XRootD
XrdPfcIOFile.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include "XrdPfcIOFile.hh"
20 #include "XrdPfcStats.hh"
21 #include "XrdPfcTrace.hh"
22 
23 #include "XrdOss/XrdOss.hh"
25 #include "XrdSys/XrdSysError.hh"
26 
27 #include "XrdOuc/XrdOucEnv.hh"
29 
30 #include <cstdio>
31 #include <fcntl.h>
32 
33 using namespace XrdPfc;
34 
35 //______________________________________________________________________________
37  IO(io, cache),
38  m_file(0)
39 {
40  m_file = Cache::GetInstance().GetFile(GetFilename(), this);
41 }
42 
43 //______________________________________________________________________________
45 {
46  // called from Detach() if no sync is needed or
47  // from Cache's sync thread
48  TRACEIO(Debug, "~IOFile() " << this);
49 }
50 
51 //______________________________________________________________________________
52 int IOFile::Fstat(struct stat &sbuff)
53 {
54  // This is only called during Cache::Attach / Cache::GetFile() for file creation.
55  // Should really be separate name but one needs to change virtual interface
56  // so initialStat() becomes virtual in the base-class.
57  // Also, IOFileBlock should be ditched.
58  if ( ! m_file) {
59  return initialStat(sbuff);
60  }
61 
62  return m_file->Fstat(sbuff);
63 }
64 
65 //______________________________________________________________________________
66 long long IOFile::FSize()
67 {
68  return m_file->GetFileSize();
69 }
70 
71 //______________________________________________________________________________
72 int IOFile::initialStat(struct stat &sbuff)
73 {
74  // Get stat to determine file-size.
75  // Called indirectly from the constructor, via Cache::GetFile().
76  // Either read it from cinfo file or obtain it from the remote IO.
77 
78  static const char* trace_pfx = "initialStat ";
79 
80  std::string fname = GetFilename();
81  if (m_cache.GetOss()->Stat(fname.c_str(), &sbuff) == XrdOssOK)
82  {
83  long long file_size = m_cache.DetermineFullFileSize(fname + Info::s_infoExtension);
84  if (file_size >= 0)
85  {
86  sbuff.st_size = file_size;
87  TRACEIO(Info, trace_pfx << "successfully read size " << sbuff.st_size << " from info file");
88  return 0;
89  }
90  TRACEIO(Error, trace_pfx << "failed reading from info file " << XrdSysE2T(-file_size));
91  }
92 
93  int res = GetInput()->Fstat(sbuff);
94  TRACEIO(Debug, trace_pfx << "stat from client res = " << res << ", size = " << sbuff.st_size);
95 
96  return res;
97 }
98 
99 //______________________________________________________________________________
101 {
102  IO::Update(iocp);
103  m_file->ioUpdated(this);
104 }
105 
106 //______________________________________________________________________________
108 {
109  RefreshLocation();
110  return m_file->ioActive(this);
111 }
112 
113 //______________________________________________________________________________
115 {
116  // Effectively a destructor.
117 
118  TRACE(Debug, "DetachFinalize() " << this);
119 
120  m_file->RequestSyncOfDetachStats();
121  Cache::GetInstance().ReleaseFile(m_file, this);
122 
123  if (( ! m_error_counts.empty() || m_incomplete_count > 0) && XRD_TRACE What >= TRACE_Error) {
124  char info[1024];
125  int pos = 0, cap = 1024;
126  bool truncated = false;
127  for (auto [err, cnt] : m_error_counts) {
128  int len = snprintf(&info[pos], cap, " ( %d : %d)", err, cnt);
129  if (len >= cap) {
130  truncated = true;
131  break;
132  }
133  pos += len;
134  cap -= len;
135  }
136  TRACE(Error, "DetachFinalize() " << this << " n_incomplete_reads=" << m_incomplete_count
137  << ", block (error : count) report:" << info << (truncated ? " - truncated" : ""));
138  }
139 
140  delete this;
141 }
142 
143 
144 //==============================================================================
145 // Read and pgRead - sync / async and helpers
146 //==============================================================================
147 
148 //______________________________________________________________________________
149 int IOFile::Read(char *buff, long long off, int size)
150 {
152 
153  auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr);
154 
155  TRACEIO(Dump, "Read() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
156 
157  rh->m_cond.Lock();
158  int retval = ReadBegin(buff, off, size, rh);
159  if (retval == -EWOULDBLOCK)
160  {
161  rh->m_cond.Wait();
162  retval = rh->m_retval;
163  }
164  rh->m_cond.UnLock();
165 
166  return ReadEnd(retval, rh);
167 }
168 
169 //______________________________________________________________________________
170 void IOFile::Read(XrdOucCacheIOCB &iocb, char *buff, long long off, int size)
171 {
172  struct ZHandler : ReadReqRH
173  { using ReadReqRH::ReadReqRH;
174  IOFile *m_io = nullptr;
175 
176  void Done(int result) override {
177  m_io->ReadEnd(result, this);
178  }
179  };
180 
182 
183  auto *rh = new ZHandler(ObtainReadSid(), &iocb);
184  rh->m_io = this;
185 
186  TRACEIO(Dump, "Read() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
187 
188  int retval = ReadBegin(buff, off, size, rh);
189  if (retval != -EWOULDBLOCK)
190  {
191  rh->Done(retval);
192  }
193 }
194 
195 //______________________________________________________________________________
196 void IOFile::pgRead(XrdOucCacheIOCB &iocb, char *buff, long long off, int size,
197  std::vector<uint32_t> &csvec, uint64_t opts, int *csfix)
198 {
199  struct ZHandler : ReadReqRH
200  { using ReadReqRH::ReadReqRH;
201  IOFile *m_io = nullptr;
202  std::function<void (int)> m_lambda {0};
203 
204  void Done(int result) override {
205  if (m_lambda) m_lambda(result);
206  m_io->ReadEnd(result, this);
207  }
208  };
209 
211 
212  auto *rh = new ZHandler(ObtainReadSid(), &iocb);
213  rh->m_io = this;
214 
215  TRACEIO(Dump, "pgRead() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size);
216 
218  rh->m_lambda = [=, &csvec](int result) {
219  if (result > 0)
220  XrdOucPgrwUtils::csCalc((const char *)buff, (ssize_t)off, (size_t)result, csvec);
221  };
222 
223  int retval = ReadBegin(buff, off, size, rh);
224  if (retval != -EWOULDBLOCK)
225  {
226  rh->Done(retval);
227  }
228 }
229 
230 //______________________________________________________________________________
231 int IOFile::ReadBegin(char *buff, long long off, int size, ReadReqRH *rh)
232 {
233  // protect from reads over the file size
234  if (off >= FSize()) {
235  size = 0;
236  return 0;
237  }
238  if (off < 0) {
239  return -EINVAL;
240  }
241  if (off + size > FSize()) {
242  size = FSize() - off;
243  }
244  rh->m_expected_size = size;
245 
246  return m_file->Read(this, buff, off, size, rh);
247 }
248 
249 //______________________________________________________________________________
250 int IOFile::ReadEnd(int retval, ReadReqRH *rh)
251 {
252  TRACEIO(Dump, "ReadEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " retval: " << retval << " expected_size: " << rh->m_expected_size);
253 
254  if (retval < 0) {
255  TRACEIO(Debug, "ReadEnd() error in File::Read(), exit status=" << retval << ", error=" << XrdSysE2T(-retval) << " sid: " << Xrd::hex1 << rh->m_seq_id);
256  } else if (retval < rh->m_expected_size) {
257  TRACEIO(Debug, "ReadEnd() bytes missed " << rh->m_expected_size - retval << " sid: " << Xrd::hex1 << rh->m_seq_id);
258  }
259  if (rh->m_iocb)
260  rh->m_iocb->Done(retval);
261 
262  delete rh;
263 
265 
266  return retval;
267 }
268 
269 
270 //==============================================================================
271 // ReadV
272 //==============================================================================
273 
274 //______________________________________________________________________________
275 int IOFile::ReadV(const XrdOucIOVec *readV, int n)
276 {
278 
279  auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr);
280 
281  TRACEIO(Dump, "ReadV() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n);
282 
283  rh->m_cond.Lock();
284  int retval = ReadVBegin(readV, n, rh);
285  if (retval == -EWOULDBLOCK)
286  {
287  rh->m_cond.Wait();
288  retval = rh->m_retval;
289  }
290  rh->m_cond.UnLock();
291  return ReadVEnd(retval, rh);
292 }
293 
294 //______________________________________________________________________________
295 void IOFile::ReadV(XrdOucCacheIOCB &iocb, const XrdOucIOVec *readV, int n)
296 {
297  struct ZHandler : ReadReqRH
298  { using ReadReqRH::ReadReqRH;
299  IOFile *m_io = nullptr;
300 
301  void Done(int result) override { m_io-> ReadVEnd(result, this); }
302  };
303 
305 
306  auto *rh = new ZHandler(ObtainReadSid(), &iocb);
307  rh->m_io = this;
308 
309  TRACEIO(Dump, "ReadV() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n);
310 
311  int retval = ReadVBegin(readV, n, rh);
312  if (retval != -EWOULDBLOCK)
313  {
314  rh->Done(retval);
315  }
316 }
317 
318 //______________________________________________________________________________
319 int IOFile::ReadVBegin(const XrdOucIOVec *readV, int n, ReadReqRH *rh)
320 {
321  long long file_size = FSize();
322  for (int i = 0; i < n; ++i)
323  {
324  const XrdOucIOVec &vr = readV[i];
325  if (vr.offset < 0 || vr.offset >= file_size ||
326  vr.offset + vr.size > file_size)
327  {
328  return -EINVAL;
329  }
330  rh->m_expected_size += vr.size;
331  }
332  rh->m_n_chunks = n;
333 
334  return m_file->ReadV(this, readV, n, rh);
335 }
336 
337 //______________________________________________________________________________
338 int IOFile::ReadVEnd(int retval, ReadReqRH *rh)
339 {
340  TRACEIO(Dump, "ReadVEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id <<
341  " retval: " << retval << " n_chunks: " << rh->m_n_chunks << " expected_size: " << rh->m_expected_size);
342 
343  if (retval < 0) {
344  TRACEIO(Debug, "ReadVEnd() error in File::ReadV(), exit status=" << retval << ", error=" << XrdSysE2T(-retval));
345  } else if (retval < rh->m_expected_size) {
346  TRACEIO(Debug, "ReadVEnd() bytes missed " << rh->m_expected_size - retval);
347  }
348  if (rh->m_iocb)
349  rh->m_iocb->Done(retval);
350 
351  delete rh;
352 
354 
355  return retval;
356 }
#define XrdOssOK
Definition: XrdOss.hh:50
#define TRACE_Error
Definition: XrdPfcTrace.hh:7
#define TRACEIO(act, x)
Definition: XrdPfcTrace.hh:63
int stat(const char *path, struct stat *buf)
#define XRD_TRACE
Definition: XrdScheduler.cc:48
struct myOpts opts
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
static const uint64_t forceCS
Definition: XrdOucCache.hh:188
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:152
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition: XrdPfc.cc:925
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition: XrdPfc.cc:389
XrdOss * GetOss() const
Definition: XrdPfc.hh:268
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:132
void ReleaseFile(File *, IO *)
Definition: XrdPfc.cc:471
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:785
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:563
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:316
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:748
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:230
long long GetFileSize() const
Definition: XrdPfcFile.hh:267
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:239
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:39
void Update(XrdOucCacheIO &iocp) override
void DetachFinalize() override
Abstract virtual method of XrdPfc::IO Called to destruct the IO object after it is no longer used.
int Read(char *buff, long long off, int size) override
Pass Read request to the corresponding File object.
int Fstat(struct stat &sbuff) override
Definition: XrdPfcIOFile.cc:52
bool ioActive() override
Abstract virtual method of XrdPfc::IO Called to check if destruction needs to be done in a separate t...
int ReadV(const XrdOucIOVec *readV, int n) override
Pass ReadV request to the corresponding File object.
long long FSize() override
Definition: XrdPfcIOFile.cc:66
IOFile(XrdOucCacheIO *io, Cache &cache)
Definition: XrdPfcIOFile.cc:36
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
Definition: XrdOucCache.cc:39
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
int m_incomplete_count
Definition: XrdPfcIO.hh:88
std::map< int, int > m_error_counts
Definition: XrdPfcIO.hh:89
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:31
Cache & m_cache
reference to Cache object
Definition: XrdPfcIO.hh:50
RAtomic_int m_active_read_reqs
number of active read requests
Definition: XrdPfcIO.hh:70
const char * RefreshLocation()
Definition: XrdPfcIO.hh:55
void Update(XrdOucCacheIO &iocp) override
Definition: XrdPfcIO.cc:17
unsigned short ObtainReadSid()
Definition: XrdPfcIO.hh:57
std::string GetFilename()
Definition: XrdPfcIO.cc:36
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309
Definition: XrdPfc.hh:41
@ hex1
Definition: XrdSysTrace.hh:42
long long offset
Definition: XrdOucIOVec.hh:42
XrdOucCacheIOCB * m_iocb
Definition: XrdPfcFile.hh:54
unsigned short m_seq_id
Definition: XrdPfcFile.hh:53
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition: XrdPfcFile.hh:56