XRootD
XrdPfcFile.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 
20 #include "XrdPfcFile.hh"
21 #include "XrdPfcIO.hh"
22 #include "XrdPfcTrace.hh"
23 #include <cstdio>
24 #include <sstream>
25 #include <fcntl.h>
26 #include <assert.h>
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClFile.hh"
30 #include "XrdSys/XrdSysPthread.hh"
31 #include "XrdSys/XrdSysTimer.hh"
32 #include "XrdOss/XrdOss.hh"
33 #include "XrdOuc/XrdOucEnv.hh"
35 #include "XrdPfc.hh"
36 
37 
38 using namespace XrdPfc;
39 
40 namespace
41 {
42 
43 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
44 
45 Cache* cache() { return &Cache::GetInstance(); }
46 
47 }
48 
49 const char *File::m_traceID = "File";
50 
51 //------------------------------------------------------------------------------
52 
53 File::File(const std::string& path, long long iOffset, long long iFileSize) :
54  m_ref_cnt(0),
55  m_data_file(0),
56  m_info_file(0),
57  m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
58  m_filename(path),
59  m_offset(iOffset),
60  m_file_size(iFileSize),
61  m_current_io(m_io_set.end()),
62  m_ios_in_detach(0),
63  m_non_flushed_cnt(0),
64  m_in_sync(false),
65  m_detach_time_logged(false),
66  m_in_shutdown(false),
67  m_state_cond(0),
68  m_block_size(0),
69  m_num_blocks(0),
70  m_prefetch_state(kOff),
71  m_prefetch_read_cnt(0),
72  m_prefetch_hit_cnt(0),
73  m_prefetch_score(0)
74 {}
75 
76 File::~File()
77 {
78  if (m_info_file)
79  {
80  TRACEF(Debug, "~File() close info ");
81  m_info_file->Close();
82  delete m_info_file;
83  m_info_file = NULL;
84  }
85 
86  if (m_data_file)
87  {
88  TRACEF(Debug, "~File() close output ");
89  m_data_file->Close();
90  delete m_data_file;
91  m_data_file = NULL;
92  }
93 
94  TRACEF(Debug, "~File() ended, prefetch score = " << m_prefetch_score);
95 }
96 
97 //------------------------------------------------------------------------------
98 
99 File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
100 {
101  File *file = new File(path, offset, fileSize);
102  if ( ! file->Open())
103  {
104  delete file;
105  file = 0;
106  }
107  return file;
108 }
109 
110 //------------------------------------------------------------------------------
111 
113 {
114  // Called from Cache::Unlink() when the file is currently open.
115  // Cache::Unlink is also called on FSync error and when wrong number of bytes
116  // is received from a remote read.
117  //
118  // From this point onward the file will not be written to, cinfo file will
119  // not be updated, and all new read requests will return -ENOENT.
120  //
121  // File's entry in the Cache's active map is set to nullptr and will be
122  // removed from there shortly, in any case, well before this File object
123  // shuts down. So we do not communicate to Cache about our destruction when
124  // it happens.
125 
126  {
127  XrdSysCondVarHelper _lck(m_state_cond);
128 
129  m_in_shutdown = true;
130 
131  if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
132  {
133  m_prefetch_state = kStopped;
134  cache()->DeRegisterPrefetchFile(this);
135  }
136  }
137 
138 }
139 
140 //------------------------------------------------------------------------------
141 
143 {
144  // Not locked, only used from Cache / Purge thread.
145 
146  Stats delta = m_last_stats;
147 
148  m_last_stats = m_stats.Clone();
149 
150  delta.DeltaToReference(m_last_stats);
151 
152  return delta;
153 }
154 
155 //------------------------------------------------------------------------------
156 
158 {
159  TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
160 
161  XrdSysCondVarHelper _lck(m_state_cond);
162  dec_ref_count(b);
163 }
164 
165 void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
166 {
167  TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
168 
169  XrdSysCondVarHelper _lck(m_state_cond);
170 
171  for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
172  {
173  dec_ref_count(*i);
174  }
175 }
176 
177 //------------------------------------------------------------------------------
178 
180 {
181  std::string loc(io->GetLocation());
182  XrdSysCondVarHelper _lck(m_state_cond);
183  insert_remote_location(loc);
184 }
185 
186 //------------------------------------------------------------------------------
187 
189 {
190  // Returns true if delay is needed.
191 
192  TRACEF(Debug, "ioActive start for io " << io);
193 
194  std::string loc(io->GetLocation());
195 
196  {
197  XrdSysCondVarHelper _lck(m_state_cond);
198 
199  IoSet_i mi = m_io_set.find(io);
200 
201  if (mi != m_io_set.end())
202  {
203  unsigned int n_active_reads = io->m_active_read_reqs;
204 
205  TRACE(Info, "ioActive for io " << io <<
206  ", active_reads " << n_active_reads <<
207  ", active_prefetches " << io->m_active_prefetches <<
208  ", allow_prefetching " << io->m_allow_prefetching <<
209  ", ios_in_detach " << m_ios_in_detach);
210  TRACEF(Info,
211  "\tio_map.size() " << m_io_set.size() <<
212  ", block_map.size() " << m_block_map.size() << ", file");
213 
214  insert_remote_location(loc);
215 
216  io->m_allow_prefetching = false;
217  io->m_in_detach = true;
218 
219  // Check if any IO is still available for prfetching. If not, stop it.
220  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
221  {
222  if ( ! select_current_io_or_disable_prefetching(false) )
223  {
224  TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
225  }
226  }
227 
228  // On last IO, consider write queue blocks. Note, this also contains
229  // blocks being prefetched.
230 
231  bool io_active_result;
232 
233  if (n_active_reads > 0)
234  {
235  io_active_result = true;
236  }
237  else if (m_io_set.size() - m_ios_in_detach == 1)
238  {
239  io_active_result = ! m_block_map.empty();
240  }
241  else
242  {
243  io_active_result = io->m_active_prefetches > 0;
244  }
245 
246  if ( ! io_active_result)
247  {
248  ++m_ios_in_detach;
249  }
250 
251  TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
252 
253  return io_active_result;
254  }
255  else
256  {
257  TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
258  return false;
259  }
260  }
261 }
262 
263 //------------------------------------------------------------------------------
264 
266 {
267  XrdSysCondVarHelper _lck(m_state_cond);
268  m_detach_time_logged = false;
269 }
270 
272 {
273  // Returns true if sync is required.
274  // This method is called after corresponding IO is detached from PosixCache.
275 
276  XrdSysCondVarHelper _lck(m_state_cond);
277  if ( ! m_in_shutdown)
278  {
279  if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
280  {
281  Stats loc_stats = m_stats.Clone();
282  m_cfi.WriteIOStatDetach(loc_stats);
283  m_detach_time_logged = true;
284  m_in_sync = true;
285  TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
286  return true;
287  }
288  }
289  TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
290  return false;
291 }
292 
293 //------------------------------------------------------------------------------
294 
295 void File::AddIO(IO *io)
296 {
297  // Called from Cache::GetFile() when a new IO asks for the file.
298 
299  TRACEF(Debug, "AddIO() io = " << (void*)io);
300 
301  time_t now = time(0);
302  std::string loc(io->GetLocation());
303 
304  m_state_cond.Lock();
305 
306  IoSet_i mi = m_io_set.find(io);
307 
308  if (mi == m_io_set.end())
309  {
310  m_io_set.insert(io);
311  io->m_attach_time = now;
312  m_stats.IoAttach();
313 
314  insert_remote_location(loc);
315 
316  if (m_prefetch_state == kStopped)
317  {
318  m_prefetch_state = kOn;
319  cache()->RegisterPrefetchFile(this);
320  }
321  }
322  else
323  {
324  TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
325  }
326 
327  m_state_cond.UnLock();
328 }
329 
330 //------------------------------------------------------------------------------
331 
333 {
334  // Called from Cache::ReleaseFile.
335 
336  TRACEF(Debug, "RemoveIO() io = " << (void*)io);
337 
338  time_t now = time(0);
339 
340  m_state_cond.Lock();
341 
342  IoSet_i mi = m_io_set.find(io);
343 
344  if (mi != m_io_set.end())
345  {
346  if (mi == m_current_io)
347  {
348  ++m_current_io;
349  }
350 
351  m_stats.IoDetach(now - io->m_attach_time);
352  m_io_set.erase(mi);
353  --m_ios_in_detach;
354 
355  if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
356  {
357  TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
358  m_prefetch_state = kStopped;
359  cache()->DeRegisterPrefetchFile(this);
360  }
361  }
362  else
363  {
364  TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
365  }
366 
367  m_state_cond.UnLock();
368 }
369 
370 //------------------------------------------------------------------------------
371 
372 bool File::Open()
373 {
374  // Sets errno accordingly.
375 
376  static const char *tpfx = "Open() ";
377 
378  TRACEF(Dump, tpfx << "open file for disk cache");
379 
381 
382  XrdOss &myOss = * Cache::GetInstance().GetOss();
383  const char *myUser = conf.m_username.c_str();
384  XrdOucEnv myEnv;
385  struct stat data_stat, info_stat;
386 
387  std::string ifn = m_filename + Info::s_infoExtension;
388 
389  bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
390  bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
391 
392  // Create the data file itself.
393  char size_str[32]; sprintf(size_str, "%lld", m_file_size);
394  myEnv.Put("oss.asize", size_str);
395  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
396 
397  int res;
398 
399  if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
400  {
401  TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
402  errno = -res;
403  return false;
404  }
405 
406  m_data_file = myOss.newFile(myUser);
407  if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
408  {
409  TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
410  errno = -res;
411  delete m_data_file; m_data_file = 0;
412  return false;
413  }
414 
415  myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
416  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
417  if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
418  {
419  TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
420  errno = -res;
421  m_data_file->Close(); delete m_data_file; m_data_file = 0;
422  return false;
423  }
424 
425  m_info_file = myOss.newFile(myUser);
426  if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
427  {
428  TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
429  errno = -res;
430  delete m_info_file; m_info_file = 0;
431  m_data_file->Close(); delete m_data_file; m_data_file = 0;
432  return false;
433  }
434 
435  bool initialize_info_file = true;
436 
437  if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
438  {
439  TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
440  ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
441  ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
442 
443  // Check if data file exists and is of reasonable size.
444  if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
445  {
446  initialize_info_file = false;
447  } else {
448  TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
449  m_cfi.ResetAllAccessStats();
450  m_data_file->Ftruncate(0);
451  }
452  }
453 
454  if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
455  {
456  if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
457  conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
458  {
459  TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
460  initialize_info_file = true;
461  m_cfi.ResetAllAccessStats();
462  m_data_file->Ftruncate(0);
463  } else {
464  // TODO: If the file is complete, we don't need to reset net cksums.
465  m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
466  }
467  }
468 
469  if (initialize_info_file)
470  {
471  m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
472  m_cfi.SetCkSumState(conf.get_cs_Chk());
473  m_cfi.ResetNoCkSumTime();
474  m_cfi.Write(m_info_file, ifn.c_str());
475  m_info_file->Fsync();
476  TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
477  }
478 
479  m_cfi.WriteIOStatAttach();
480  m_state_cond.Lock();
481  m_block_size = m_cfi.GetBufferSize();
482  m_num_blocks = m_cfi.GetNBlocks();
483  m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
484  m_state_cond.UnLock();
485 
486  return true;
487 }
488 
489 
490 //==============================================================================
491 // Read and helpers
492 //==============================================================================
493 
494 bool File::overlap(int blk, // block to query
495  long long blk_size, //
496  long long req_off, // offset of user request
497  int req_size, // size of user request
498  // output:
499  long long &off, // offset in user buffer
500  long long &blk_off, // offset in block
501  int &size) // size to copy
502 {
503  const long long beg = blk * blk_size;
504  const long long end = beg + blk_size;
505  const long long req_end = req_off + req_size;
506 
507  if (req_off < end && req_end > beg)
508  {
509  const long long ovlp_beg = std::max(beg, req_off);
510  const long long ovlp_end = std::min(end, req_end);
511 
512  off = ovlp_beg - req_off;
513  blk_off = ovlp_beg - beg;
514  size = (int) (ovlp_end - ovlp_beg);
515 
516  assert(size <= blk_size);
517  return true;
518  }
519  else
520  {
521  return false;
522  }
523 }
524 
525 //------------------------------------------------------------------------------
526 
527 Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
528 {
529  // Must be called w/ state_cond locked.
530  // Checks on size etc should be done before.
531  //
532  // Reference count is 0 so increase it in calling function if you want to
533  // catch the block while still in memory.
534 
535  const long long off = i * m_block_size;
536  const int last_block = m_num_blocks - 1;
537  const bool cs_net = cache()->RefConfiguration().is_cschk_net();
538 
539  int blk_size, req_size;
540  if (i == last_block) {
541  blk_size = req_size = m_file_size - off;
542  if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
543  } else {
544  blk_size = req_size = m_block_size;
545  }
546 
547  Block *b = 0;
548  char *buf = cache()->RequestRAM(req_size);
549 
550  if (buf)
551  {
552  b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
553 
554  if (b)
555  {
556  m_block_map[i] = b;
557 
558  // Actual Read request is issued in ProcessBlockRequests().
559 
560  if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
561  {
562  m_prefetch_state = kHold;
563  cache()->DeRegisterPrefetchFile(this);
564  }
565  }
566  else
567  {
568  TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
569  }
570  }
571 
572  return b;
573 }
574 
575 void File::ProcessBlockRequest(Block *b)
576 {
577  // This *must not* be called with block_map locked.
578 
580 
581  if (XRD_TRACE What >= TRACE_Dump) {
582  char buf[256];
583  snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
584  b->get_offset()/m_block_size, b, b->m_prefetch, b->get_offset(), b->get_req_size(), b->get_buff(), brh);
585  TRACEF(Dump, "ProcessBlockRequest() " << buf);
586  }
587 
588  if (b->req_cksum_net())
589  {
590  b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
591  b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
592  } else {
593  b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
594  }
595 }
596 
597 void File::ProcessBlockRequests(BlockList_t& blks)
598 {
599  // This *must not* be called with block_map locked.
600 
601  for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
602  {
603  ProcessBlockRequest(*bi);
604  }
605 }
606 
607 //------------------------------------------------------------------------------
608 
609 void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
610 {
611  int n_chunks = ioVec.size();
612  int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
613 
614  TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
615  ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
616 
617  DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
618 
619  int pos = 0;
620  while (n_chunks > XrdProto::maxRvecsz) {
621  io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
622  pos += XrdProto::maxRvecsz;
623  n_chunks -= XrdProto::maxRvecsz;
624  }
625  io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
626 }
627 
628 //------------------------------------------------------------------------------
629 
630 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
631 {
632  TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
633 
634  long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
635 
636  if (rs < 0)
637  {
638  TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
639  return rs;
640  }
641 
642  if (rs != expected_size)
643  {
644  TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
645  return -EIO;
646  }
647 
648  return (int) rs;
649 }
650 
651 //------------------------------------------------------------------------------
652 
653 int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
654 {
655  // rrc_func is ONLY called from async processing.
656  // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
657  // This streamlines implementation of synchronous IO::Read().
658 
659  TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
660 
661  m_state_cond.Lock();
662 
663  if (m_in_shutdown || io->m_in_detach)
664  {
665  m_state_cond.UnLock();
666  return m_in_shutdown ? -ENOENT : -EBADF;
667  }
668 
669  // Shortcut -- file is fully downloaded.
670 
671  if (m_cfi.IsComplete())
672  {
673  m_state_cond.UnLock();
674  int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
675  if (ret > 0) m_stats.AddBytesHit(ret);
676  return ret;
677  }
678 
679  XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
680 
681  return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
682 }
683 
684 //------------------------------------------------------------------------------
685 
686 int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
687 {
688  TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
689 
690  m_state_cond.Lock();
691 
692  if (m_in_shutdown || io->m_in_detach)
693  {
694  m_state_cond.UnLock();
695  return m_in_shutdown ? -ENOENT : -EBADF;
696  }
697 
698  // Shortcut -- file is fully downloaded.
699 
700  if (m_cfi.IsComplete())
701  {
702  m_state_cond.UnLock();
703  int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
704  if (ret > 0) m_stats.AddBytesHit(ret);
705  return ret;
706  }
707 
708  return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
709 }
710 
711 //------------------------------------------------------------------------------
712 
713 int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
714  ReadReqRH *rh, const char *tpfx)
715 {
716  // Non-trivial processing for Read and ReadV.
717  // Entered under lock.
718  //
719  // loop over reqired blocks:
720  // - if on disk, ok;
721  // - if in ram or incoming, inc ref-count
722  // - otherwise request and inc ref count (unless RAM full => request direct)
723  // unlock
724 
725  int prefetch_cnt = 0;
726 
727  ReadRequest *read_req = nullptr;
728  BlockList_t blks_to_request; // blocks we are issuing a new remote request for
729 
730  std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
731 
732  std::vector<XrdOucIOVec> iovec_disk;
733  std::vector<XrdOucIOVec> iovec_direct;
734  int iovec_disk_total = 0;
735  int iovec_direct_total = 0;
736 
737  for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
738  {
739  const XrdOucIOVec &iov = readV[iov_idx];
740  long long iUserOff = iov.offset;
741  int iUserSize = iov.size;
742  char *iUserBuff = iov.data;
743 
744  const int idx_first = iUserOff / m_block_size;
745  const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
746 
747  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
748 
749  enum LastBlock_e { LB_other, LB_disk, LB_direct };
750 
751  LastBlock_e lbe = LB_other;
752 
753  for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
754  {
755  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
756  BlockMap_i bi = m_block_map.find(block_idx);
757 
758  // overlap and read
759  long long off; // offset in user buffer
760  long long blk_off; // offset in block
761  int size; // size to copy
762 
763  overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
764 
765  // In RAM or incoming?
766  if (bi != m_block_map.end())
767  {
768  inc_ref_count(bi->second);
769  TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
770 
771  if (bi->second->is_finished())
772  {
773  // note, blocks with error should not be here !!!
774  // they should be either removed or reissued in ProcessBlockResponse()
775  assert(bi->second->is_ok());
776 
777  blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
778 
779  if (bi->second->m_prefetch)
780  ++prefetch_cnt;
781  }
782  else
783  {
784  if ( ! read_req)
785  read_req = new ReadRequest(io, rh);
786 
787  // We have a lock on state_cond --> as we register the request before releasing the lock,
788  // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
789 
790  bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
791  ++read_req->m_n_chunk_reqs;
792  }
793 
794  lbe = LB_other;
795  }
796  // On disk?
797  else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
798  {
799  TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
800 
801  if (lbe == LB_disk)
802  iovec_disk.back().size += size;
803  else
804  iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
805  iovec_disk_total += size;
806 
807  if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
808  ++prefetch_cnt;
809 
810  lbe = LB_disk;
811  }
812  // Neither ... then we have to go get it ...
813  else
814  {
815  if ( ! read_req)
816  read_req = new ReadRequest(io, rh);
817 
818  // Is there room for one more RAM Block?
819  Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
820  if (b)
821  {
822  TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
823  inc_ref_count(b);
824  blks_to_request.push_back(b);
825 
826  b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
827  ++read_req->m_n_chunk_reqs;
828 
829  lbe = LB_other;
830  }
831  else // Nope ... read this directly without caching.
832  {
833  TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
834 
835  iovec_direct_total += size;
836  read_req->m_direct_done = false;
837 
838  // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
839  // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
840  // is determined in the RequestBlocksDirect().
841  if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
842  iovec_direct.back().size += size;
843  } else {
844  long long in_offset = block_idx * m_block_size + blk_off;
845  char *out_pos = iUserBuff + off;
846  while (size > XrdProto::maxRVdsz) {
847  iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
848  in_offset += XrdProto::maxRVdsz;
849  out_pos += XrdProto::maxRVdsz;
850  size -= XrdProto::maxRVdsz;
851  }
852  iovec_direct.push_back( { in_offset, size, 0, out_pos } );
853  }
854 
855  lbe = LB_direct;
856  }
857  }
858  } // end for over blocks in an IOVec
859  } // end for over readV IOVec
860 
861  inc_prefetch_hit_cnt(prefetch_cnt);
862 
863  m_state_cond.UnLock();
864 
865  // First, send out remote requests for new blocks.
866  if ( ! blks_to_request.empty())
867  {
868  ProcessBlockRequests(blks_to_request);
869  blks_to_request.clear();
870  }
871 
872  // Second, send out remote direct read requests.
873  if ( ! iovec_direct.empty())
874  {
875  RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
876 
877  TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
878  }
879 
880  // Begin synchronous part where we process data that is already in RAM or on disk.
881 
882  long long bytes_read = 0;
883  int error_cond = 0; // to be set to -errno
884 
885  // Third, process blocks that are available in RAM.
886  if ( ! blks_ready.empty())
887  {
888  for (auto &bvi : blks_ready)
889  {
890  for (auto &cr : bvi.second)
891  {
892  TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
893  memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
894  bytes_read += cr.m_size;
895  }
896  }
897  }
898 
899  // Fourth, read blocks from disk.
900  if ( ! iovec_disk.empty())
901  {
902  int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
903  TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
904  if (rc >= 0)
905  {
906  bytes_read += rc;
907  }
908  else
909  {
910  error_cond = rc;
911  TRACEF(Error, tpfx << "failed read from disk");
912  }
913  }
914 
915  // End synchronous part -- update with sync stats and determine actual state of this read.
916  // Note: remote reads might have already finished during disk-read!
917 
918  m_state_cond.Lock();
919 
920  for (auto &bvi : blks_ready)
921  dec_ref_count(bvi.first, (int) bvi.second.size());
922 
923  if (read_req)
924  {
925  read_req->m_bytes_read += bytes_read;
926  read_req->update_error_cond(error_cond);
927  read_req->m_stats.m_BytesHit += bytes_read;
928  read_req->m_sync_done = true;
929 
930  if (read_req->is_complete())
931  {
932  // Almost like FinalizeReadRequest(read_req) -- but no callout!
933  m_state_cond.UnLock();
934 
935  m_stats.AddReadStats(read_req->m_stats);
936 
937  int ret = read_req->return_value();
938  delete read_req;
939  return ret;
940  }
941  else
942  {
943  m_state_cond.UnLock();
944  return -EWOULDBLOCK;
945  }
946  }
947  else
948  {
949  m_stats.m_BytesHit += bytes_read;
950  m_state_cond.UnLock();
951 
952  // !!! No callout.
953 
954  return error_cond ? error_cond : bytes_read;
955  }
956 }
957 
958 
959 //==============================================================================
960 // WriteBlock and Sync
961 //==============================================================================
962 
964 {
965  // write block buffer into disk file
966  long long offset = b->m_offset - m_offset;
967  long long size = b->get_size();
968  ssize_t retval;
969 
970  if (m_cfi.IsCkSumCache())
971  if (b->has_cksums())
972  retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
973  else
974  retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
975  else
976  retval = m_data_file->Write(b->get_buff(), offset, size);
977 
978  if (retval < size)
979  {
980  if (retval < 0)
981  {
982  GetLog()->Emsg("WriteToDisk()", -retval, "write block to disk", GetLocalPath().c_str());
983  }
984  else
985  {
986  TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
987  }
988 
989  XrdSysCondVarHelper _lck(m_state_cond);
990 
991  dec_ref_count(b);
992 
993  return;
994  }
995 
996  const int blk_idx = (b->m_offset - m_offset) / m_block_size;
997 
998  // Set written bit.
999  TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1000 
1001  bool schedule_sync = false;
1002  {
1003  XrdSysCondVarHelper _lck(m_state_cond);
1004 
1005  m_cfi.SetBitWritten(blk_idx);
1006 
1007  if (b->m_prefetch)
1008  {
1009  m_cfi.SetBitPrefetch(blk_idx);
1010  }
1011  if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1012  {
1013  m_cfi.ResetCkSumNet();
1014  }
1015 
1016  dec_ref_count(b);
1017 
1018  // Set synced bit or stash block index if in actual sync.
1019  // Synced state is only written out to cinfo file when data file is synced.
1020  if (m_in_sync)
1021  {
1022  m_writes_during_sync.push_back(blk_idx);
1023  }
1024  else
1025  {
1026  m_cfi.SetBitSynced(blk_idx);
1027  ++m_non_flushed_cnt;
1028  if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1029  ! m_in_shutdown)
1030  {
1031  schedule_sync = true;
1032  m_in_sync = true;
1033  m_non_flushed_cnt = 0;
1034  }
1035  }
1036  }
1037 
1038  if (schedule_sync)
1039  {
1040  cache()->ScheduleFileSync(this);
1041  }
1042 }
1043 
1044 //------------------------------------------------------------------------------
1045 
1047 {
1048  TRACEF(Dump, "Sync()");
1049 
1050  int ret = m_data_file->Fsync();
1051  bool errorp = false;
1052  if (ret == XrdOssOK)
1053  {
1054  Stats loc_stats = m_stats.Clone();
1055  m_cfi.WriteIOStat(loc_stats);
1056  m_cfi.Write(m_info_file, m_filename.c_str());
1057  int cret = m_info_file->Fsync();
1058  if (cret != XrdOssOK)
1059  {
1060  TRACEF(Error, "Sync cinfo file sync error " << cret);
1061  errorp = true;
1062  }
1063  }
1064  else
1065  {
1066  TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1067  errorp = true;
1068  }
1069 
1070  if (errorp)
1071  {
1072  TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1073 
1074  // Unlink will also call this->initiate_emergency_shutdown()
1075  Cache::GetInstance().UnlinkFile(m_filename, false);
1076 
1077  XrdSysCondVarHelper _lck(&m_state_cond);
1078 
1079  m_writes_during_sync.clear();
1080  m_in_sync = false;
1081 
1082  return;
1083  }
1084 
1085  int written_while_in_sync;
1086  bool resync = false;
1087  {
1088  XrdSysCondVarHelper _lck(&m_state_cond);
1089  for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1090  {
1091  m_cfi.SetBitSynced(*i);
1092  }
1093  written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1094  m_writes_during_sync.clear();
1095 
1096  // If there were writes during sync and the file is now complete,
1097  // let us call Sync again without resetting the m_in_sync flag.
1098  if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1099  resync = true;
1100  else
1101  m_in_sync = false;
1102  }
1103  TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1104 
1105  if (resync)
1106  Sync();
1107 }
1108 
1109 
1110 //==============================================================================
1111 // Block processing
1112 //==============================================================================
1113 
1114 void File::free_block(Block* b)
1115 {
1116  // Method always called under lock.
1117  int i = b->m_offset / m_block_size;
1118  TRACEF(Dump, "free_block block " << b << " idx = " << i);
1119  size_t ret = m_block_map.erase(i);
1120  if (ret != 1)
1121  {
1122  // assert might be a better option than a warning
1123  TRACEF(Error, "free_block did not erase " << i << " from map");
1124  }
1125  else
1126  {
1127  cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1128  delete b;
1129  }
1130 
1131  if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1132  {
1133  m_prefetch_state = kOn;
1134  cache()->RegisterPrefetchFile(this);
1135  }
1136 }
1137 
1138 //------------------------------------------------------------------------------
1139 
1140 bool File::select_current_io_or_disable_prefetching(bool skip_current)
1141 {
1142  // Method always called under lock. It also expects prefetch to be active.
1143 
1144  int io_size = (int) m_io_set.size();
1145  bool io_ok = false;
1146 
1147  if (io_size == 1)
1148  {
1149  io_ok = (*m_io_set.begin())->m_allow_prefetching;
1150  if (io_ok)
1151  {
1152  m_current_io = m_io_set.begin();
1153  }
1154  }
1155  else if (io_size > 1)
1156  {
1157  IoSet_i mi = m_current_io;
1158  if (skip_current && mi != m_io_set.end()) ++mi;
1159 
1160  for (int i = 0; i < io_size; ++i)
1161  {
1162  if (mi == m_io_set.end()) mi = m_io_set.begin();
1163 
1164  if ((*mi)->m_allow_prefetching)
1165  {
1166  m_current_io = mi;
1167  io_ok = true;
1168  break;
1169  }
1170  ++mi;
1171  }
1172  }
1173 
1174  if ( ! io_ok)
1175  {
1176  m_current_io = m_io_set.end();
1177  m_prefetch_state = kStopped;
1178  cache()->DeRegisterPrefetchFile(this);
1179  }
1180 
1181  return io_ok;
1182 }
1183 
1184 //------------------------------------------------------------------------------
1185 
1186 void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1187 {
1188  // Called from DirectResponseHandler.
1189  // NOT under lock.
1190 
1191  if (error_cond)
1192  TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1193 
1194  m_state_cond.Lock();
1195 
1196  if (error_cond)
1197  rreq->update_error_cond(error_cond);
1198  else {
1199  rreq->m_stats.m_BytesBypassed += bytes_read;
1200  rreq->m_bytes_read += bytes_read;
1201  }
1202 
1203  rreq->m_direct_done = true;
1204 
1205  bool rreq_complete = rreq->is_complete();
1206 
1207  m_state_cond.UnLock();
1208 
1209  if (rreq_complete)
1210  FinalizeReadRequest(rreq);
1211 }
1212 
1213 void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1214 {
1215  // Called from ProcessBlockResponse().
1216  // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1217  // Does not manage m_read_req.
1218  // Will not complete the request.
1219 
1220  TRACEF(Error, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1221  " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1222 
1223  rreq->update_error_cond(b->get_error());
1224  --rreq->m_n_chunk_reqs;
1225 
1226  dec_ref_count(b);
1227 }
1228 
1229 void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1230 {
1231  // Called from ProcessBlockResponse().
1232  // NOT under lock as it does memcopy ofor exisf block data.
1233  // Acquires lock for block, m_read_req and rreq state update.
1234 
1235  ReadRequest *rreq = creq.m_read_req;
1236 
1237  TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1238  memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1239 
1240  m_state_cond.Lock();
1241 
1242  rreq->m_bytes_read += creq.m_size;
1243 
1244  if (b->get_req_id() == (void*) rreq)
1245  rreq->m_stats.m_BytesMissed += creq.m_size;
1246  else
1247  rreq->m_stats.m_BytesHit += creq.m_size;
1248 
1249  --rreq->m_n_chunk_reqs;
1250 
1251  if (b->m_prefetch)
1252  inc_prefetch_hit_cnt(1);
1253 
1254  dec_ref_count(b);
1255 
1256  bool rreq_complete = rreq->is_complete();
1257 
1258  m_state_cond.UnLock();
1259 
1260  if (rreq_complete)
1261  FinalizeReadRequest(rreq);
1262 }
1263 
1264 void File::FinalizeReadRequest(ReadRequest *rreq)
1265 {
1266  // called from ProcessBlockResponse()
1267  // NOT under lock -- does callout
1268 
1269  m_stats.AddReadStats(rreq->m_stats);
1270 
1271  rreq->m_rh->Done(rreq->return_value());
1272  delete rreq;
1273 }
1274 
1275 void File::ProcessBlockResponse(Block *b, int res)
1276 {
1277  static const char* tpfx = "ProcessBlockResponse ";
1278 
1279  TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1280 
1281  if (res >= 0 && res != b->get_size())
1282  {
1283  // Incorrect number of bytes received, apparently size of the file on the remote
1284  // is different than what the cache expects it to be.
1285  TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1286  Cache::GetInstance().UnlinkFile(m_filename, false);
1287  }
1288 
1289  m_state_cond.Lock();
1290 
1291  // Deregister block from IO's prefetch count, if needed.
1292  if (b->m_prefetch)
1293  {
1294  IO *io = b->get_io();
1295  IoSet_i mi = m_io_set.find(io);
1296  if (mi != m_io_set.end())
1297  {
1298  --io->m_active_prefetches;
1299 
1300  // If failed and IO is still prefetching -- disable prefetching on this IO.
1301  if (res < 0 && io->m_allow_prefetching)
1302  {
1303  TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1304  io->m_allow_prefetching = false;
1305 
1306  // Check if any IO is still available for prfetching. If not, stop it.
1307  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1308  {
1309  if ( ! select_current_io_or_disable_prefetching(false) )
1310  {
1311  TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1312  }
1313  }
1314  }
1315 
1316  // If failed with no subscribers -- delete the block and exit.
1317  if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1318  {
1319  free_block(b);
1320  m_state_cond.UnLock();
1321  return;
1322  }
1323  }
1324  else
1325  {
1326  TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1327  }
1328  }
1329 
1330  if (res == b->get_size())
1331  {
1332  b->set_downloaded();
1333  TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1334  if ( ! m_in_shutdown)
1335  {
1336  // Increase ref-count for the writer.
1337  inc_ref_count(b);
1338  m_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1339  cache()->AddWriteTask(b, true);
1340  }
1341 
1342  // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1343  vChunkRequest_t creqs_to_notify;
1344  creqs_to_notify.swap( b->m_chunk_reqs );
1345 
1346  m_state_cond.UnLock();
1347 
1348  for (auto &creq : creqs_to_notify)
1349  {
1350  ProcessBlockSuccess(b, creq);
1351  }
1352  }
1353  else
1354  {
1355  if (res < 0) {
1356  TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << " error=" << res);
1357  } else {
1358  TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << " incomplete, got " << res << " expected " << b->get_size());
1359 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1360  res = -EIO;
1361 #else
1362  res = -EREMOTEIO;
1363 #endif
1364  }
1365  b->set_error(res);
1366 
1367  // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1368  // Collect others with a different IO, the first of them will be used to reissue the request.
1369  // This is then done outside of lock.
1370  std::list<ReadRequest*> rreqs_to_complete;
1371  vChunkRequest_t creqs_to_keep;
1372 
1373  for(ChunkRequest &creq : b->m_chunk_reqs)
1374  {
1375  ReadRequest *rreq = creq.m_read_req;
1376 
1377  if (rreq->m_io == b->get_io())
1378  {
1379  ProcessBlockError(b, rreq);
1380  if (rreq->is_complete())
1381  {
1382  rreqs_to_complete.push_back(rreq);
1383  }
1384  }
1385  else
1386  {
1387  creqs_to_keep.push_back(creq);
1388  }
1389  }
1390 
1391  bool reissue = false;
1392  if ( ! creqs_to_keep.empty())
1393  {
1394  ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1395 
1396  TRACEF(Info, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1397  b->get_io() << " - reissuing request with my io " << rreq->m_io);
1398 
1399  b->reset_error_and_set_io(rreq->m_io, rreq);
1400  b->m_chunk_reqs.swap( creqs_to_keep );
1401  reissue = true;
1402  }
1403 
1404  m_state_cond.UnLock();
1405 
1406  for (auto rreq : rreqs_to_complete)
1407  FinalizeReadRequest(rreq);
1408 
1409  if (reissue)
1410  ProcessBlockRequest(b);
1411  }
1412 }
1413 
1414 //------------------------------------------------------------------------------
1415 
1416 const char* File::lPath() const
1417 {
1418  return m_filename.c_str();
1419 }
1420 
1421 //------------------------------------------------------------------------------
1422 
1423 int File::offsetIdx(int iIdx) const
1424 {
1425  return iIdx - m_offset/m_block_size;
1426 }
1427 
1428 
1429 //------------------------------------------------------------------------------
1430 
1432 {
1433  // Check that block is not on disk and not in RAM.
1434  // TODO: Could prefetch several blocks at once!
1435  // blks_max could be an argument
1436 
1437  BlockList_t blks;
1438 
1439  TRACEF(DumpXL, "Prefetch() entering.");
1440  {
1441  XrdSysCondVarHelper _lck(m_state_cond);
1442 
1443  if (m_prefetch_state != kOn)
1444  {
1445  return;
1446  }
1447 
1448  if ( ! select_current_io_or_disable_prefetching(true) )
1449  {
1450  TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1451  return;
1452  }
1453 
1454  // Select block(s) to fetch.
1455  for (int f = 0; f < m_num_blocks; ++f)
1456  {
1457  if ( ! m_cfi.TestBitWritten(f))
1458  {
1459  int f_act = f + m_offset / m_block_size;
1460 
1461  BlockMap_i bi = m_block_map.find(f_act);
1462  if (bi == m_block_map.end())
1463  {
1464  Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1465  if (b)
1466  {
1467  TRACEF(Dump, "Prefetch take block " << f_act);
1468  blks.push_back(b);
1469  // Note: block ref_cnt not increased, it will be when placed into write queue.
1470 
1471  inc_prefetch_read_cnt(1);
1472  }
1473  else
1474  {
1475  // This shouldn't happen as prefetching stops when RAM is 70% full.
1476  TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1477  }
1478  break;
1479  }
1480  }
1481  }
1482 
1483  if (blks.empty())
1484  {
1485  TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1486  m_prefetch_state = kComplete;
1487  cache()->DeRegisterPrefetchFile(this);
1488  }
1489  else
1490  {
1491  (*m_current_io)->m_active_prefetches += (int) blks.size();
1492  }
1493  }
1494 
1495  if ( ! blks.empty())
1496  {
1497  ProcessBlockRequests(blks);
1498  }
1499 }
1500 
1501 
1502 //------------------------------------------------------------------------------
1503 
1505 {
1506  return m_prefetch_score;
1507 }
1508 
1510 {
1511  return Cache::GetInstance().GetLog();
1512 }
1513 
1515 {
1516  return Cache::GetInstance().GetTrace();
1517 }
1518 
1519 void File::insert_remote_location(const std::string &loc)
1520 {
1521  if ( ! loc.empty())
1522  {
1523  size_t p = loc.find_first_of('@');
1524  m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1525  }
1526 }
1527 
1528 std::string File::GetRemoteLocations() const
1529 {
1530  std::string s;
1531  if ( ! m_remote_locations.empty())
1532  {
1533  size_t sl = 0;
1534  int nl = 0;
1535  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1536  {
1537  sl += i->size();
1538  }
1539  s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1540  s = '[';
1541  int j = 1;
1542  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1543  {
1544  s += '"'; s += *i; s += '"';
1545  if (j < nl) s += ',';
1546  }
1547  s += ']';
1548  }
1549  else
1550  {
1551  s = "[]";
1552  }
1553  return s;
1554 }
1555 
1556 //==============================================================================
1557 //======================= RESPONSE HANDLERS ==============================
1558 //==============================================================================
1559 
1561 {
1562  m_block->m_file->ProcessBlockResponse(m_block, res);
1563  delete this;
1564 }
1565 
1566 //------------------------------------------------------------------------------
1567 
1569 {
1570  m_mutex.Lock();
1571 
1572  int n_left = --m_to_wait;
1573 
1574  if (res < 0) {
1575  if (m_errno == 0) m_errno = res; // store first reported error
1576  } else {
1577  m_bytes_read += res;
1578  }
1579 
1580  m_mutex.UnLock();
1581 
1582  if (n_left == 0)
1583  {
1584  m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1585  delete this;
1586  }
1587 }
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACEF(act, x)
Definition: XrdPfcTrace.hh:63
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:41
int stat(const char *path, struct stat *buf)
#define XRD_TRACE
Definition: XrdScheduler.cc:48
@ Warning
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:99
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual int Fsync()
Definition: XrdOss.hh:144
virtual int Ftruncate(unsigned long long flen)
Definition: XrdOss.hh:164
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition: XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
Definition: XrdOucCache.cc:39
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
Definition: XrdOucCache.cc:86
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
void Done(int result) override
Definition: XrdPfcFile.cc:1560
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:173
int get_size() const
Definition: XrdPfcFile.hh:146
int get_error() const
Definition: XrdPfcFile.hh:160
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:172
void * get_req_id() const
Definition: XrdPfcFile.hh:152
long long get_offset() const
Definition: XrdPfcFile.hh:148
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:135
void set_error(int err)
Definition: XrdPfcFile.hh:159
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:171
char * get_buff() const
Definition: XrdPfcFile.hh:145
IO * get_io() const
Definition: XrdPfcFile.hh:151
void set_downloaded()
Definition: XrdPfcFile.hh:158
bool req_cksum_net() const
Definition: XrdPfcFile.hh:169
bool has_cksums() const
Definition: XrdPfcFile.hh:170
long long m_offset
Definition: XrdPfcFile.hh:124
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:162
int get_req_size() const
Definition: XrdPfcFile.hh:147
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:267
XrdOss * GetOss() const
Definition: XrdPfc.hh:385
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:398
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:315
XrdSysError * GetLog()
Definition: XrdPfc.hh:397
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:160
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1133
void Done(int result) override
Definition: XrdPfcFile.cc:1568
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:271
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1416
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:686
XrdSysTrace * GetTrace()
Definition: XrdPfcFile.cc:1514
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:963
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:99
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1504
friend class BlockResponseHandler
Definition: XrdPfcFile.hh:214
XrdSysError * GetLog()
Definition: XrdPfcFile.cc:1509
void Prefetch()
Definition: XrdPfcFile.cc:1431
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1528
void AddIO(IO *io)
Definition: XrdPfcFile.cc:295
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:265
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:165
friend class DirectResponseHandler
Definition: XrdPfcFile.hh:215
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:112
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1046
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:653
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:179
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:332
std::string & GetLocalPath()
Definition: XrdPfcFile.hh:274
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:157
Stats DeltaStatsFromLastCall()
Definition: XrdPfcFile.cc:142
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:188
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:30
RAtomic_int m_active_read_reqs
number of active read requests
Definition: XrdPfcIO.hh:72
const char * GetLocation()
Definition: XrdPfcIO.hh:46
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
Definition: XrdPfcInfo.hh:369
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:313
void SetBitSynced(int i)
Mark block as synced to disk.
Definition: XrdPfcInfo.hh:391
time_t GetNoCkSumTimeForUVKeep() const
Definition: XrdPfcInfo.hh:305
CkSumCheck_e GetCkSumState() const
Definition: XrdPfcInfo.hh:290
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
Definition: XrdPfcInfo.cc:422
void ResetCkSumNet()
Definition: XrdPfcInfo.cc:215
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:268
void DowngradeCkSumState(CkSumCheck_e css_ref)
Definition: XrdPfcInfo.hh:299
bool IsCkSumNet() const
Definition: XrdPfcInfo.hh:294
void ResetAllAccessStats()
Reset IO Stats.
Definition: XrdPfcInfo.cc:361
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
Definition: XrdPfcInfo.hh:380
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:451
bool IsCkSumCache() const
Definition: XrdPfcInfo.hh:293
void SetBitWritten(int i)
Mark block as written to disk.
Definition: XrdPfcInfo.hh:356
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:473
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:431
long long GetExpectedDataFileSize() const
Get expected data file size.
Definition: XrdPfcInfo.hh:424
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
Definition: XrdPfcInfo.hh:347
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:296
void SetCkSumState(CkSumCheck_e css)
Definition: XrdPfcInfo.hh:298
void ResetNoCkSumTime()
Definition: XrdPfcInfo.hh:306
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:163
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:440
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:441
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:31
void IoAttach()
Definition: XrdPfcStats.hh:83
Stats Clone()
Definition: XrdPfcStats.hh:97
void AddReadStats(const Stats &s)
Definition: XrdPfcStats.hh:59
long long m_BytesBypassed
number of bytes served directly through XrdCl
Definition: XrdPfcStats.hh:37
void AddWriteStats(long long bytes_written, int n_cks_errs)
Definition: XrdPfcStats.hh:75
void AddBytesHit(long long bh)
Definition: XrdPfcStats.hh:68
long long m_BytesHit
number of bytes served from disk
Definition: XrdPfcStats.hh:35
void IoDetach(int duration)
Definition: XrdPfcStats.hh:90
void DeltaToReference(const Stats &ref)
Definition: XrdPfcStats.hh:106
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:111
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:176
XrdSysTrace * GetTrace()
Definition: XrdPfcPurge.cc:16
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:177
static const int maxRVdsz
Definition: XProtocol.hh:688
static const int maxRvecsz
Definition: XProtocol.hh:686
@ hex1
Definition: XrdSysTrace.hh:42
long long offset
Definition: XrdOucIOVec.hh:42
char * data
Definition: XrdOucIOVec.hh:45
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:101
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:56
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:109
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition: XrdPfc.hh:74
CkSumCheck_e get_cs_Chk() const
Definition: XrdPfc.hh:67
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:106
bool should_uvkeep_purge(time_t delta) const
Definition: XrdPfc.hh:76
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:82
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:83
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:81
unsigned short m_seq_id
Definition: XrdPfcFile.hh:64
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:91
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:77
bool is_complete() const
Definition: XrdPfcFile.hh:93
int return_value() const
Definition: XrdPfcFile.hh:94
long long m_bytes_read
Definition: XrdPfcFile.hh:79