XRootD
XrdPfcIOFileBlock.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include "XrdPfcIOFileBlock.hh"
20 #include "XrdPfc.hh"
21 #include "XrdPfcStats.hh"
22 #include "XrdPfcTrace.hh"
23 
24 #include "XrdOss/XrdOss.hh"
26 #include "XrdSys/XrdSysError.hh"
27 
28 #include "XrdOuc/XrdOucEnv.hh"
29 
30 #include <cmath>
31 #include <sstream>
32 #include <cstdio>
33 #include <iostream>
34 #include <cassert>
35 #include <fcntl.h>
36 
37 using namespace XrdPfc;
38 
39 //______________________________________________________________________________
41  IO(io, cache), m_localStat(0), m_info(cache.GetTrace(), false), m_info_file(0)
42 {
44  GetBlockSizeFromPath();
45  initLocalStat();
46 }
47 
48 //______________________________________________________________________________
50 {
51  // called from Detach() if no sync is needed or
52  // from Cache's sync thread
53 
54  TRACEIO(Debug, "deleting IOFileBlock");
55 }
56 
57 // Check if m_mutex is needed at all, it is only used in ioActive and DetachFinalize
58 // and in Read for block selection -- see if Prefetch Read requires mutex
59 // to be held.
60 // I think I need it in ioActive and Read.
61 
62 //______________________________________________________________________________
64 {
65  IO::Update(iocp);
66  {
67  XrdSysMutexHelper lock(&m_mutex);
68 
69  for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
70  {
71  // Need to update all File / block objects.
72  if (it->second) it->second->ioUpdated(this);
73  }
74  }
75 }
76 
77 //______________________________________________________________________________
79 {
80  // Called from XrdPosixFile when local connection is closed.
81 
83 
84  bool active = false;
85  {
86  XrdSysMutexHelper lock(&m_mutex);
87 
88  for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
89  {
90  // Need to initiate stop on all File / block objects.
91  if (it->second && it->second->ioActive(this))
92  {
93  active = true;
94  }
95  }
96  }
97 
98  return active;
99 }
100 
101 //______________________________________________________________________________
103 {
104  // Effectively a destructor.
105 
106  TRACEIO(Info, "DetachFinalize() " << this);
107 
108  CloseInfoFile();
109  {
110  XrdSysMutexHelper lock(&m_mutex);
111  for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
112  {
113  if (it->second)
114  {
115  it->second->RequestSyncOfDetachStats();
116  m_cache.ReleaseFile(it->second, this);
117  }
118  }
119  }
120 
121  delete this;
122 }
123 
124 //______________________________________________________________________________
125 void IOFileBlock::CloseInfoFile()
126 {
127  // write access statistics to info file and close it
128  // detach time is needed for file purge
129  if (m_info_file)
130  {
131  if (m_info.GetFileSize() > 0)
132  {
133  // We do not maintain access statistics for individual blocks.
134  Stats as;
135  m_info.WriteIOStatDetach(as);
136  }
137  m_info.Write(m_info_file, GetFilename().c_str());
138  m_info_file->Fsync();
139  m_info_file->Close();
140 
141  delete m_info_file;
142  m_info_file = 0;
143  }
144 }
145 
146 //______________________________________________________________________________
147 void IOFileBlock::GetBlockSizeFromPath()
148 {
149  const static std::string tag = "hdfsbsize=";
150 
151  std::string path = GetInput()->Path();
152  size_t pos1 = path.find(tag);
153  size_t t = tag.length();
154 
155  if (pos1 != path.npos)
156  {
157  pos1 += t;
158  size_t pos2 = path.find("&", pos1);
159  if (pos2 != path.npos )
160  {
161  std::string bs = path.substr(pos1, pos2 - pos1);
162  m_blocksize = atoi(bs.c_str());
163  }
164  else
165  {
166  m_blocksize = atoi(path.substr(pos1).c_str());
167  }
168 
169  TRACEIO(Debug, "GetBlockSizeFromPath(), blocksize = " << m_blocksize );
170  }
171 }
172 
173 //______________________________________________________________________________
174 File* IOFileBlock::newBlockFile(long long off, int blocksize)
175 {
176  // NOTE: Can return 0 if opening of a local file fails!
177 
178  std::string fname = GetFilename();
179 
180  std::stringstream ss;
181  ss << fname;
182  char offExt[64];
183  // filename like <origpath>___<size>_<offset>
184  sprintf(&offExt[0], "___%lld_%lld", m_blocksize, off);
185  ss << &offExt[0];
186  fname = ss.str();
187 
188  TRACEIO(Debug, "FileBlock(), create XrdPfcFile ");
189 
190  File *file = Cache::GetInstance().GetFile(fname, this, off, blocksize);
191  return file;
192 }
193 
194 //______________________________________________________________________________
195 int IOFileBlock::Fstat(struct stat &sbuff)
196 {
197  // local stat is create in constructor. if file was on disk before
198  // attach that the only way stat was not successful is becuse there
199  // were info file read errors
200  if ( ! m_localStat) return -ENOENT;
201 
202  memcpy(&sbuff, m_localStat, sizeof(struct stat));
203  return 0;
204 }
205 
206 //______________________________________________________________________________
208 {
209  if ( ! m_localStat) return -ENOENT;
210 
211  return m_localStat->st_size;
212 }
213 
214 //______________________________________________________________________________
215 int IOFileBlock::initLocalStat()
216 {
217  std::string path = GetFilename() + Info::s_infoExtension;
218 
219  int res = -1;
220  struct stat tmpStat;
221  XrdOucEnv myEnv;
222 
223  // try to read from existing file
224  if (m_cache.GetOss()->Stat(path.c_str(), &tmpStat) == XrdOssOK)
225  {
226  m_info_file = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
227  if (m_info_file->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK)
228  {
229  if (m_info.Read(m_info_file, path.c_str()))
230  {
231  tmpStat.st_size = m_info.GetFileSize();
232  TRACEIO(Info, "initCachedStat successfully read size from existing info file = " << tmpStat.st_size);
233  res = 0;
234  }
235  else
236  {
237  // file exist but can't read it
238  TRACEIO(Debug, "initCachedStat info file is not complete");
239  }
240  }
241  }
242 
243  // if there is no local info file, try to read from client and then save stat into a new *cinfo file
244  if (res)
245  {
246  if (m_info_file) { delete m_info_file; m_info_file = 0; }
247 
248  res = GetInput()->Fstat(tmpStat);
249  TRACEIO(Debug, "initCachedStat get stat from client res= " << res << "size = " << tmpStat.st_size);
250  if (res == 0)
251  {
252  if (m_cache.GetOss()->Create(m_cache.RefConfiguration().m_username.c_str(), path.c_str(), 0600, myEnv, XRDOSS_mkpath) == XrdOssOK)
253  {
254  m_info_file = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
255  if (m_info_file->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK)
256  {
257  // This is writing the top-level cinfo
258  // The info file is used to get file size on defer open
259  // don't initalize buffer, it does not hold useful information in this case
261  // m_info.DisableDownloadStatus(); -- this stopped working a while back.
262  m_info.Write(m_info_file, path.c_str());
263  m_info_file->Fsync();
264  }
265  else
266  {
267  TRACEIO(Error, "initCachedStat can't open info file path");
268  }
269  }
270  else
271  {
272  TRACEIO(Error, "initCachedStat can't create info file path");
273  }
274  }
275  }
276 
277  if (res == 0)
278  {
279  m_localStat = new struct stat;
280  memcpy(m_localStat, &tmpStat, sizeof(struct stat));
281  }
282 
283  return res;
284 }
285 
286 //______________________________________________________________________________
287 int IOFileBlock::Read(char *buff, long long off, int size)
288 {
289  // protect from reads over the file size
290 
291  long long fileSize = FSize();
292 
293  if (off >= fileSize)
294  return 0;
295  if (off < 0)
296  {
297  return -EINVAL;
298  }
299  if (off + size > fileSize)
300  size = fileSize - off;
301 
302  long long off0 = off;
303  int idx_first = off0 / m_blocksize;
304  int idx_last = (off0 + size - 1) / m_blocksize;
305  int bytes_read = 0;
306  TRACEIO(Dump, "Read() "<< off << "@" << size << " block range ["<< idx_first << ", " << idx_last << "]");
307 
308  for (int blockIdx = idx_first; blockIdx <= idx_last; ++blockIdx)
309  {
310  // locate block
311  File *fb;
312  m_mutex.Lock();
313  std::map<int, File*>::iterator it = m_blocks.find(blockIdx);
314  if (it != m_blocks.end())
315  {
316  fb = it->second;
317  }
318  else
319  {
320  size_t pbs = m_blocksize;
321  // check if this is last block
322  int lastIOFileBlock = (fileSize-1)/m_blocksize;
323  if (blockIdx == lastIOFileBlock )
324  {
325  pbs = fileSize - blockIdx*m_blocksize;
326  // TRACEIO(Dump, "Read() last block, change output file size to " << pbs);
327  }
328 
329  // Note: File* can be 0 and stored as 0 if local open fails!
330  fb = newBlockFile(blockIdx*m_blocksize, pbs);
331  m_blocks.insert(std::make_pair(blockIdx, fb));
332  }
333  m_mutex.UnLock();
334 
335  // edit size if read request is reaching more than a block
336  int readBlockSize = size;
337  if (idx_first != idx_last)
338  {
339  if (blockIdx == idx_first)
340  {
341  readBlockSize = (blockIdx + 1) * m_blocksize - off0;
342  TRACEIO(Dump, "Read partially till the end of the block");
343  }
344  else if (blockIdx == idx_last)
345  {
346  readBlockSize = (off0 + size) - blockIdx * m_blocksize;
347  TRACEIO(Dump, "Read partially till the end of the block");
348  }
349  else
350  {
351  readBlockSize = m_blocksize;
352  }
353  }
354 
355  TRACEIO(Dump, "Read() block[ " << blockIdx << "] read-block-size[" << readBlockSize << "], offset[" << readBlockSize << "] off = " << off );
356 
357  int retvalBlock;
358  if (fb != 0)
359  {
360  struct ZHandler : public ReadReqRH
361  { using ReadReqRH::ReadReqRH;
362  XrdSysCondVar m_cond {0};
363  int m_retval {0};
364 
365  void Done(int result) override
366  { m_cond.Lock(); m_retval = result; m_cond.Signal(); m_cond.UnLock(); }
367  };
368 
369  ReadReqRHCond rh(ObtainReadSid(), nullptr);
370 
371  rh.m_cond.Lock();
372  retvalBlock = fb->Read(this, buff, off, readBlockSize, &rh);
373  if (retvalBlock == -EWOULDBLOCK)
374  {
375  rh.m_cond.Wait();
376  retvalBlock = rh.m_retval;
377  }
378  rh.m_cond.UnLock();
379  }
380  else
381  {
382  retvalBlock = GetInput()->Read(buff, off, readBlockSize);
383  }
384 
385  TRACEIO(Dump, "Read() Block read returned " << retvalBlock);
386  if (retvalBlock == readBlockSize)
387  {
388  bytes_read += retvalBlock;
389  buff += retvalBlock;
390  off += retvalBlock;
391  }
392  else if (retvalBlock >= 0)
393  {
394  TRACEIO(Warning, "Read() incomplete read, missing bytes " << readBlockSize-retvalBlock);
395  return -EIO;
396  }
397  else
398  {
399  TRACEIO(Error, "Read() read error, retval" << retvalBlock);
400  return retvalBlock;
401  }
402  }
403 
404  return bytes_read;
405 }
@ Warning
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
#define TRACEIO(act, x)
Definition: XrdPfcTrace.hh:63
int stat(const char *path, struct stat *buf)
virtual int Fsync()
Definition: XrdOss.hh:144
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual const char * Path()=0
virtual int Read(char *buff, long long offs, int rlen)=0
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:152
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition: XrdPfc.cc:389
XrdOss * GetOss() const
Definition: XrdPfc.hh:268
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:204
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:132
void ReleaseFile(File *, IO *)
Definition: XrdPfc.cc:471
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:748
void Update(XrdOucCacheIO &iocp) override
long long FSize() override
virtual int Read(char *buff, long long offs, int rlen)=0
Pass Read request to the corresponding File object.
int Fstat(struct stat &sbuff) override
IOFileBlock(XrdOucCacheIO *io, Cache &cache)
bool ioActive() override
Abstract virtual method of XrdPfc::IO Called to check if destruction needs to be done in a separate t...
void DetachFinalize() override
Abstract virtual method of XrdPfc::IO Called to destruct the IO object after it is no longer used.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:31
Cache & m_cache
reference to Cache object
Definition: XrdPfcIO.hh:50
const char * RefreshLocation()
Definition: XrdPfcIO.hh:55
void Update(XrdOucCacheIO &iocp) override
Definition: XrdPfcIO.cc:17
unsigned short ObtainReadSid()
Definition: XrdPfcIO.hh:57
std::string GetFilename()
Definition: XrdPfcIO.cc:36
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:266
long long GetFileSize() const
Get file size.
Definition: XrdPfcInfo.hh:442
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:294
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:161
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:438
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
Definition: XrdPfc.hh:41
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:115
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:108
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
XrdSysCondVar m_cond
Definition: XrdPfcIO.hh:61
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition: XrdPfcFile.hh:56