XRootD
XrdPfcIOFile.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include <cstdio>
20 #include <fcntl.h>
21 
22 #include "XrdSys/XrdSysError.hh"
24 #include "XrdSys/XrdSysPthread.hh"
25 
26 #include "XrdPfcIOFile.hh"
27 #include "XrdPfcStats.hh"
28 #include "XrdPfcTrace.hh"
29 
30 #include "XrdOuc/XrdOucEnv.hh"
32 
33 using namespace XrdPfc;
34 
35 //______________________________________________________________________________
37  IO(io, cache),
38  m_file(0),
39  m_localStat(0)
40 {
41  m_file = Cache::GetInstance().GetFile(GetFilename(), this);
42 }
43 
44 //______________________________________________________________________________
46 {
47  // called from Detach() if no sync is needed or
48  // from Cache's sync thread
49  TRACEIO(Debug, "~IOFile() " << this);
50 
51  delete m_localStat;
52 }
53 
54 //______________________________________________________________________________
55 int IOFile::Fstat(struct stat &sbuff)
56 {
57  int res = 0;
58  if( ! m_localStat)
59  {
60  res = initCachedStat();
61  if (res) return res;
62  }
63 
64  memcpy(&sbuff, m_localStat, sizeof(struct stat));
65  return 0;
66 }
67 
68 //______________________________________________________________________________
69 long long IOFile::FSize()
70 {
71  return m_file->GetFileSize();
72 }
73 
74 //______________________________________________________________________________
75 int IOFile::initCachedStat()
76 {
77  // Called indirectly from the constructor.
78 
79  static const char* trace_pfx = "initCachedStat ";
80 
81  int res = -1;
82  struct stat tmpStat;
83 
84  std::string fname = GetFilename();
85  std::string iname = fname + Info::s_infoExtension;
86  if (m_cache.GetOss()->Stat(fname.c_str(), &tmpStat) == XrdOssOK)
87  {
88  XrdOssDF* infoFile = m_cache.GetOss()->newFile(Cache::GetInstance().RefConfiguration().m_username.c_str());
89  XrdOucEnv myEnv;
90  int res_open;
91  if ((res_open = infoFile->Open(iname.c_str(), O_RDONLY, 0600, myEnv)) == XrdOssOK)
92  {
93  Info info(m_cache.GetTrace());
94  if (info.Read(infoFile, iname.c_str()))
95  {
96  // The filesize from the file itself may be misleading if its download is incomplete; take it from the cinfo.
97  tmpStat.st_size = info.GetFileSize();
98  // We are arguably abusing the mtime to be the creation time of the file; then ctime becomes the
99  // last time additional data was cached. Note we only set the mtime if the file is complete; this
100  // helps clients know whether there was a cache hit or miss.
101  if (info.IsComplete())
102  {
103  tmpStat.st_mtime = info.GetCreationTime();
104  TRACEIO(Info, trace_pfx << "successfully read size " << tmpStat.st_size << " and creation time " << tmpStat.st_mtime << " from info file");
105  }
106  else
107  {
108  tmpStat.st_mtime = 0;
109  TRACEIO(Info, trace_pfx << "successfully read size from info file = " << tmpStat.st_size);
110  }
111  res = 0;
112  }
113  else
114  {
115  // file exist but can't read it
116  TRACEIO(Info, trace_pfx << "info file is incomplete or corrupt");
117  }
118  }
119  else
120  {
121  TRACEIO(Error, trace_pfx << "can't open info file " << XrdSysE2T(-res_open));
122  }
123  infoFile->Close();
124  delete infoFile;
125  }
126 
127  if (res)
128  {
129  res = GetInput()->Fstat(tmpStat);
130  TRACEIO(Debug, trace_pfx << "got stat from client res = " << res << ", size = " << tmpStat.st_size);
131  // The mtime / atime / ctime for cached responses come from the file on disk in the cache hit case.
132  // In the cache miss cache, we set the various *time attributes to 0 as they are meant to indicate
133  // the age of the cache information and 0 indicates non-existence.
134  tmpStat.st_ctime = tmpStat.st_mtime = tmpStat.st_atime = 0;
135  }
136 
137  if (res == 0)
138  {
139  m_localStat = new struct stat;
140  memcpy(m_localStat, &tmpStat, sizeof(struct stat));
141  }
142  return res;
143 }
144 
145 //______________________________________________________________________________
147 {
148  IO::Update(iocp);
149  m_file->ioUpdated(this);
150 }
151 
152 //______________________________________________________________________________
154 {
155  RefreshLocation();
156  return m_file->ioActive(this);
157 }
158 
159 //______________________________________________________________________________
161 {
162  // Effectively a destructor.
163 
164  TRACE(Info, "DetachFinalize() " << this);
165 
166  m_file->RequestSyncOfDetachStats();
167  Cache::GetInstance().ReleaseFile(m_file, this);
168 
169  delete this;
170 }
171 
172 
173 //==============================================================================
174 // Read and pgRead - sync / async and helpers
175 //==============================================================================
176 
177 //______________________________________________________________________________
178 int IOFile::Read(char *buff, long long off, int size)
179 {
181 
182  auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr);
183 
184  TRACEIO(Dump, "Read() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
185 
186  rh->m_cond.Lock();
187  int retval = ReadBegin(buff, off, size, rh);
188  if (retval == -EWOULDBLOCK)
189  {
190  rh->m_cond.Wait();
191  retval = rh->m_retval;
192  }
193  rh->m_cond.UnLock();
194 
195  return ReadEnd(retval, rh);
196 }
197 
198 //______________________________________________________________________________
199 void IOFile::Read(XrdOucCacheIOCB &iocb, char *buff, long long off, int size)
200 {
201  struct ZHandler : ReadReqRH
202  { using ReadReqRH::ReadReqRH;
203  IOFile *m_io = nullptr;
204 
205  void Done(int result) override {
206  m_io->ReadEnd(result, this);
207  }
208  };
209 
211 
212  auto *rh = new ZHandler(ObtainReadSid(), &iocb);
213  rh->m_io = this;
214 
215  TRACEIO(Dump, "Read() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
216 
217  int retval = ReadBegin(buff, off, size, rh);
218  if (retval != -EWOULDBLOCK)
219  {
220  rh->Done(retval);
221  }
222 }
223 
224 //______________________________________________________________________________
225 void IOFile::pgRead(XrdOucCacheIOCB &iocb, char *buff, long long off, int size,
226  std::vector<uint32_t> &csvec, uint64_t opts, int *csfix)
227 {
228  struct ZHandler : ReadReqRH
229  { using ReadReqRH::ReadReqRH;
230  IOFile *m_io = nullptr;
231  std::function<void (int)> m_lambda {0};
232 
233  void Done(int result) override {
234  if (m_lambda) m_lambda(result);
235  m_io->ReadEnd(result, this);
236  }
237  };
238 
240 
241  auto *rh = new ZHandler(ObtainReadSid(), &iocb);
242  rh->m_io = this;
243 
244  TRACEIO(Dump, "pgRead() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
245 
247  rh->m_lambda = [=, &csvec](int result) {
248  if (result > 0)
249  XrdOucPgrwUtils::csCalc((const char *)buff, (ssize_t)off, (size_t)result, csvec);
250  };
251 
252  int retval = ReadBegin(buff, off, size, rh);
253  if (retval != -EWOULDBLOCK)
254  {
255  rh->Done(retval);
256  }
257 }
258 
259 //______________________________________________________________________________
260 int IOFile::ReadBegin(char *buff, long long off, int size, ReadReqRH *rh)
261 {
262  // protect from reads over the file size
263  if (off >= FSize()) {
264  size = 0;
265  return 0;
266  }
267  if (off < 0) {
268  return -EINVAL;
269  }
270  if (off + size > FSize()) {
271  size = FSize() - off;
272  }
273  rh->m_expected_size = size;
274 
275  return m_file->Read(this, buff, off, size, rh);
276 }
277 
278 //______________________________________________________________________________
279 int IOFile::ReadEnd(int retval, ReadReqRH *rh)
280 {
281  TRACEIO(Dump, "ReadEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " retval: " << retval << " expected_size: " << rh->m_expected_size);
282 
283  if (retval < 0) {
284  TRACEIO(Warning, "ReadEnd() error in File::Read(), exit status=" << retval << ", error=" << XrdSysE2T(-retval) << " sid: " << Xrd::hex1 << rh->m_seq_id);
285  } else if (retval < rh->m_expected_size) {
286  TRACEIO(Warning, "ReadEnd() bytes missed " << rh->m_expected_size - retval << " sid: " << Xrd::hex1 << rh->m_seq_id);
287  }
288  if (rh->m_iocb)
289  rh->m_iocb->Done(retval);
290 
291  delete rh;
292 
294 
295  return retval;
296 }
297 
298 
299 //==============================================================================
300 // ReadV
301 //==============================================================================
302 
303 //______________________________________________________________________________
304 int IOFile::ReadV(const XrdOucIOVec *readV, int n)
305 {
307 
308  auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr);
309 
310  TRACEIO(Dump, "ReadV() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n);
311 
312  rh->m_cond.Lock();
313  int retval = ReadVBegin(readV, n, rh);
314  if (retval == -EWOULDBLOCK)
315  {
316  rh->m_cond.Wait();
317  retval = rh->m_retval;
318  }
319  rh->m_cond.UnLock();
320  return ReadVEnd(retval, rh);
321 }
322 
323 //______________________________________________________________________________
324 void IOFile::ReadV(XrdOucCacheIOCB &iocb, const XrdOucIOVec *readV, int n)
325 {
326  struct ZHandler : ReadReqRH
327  { using ReadReqRH::ReadReqRH;
328  IOFile *m_io = nullptr;
329 
330  void Done(int result) override { m_io-> ReadVEnd(result, this); }
331  };
332 
334 
335  auto *rh = new ZHandler(ObtainReadSid(), &iocb);
336  rh->m_io = this;
337 
338  TRACEIO(Dump, "ReadV() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n);
339 
340  int retval = ReadVBegin(readV, n, rh);
341  if (retval != -EWOULDBLOCK)
342  {
343  rh->Done(retval);
344  }
345 }
346 
347 //______________________________________________________________________________
348 int IOFile::ReadVBegin(const XrdOucIOVec *readV, int n, ReadReqRH *rh)
349 {
350  long long file_size = FSize();
351  for (int i = 0; i < n; ++i)
352  {
353  const XrdOucIOVec &vr = readV[i];
354  if (vr.offset < 0 || vr.offset >= file_size ||
355  vr.offset + vr.size > file_size)
356  {
357  return -EINVAL;
358  }
359  rh->m_expected_size += vr.size;
360  }
361  rh->m_n_chunks = n;
362 
363  return m_file->ReadV(this, readV, n, rh);
364 }
365 
366 //______________________________________________________________________________
367 int IOFile::ReadVEnd(int retval, ReadReqRH *rh)
368 {
369  TRACEIO(Dump, "ReadVEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id <<
370  " retval: " << retval << " n_chunks: " << rh->m_n_chunks << " expected_size: " << rh->m_expected_size);
371 
372  if (retval < 0) {
373  TRACEIO(Warning, "ReadVEnd() error in File::ReadV(), exit status=" << retval << ", error=" << XrdSysE2T(-retval));
374  } else if (retval < rh->m_expected_size) {
375  TRACEIO(Warning, "ReadVEnd() bytes missed " << rh->m_expected_size - retval);
376  }
377  if (rh->m_iocb)
378  rh->m_iocb->Done(retval);
379 
380  delete rh;
381 
383 
384  return retval;
385 }
#define XrdOssOK
Definition: XrdOss.hh:50
#define TRACEIO(act, x)
Definition: XrdPfcTrace.hh:59
int stat(const char *path, struct stat *buf)
struct myOpts opts
@ Warning
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:99
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
static const uint64_t forceCS
Definition: XrdOucCache.hh:188
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:267
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition: XrdPfc.cc:412
XrdOss * GetOss() const
Definition: XrdPfc.hh:385
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:398
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:160
void ReleaseFile(File *, IO *)
Definition: XrdPfc.cc:493
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:686
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:265
long long GetFileSize()
Definition: XrdPfcFile.hh:279
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:653
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:179
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:188
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:40
void Update(XrdOucCacheIO &iocp) override
void DetachFinalize() override
Abstract virtual method of XrdPfc::IO Called to destruct the IO object after it is no longer used.
int Read(char *buff, long long off, int size) override
Pass Read request to the corresponding File object.
int Fstat(struct stat &sbuff) override
Definition: XrdPfcIOFile.cc:55
bool ioActive() override
Abstract virtual method of XrdPfc::IO Called to check if destruction needs to be done in a separate t...
int ReadV(const XrdOucIOVec *readV, int n) override
Pass ReadV request to the corresponding File object.
long long FSize() override
Definition: XrdPfcIOFile.cc:69
IOFile(XrdOucCacheIO *io, Cache &cache)
Definition: XrdPfcIOFile.cc:36
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
Definition: XrdOucCache.cc:39
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
std::string GetFilename()
Definition: XrdPfcIO.hh:56
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:30
Cache & m_cache
reference to Cache object
Definition: XrdPfcIO.hh:52
RAtomic_int m_active_read_reqs
number of active read requests
Definition: XrdPfcIO.hh:72
const char * RefreshLocation()
Definition: XrdPfcIO.hh:57
void Update(XrdOucCacheIO &iocp) override
Definition: XrdPfcIO.cc:16
unsigned short ObtainReadSid()
Definition: XrdPfcIO.hh:59
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:313
Definition: XrdPfc.hh:41
@ hex1
Definition: XrdSysTrace.hh:42
long long offset
Definition: XrdOucIOVec.hh:42
XrdOucCacheIOCB * m_iocb
Definition: XrdPfcFile.hh:65
unsigned short m_seq_id
Definition: XrdPfcFile.hh:64
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition: XrdPfcFile.hh:67