XRootD
XrdPfcFile.hh
Go to the documentation of this file.
1 #ifndef __XRDPFC_FILE_HH__
2 #define __XRDPFC_FILE_HH__
3 //----------------------------------------------------------------------------------
4 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5 // Author: Alja Mrak-Tadel, Matevz Tadel
6 //----------------------------------------------------------------------------------
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //----------------------------------------------------------------------------------
20 
21 #include "XrdPfcTypes.hh"
22 #include "XrdPfcInfo.hh"
23 #include "XrdPfcStats.hh"
24 
25 #include "XrdOuc/XrdOucCache.hh"
26 #include "XrdOuc/XrdOucIOVec.hh"
27 
28 #include <functional>
29 #include <list>
30 #include <map>
31 #include <set>
32 #include <string>
33 
34 class XrdJob;
35 class XrdOucIOVec;
36 
37 namespace XrdPfc
38 {
39 class File;
40 class BlockResponseHandler;
41 class DirectResponseHandler;
42 class IO;
43 
44 struct ReadVBlockListRAM;
45 struct ReadVChunkListRAM;
46 struct ReadVBlockListDisk;
47 struct ReadVChunkListDisk;
48 
49 struct ReadReqRH : public XrdOucCacheIOCB
50 {
51  int m_expected_size = 0;
52  int m_n_chunks = 0; // Only set for ReadV().
53  unsigned short m_seq_id;
54  XrdOucCacheIOCB *m_iocb; // External callback passed into IO::Read().
55 
56  ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
57  m_seq_id(sid), m_iocb(iocb)
58  {}
59 };
60 
61 // -------------------------------------------------------------
62 
64 {
65  IO *m_io;
66  ReadReqRH *m_rh; // Internal callback created in IO::Read().
67 
68  long long m_bytes_read = 0;
69  int m_error_cond = 0; // to be set to -errno
70  int m_error_count = 0;
72 
73  int m_n_chunk_reqs = 0;
74  bool m_sync_done = false;
75  bool m_direct_done = true;
76 
77  ReadRequest(IO *io, ReadReqRH *rh) :
78  m_io(io), m_rh(rh)
79  {}
80 
82 
83  bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
84  int return_value() const { return m_error_cond ? m_error_cond : m_bytes_read; }
85 };
86 
87 // -------------------------------------------------------------
88 
90 {
92  char *m_buf; // Where to place the data chunk.
93  long long m_off; // Offset *within* the corresponding block.
94  int m_size; // Size of the data chunk.
95 
96  ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
97  m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
98  {}
99 };
100 
101 using vChunkRequest_t = std::vector<ChunkRequest>;
102 using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
103 
104 // ================================================================
105 
106 class Block
107 {
108 public:
110  IO *m_io; // IO that handled current request, used for == / != comparisons only
111  void *m_req_id; // Identity of requestor -- used for stats.
112 
113  char *m_buff;
114  long long m_offset;
115  int m_size;
117  int m_refcnt;
118  int m_errno; // stores negative errno
124 
126 
127  Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
128  bool m_prefetch, bool cks_net) :
129  m_file(f), m_io(io), m_req_id(rid),
130  m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
132  m_req_cksum_net(cks_net), m_n_cksum_errors(0)
133  {}
134 
135  char* get_buff() const { return m_buff; }
136  int get_size() const { return m_size; }
137  int get_req_size() const { return m_req_size; }
138  long long get_offset() const { return m_offset; }
139 
140  File* get_file() const { return m_file; }
141  IO* get_io() const { return m_io; }
142  void* get_req_id() const { return m_req_id; }
143 
144  bool is_finished() const { return m_downloaded || m_errno != 0; }
145  bool is_ok() const { return m_downloaded; }
146  bool is_failed() const { return m_errno != 0; }
147 
148  void set_downloaded() { m_downloaded = true; }
149  void set_error(int err) { m_errno = err; }
150  int get_error() const { return m_errno; }
151 
152  void reset_error_and_set_io(IO *io, void *rid)
153  {
154  m_errno = 0;
155  m_io = io;
156  m_req_id = rid;
157  }
158 
159  bool req_cksum_net() const { return m_req_cksum_net; }
160  bool has_cksums() const { return ! m_cksum_vec.empty(); }
164 };
165 
166 using BlockList_t = std::list<Block*>;
167 using BlockList_i = std::list<Block*>::iterator;
168 
169 // ================================================================
170 
172 {
173 public:
175 
177 
178  void Done(int result) override;
179 };
180 
181 // ----------------------------------------------------------------
182 
184 {
185 public:
190  int m_bytes_read = 0;
191  int m_errno = 0;
192 
193  DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
194  m_file(file), m_read_req(rreq), m_to_wait(to_wait)
195  {}
196 
197  void Done(int result) override;
198 };
199 
200 // ================================================================
201 
202 class File
203 {
204  friend class Cache;
205  friend class BlockResponseHandler;
206  friend class DirectResponseHandler;
207 public:
208  // Constructor, destructor, Open() and Close() are private.
209 
211  static File* FileOpen(const std::string &path, long long offset, long long fileSize);
212 
215 
217  void BlocksRemovedFromWriteQ(std::list<Block*>&);
218 
220  int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
221 
223  int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
224 
225  //----------------------------------------------------------------------
227  //----------------------------------------------------------------------
228  void ioUpdated(IO *io);
229 
230  //----------------------------------------------------------------------
233  //----------------------------------------------------------------------
234  bool ioActive(IO *io);
235 
236  //----------------------------------------------------------------------
239  //----------------------------------------------------------------------
241 
242  //----------------------------------------------------------------------
245  //----------------------------------------------------------------------
246  bool FinalizeSyncBeforeExit();
247 
248  //----------------------------------------------------------------------
250  //----------------------------------------------------------------------
251  void Sync();
252 
253  void WriteBlockToDisk(Block* b);
254 
255  void Prefetch();
256 
257  float GetPrefetchScore() const;
258 
260  const char* lPath() const;
261 
262  const std::string& GetLocalPath() const { return m_filename; }
263 
264  XrdSysError* GetLog();
266 
267  long long GetFileSize() const { return m_file_size; }
268 
269  void AddIO(IO *io);
272  void RemoveIO(IO *io);
273 
274  std::string GetRemoteLocations() const;
275  const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
276  size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
277  int GetBlockSize() const { return m_cfi.GetBufferSize(); }
278  int GetNBlocks() const { return m_cfi.GetNBlocks(); }
279  int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
280  long long GetPrefetchedBytes() const { return m_prefetch_bytes; }
281  const Stats& RefStats() const { return m_stats; }
282 
283  int Fstat(struct stat &sbuff);
284 
285  // These three methods are called under Cache's m_active lock
286  int get_ref_cnt() { return m_ref_cnt; }
287  int inc_ref_cnt() { return ++m_ref_cnt; }
288  int dec_ref_cnt() { return --m_ref_cnt; }
289 
290  long long initiate_emergency_shutdown();
291  bool is_in_emergency_shutdown() { return m_in_shutdown; }
292 
293 private:
295  File(const std::string &path, long long offset, long long fileSize);
296 
298  ~File();
299 
301  void Close();
302 
304  bool Open();
305 
306  static const char *m_traceID;
307 
308  int m_ref_cnt;
309 
310  XrdOssDF *m_data_file;
311  XrdOssDF *m_info_file;
312  Info m_cfi;
313 
314  const std::string m_filename;
315  const long long m_offset;
316  const long long m_file_size;
317 
318  // IO objects attached to this file.
319 
320  typedef std::set<IO*> IoSet_t;
321  typedef IoSet_t::iterator IoSet_i;
322 
323  IoSet_t m_io_set;
324  IoSet_i m_current_io;
325  int m_ios_in_detach;
326 
327  // FSync
328 
329  std::vector<int> m_writes_during_sync;
330  int m_non_flushed_cnt;
331  bool m_in_sync;
332  bool m_detach_time_logged;
333  bool m_in_shutdown;
334 
335  // Block state and management
336 
337  typedef std::list<int> IntList_t;
338  typedef IntList_t::iterator IntList_i;
339 
340  typedef std::map<int, Block*> BlockMap_t;
341  typedef BlockMap_t::iterator BlockMap_i;
342 
343  BlockMap_t m_block_map;
344  XrdSysCondVar m_state_cond;
345  long long m_block_size;
346  int m_num_blocks;
347 
348  // Stats and ResourceMonitor interface
349 
350  Stats m_stats;
351  Stats m_delta_stats;
352  long long m_st_blocks;
353  long long m_resmon_report_threshold;
354  int m_resmon_token;
355 
356  void check_delta_stats();
357  void report_and_merge_delta_stats();
358 
359  std::set<std::string> m_remote_locations;
360  void insert_remote_location(const std::string &loc);
361 
362  // Prefetch
363 
364  enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
365 
366  PrefetchState_e m_prefetch_state;
367 
368  long long m_prefetch_bytes;
369  int m_prefetch_read_cnt;
370  int m_prefetch_hit_cnt;
371  float m_prefetch_score; // cached
372 
373  void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
374  void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } }
375  void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
376 
377  // Helpers
378 
379  bool overlap(int blk, // block to query
380  long long blk_size, //
381  long long req_off, // offset of user request
382  int req_size, // size of user request
383  // output:
384  long long &off, // offset in user buffer
385  long long &blk_off, // offset in block
386  int &size);
387 
388  // Read & ReadV
389 
390  Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
391 
392  void ProcessBlockRequest (Block *b);
393  void ProcessBlockRequests(BlockList_t& blks);
394 
395  void RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size);
396 
397  int ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
398 
399  int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
400  ReadReqRH *rh, const char *tpfx);
401 
402  void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
403  void ProcessBlockError(Block *b, ReadRequest *rreq);
404  void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
405  void FinalizeReadRequest(ReadRequest *rreq);
406 
407  void ProcessBlockResponse(Block *b, int res);
408 
409  // Block management
410 
411  void inc_ref_count(Block* b);
412  void dec_ref_count(Block* b, int count = 1);
413  void free_block(Block*);
414 
415  bool select_current_io_or_disable_prefetching(bool skip_current);
416 
417  int offsetIdx(int idx) const;
418 };
419 
420 //------------------------------------------------------------------------------
421 
422 inline void File::inc_ref_count(Block* b)
423 {
424  // Method always called under lock.
425  b->m_refcnt++;
426 }
427 
428 //------------------------------------------------------------------------------
429 
430 inline void File::dec_ref_count(Block* b, int count)
431 {
432  // Method always called under lock.
433  assert(b->is_finished());
434  b->m_refcnt -= count;
435  assert(b->m_refcnt >= 0);
436 
437  if (b->m_refcnt == 0)
438  {
439  free_block(b);
440  }
441 }
442 
443 }
444 
445 #endif
int stat(const char *path, struct stat *buf)
XrdOucString File
Definition: XrdJob.hh:43
void Done(int result) override
Definition: XrdPfcFile.cc:1678
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:163
int get_size() const
Definition: XrdPfcFile.hh:136
int get_error() const
Definition: XrdPfcFile.hh:150
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:162
Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net)
Definition: XrdPfcFile.hh:127
void * get_req_id() const
Definition: XrdPfcFile.hh:142
long long get_offset() const
Definition: XrdPfcFile.hh:138
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:125
bool is_finished() const
Definition: XrdPfcFile.hh:144
bool is_ok() const
Definition: XrdPfcFile.hh:145
void set_error(int err)
Definition: XrdPfcFile.hh:149
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:161
int m_n_cksum_errors
Definition: XrdPfcFile.hh:123
char * get_buff() const
Definition: XrdPfcFile.hh:135
IO * get_io() const
Definition: XrdPfcFile.hh:141
void set_downloaded()
Definition: XrdPfcFile.hh:148
bool req_cksum_net() const
Definition: XrdPfcFile.hh:159
void * m_req_id
Definition: XrdPfcFile.hh:111
bool has_cksums() const
Definition: XrdPfcFile.hh:160
File * get_file() const
Definition: XrdPfcFile.hh:140
bool is_failed() const
Definition: XrdPfcFile.hh:146
long long m_offset
Definition: XrdPfcFile.hh:114
vCkSum_t m_cksum_vec
Definition: XrdPfcFile.hh:122
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:152
bool m_req_cksum_net
Definition: XrdPfcFile.hh:121
int get_req_size() const
Definition: XrdPfcFile.hh:137
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:152
void Done(int result) override
Definition: XrdPfcFile.cc:1686
DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait)
Definition: XrdPfcFile.hh:193
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:322
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1534
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:785
XrdSysTrace * GetTrace()
Definition: XrdPfcFile.cc:1632
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1068
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:138
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1622
XrdSysError * GetLog()
Definition: XrdPfcFile.cc:1627
int GetNBlocks() const
Definition: XrdPfcFile.hh:278
void Prefetch()
Definition: XrdPfcFile.cc:1549
void StopPrefetchingOnIO(IO *io)
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1646
const Info::AStat * GetLastAccessStats() const
Definition: XrdPfcFile.hh:275
size_t GetAccessCnt() const
Definition: XrdPfcFile.hh:276
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:563
void AddIO(IO *io)
Definition: XrdPfcFile.cc:346
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:316
long long GetPrefetchedBytes() const
Definition: XrdPfcFile.hh:280
int GetBlockSize() const
Definition: XrdPfcFile.hh:277
int GetNDownloadedBlocks() const
Definition: XrdPfcFile.hh:279
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:216
int inc_ref_cnt()
Definition: XrdPfcFile.hh:287
int GetPrefetchCountOnIO(IO *io)
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1148
int dec_ref_cnt()
Definition: XrdPfcFile.hh:288
int get_ref_cnt()
Definition: XrdPfcFile.hh:286
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:748
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:230
long long initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:151
long long GetFileSize() const
Definition: XrdPfcFile.hh:267
const Stats & RefStats() const
Definition: XrdPfcFile.hh:281
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:383
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:208
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:262
bool is_in_emergency_shutdown()
Definition: XrdPfcFile.hh:291
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:239
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
const AStat * GetLastAccessStats() const
Get latest access stats.
Definition: XrdPfcInfo.cc:489
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:469
int GetNDownloadedBlocks() const
Get number of downloaded blocks.
Definition: XrdPfcInfo.hh:398
size_t GetAccessCnt() const
Get number of accesses.
Definition: XrdPfcInfo.hh:261
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:437
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:101
std::vector< ChunkRequest >::iterator vChunkRequest_i
Definition: XrdPfcFile.hh:102
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:166
std::vector< uint32_t > vCkSum_t
Definition: XrdPfcTypes.hh:31
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:167
ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size)
Definition: XrdPfcFile.hh:96
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:91
Access statistics.
Definition: XrdPfcInfo.hh:57
XrdOucCacheIOCB * m_iocb
Definition: XrdPfcFile.hh:54
unsigned short m_seq_id
Definition: XrdPfcFile.hh:53
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition: XrdPfcFile.hh:56
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:81
ReadRequest(IO *io, ReadReqRH *rh)
Definition: XrdPfcFile.hh:77
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:66
bool is_complete() const
Definition: XrdPfcFile.hh:83
int return_value() const
Definition: XrdPfcFile.hh:84
long long m_bytes_read
Definition: XrdPfcFile.hh:68