XRootD
XrdPfcIOFileBlock.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include <math.h>
20 #include <sstream>
21 #include <cstdio>
22 #include <iostream>
23 #include <assert.h>
24 #include <fcntl.h>
25 
26 #include "XrdPfcIOFileBlock.hh"
27 #include "XrdPfc.hh"
28 #include "XrdPfcStats.hh"
29 #include "XrdPfcTrace.hh"
30 
31 #include "XrdSys/XrdSysError.hh"
33 
34 #include "XrdOuc/XrdOucEnv.hh"
35 
36 using namespace XrdPfc;
37 
38 //______________________________________________________________________________
40  IO(io, cache), m_localStat(0), m_info(cache.GetTrace(), false), m_info_file(0)
41 {
43  GetBlockSizeFromPath();
44  initLocalStat();
45 }
46 
47 //______________________________________________________________________________
49 {
50  // called from Detach() if no sync is needed or
51  // from Cache's sync thread
52 
53  TRACEIO(Debug, "deleting IOFileBlock");
54 }
55 
56 // Check if m_mutex is needed at all, it is only used in ioActive and DetachFinalize
57 // and in Read for block selection -- see if Prefetch Read requires mutex
58 // to be held.
59 // I think I need it in ioActive and Read.
60 
61 //______________________________________________________________________________
63 {
64  IO::Update(iocp);
65  {
66  XrdSysMutexHelper lock(&m_mutex);
67 
68  for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
69  {
70  // Need to update all File / block objects.
71  if (it->second) it->second->ioUpdated(this);
72  }
73  }
74 }
75 
76 //______________________________________________________________________________
78 {
79  // Called from XrdPosixFile when local connection is closed.
80 
82 
83  bool active = false;
84  {
85  XrdSysMutexHelper lock(&m_mutex);
86 
87  for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
88  {
89  // Need to initiate stop on all File / block objects.
90  if (it->second && it->second->ioActive(this))
91  {
92  active = true;
93  }
94  }
95  }
96 
97  return active;
98 }
99 
100 //______________________________________________________________________________
102 {
103  // Effectively a destructor.
104 
105  TRACEIO(Info, "DetachFinalize() " << this);
106 
107  CloseInfoFile();
108  {
109  XrdSysMutexHelper lock(&m_mutex);
110  for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
111  {
112  if (it->second)
113  {
114  it->second->RequestSyncOfDetachStats();
115  m_cache.ReleaseFile(it->second, this);
116  }
117  }
118  }
119 
120  delete this;
121 }
122 
123 //______________________________________________________________________________
124 void IOFileBlock::CloseInfoFile()
125 {
126  // write access statistics to info file and close it
127  // detach time is needed for file purge
128  if (m_info_file)
129  {
130  if (m_info.GetFileSize() > 0)
131  {
132  // We do not maintain access statistics for individual blocks.
133  Stats as;
134  m_info.WriteIOStatDetach(as);
135  }
136  m_info.Write(m_info_file, GetFilename().c_str());
137  m_info_file->Fsync();
138  m_info_file->Close();
139 
140  delete m_info_file;
141  m_info_file = 0;
142  }
143 }
144 
145 //______________________________________________________________________________
146 void IOFileBlock::GetBlockSizeFromPath()
147 {
148  const static std::string tag = "hdfsbsize=";
149 
150  std::string path = GetInput()->Path();
151  size_t pos1 = path.find(tag);
152  size_t t = tag.length();
153 
154  if (pos1 != path.npos)
155  {
156  pos1 += t;
157  size_t pos2 = path.find("&", pos1);
158  if (pos2 != path.npos )
159  {
160  std::string bs = path.substr(pos1, pos2 - pos1);
161  m_blocksize = atoi(bs.c_str());
162  }
163  else
164  {
165  m_blocksize = atoi(path.substr(pos1).c_str());
166  }
167 
168  TRACEIO(Debug, "GetBlockSizeFromPath(), blocksize = " << m_blocksize );
169  }
170 }
171 
172 //______________________________________________________________________________
173 File* IOFileBlock::newBlockFile(long long off, int blocksize)
174 {
175  // NOTE: Can return 0 if opening of a local file fails!
176 
177  std::string fname = GetFilename();
178 
179  std::stringstream ss;
180  ss << fname;
181  char offExt[64];
182  // filename like <origpath>___<size>_<offset>
183  sprintf(&offExt[0], "___%lld_%lld", m_blocksize, off);
184  ss << &offExt[0];
185  fname = ss.str();
186 
187  TRACEIO(Debug, "FileBlock(), create XrdPfcFile ");
188 
189  File *file = Cache::GetInstance().GetFile(fname, this, off, blocksize);
190  return file;
191 }
192 
193 //______________________________________________________________________________
194 int IOFileBlock::Fstat(struct stat &sbuff)
195 {
196  // local stat is create in constructor. if file was on disk before
197  // attach that the only way stat was not successful is becuse there
198  // were info file read errors
199  if ( ! m_localStat) return -ENOENT;
200 
201  memcpy(&sbuff, m_localStat, sizeof(struct stat));
202  return 0;
203 }
204 
205 //______________________________________________________________________________
207 {
208  if ( ! m_localStat) return -ENOENT;
209 
210  return m_localStat->st_size;
211 }
212 
213 //______________________________________________________________________________
214 int IOFileBlock::initLocalStat()
215 {
216  std::string path = GetFilename() + Info::s_infoExtension;
217 
218  int res = -1;
219  struct stat tmpStat;
220  XrdOucEnv myEnv;
221 
222  // try to read from existing file
223  if (m_cache.GetOss()->Stat(path.c_str(), &tmpStat) == XrdOssOK)
224  {
225  m_info_file = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
226  if (m_info_file->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK)
227  {
228  if (m_info.Read(m_info_file, path.c_str()))
229  {
230  tmpStat.st_size = m_info.GetFileSize();
231  TRACEIO(Info, "initCachedStat successfully read size from existing info file = " << tmpStat.st_size);
232  res = 0;
233  }
234  else
235  {
236  // file exist but can't read it
237  TRACEIO(Debug, "initCachedStat info file is not complete");
238  }
239  }
240  }
241 
242  // if there is no local info file, try to read from client and then save stat into a new *cinfo file
243  if (res)
244  {
245  if (m_info_file) { delete m_info_file; m_info_file = 0; }
246 
247  res = GetInput()->Fstat(tmpStat);
248  TRACEIO(Debug, "initCachedStat get stat from client res= " << res << "size = " << tmpStat.st_size);
249  if (res == 0)
250  {
251  if (m_cache.GetOss()->Create(m_cache.RefConfiguration().m_username.c_str(), path.c_str(), 0600, myEnv, XRDOSS_mkpath) == XrdOssOK)
252  {
253  m_info_file = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
254  if (m_info_file->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK)
255  {
256  // This is writing the top-level cinfo
257  // The info file is used to get file size on defer open
258  // don't initalize buffer, it does not hold useful information in this case
260  // m_info.DisableDownloadStatus(); -- this stopped working a while back.
261  m_info.Write(m_info_file, path.c_str());
262  m_info_file->Fsync();
263  }
264  else
265  {
266  TRACEIO(Error, "initCachedStat can't open info file path");
267  }
268  }
269  else
270  {
271  TRACEIO(Error, "initCachedStat can't create info file path");
272  }
273  }
274  }
275 
276  if (res == 0)
277  {
278  m_localStat = new struct stat;
279  memcpy(m_localStat, &tmpStat, sizeof(struct stat));
280  }
281 
282  return res;
283 }
284 
285 //______________________________________________________________________________
286 int IOFileBlock::Read(char *buff, long long off, int size)
287 {
288  // protect from reads over the file size
289 
290  long long fileSize = FSize();
291 
292  if (off >= fileSize)
293  return 0;
294  if (off < 0)
295  {
296  return -EINVAL;
297  }
298  if (off + size > fileSize)
299  size = fileSize - off;
300 
301  long long off0 = off;
302  int idx_first = off0 / m_blocksize;
303  int idx_last = (off0 + size - 1) / m_blocksize;
304  int bytes_read = 0;
305  TRACEIO(Dump, "Read() "<< off << "@" << size << " block range ["<< idx_first << ", " << idx_last << "]");
306 
307  for (int blockIdx = idx_first; blockIdx <= idx_last; ++blockIdx)
308  {
309  // locate block
310  File *fb;
311  m_mutex.Lock();
312  std::map<int, File*>::iterator it = m_blocks.find(blockIdx);
313  if (it != m_blocks.end())
314  {
315  fb = it->second;
316  }
317  else
318  {
319  size_t pbs = m_blocksize;
320  // check if this is last block
321  int lastIOFileBlock = (fileSize-1)/m_blocksize;
322  if (blockIdx == lastIOFileBlock )
323  {
324  pbs = fileSize - blockIdx*m_blocksize;
325  // TRACEIO(Dump, "Read() last block, change output file size to " << pbs);
326  }
327 
328  // Note: File* can be 0 and stored as 0 if local open fails!
329  fb = newBlockFile(blockIdx*m_blocksize, pbs);
330  m_blocks.insert(std::make_pair(blockIdx, fb));
331  }
332  m_mutex.UnLock();
333 
334  // edit size if read request is reaching more than a block
335  int readBlockSize = size;
336  if (idx_first != idx_last)
337  {
338  if (blockIdx == idx_first)
339  {
340  readBlockSize = (blockIdx + 1) * m_blocksize - off0;
341  TRACEIO(Dump, "Read partially till the end of the block");
342  }
343  else if (blockIdx == idx_last)
344  {
345  readBlockSize = (off0 + size) - blockIdx * m_blocksize;
346  TRACEIO(Dump, "Read partially till the end of the block");
347  }
348  else
349  {
350  readBlockSize = m_blocksize;
351  }
352  }
353 
354  TRACEIO(Dump, "Read() block[ " << blockIdx << "] read-block-size[" << readBlockSize << "], offset[" << readBlockSize << "] off = " << off );
355 
356  int retvalBlock;
357  if (fb != 0)
358  {
359  struct ZHandler : public ReadReqRH
360  { using ReadReqRH::ReadReqRH;
361  XrdSysCondVar m_cond {0};
362  int m_retval {0};
363 
364  void Done(int result) override
365  { m_cond.Lock(); m_retval = result; m_cond.Signal(); m_cond.UnLock(); }
366  };
367 
368  ReadReqRHCond rh(ObtainReadSid(), nullptr);
369 
370  rh.m_cond.Lock();
371  retvalBlock = fb->Read(this, buff, off, readBlockSize, &rh);
372  if (retvalBlock == -EWOULDBLOCK)
373  {
374  rh.m_cond.Wait();
375  retvalBlock = rh.m_retval;
376  }
377  rh.m_cond.UnLock();
378  }
379  else
380  {
381  retvalBlock = GetInput()->Read(buff, off, readBlockSize);
382  }
383 
384  TRACEIO(Dump, "Read() Block read returned " << retvalBlock);
385  if (retvalBlock == readBlockSize)
386  {
387  bytes_read += retvalBlock;
388  buff += retvalBlock;
389  off += retvalBlock;
390  }
391  else if (retvalBlock >= 0)
392  {
393  TRACEIO(Warning, "Read() incomplete read, missing bytes " << readBlockSize-retvalBlock);
394  return -EIO;
395  }
396  else
397  {
398  TRACEIO(Error, "Read() read error, retval" << retvalBlock);
399  return retvalBlock;
400  }
401  }
402 
403  return bytes_read;
404 }
@ Warning
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
#define TRACEIO(act, x)
Definition: XrdPfcTrace.hh:63
int stat(const char *path, struct stat *buf)
virtual int Fsync()
Definition: XrdOss.hh:144
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual const char * Path()=0
virtual int Read(char *buff, long long offs, int rlen)=0
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:267
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition: XrdPfc.cc:415
XrdOss * GetOss() const
Definition: XrdPfc.hh:389
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:319
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:163
void ReleaseFile(File *, IO *)
Definition: XrdPfc.cc:497
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:676
void Update(XrdOucCacheIO &iocp) override
long long FSize() override
virtual int Read(char *buff, long long offs, int rlen)=0
Pass Read request to the corresponding File object.
int Fstat(struct stat &sbuff) override
IOFileBlock(XrdOucCacheIO *io, Cache &cache)
bool ioActive() override
Abstract virtual method of XrdPfc::IO Called to check if destruction needs to be done in a separate t...
void DetachFinalize() override
Abstract virtual method of XrdPfc::IO Called to destruct the IO object after it is no longer used.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
std::string GetFilename()
Definition: XrdPfcIO.hh:56
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:30
Cache & m_cache
reference to Cache object
Definition: XrdPfcIO.hh:52
const char * RefreshLocation()
Definition: XrdPfcIO.hh:57
void Update(XrdOucCacheIO &iocp) override
Definition: XrdPfcIO.cc:16
unsigned short ObtainReadSid()
Definition: XrdPfcIO.hh:59
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:313
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:268
long long GetFileSize() const
Get file size.
Definition: XrdPfcInfo.hh:446
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:296
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:163
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:440
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:31
Definition: XrdPfc.hh:41
XrdSysTrace * GetTrace()
Definition: XrdPfcPurge.cc:16
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:108
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:101
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:81
XrdSysCondVar m_cond
Definition: XrdPfcIO.hh:63
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition: XrdPfcFile.hh:67