XRootD
XrdPfcFile.hh
Go to the documentation of this file.
1 #ifndef __XRDPFC_FILE_HH__
2 #define __XRDPFC_FILE_HH__
3 //----------------------------------------------------------------------------------
4 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5 // Author: Alja Mrak-Tadel, Matevz Tadel
6 //----------------------------------------------------------------------------------
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //----------------------------------------------------------------------------------
20 
22 #include "XrdCl/XrdClDefaultEnv.hh"
23 
24 #include "XrdOuc/XrdOucCache.hh"
25 #include "XrdOuc/XrdOucIOVec.hh"
26 
27 #include "XrdPfcInfo.hh"
28 #include "XrdPfcStats.hh"
29 
30 #include <functional>
31 #include <map>
32 #include <set>
33 #include <string>
34 
35 class XrdJob;
36 class XrdOucIOVec;
37 
38 namespace XrdCl
39 {
40 class Log;
41 }
42 
43 namespace XrdPfc
44 {
45 class BlockResponseHandler;
46 class DirectResponseHandler;
47 class IO;
48 
49 struct ReadVBlockListRAM;
50 struct ReadVChunkListRAM;
51 struct ReadVBlockListDisk;
52 struct ReadVChunkListDisk;
53 }
54 
55 
56 namespace XrdPfc
57 {
58 class File;
59 
60 struct ReadReqRH : public XrdOucCacheIOCB
61 {
62  int m_expected_size = 0;
63  int m_n_chunks = 0; // Only set for ReadV().
64  unsigned short m_seq_id;
65  XrdOucCacheIOCB *m_iocb; // External callback passed into IO::Read().
66 
67  ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
68  m_seq_id(sid), m_iocb(iocb)
69  {}
70 };
71 
72 // -------------------------------------------------------------
73 
75 {
76  IO *m_io;
77  ReadReqRH *m_rh; // Internal callback created in IO::Read().
78 
79  long long m_bytes_read = 0;
80  int m_error_cond = 0; // to be set to -errno
81  int m_error_count = 0;
83 
84  int m_n_chunk_reqs = 0;
85  bool m_sync_done = false;
86  bool m_direct_done = true;
87 
88  ReadRequest(IO *io, ReadReqRH *rh) :
89  m_io(io), m_rh(rh)
90  {}
91 
93 
94  bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
95  int return_value() const { return m_error_cond ? m_error_cond : m_bytes_read; }
96 };
97 
98 // -------------------------------------------------------------
99 
101 {
103  char *m_buf; // Where to place the data chunk.
104  long long m_off; // Offset *within* the corresponding block.
105  int m_size; // Size of the data chunk.
106 
107  ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
108  m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
109  {}
110 };
111 
112 using vChunkRequest_t = std::vector<ChunkRequest>;
113 using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
114 
115 // ================================================================
116 
117 class Block
118 {
119 public:
121  IO *m_io; // IO that handled current request, used for == / != comparisons only
122  void *m_req_id; // Identity of requestor -- used for stats.
123 
124  char *m_buff;
125  long long m_offset;
126  int m_size;
128  int m_refcnt;
129  int m_errno; // stores negative errno
135 
137 
138  Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
139  bool m_prefetch, bool cks_net) :
140  m_file(f), m_io(io), m_req_id(rid),
141  m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
143  m_req_cksum_net(cks_net), m_n_cksum_errors(0)
144  {}
145 
146  char* get_buff() const { return m_buff; }
147  int get_size() const { return m_size; }
148  int get_req_size() const { return m_req_size; }
149  long long get_offset() const { return m_offset; }
150 
151  File* get_file() const { return m_file; }
152  IO* get_io() const { return m_io; }
153  void* get_req_id() const { return m_req_id; }
154 
155  bool is_finished() const { return m_downloaded || m_errno != 0; }
156  bool is_ok() const { return m_downloaded; }
157  bool is_failed() const { return m_errno != 0; }
158 
159  void set_downloaded() { m_downloaded = true; }
160  void set_error(int err) { m_errno = err; }
161  int get_error() const { return m_errno; }
162 
163  void reset_error_and_set_io(IO *io, void *rid)
164  {
165  m_errno = 0;
166  m_io = io;
167  m_req_id = rid;
168  }
169 
170  bool req_cksum_net() const { return m_req_cksum_net; }
171  bool has_cksums() const { return ! m_cksum_vec.empty(); }
175 };
176 
177 using BlockList_t = std::list<Block*>;
178 using BlockList_i = std::list<Block*>::iterator;
179 
180 // ================================================================
181 
183 {
184 public:
186 
188 
189  void Done(int result) override;
190 };
191 
192 // ----------------------------------------------------------------
193 
195 {
196 public:
201  int m_bytes_read = 0;
202  int m_errno = 0;
203 
204  DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
205  m_file(file), m_read_req(rreq), m_to_wait(to_wait)
206  {}
207 
208  void Done(int result) override;
209 };
210 
211 // ================================================================
212 
213 class File
214 {
215  friend class BlockResponseHandler;
216  friend class DirectResponseHandler;
217 public:
218  // Constructor and Open() are private.
219 
221  static File* FileOpen(const std::string &path, long long offset, long long fileSize);
222 
224  ~File();
225 
228 
230  void BlocksRemovedFromWriteQ(std::list<Block*>&);
231 
233  int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
234 
236  int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
237 
238  //----------------------------------------------------------------------
240  //----------------------------------------------------------------------
241  void ioUpdated(IO *io);
242 
243  //----------------------------------------------------------------------
246  //----------------------------------------------------------------------
247  bool ioActive(IO *io);
248 
249  //----------------------------------------------------------------------
252  //----------------------------------------------------------------------
254 
255  //----------------------------------------------------------------------
258  //----------------------------------------------------------------------
259  bool FinalizeSyncBeforeExit();
260 
261  //----------------------------------------------------------------------
263  //----------------------------------------------------------------------
264  void Sync();
265 
266  void WriteBlockToDisk(Block* b);
267 
268  void Prefetch();
269 
270  float GetPrefetchScore() const;
271 
273  const char* lPath() const;
274 
275  std::string& GetLocalPath() { return m_filename; }
276 
277  XrdSysError* GetLog();
279 
280  long long GetFileSize() { return m_file_size; }
281 
282  void AddIO(IO *io);
285  void RemoveIO(IO *io);
286 
288 
289  std::string GetRemoteLocations() const;
290  const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
291  size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
292  int GetBlockSize() const { return m_cfi.GetBufferSize(); }
293  int GetNBlocks() const { return m_cfi.GetNBlocks(); }
294  int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
295  long long GetPrefetchedBytes() const { return m_prefetch_bytes; }
296  const Stats& RefStats() const { return m_stats; }
297 
298  int Fstat(struct stat &sbuff);
299 
300  // These three methods are called under Cache's m_active lock
301  int get_ref_cnt() { return m_ref_cnt; }
302  int inc_ref_cnt() { return ++m_ref_cnt; }
303  int dec_ref_cnt() { return --m_ref_cnt; }
304 
306  bool is_in_emergency_shutdown() { return m_in_shutdown; }
307 
308 private:
310  File(const std::string &path, long long offset, long long fileSize);
311 
313  bool Open();
314 
315  static const char *m_traceID;
316 
317  int m_ref_cnt;
318 
319  XrdOssDF *m_data_file;
320  XrdOssDF *m_info_file;
321  Info m_cfi;
322 
323  std::string m_filename;
324  long long m_offset;
325  long long m_file_size;
326 
327  // IO objects attached to this file.
328 
329  typedef std::set<IO*> IoSet_t;
330  typedef IoSet_t::iterator IoSet_i;
331 
332  IoSet_t m_io_set;
333  IoSet_i m_current_io;
334  int m_ios_in_detach;
335 
336  // FSync
337 
338  std::vector<int> m_writes_during_sync;
339  int m_non_flushed_cnt;
340  bool m_in_sync;
341  bool m_detach_time_logged;
342  bool m_in_shutdown;
343 
344  // Block state and management
345 
346  typedef std::list<int> IntList_t;
347  typedef IntList_t::iterator IntList_i;
348 
349  typedef std::map<int, Block*> BlockMap_t;
350  typedef BlockMap_t::iterator BlockMap_i;
351 
352  BlockMap_t m_block_map;
353  XrdSysCondVar m_state_cond;
354  long long m_block_size;
355  int m_num_blocks;
356 
357  // Stats
358 
359  Stats m_stats;
360  Stats m_last_stats;
361 
362  std::set<std::string> m_remote_locations;
363  void insert_remote_location(const std::string &loc);
364 
365  // Prefetch
366 
367  enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
368 
369  PrefetchState_e m_prefetch_state;
370 
371  long long m_prefetch_bytes;
372  int m_prefetch_read_cnt;
373  int m_prefetch_hit_cnt;
374  float m_prefetch_score; // cached
375 
376  void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
377  void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } }
378  void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
379 
380  // Helpers
381 
382  bool overlap(int blk, // block to query
383  long long blk_size, //
384  long long req_off, // offset of user request
385  int req_size, // size of user request
386  // output:
387  long long &off, // offset in user buffer
388  long long &blk_off, // offset in block
389  int &size);
390 
391  // Read & ReadV
392 
393  Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
394 
395  void ProcessBlockRequest (Block *b);
396  void ProcessBlockRequests(BlockList_t& blks);
397 
398  void RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size);
399 
400  int ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
401 
402  int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
403  ReadReqRH *rh, const char *tpfx);
404 
405  void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
406  void ProcessBlockError(Block *b, ReadRequest *rreq);
407  void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
408  void FinalizeReadRequest(ReadRequest *rreq);
409 
410  void ProcessBlockResponse(Block *b, int res);
411 
412  // Block management
413 
414  void inc_ref_count(Block* b);
415  void dec_ref_count(Block* b, int count = 1);
416  void free_block(Block*);
417 
418  bool select_current_io_or_disable_prefetching(bool skip_current);
419 
420  int offsetIdx(int idx) const;
421 };
422 
423 //------------------------------------------------------------------------------
424 
425 inline void File::inc_ref_count(Block* b)
426 {
427  // Method always called under lock.
428  b->m_refcnt++;
429 }
430 
431 //------------------------------------------------------------------------------
432 
433 inline void File::dec_ref_count(Block* b, int count)
434 {
435  // Method always called under lock.
436  assert(b->is_finished());
437  b->m_refcnt -= count;
438  assert(b->m_refcnt >= 0);
439 
440  if (b->m_refcnt == 0)
441  {
442  free_block(b);
443  }
444 }
445 
446 }
447 
448 #endif
int stat(const char *path, struct stat *buf)
XrdOucString File
Definition: XrdJob.hh:43
void Done(int result) override
Definition: XrdPfcFile.cc:1591
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:174
int get_size() const
Definition: XrdPfcFile.hh:147
int get_error() const
Definition: XrdPfcFile.hh:161
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:173
Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net)
Definition: XrdPfcFile.hh:138
void * get_req_id() const
Definition: XrdPfcFile.hh:153
long long get_offset() const
Definition: XrdPfcFile.hh:149
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:136
bool is_finished() const
Definition: XrdPfcFile.hh:155
bool is_ok() const
Definition: XrdPfcFile.hh:156
void set_error(int err)
Definition: XrdPfcFile.hh:160
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:172
int m_n_cksum_errors
Definition: XrdPfcFile.hh:134
char * get_buff() const
Definition: XrdPfcFile.hh:146
IO * get_io() const
Definition: XrdPfcFile.hh:152
void set_downloaded()
Definition: XrdPfcFile.hh:159
bool req_cksum_net() const
Definition: XrdPfcFile.hh:170
void * m_req_id
Definition: XrdPfcFile.hh:122
bool has_cksums() const
Definition: XrdPfcFile.hh:171
File * get_file() const
Definition: XrdPfcFile.hh:151
bool is_failed() const
Definition: XrdPfcFile.hh:157
long long m_offset
Definition: XrdPfcFile.hh:125
vCkSum_t m_cksum_vec
Definition: XrdPfcFile.hh:133
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:163
bool m_req_cksum_net
Definition: XrdPfcFile.hh:132
int get_req_size() const
Definition: XrdPfcFile.hh:148
void Done(int result) override
Definition: XrdPfcFile.cc:1599
DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait)
Definition: XrdPfcFile.hh:204
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:272
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1447
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:709
XrdSysTrace * GetTrace()
Definition: XrdPfcFile.cc:1545
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:987
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:100
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1535
XrdSysError * GetLog()
Definition: XrdPfcFile.cc:1540
int GetNBlocks() const
Definition: XrdPfcFile.hh:293
void Prefetch()
Definition: XrdPfcFile.cc:1462
void StopPrefetchingOnIO(IO *io)
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1559
const Info::AStat * GetLastAccessStats() const
Definition: XrdPfcFile.hh:290
size_t GetAccessCnt() const
Definition: XrdPfcFile.hh:291
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:491
void AddIO(IO *io)
Definition: XrdPfcFile.cc:296
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:266
long long GetPrefetchedBytes() const
Definition: XrdPfcFile.hh:295
int GetBlockSize() const
Definition: XrdPfcFile.hh:292
int GetNDownloadedBlocks() const
Definition: XrdPfcFile.hh:294
long long GetFileSize()
Definition: XrdPfcFile.hh:280
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:166
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:113
int inc_ref_cnt()
Definition: XrdPfcFile.hh:302
int GetPrefetchCountOnIO(IO *io)
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1070
int dec_ref_cnt()
Definition: XrdPfcFile.hh:303
int get_ref_cnt()
Definition: XrdPfcFile.hh:301
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:676
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:180
const Stats & RefStats() const
Definition: XrdPfcFile.hh:296
~File()
Destructor.
Definition: XrdPfcFile.cc:77
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:333
std::string & GetLocalPath()
Definition: XrdPfcFile.hh:275
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:158
Stats DeltaStatsFromLastCall()
Definition: XrdPfcFile.cc:143
bool is_in_emergency_shutdown()
Definition: XrdPfcFile.hh:306
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:189
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
const AStat * GetLastAccessStats() const
Get latest access stats.
Definition: XrdPfcInfo.cc:491
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:473
int GetNDownloadedBlocks() const
Get number of downloaded blocks.
Definition: XrdPfcInfo.hh:402
size_t GetAccessCnt() const
Get number of accesses.
Definition: XrdPfcInfo.hh:265
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:441
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:31
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
XrdSysError Log
Definition: XrdConfig.cc:112
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:112
std::vector< ChunkRequest >::iterator vChunkRequest_i
Definition: XrdPfcFile.hh:113
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:177
std::vector< uint32_t > vCkSum_t
Definition: XrdPfcTypes.hh:27
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:178
ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size)
Definition: XrdPfcFile.hh:107
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:102
Access statistics.
Definition: XrdPfcInfo.hh:61
XrdOucCacheIOCB * m_iocb
Definition: XrdPfcFile.hh:65
unsigned short m_seq_id
Definition: XrdPfcFile.hh:64
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition: XrdPfcFile.hh:67
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:92
ReadRequest(IO *io, ReadReqRH *rh)
Definition: XrdPfcFile.hh:88
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:77
bool is_complete() const
Definition: XrdPfcFile.hh:94
int return_value() const
Definition: XrdPfcFile.hh:95
long long m_bytes_read
Definition: XrdPfcFile.hh:79