XRootD
XrdPfcFile.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 
20 #include "XrdPfcFile.hh"
21 #include "XrdPfc.hh"
22 #include "XrdPfcResourceMonitor.hh"
23 #include "XrdPfcIO.hh"
24 #include "XrdPfcTrace.hh"
25 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClConstants.hh"
28 #include "XrdCl/XrdClFile.hh"
29 #include "XrdSys/XrdSysTimer.hh"
30 #include "XrdOss/XrdOss.hh"
31 #include "XrdOuc/XrdOucEnv.hh"
33 
34 #include <cstdio>
35 #include <sstream>
36 #include <fcntl.h>
37 #include <cassert>
38 
39 
40 using namespace XrdPfc;
41 
42 namespace
43 {
44 
45 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
46 
47 Cache* cache() { return &Cache::GetInstance(); }
48 
49 }
50 
51 const char *File::m_traceID = "File";
52 
53 //------------------------------------------------------------------------------
54 
55 File::File(const std::string& path, long long iOffset, long long iFileSize) :
56  m_ref_cnt(0),
57  m_data_file(0),
58  m_info_file(0),
59  m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
60  m_filename(path),
61  m_offset(iOffset),
62  m_file_size(iFileSize),
63  m_current_io(m_io_set.end()),
64  m_ios_in_detach(0),
65  m_non_flushed_cnt(0),
66  m_in_sync(false),
67  m_detach_time_logged(false),
68  m_in_shutdown(false),
69  m_state_cond(0),
70  m_block_size(0),
71  m_num_blocks(0),
72  m_resmon_token(-1),
73  m_prefetch_state(kOff),
74  m_prefetch_bytes(0),
75  m_prefetch_read_cnt(0),
76  m_prefetch_hit_cnt(0),
77  m_prefetch_score(0)
78 {}
79 
80 File::~File()
81 {
82  if (m_info_file)
83  {
84  TRACEF(Debug, "~File() close info ");
85  m_info_file->Close();
86  delete m_info_file;
87  m_info_file = nullptr;
88  }
89 
90  if (m_data_file)
91  {
92  TRACEF(Debug, "~File() close output ");
93  m_data_file->Close();
94  delete m_data_file;
95  m_data_file = nullptr;
96  }
97 
98  if (m_resmon_token >= 0)
99  {
100  // Last update of file stats has been sent from the final Sync.
101  Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
102  }
103 
104  TRACEF(Debug, "~File() ended, prefetch score = " << m_prefetch_score);
105 }
106 
107 //------------------------------------------------------------------------------
108 
109 File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
110 {
111  File *file = new File(path, offset, fileSize);
112  if ( ! file->Open())
113  {
114  delete file;
115  file = 0;
116  }
117  return file;
118 }
119 
120 //------------------------------------------------------------------------------
121 
123 {
124  // Called from Cache::Unlink() when the file is currently open.
125  // Cache::Unlink is also called on FSync error and when wrong number of bytes
126  // is received from a remote read.
127  //
128  // From this point onward the file will not be written to, cinfo file will
129  // not be updated, and all new read requests will return -ENOENT.
130  //
131  // File's entry in the Cache's active map is set to nullptr and will be
132  // removed from there shortly, in any case, well before this File object
133  // shuts down. So we do not communicate to Cache about our destruction when
134  // it happens.
135 
136  {
137  XrdSysCondVarHelper _lck(m_state_cond);
138 
139  m_in_shutdown = true;
140 
141  if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
142  {
143  m_prefetch_state = kStopped;
144  cache()->DeRegisterPrefetchFile(this);
145  }
146  }
147 }
148 
149 //------------------------------------------------------------------------------
150 
151 void File::check_delta_stats()
152 {
153  // Called under m_state_cond lock.
154  // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
155  if (m_delta_stats.BytesRead() >= m_resmon_report_threshold)
156  report_and_merge_delta_stats();
157 }
158 
159 void File::report_and_merge_delta_stats()
160 {
161  // Called under m_state_cond lock.
162  struct stat s;
163  m_data_file->Fstat(&s);
164  m_delta_stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
165  m_st_blocks = s.st_blocks;
166  Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
167  m_stats.AddUp(m_delta_stats);
168  m_delta_stats.Reset();
169 }
170 
171 //------------------------------------------------------------------------------
172 
174 {
175  TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
176 
177  XrdSysCondVarHelper _lck(m_state_cond);
178  dec_ref_count(b);
179 }
180 
181 void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
182 {
183  TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
184 
185  XrdSysCondVarHelper _lck(m_state_cond);
186 
187  for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
188  {
189  dec_ref_count(*i);
190  }
191 }
192 
193 //------------------------------------------------------------------------------
194 
196 {
197  std::string loc(io->GetLocation());
198  XrdSysCondVarHelper _lck(m_state_cond);
199  insert_remote_location(loc);
200 }
201 
202 //------------------------------------------------------------------------------
203 
205 {
206  // Returns true if delay is needed.
207 
208  TRACEF(Debug, "ioActive start for io " << io);
209 
210  std::string loc(io->GetLocation());
211 
212  {
213  XrdSysCondVarHelper _lck(m_state_cond);
214 
215  IoSet_i mi = m_io_set.find(io);
216 
217  if (mi != m_io_set.end())
218  {
219  unsigned int n_active_reads = io->m_active_read_reqs;
220 
221  TRACE(Info, "ioActive for io " << io <<
222  ", active_reads " << n_active_reads <<
223  ", active_prefetches " << io->m_active_prefetches <<
224  ", allow_prefetching " << io->m_allow_prefetching <<
225  ", ios_in_detach " << m_ios_in_detach);
226  TRACEF(Info,
227  "\tio_map.size() " << m_io_set.size() <<
228  ", block_map.size() " << m_block_map.size() << ", file");
229 
230  insert_remote_location(loc);
231 
232  io->m_allow_prefetching = false;
233  io->m_in_detach = true;
234 
235  // Check if any IO is still available for prfetching. If not, stop it.
236  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
237  {
238  if ( ! select_current_io_or_disable_prefetching(false) )
239  {
240  TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
241  }
242  }
243 
244  // On last IO, consider write queue blocks. Note, this also contains
245  // blocks being prefetched.
246 
247  bool io_active_result;
248 
249  if (n_active_reads > 0)
250  {
251  io_active_result = true;
252  }
253  else if (m_io_set.size() - m_ios_in_detach == 1)
254  {
255  io_active_result = ! m_block_map.empty();
256  }
257  else
258  {
259  io_active_result = io->m_active_prefetches > 0;
260  }
261 
262  if ( ! io_active_result)
263  {
264  ++m_ios_in_detach;
265  }
266 
267  TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
268 
269  return io_active_result;
270  }
271  else
272  {
273  TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
274  return false;
275  }
276  }
277 }
278 
279 //------------------------------------------------------------------------------
280 
282 {
283  XrdSysCondVarHelper _lck(m_state_cond);
284  m_detach_time_logged = false;
285 }
286 
288 {
289  // Returns true if sync is required.
290  // This method is called after corresponding IO is detached from PosixCache.
291 
292  XrdSysCondVarHelper _lck(m_state_cond);
293  if ( ! m_in_shutdown)
294  {
295  if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
296  {
297  report_and_merge_delta_stats();
298  m_cfi.WriteIOStatDetach(m_stats);
299  m_detach_time_logged = true;
300  m_in_sync = true;
301  TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
302  return true;
303  }
304  }
305  TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
306  return false;
307 }
308 
309 //------------------------------------------------------------------------------
310 
311 void File::AddIO(IO *io)
312 {
313  // Called from Cache::GetFile() when a new IO asks for the file.
314 
315  TRACEF(Debug, "AddIO() io = " << (void*)io);
316 
317  time_t now = time(0);
318  std::string loc(io->GetLocation());
319 
320  m_state_cond.Lock();
321 
322  IoSet_i mi = m_io_set.find(io);
323 
324  if (mi == m_io_set.end())
325  {
326  m_io_set.insert(io);
327  io->m_attach_time = now;
328  m_delta_stats.IoAttach();
329 
330  insert_remote_location(loc);
331 
332  if (m_prefetch_state == kStopped)
333  {
334  m_prefetch_state = kOn;
335  cache()->RegisterPrefetchFile(this);
336  }
337  }
338  else
339  {
340  TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
341  }
342 
343  m_state_cond.UnLock();
344 }
345 
346 //------------------------------------------------------------------------------
347 
349 {
350  // Called from Cache::ReleaseFile.
351 
352  TRACEF(Debug, "RemoveIO() io = " << (void*)io);
353 
354  time_t now = time(0);
355 
356  m_state_cond.Lock();
357 
358  IoSet_i mi = m_io_set.find(io);
359 
360  if (mi != m_io_set.end())
361  {
362  if (mi == m_current_io)
363  {
364  ++m_current_io;
365  }
366 
367  m_delta_stats.IoDetach(now - io->m_attach_time);
368  m_io_set.erase(mi);
369  --m_ios_in_detach;
370 
371  if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
372  {
373  TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
374  m_prefetch_state = kStopped;
375  cache()->DeRegisterPrefetchFile(this);
376  }
377  }
378  else
379  {
380  TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
381  }
382 
383  m_state_cond.UnLock();
384 }
385 
386 //------------------------------------------------------------------------------
387 
388 bool File::Open()
389 {
390  // Sets errno accordingly.
391 
392  static const char *tpfx = "Open() ";
393 
394  TRACEF(Dump, tpfx << "entered");
395 
396  // Before touching anything, check with ResourceMonitor if a scan is in progress.
397  // This function will wait internally if needed until it is safe to proceed.
398  Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
399 
401 
402  XrdOss &myOss = * Cache::GetInstance().GetOss();
403  const char *myUser = conf.m_username.c_str();
404  XrdOucEnv myEnv;
405  struct stat data_stat, info_stat;
406 
407  std::string ifn = m_filename + Info::s_infoExtension;
408 
409  bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
410  bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
411 
412  // Create the data file itself.
413  char size_str[32]; sprintf(size_str, "%lld", m_file_size);
414  myEnv.Put("oss.asize", size_str);
415  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
416 
417  int res;
418 
419  if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
420  {
421  TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
422  errno = -res;
423  return false;
424  }
425 
426  m_data_file = myOss.newFile(myUser);
427  if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
428  {
429  TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
430  errno = -res;
431  delete m_data_file; m_data_file = 0;
432  return false;
433  }
434 
435  myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
436  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
437  if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
438  {
439  TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
440  errno = -res;
441  m_data_file->Close(); delete m_data_file; m_data_file = 0;
442  return false;
443  }
444 
445  m_info_file = myOss.newFile(myUser);
446  if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
447  {
448  TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
449  errno = -res;
450  delete m_info_file; m_info_file = 0;
451  m_data_file->Close(); delete m_data_file; m_data_file = 0;
452  return false;
453  }
454 
455  bool initialize_info_file = true;
456 
457  if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
458  {
459  TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
460  ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
461  ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
462 
463  // Check if data file exists and is of reasonable size.
464  if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
465  {
466  initialize_info_file = false;
467  } else {
468  TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
469  m_cfi.ResetAllAccessStats();
470  m_data_file->Ftruncate(0);
471  Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
472  }
473  }
474 
475  if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
476  {
477  if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
478  conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
479  {
480  TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
481  initialize_info_file = true;
482  m_cfi.ResetAllAccessStats();
483  m_data_file->Ftruncate(0);
484  Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
485  } else {
486  // TODO: If the file is complete, we don't need to reset net cksums.
487  m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
488  }
489  }
490 
491  if (initialize_info_file)
492  {
493  m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
494  m_cfi.SetCkSumState(conf.get_cs_Chk());
495  m_cfi.ResetNoCkSumTime();
496  m_cfi.Write(m_info_file, ifn.c_str());
497  m_info_file->Fsync();
498  cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
499  TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
500  }
501  else
502  {
503  if (futimens(m_info_file->getFD(), NULL)) {
504  TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
505  }
506  }
507 
508  m_cfi.WriteIOStatAttach();
509  m_state_cond.Lock();
510  m_block_size = m_cfi.GetBufferSize();
511  m_num_blocks = m_cfi.GetNBlocks();
512  m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
513 
514  m_data_file->Fstat(&data_stat);
515  m_st_blocks = data_stat.st_blocks;
516 
517  m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
518  m_resmon_report_threshold = std::min(std::max(200ll * 1024, m_file_size / 50), 500ll * 1024 * 1024);
519  // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
520  // actual threshold based on return values from register_file_update_stats().
521 
522  m_state_cond.UnLock();
523 
524  return true;
525 }
526 
527 int File::Fstat(struct stat &sbuff)
528 {
529  // Stat on an open file.
530  // Corrects size to actual full size of the file.
531  // Sets atime to 0 if the file is only partially downloaded, in accordance
532  // with pfc.onlyifcached settings.
533  // Called from IO::Fstat() and Cache::Stat() when the file is active.
534  // Returns 0 on success, -errno on error.
535 
536  int res;
537 
538  if ((res = m_data_file->Fstat(&sbuff))) return res;
539 
540  sbuff.st_size = m_file_size;
541 
542  bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
543  if ( ! is_cached)
544  sbuff.st_atime = 0;
545 
546  return 0;
547 }
548 
549 //==============================================================================
550 // Read and helpers
551 //==============================================================================
552 
553 bool File::overlap(int blk, // block to query
554  long long blk_size, //
555  long long req_off, // offset of user request
556  int req_size, // size of user request
557  // output:
558  long long &off, // offset in user buffer
559  long long &blk_off, // offset in block
560  int &size) // size to copy
561 {
562  const long long beg = blk * blk_size;
563  const long long end = beg + blk_size;
564  const long long req_end = req_off + req_size;
565 
566  if (req_off < end && req_end > beg)
567  {
568  const long long ovlp_beg = std::max(beg, req_off);
569  const long long ovlp_end = std::min(end, req_end);
570 
571  off = ovlp_beg - req_off;
572  blk_off = ovlp_beg - beg;
573  size = (int) (ovlp_end - ovlp_beg);
574 
575  assert(size <= blk_size);
576  return true;
577  }
578  else
579  {
580  return false;
581  }
582 }
583 
584 //------------------------------------------------------------------------------
585 
586 Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
587 {
588  // Must be called w/ state_cond locked.
589  // Checks on size etc should be done before.
590  //
591  // Reference count is 0 so increase it in calling function if you want to
592  // catch the block while still in memory.
593 
594  const long long off = i * m_block_size;
595  const int last_block = m_num_blocks - 1;
596  const bool cs_net = cache()->RefConfiguration().is_cschk_net();
597 
598  int blk_size, req_size;
599  if (i == last_block) {
600  blk_size = req_size = m_file_size - off;
601  if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
602  } else {
603  blk_size = req_size = m_block_size;
604  }
605 
606  Block *b = 0;
607  char *buf = cache()->RequestRAM(req_size);
608 
609  if (buf)
610  {
611  b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
612 
613  if (b)
614  {
615  m_block_map[i] = b;
616 
617  // Actual Read request is issued in ProcessBlockRequests().
618 
619  if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
620  {
621  m_prefetch_state = kHold;
622  cache()->DeRegisterPrefetchFile(this);
623  }
624  }
625  else
626  {
627  TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
628  }
629  }
630 
631  return b;
632 }
633 
634 void File::ProcessBlockRequest(Block *b)
635 {
636  // This *must not* be called with block_map locked.
637 
639 
640  if (XRD_TRACE What >= TRACE_Dump) {
641  char buf[256];
642  snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
643  b->get_offset()/m_block_size, b, b->m_prefetch, b->get_offset(), b->get_req_size(), b->get_buff(), brh);
644  TRACEF(Dump, "ProcessBlockRequest() " << buf);
645  }
646 
647  if (b->req_cksum_net())
648  {
649  b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
650  b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
651  } else {
652  b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
653  }
654 }
655 
656 void File::ProcessBlockRequests(BlockList_t& blks)
657 {
658  // This *must not* be called with block_map locked.
659 
660  for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
661  {
662  ProcessBlockRequest(*bi);
663  }
664 }
665 
666 //------------------------------------------------------------------------------
667 
668 void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
669 {
670  int n_chunks = ioVec.size();
671  int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
672 
673  TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
674  ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
675 
676  DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
677 
678  int pos = 0;
679  while (n_chunks > XrdProto::maxRvecsz) {
680  io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
681  pos += XrdProto::maxRvecsz;
682  n_chunks -= XrdProto::maxRvecsz;
683  }
684  io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
685 }
686 
687 //------------------------------------------------------------------------------
688 
689 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
690 {
691  TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
692 
693  long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
694 
695  if (rs < 0)
696  {
697  TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
698  return rs;
699  }
700 
701  if (rs != expected_size)
702  {
703  TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
704  return -EIO;
705  }
706 
707  return (int) rs;
708 }
709 
710 //------------------------------------------------------------------------------
711 
712 int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
713 {
714  // rrc_func is ONLY called from async processing.
715  // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
716  // This streamlines implementation of synchronous IO::Read().
717 
718  TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
719 
720  m_state_cond.Lock();
721 
722  if (m_in_shutdown || io->m_in_detach)
723  {
724  m_state_cond.UnLock();
725  return m_in_shutdown ? -ENOENT : -EBADF;
726  }
727 
728  // Shortcut -- file is fully downloaded.
729 
730  if (m_cfi.IsComplete())
731  {
732  m_state_cond.UnLock();
733  int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
734  if (ret > 0) {
735  XrdSysCondVarHelper _lck(m_state_cond);
736  m_delta_stats.AddBytesHit(ret);
737  check_delta_stats();
738  }
739  return ret;
740  }
741 
742  XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
743 
744  return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
745 }
746 
747 //------------------------------------------------------------------------------
748 
749 int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
750 {
751  TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
752 
753  m_state_cond.Lock();
754 
755  if (m_in_shutdown || io->m_in_detach)
756  {
757  m_state_cond.UnLock();
758  return m_in_shutdown ? -ENOENT : -EBADF;
759  }
760 
761  // Shortcut -- file is fully downloaded.
762 
763  if (m_cfi.IsComplete())
764  {
765  m_state_cond.UnLock();
766  int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
767  if (ret > 0) {
768  XrdSysCondVarHelper _lck(m_state_cond);
769  m_delta_stats.AddBytesHit(ret);
770  check_delta_stats();
771  }
772  return ret;
773  }
774 
775  return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
776 }
777 
778 //------------------------------------------------------------------------------
779 
780 int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
781  ReadReqRH *rh, const char *tpfx)
782 {
783  // Non-trivial processing for Read and ReadV.
784  // Entered under lock.
785  //
786  // loop over reqired blocks:
787  // - if on disk, ok;
788  // - if in ram or incoming, inc ref-count
789  // - otherwise request and inc ref count (unless RAM full => request direct)
790  // unlock
791 
792  int prefetch_cnt = 0;
793 
794  ReadRequest *read_req = nullptr;
795  BlockList_t blks_to_request; // blocks we are issuing a new remote request for
796 
797  std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
798 
799  std::vector<XrdOucIOVec> iovec_disk;
800  std::vector<XrdOucIOVec> iovec_direct;
801  int iovec_disk_total = 0;
802  int iovec_direct_total = 0;
803 
804  for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
805  {
806  const XrdOucIOVec &iov = readV[iov_idx];
807  long long iUserOff = iov.offset;
808  int iUserSize = iov.size;
809  char *iUserBuff = iov.data;
810 
811  const int idx_first = iUserOff / m_block_size;
812  const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
813 
814  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
815 
816  enum LastBlock_e { LB_other, LB_disk, LB_direct };
817 
818  LastBlock_e lbe = LB_other;
819 
820  for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
821  {
822  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
823  BlockMap_i bi = m_block_map.find(block_idx);
824 
825  // overlap and read
826  long long off; // offset in user buffer
827  long long blk_off; // offset in block
828  int size; // size to copy
829 
830  overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
831 
832  // In RAM or incoming?
833  if (bi != m_block_map.end())
834  {
835  inc_ref_count(bi->second);
836  TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
837 
838  if (bi->second->is_finished())
839  {
840  // note, blocks with error should not be here !!!
841  // they should be either removed or reissued in ProcessBlockResponse()
842  assert(bi->second->is_ok());
843 
844  blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
845 
846  if (bi->second->m_prefetch)
847  ++prefetch_cnt;
848  }
849  else
850  {
851  if ( ! read_req)
852  read_req = new ReadRequest(io, rh);
853 
854  // We have a lock on state_cond --> as we register the request before releasing the lock,
855  // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
856 
857  bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
858  ++read_req->m_n_chunk_reqs;
859  }
860 
861  lbe = LB_other;
862  }
863  // On disk?
864  else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
865  {
866  TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
867 
868  if (lbe == LB_disk)
869  iovec_disk.back().size += size;
870  else
871  iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
872  iovec_disk_total += size;
873 
874  if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
875  ++prefetch_cnt;
876 
877  lbe = LB_disk;
878  }
879  // Neither ... then we have to go get it ...
880  else
881  {
882  if ( ! read_req)
883  read_req = new ReadRequest(io, rh);
884 
885  // Is there room for one more RAM Block?
886  Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
887  if (b)
888  {
889  TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
890  inc_ref_count(b);
891  blks_to_request.push_back(b);
892 
893  b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
894  ++read_req->m_n_chunk_reqs;
895 
896  lbe = LB_other;
897  }
898  else // Nope ... read this directly without caching.
899  {
900  TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
901 
902  iovec_direct_total += size;
903  read_req->m_direct_done = false;
904 
905  // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
906  // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
907  // is determined in the RequestBlocksDirect().
908  if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
909  iovec_direct.back().size += size;
910  } else {
911  long long in_offset = block_idx * m_block_size + blk_off;
912  char *out_pos = iUserBuff + off;
913  while (size > XrdProto::maxRVdsz) {
914  iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
915  in_offset += XrdProto::maxRVdsz;
916  out_pos += XrdProto::maxRVdsz;
917  size -= XrdProto::maxRVdsz;
918  }
919  iovec_direct.push_back( { in_offset, size, 0, out_pos } );
920  }
921 
922  lbe = LB_direct;
923  }
924  }
925  } // end for over blocks in an IOVec
926  } // end for over readV IOVec
927 
928  inc_prefetch_hit_cnt(prefetch_cnt);
929 
930  m_state_cond.UnLock();
931 
932  // First, send out remote requests for new blocks.
933  if ( ! blks_to_request.empty())
934  {
935  ProcessBlockRequests(blks_to_request);
936  blks_to_request.clear();
937  }
938 
939  // Second, send out remote direct read requests.
940  if ( ! iovec_direct.empty())
941  {
942  RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
943 
944  TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
945  }
946 
947  // Begin synchronous part where we process data that is already in RAM or on disk.
948 
949  long long bytes_read = 0;
950  int error_cond = 0; // to be set to -errno
951 
952  // Third, process blocks that are available in RAM.
953  if ( ! blks_ready.empty())
954  {
955  for (auto &bvi : blks_ready)
956  {
957  for (auto &cr : bvi.second)
958  {
959  TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
960  memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
961  bytes_read += cr.m_size;
962  }
963  }
964  }
965 
966  // Fourth, read blocks from disk.
967  if ( ! iovec_disk.empty())
968  {
969  int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
970  TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
971  if (rc >= 0)
972  {
973  bytes_read += rc;
974  }
975  else
976  {
977  error_cond = rc;
978  TRACEF(Error, tpfx << "failed read from disk");
979  }
980  }
981 
982  // End synchronous part -- update with sync stats and determine actual state of this read.
983  // Note: remote reads might have already finished during disk-read!
984 
985  m_state_cond.Lock();
986 
987  for (auto &bvi : blks_ready)
988  dec_ref_count(bvi.first, (int) bvi.second.size());
989 
990  if (read_req)
991  {
992  read_req->m_bytes_read += bytes_read;
993  if (error_cond)
994  read_req->update_error_cond(error_cond);
995  read_req->m_stats.m_BytesHit += bytes_read;
996  read_req->m_sync_done = true;
997 
998  if (read_req->is_complete())
999  {
1000  // Almost like FinalizeReadRequest(read_req) -- but no callout!
1001  m_delta_stats.AddReadStats(read_req->m_stats);
1002  check_delta_stats();
1003  m_state_cond.UnLock();
1004 
1005  int ret = read_req->return_value();
1006  delete read_req;
1007  return ret;
1008  }
1009  else
1010  {
1011  m_state_cond.UnLock();
1012  return -EWOULDBLOCK;
1013  }
1014  }
1015  else
1016  {
1017  m_delta_stats.m_BytesHit += bytes_read;
1018  check_delta_stats();
1019  m_state_cond.UnLock();
1020 
1021  // !!! No callout.
1022 
1023  return error_cond ? error_cond : bytes_read;
1024  }
1025 }
1026 
1027 
1028 //==============================================================================
1029 // WriteBlock and Sync
1030 //==============================================================================
1031 
1033 {
1034  // write block buffer into disk file
1035  long long offset = b->m_offset - m_offset;
1036  long long size = b->get_size();
1037  ssize_t retval;
1038 
1039  if (m_cfi.IsCkSumCache())
1040  if (b->has_cksums())
1041  retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1042  else
1043  retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1044  else
1045  retval = m_data_file->Write(b->get_buff(), offset, size);
1046 
1047  if (retval < size)
1048  {
1049  if (retval < 0)
1050  {
1051  GetLog()->Emsg("WriteToDisk()", -retval, "write block to disk", GetLocalPath().c_str());
1052  }
1053  else
1054  {
1055  TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1056  }
1057 
1058  XrdSysCondVarHelper _lck(m_state_cond);
1059 
1060  dec_ref_count(b);
1061 
1062  return;
1063  }
1064 
1065  const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1066 
1067  // Set written bit.
1068  TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1069 
1070  bool schedule_sync = false;
1071  {
1072  XrdSysCondVarHelper _lck(m_state_cond);
1073 
1074  m_cfi.SetBitWritten(blk_idx);
1075 
1076  if (b->m_prefetch)
1077  {
1078  m_cfi.SetBitPrefetch(blk_idx);
1079  }
1080  if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1081  {
1082  m_cfi.ResetCkSumNet();
1083  }
1084 
1085  dec_ref_count(b);
1086 
1087  // Set synced bit or stash block index if in actual sync.
1088  // Synced state is only written out to cinfo file when data file is synced.
1089  if (m_in_sync)
1090  {
1091  m_writes_during_sync.push_back(blk_idx);
1092  }
1093  else
1094  {
1095  m_cfi.SetBitSynced(blk_idx);
1096  ++m_non_flushed_cnt;
1097  if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1098  ! m_in_shutdown)
1099  {
1100  schedule_sync = true;
1101  m_in_sync = true;
1102  m_non_flushed_cnt = 0;
1103  }
1104  }
1105  }
1106 
1107  if (schedule_sync)
1108  {
1109  cache()->ScheduleFileSync(this);
1110  }
1111 }
1112 
1113 //------------------------------------------------------------------------------
1114 
1116 {
1117  TRACEF(Dump, "Sync()");
1118 
1119  int ret = m_data_file->Fsync();
1120  bool errorp = false;
1121  if (ret == XrdOssOK)
1122  {
1123  Stats loc_stats;
1124  {
1125  XrdSysCondVarHelper _lck(&m_state_cond);
1126  report_and_merge_delta_stats();
1127  loc_stats = m_stats;
1128  }
1129  m_cfi.WriteIOStat(loc_stats);
1130  m_cfi.Write(m_info_file, m_filename.c_str());
1131  int cret = m_info_file->Fsync();
1132  if (cret != XrdOssOK)
1133  {
1134  TRACEF(Error, "Sync cinfo file sync error " << cret);
1135  errorp = true;
1136  }
1137  }
1138  else
1139  {
1140  TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1141  errorp = true;
1142  }
1143 
1144  if (errorp)
1145  {
1146  TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1147 
1148  // Unlink will also call this->initiate_emergency_shutdown()
1149  Cache::GetInstance().UnlinkFile(m_filename, false);
1150 
1151  XrdSysCondVarHelper _lck(&m_state_cond);
1152 
1153  m_writes_during_sync.clear();
1154  m_in_sync = false;
1155 
1156  return;
1157  }
1158 
1159  int written_while_in_sync;
1160  bool resync = false;
1161  {
1162  XrdSysCondVarHelper _lck(&m_state_cond);
1163  for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1164  {
1165  m_cfi.SetBitSynced(*i);
1166  }
1167  written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1168  m_writes_during_sync.clear();
1169 
1170  // If there were writes during sync and the file is now complete,
1171  // let us call Sync again without resetting the m_in_sync flag.
1172  if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1173  resync = true;
1174  else
1175  m_in_sync = false;
1176  }
1177  TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1178 
1179  if (resync)
1180  Sync();
1181 }
1182 
1183 
1184 //==============================================================================
1185 // Block processing
1186 //==============================================================================
1187 
1188 void File::free_block(Block* b)
1189 {
1190  // Method always called under lock.
1191  int i = b->m_offset / m_block_size;
1192  TRACEF(Dump, "free_block block " << b << " idx = " << i);
1193  size_t ret = m_block_map.erase(i);
1194  if (ret != 1)
1195  {
1196  // assert might be a better option than a warning
1197  TRACEF(Error, "free_block did not erase " << i << " from map");
1198  }
1199  else
1200  {
1201  cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1202  delete b;
1203  }
1204 
1205  if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1206  {
1207  m_prefetch_state = kOn;
1208  cache()->RegisterPrefetchFile(this);
1209  }
1210 }
1211 
1212 //------------------------------------------------------------------------------
1213 
1214 bool File::select_current_io_or_disable_prefetching(bool skip_current)
1215 {
1216  // Method always called under lock. It also expects prefetch to be active.
1217 
1218  int io_size = (int) m_io_set.size();
1219  bool io_ok = false;
1220 
1221  if (io_size == 1)
1222  {
1223  io_ok = (*m_io_set.begin())->m_allow_prefetching;
1224  if (io_ok)
1225  {
1226  m_current_io = m_io_set.begin();
1227  }
1228  }
1229  else if (io_size > 1)
1230  {
1231  IoSet_i mi = m_current_io;
1232  if (skip_current && mi != m_io_set.end()) ++mi;
1233 
1234  for (int i = 0; i < io_size; ++i)
1235  {
1236  if (mi == m_io_set.end()) mi = m_io_set.begin();
1237 
1238  if ((*mi)->m_allow_prefetching)
1239  {
1240  m_current_io = mi;
1241  io_ok = true;
1242  break;
1243  }
1244  ++mi;
1245  }
1246  }
1247 
1248  if ( ! io_ok)
1249  {
1250  m_current_io = m_io_set.end();
1251  m_prefetch_state = kStopped;
1252  cache()->DeRegisterPrefetchFile(this);
1253  }
1254 
1255  return io_ok;
1256 }
1257 
1258 //------------------------------------------------------------------------------
1259 
1260 void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1261 {
1262  // Called from DirectResponseHandler.
1263  // NOT under lock.
1264 
1265  if (error_cond)
1266  TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1267 
1268  m_state_cond.Lock();
1269 
1270  if (error_cond)
1271  rreq->update_error_cond(error_cond);
1272  else {
1273  rreq->m_stats.m_BytesBypassed += bytes_read;
1274  rreq->m_bytes_read += bytes_read;
1275  }
1276 
1277  rreq->m_direct_done = true;
1278 
1279  bool rreq_complete = rreq->is_complete();
1280 
1281  m_state_cond.UnLock();
1282 
1283  if (rreq_complete)
1284  FinalizeReadRequest(rreq);
1285 }
1286 
1287 void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1288 {
1289  // Called from ProcessBlockResponse().
1290  // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1291  // Does not manage m_read_req.
1292  // Will not complete the request.
1293 
1294  TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1295  " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1296 
1297  rreq->update_error_cond(b->get_error());
1298  --rreq->m_n_chunk_reqs;
1299 
1300  dec_ref_count(b);
1301 }
1302 
1303 void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1304 {
1305  // Called from ProcessBlockResponse().
1306  // NOT under lock as it does memcopy ofor exisf block data.
1307  // Acquires lock for block, m_read_req and rreq state update.
1308 
1309  ReadRequest *rreq = creq.m_read_req;
1310 
1311  TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1312  memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1313 
1314  m_state_cond.Lock();
1315 
1316  rreq->m_bytes_read += creq.m_size;
1317 
1318  if (b->get_req_id() == (void*) rreq)
1319  rreq->m_stats.m_BytesMissed += creq.m_size;
1320  else
1321  rreq->m_stats.m_BytesHit += creq.m_size;
1322 
1323  --rreq->m_n_chunk_reqs;
1324 
1325  if (b->m_prefetch)
1326  inc_prefetch_hit_cnt(1);
1327 
1328  dec_ref_count(b);
1329 
1330  bool rreq_complete = rreq->is_complete();
1331 
1332  m_state_cond.UnLock();
1333 
1334  if (rreq_complete)
1335  FinalizeReadRequest(rreq);
1336 }
1337 
1338 void File::FinalizeReadRequest(ReadRequest *rreq)
1339 {
1340  // called from ProcessBlockResponse()
1341  // NOT under lock -- does callout
1342  {
1343  XrdSysCondVarHelper _lck(m_state_cond);
1344  m_delta_stats.AddReadStats(rreq->m_stats);
1345  check_delta_stats();
1346  }
1347 
1348  rreq->m_rh->Done(rreq->return_value());
1349  delete rreq;
1350 }
1351 
1352 void File::ProcessBlockResponse(Block *b, int res)
1353 {
1354  static const char* tpfx = "ProcessBlockResponse ";
1355 
1356  TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1357 
1358  if (res >= 0 && res != b->get_size())
1359  {
1360  // Incorrect number of bytes received, apparently size of the file on the remote
1361  // is different than what the cache expects it to be.
1362  TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1363  Cache::GetInstance().UnlinkFile(m_filename, false);
1364  }
1365 
1366  m_state_cond.Lock();
1367 
1368  // Deregister block from IO's prefetch count, if needed.
1369  if (b->m_prefetch)
1370  {
1371  IO *io = b->get_io();
1372  IoSet_i mi = m_io_set.find(io);
1373  if (mi != m_io_set.end())
1374  {
1375  --io->m_active_prefetches;
1376 
1377  // If failed and IO is still prefetching -- disable prefetching on this IO.
1378  if (res < 0 && io->m_allow_prefetching)
1379  {
1380  TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1381  io->m_allow_prefetching = false;
1382 
1383  // Check if any IO is still available for prfetching. If not, stop it.
1384  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1385  {
1386  if ( ! select_current_io_or_disable_prefetching(false) )
1387  {
1388  TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1389  }
1390  }
1391  }
1392 
1393  // If failed with no subscribers -- delete the block and exit.
1394  if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1395  {
1396  free_block(b);
1397  m_state_cond.UnLock();
1398  return;
1399  }
1400  m_prefetch_bytes += b->get_size();
1401  }
1402  else
1403  {
1404  TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1405  }
1406  }
1407 
1408  if (res == b->get_size())
1409  {
1410  b->set_downloaded();
1411  TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1412  if ( ! m_in_shutdown)
1413  {
1414  // Increase ref-count for the writer.
1415  inc_ref_count(b);
1416  m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1417  // No check for writes, report-and-merge forced during Sync().
1418  cache()->AddWriteTask(b, true);
1419  }
1420 
1421  // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1422  vChunkRequest_t creqs_to_notify;
1423  creqs_to_notify.swap( b->m_chunk_reqs );
1424 
1425  m_state_cond.UnLock();
1426 
1427  for (auto &creq : creqs_to_notify)
1428  {
1429  ProcessBlockSuccess(b, creq);
1430  }
1431  }
1432  else
1433  {
1434  if (res < 0) {
1435  bool new_error = b->get_io()->register_block_error(res);
1436  int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1437  TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1438  << ", io=" << b->get_io() << ", error=" << res);
1439  } else {
1440  bool first_p = b->get_io()->register_incomplete_read();
1441  int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1442  TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1443  << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1444 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1445  res = -EIO;
1446 #else
1447  res = -EREMOTEIO;
1448 #endif
1449  }
1450  b->set_error(res);
1451 
1452  // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1453  // Collect others with a different IO, the first of them will be used to reissue the request.
1454  // This is then done outside of lock.
1455  std::list<ReadRequest*> rreqs_to_complete;
1456  vChunkRequest_t creqs_to_keep;
1457 
1458  for(ChunkRequest &creq : b->m_chunk_reqs)
1459  {
1460  ReadRequest *rreq = creq.m_read_req;
1461 
1462  if (rreq->m_io == b->get_io())
1463  {
1464  ProcessBlockError(b, rreq);
1465  if (rreq->is_complete())
1466  {
1467  rreqs_to_complete.push_back(rreq);
1468  }
1469  }
1470  else
1471  {
1472  creqs_to_keep.push_back(creq);
1473  }
1474  }
1475 
1476  bool reissue = false;
1477  if ( ! creqs_to_keep.empty())
1478  {
1479  ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1480 
1481  TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1482  b->get_io() << " - reissuing request with my io " << rreq->m_io);
1483 
1484  b->reset_error_and_set_io(rreq->m_io, rreq);
1485  b->m_chunk_reqs.swap( creqs_to_keep );
1486  reissue = true;
1487  }
1488 
1489  m_state_cond.UnLock();
1490 
1491  for (auto rreq : rreqs_to_complete)
1492  FinalizeReadRequest(rreq);
1493 
1494  if (reissue)
1495  ProcessBlockRequest(b);
1496  }
1497 }
1498 
1499 //------------------------------------------------------------------------------
1500 
1501 const char* File::lPath() const
1502 {
1503  return m_filename.c_str();
1504 }
1505 
1506 //------------------------------------------------------------------------------
1507 
1508 int File::offsetIdx(int iIdx) const
1509 {
1510  return iIdx - m_offset/m_block_size;
1511 }
1512 
1513 
1514 //------------------------------------------------------------------------------
1515 
1517 {
1518  // Check that block is not on disk and not in RAM.
1519  // TODO: Could prefetch several blocks at once!
1520  // blks_max could be an argument
1521 
1522  BlockList_t blks;
1523 
1524  TRACEF(DumpXL, "Prefetch() entering.");
1525  {
1526  XrdSysCondVarHelper _lck(m_state_cond);
1527 
1528  if (m_prefetch_state != kOn)
1529  {
1530  return;
1531  }
1532 
1533  if ( ! select_current_io_or_disable_prefetching(true) )
1534  {
1535  TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1536  return;
1537  }
1538 
1539  // Select block(s) to fetch.
1540  for (int f = 0; f < m_num_blocks; ++f)
1541  {
1542  if ( ! m_cfi.TestBitWritten(f))
1543  {
1544  int f_act = f + m_offset / m_block_size;
1545 
1546  BlockMap_i bi = m_block_map.find(f_act);
1547  if (bi == m_block_map.end())
1548  {
1549  Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1550  if (b)
1551  {
1552  TRACEF(Dump, "Prefetch take block " << f_act);
1553  blks.push_back(b);
1554  // Note: block ref_cnt not increased, it will be when placed into write queue.
1555 
1556  inc_prefetch_read_cnt(1);
1557  }
1558  else
1559  {
1560  // This shouldn't happen as prefetching stops when RAM is 70% full.
1561  TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1562  }
1563  break;
1564  }
1565  }
1566  }
1567 
1568  if (blks.empty())
1569  {
1570  TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1571  m_prefetch_state = kComplete;
1572  cache()->DeRegisterPrefetchFile(this);
1573  }
1574  else
1575  {
1576  (*m_current_io)->m_active_prefetches += (int) blks.size();
1577  }
1578  }
1579 
1580  if ( ! blks.empty())
1581  {
1582  ProcessBlockRequests(blks);
1583  }
1584 }
1585 
1586 
1587 //------------------------------------------------------------------------------
1588 
1590 {
1591  return m_prefetch_score;
1592 }
1593 
1595 {
1596  return Cache::GetInstance().GetLog();
1597 }
1598 
1600 {
1601  return Cache::GetInstance().GetTrace();
1602 }
1603 
1604 void File::insert_remote_location(const std::string &loc)
1605 {
1606  if ( ! loc.empty())
1607  {
1608  size_t p = loc.find_first_of('@');
1609  m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1610  }
1611 }
1612 
1613 std::string File::GetRemoteLocations() const
1614 {
1615  std::string s;
1616  if ( ! m_remote_locations.empty())
1617  {
1618  size_t sl = 0;
1619  int nl = 0;
1620  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1621  {
1622  sl += i->size();
1623  }
1624  s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1625  s = '[';
1626  int j = 1;
1627  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1628  {
1629  s += '"'; s += *i; s += '"';
1630  if (j < nl) s += ',';
1631  }
1632  s += ']';
1633  }
1634  else
1635  {
1636  s = "[]";
1637  }
1638  return s;
1639 }
1640 
1641 //==============================================================================
1642 //======================= RESPONSE HANDLERS ==============================
1643 //==============================================================================
1644 
1646 {
1647  m_block->m_file->ProcessBlockResponse(m_block, res);
1648  delete this;
1649 }
1650 
1651 //------------------------------------------------------------------------------
1652 
1654 {
1655  m_mutex.Lock();
1656 
1657  int n_left = --m_to_wait;
1658 
1659  if (res < 0) {
1660  if (m_errno == 0) m_errno = res; // store first reported error
1661  } else {
1662  m_bytes_read += res;
1663  }
1664 
1665  m_mutex.UnLock();
1666 
1667  if (n_left == 0)
1668  {
1669  m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1670  delete this;
1671  }
1672 }
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
@ Warning
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
#define TRACE_Error
Definition: XrdPfcTrace.hh:7
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACEF(act, x)
Definition: XrdPfcTrace.hh:67
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
#define TRACEF_INT(act, x)
Definition: XrdPfcTrace.hh:71
int stat(const char *path, struct stat *buf)
#define XRD_TRACE
Definition: XrdScheduler.cc:48
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual int Fsync()
Definition: XrdOss.hh:144
virtual int Ftruncate(unsigned long long flen)
Definition: XrdOss.hh:164
virtual int Fstat(struct stat *buf)
Definition: XrdOss.hh:136
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition: XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
Definition: XrdOucCache.cc:39
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
Definition: XrdOucCache.cc:86
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
void Done(int result) override
Definition: XrdPfcFile.cc:1645
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:169
int get_size() const
Definition: XrdPfcFile.hh:142
int get_error() const
Definition: XrdPfcFile.hh:156
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:168
void * get_req_id() const
Definition: XrdPfcFile.hh:148
long long get_offset() const
Definition: XrdPfcFile.hh:144
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:131
void set_error(int err)
Definition: XrdPfcFile.hh:155
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:167
char * get_buff() const
Definition: XrdPfcFile.hh:141
IO * get_io() const
Definition: XrdPfcFile.hh:147
void set_downloaded()
Definition: XrdPfcFile.hh:154
bool req_cksum_net() const
Definition: XrdPfcFile.hh:165
bool has_cksums() const
Definition: XrdPfcFile.hh:166
long long m_offset
Definition: XrdPfcFile.hh:120
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:158
int get_req_size() const
Definition: XrdPfcFile.hh:143
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:151
XrdOss * GetOss() const
Definition: XrdPfc.hh:267
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:282
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:203
static ResourceMonitor & ResMon()
Definition: XrdPfc.cc:136
XrdSysError * GetLog()
Definition: XrdPfc.hh:281
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:133
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1163
void Done(int result) override
Definition: XrdPfcFile.cc:1653
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:287
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1501
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:749
XrdSysTrace * GetTrace()
Definition: XrdPfcFile.cc:1599
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1032
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:109
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1589
friend class BlockResponseHandler
Definition: XrdPfcFile.hh:210
XrdSysError * GetLog()
Definition: XrdPfcFile.cc:1594
void Prefetch()
Definition: XrdPfcFile.cc:1516
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1613
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:527
void AddIO(IO *io)
Definition: XrdPfcFile.cc:311
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:281
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:181
friend class DirectResponseHandler
Definition: XrdPfcFile.hh:211
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:122
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1115
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:712
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:195
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:348
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:173
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:270
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:204
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
bool register_incomplete_read()
Definition: XrdPfcIO.hh:90
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:30
bool register_block_error(int res)
Definition: XrdPfcIO.hh:93
RAtomic_int m_active_read_reqs
number of active read requests
Definition: XrdPfcIO.hh:70
const char * GetLocation()
Definition: XrdPfcIO.hh:44
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
Definition: XrdPfcInfo.hh:365
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309
void SetBitSynced(int i)
Mark block as synced to disk.
Definition: XrdPfcInfo.hh:387
time_t GetNoCkSumTimeForUVKeep() const
Definition: XrdPfcInfo.hh:301
CkSumCheck_e GetCkSumState() const
Definition: XrdPfcInfo.hh:286
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
Definition: XrdPfcInfo.cc:422
void ResetCkSumNet()
Definition: XrdPfcInfo.cc:215
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:268
void DowngradeCkSumState(CkSumCheck_e css_ref)
Definition: XrdPfcInfo.hh:295
bool IsCkSumNet() const
Definition: XrdPfcInfo.hh:290
void ResetAllAccessStats()
Reset IO Stats.
Definition: XrdPfcInfo.cc:361
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
Definition: XrdPfcInfo.hh:376
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:447
bool IsCkSumCache() const
Definition: XrdPfcInfo.hh:289
void SetBitWritten(int i)
Mark block as written to disk.
Definition: XrdPfcInfo.hh:352
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:469
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:431
long long GetExpectedDataFileSize() const
Get expected data file size.
Definition: XrdPfcInfo.hh:420
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
Definition: XrdPfcInfo.hh:343
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:296
void SetCkSumState(CkSumCheck_e css)
Definition: XrdPfcInfo.hh:294
void ResetNoCkSumTime()
Definition: XrdPfcInfo.hh:302
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:163
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:440
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:437
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
void IoAttach()
Definition: XrdPfcStats.hh:85
void AddReadStats(const Stats &s)
Definition: XrdPfcStats.hh:67
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesBypassed
number of bytes served directly through XrdCl
Definition: XrdPfcStats.hh:41
void AddUp(const Stats &s)
Definition: XrdPfcStats.hh:114
void AddWriteStats(long long bytes_written, int n_cks_errs)
Definition: XrdPfcStats.hh:79
void AddBytesHit(long long bh)
Definition: XrdPfcStats.hh:74
long long BytesRead() const
Definition: XrdPfcStats.hh:97
long long m_BytesHit
number of bytes served from disk
Definition: XrdPfcStats.hh:39
void IoDetach(int duration)
Definition: XrdPfcStats.hh:90
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:107
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:172
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:173
static const int maxRVdsz
Definition: XProtocol.hh:688
static const int maxRvecsz
Definition: XProtocol.hh:686
@ hex1
Definition: XrdSysTrace.hh:42
long long offset
Definition: XrdOucIOVec.hh:42
char * data
Definition: XrdOucIOVec.hh:45
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:97
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:115
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition: XrdPfc.hh:80
CkSumCheck_e get_cs_Chk() const
Definition: XrdPfc.hh:73
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:112
bool should_uvkeep_purge(time_t delta) const
Definition: XrdPfc.hh:82
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:88
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:107
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:89
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
unsigned short m_seq_id
Definition: XrdPfcFile.hh:59
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:87
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:72
bool is_complete() const
Definition: XrdPfcFile.hh:89
int return_value() const
Definition: XrdPfcFile.hh:90
long long m_bytes_read
Definition: XrdPfcFile.hh:74