XRootD
XrdOssCsiFile.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O s s C s i F i l e . c c */
4 /* */
5 /* (C) Copyright 2021 CERN. */
6 /* */
7 /* This file is part of the XRootD software suite. */
8 /* */
9 /* XRootD is free software: you can redistribute it and/or modify it under */
10 /* the terms of the GNU Lesser General Public License as published by the */
11 /* Free Software Foundation, either version 3 of the License, or (at your */
12 /* option) any later version. */
13 /* */
14 /* In applying this licence, CERN does not waive the privileges and */
15 /* immunities granted to it by virtue of its status as an Intergovernmental */
16 /* Organization or submit itself to any jurisdiction. */
17 /* */
18 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
19 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
20 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
21 /* License for more details. */
22 /* */
23 /* You should have received a copy of the GNU Lesser General Public License */
24 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
25 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
26 /* */
27 /* The copyright holder's institutional names and contributor's names may not */
28 /* be used to endorse or promote products derived from this software without */
29 /* specific prior written permission of the institution or contributor. */
30 /******************************************************************************/
31 
32 #include "XrdOssCsi.hh"
33 #include "XrdOssCsiTrace.hh"
34 #include "XrdOssCsiTagstoreFile.hh"
35 #include "XrdOssCsiPages.hh"
36 #include "XrdOssCsiRanges.hh"
37 #include "XrdOuc/XrdOucCRC.hh"
38 #include "XrdOuc/XrdOucEnv.hh"
39 #include "XrdSfs/XrdSfsAio.hh"
40 #include "XrdSys/XrdSysPageSize.hh"
41 #include "XrdVersion.hh"
42 #include "XrdSfs/XrdSfsAio.hh"
43 
44 #include <string>
45 #include <algorithm>
46 
47 #include <sys/types.h>
48 #include <sys/stat.h>
49 #include <unistd.h>
50 #include <fcntl.h>
51 #include <limits.h>
52 #include <assert.h>
53 
56 
57 // storage for class members
59 std::unordered_map<std::string, std::shared_ptr<XrdOssCsiFile::puMapItem_t> > XrdOssCsiFile::pumap_;
60 
61 //
62 // If no others hold a pointer to Pages object, close it and remoe the pagemap info object.
63 //
64 int XrdOssCsiFile::pageMapClose()
65 {
66  if (!pmi_) return -EBADF;
67  bool doclose = false;
68 
69  XrdSysMutexHelper lck(pmi_->mtx);
70  if (mapRelease(pmi_)) doclose = true;
71 
72  int cpret = 0;
73  if (doclose)
74  {
75  if (pmi_->pages)
76  {
77  cpret = pmi_->pages->Close();
78  pmi_->pages.reset();
79  }
80  }
81 
82  lck.UnLock();
83  pmi_.reset();
84 
85  return cpret;
86 }
87 
88 void XrdOssCsiFile::mapTake(const std::string &key, std::shared_ptr<puMapItem_t> &pmi, const bool create)
89 {
91  auto mapidx = pumap_.find(key);
92  if (mapidx == pumap_.end())
93  {
94  if (!create) return;
95  pmi.reset(new puMapItem_t());
96  pmi->tpath = key;
97  if (!key.empty())
98  {
99  pumap_.insert(std::make_pair(key, pmi));
100  }
101  }
102  else
103  {
104  pmi = mapidx->second;
105  }
106  pmi->refcount++;
107 }
108 
109 int XrdOssCsiFile::mapRelease(std::shared_ptr<puMapItem_t> &pmi, XrdSysMutexHelper *plck)
110 {
112  pmi->refcount--;
113  auto mapidx = pumap_.find(pmi->tpath);
114  if (pmi->refcount == 0 || pmi->unlinked)
115  {
116  if (mapidx != pumap_.end() && mapidx->second == pmi)
117  {
118  pumap_.erase(mapidx);
119  }
120  }
121  if (plck) plck->UnLock();
122  return (pmi->refcount == 0) ? 1 : 0;
123 }
124 
125 int XrdOssCsiFile::pageAndFileOpen(const char *fn, const int dflags, const int Oflag, const mode_t Mode, XrdOucEnv &Env)
126 {
127  if (pmi_) return -EBADF;
128 
129  {
130  std::string tpath = config_.tagParam_.makeTagFilename(fn);
131  mapTake(tpath, pmi_);
132  }
133 
134  XrdSysMutexHelper lck(pmi_->mtx);
135  pmi_->dpath = fn;
136  if (pmi_->unlinked)
137  {
138  mapRelease(pmi_, &lck);
139  // filename replaced since check, try again
140  pmi_.reset();
141  return pageAndFileOpen(fn, dflags, Oflag, Mode, Env);
142  }
143 
144  if ((dflags & O_TRUNC) && pmi_->pages)
145  {
146  // truncate of already open file at open() not supported
147  mapRelease(pmi_, &lck);
148  pmi_.reset();
149  return -EDEADLK;
150  }
151 
152  const int dataret = successor_->Open(pmi_->dpath.c_str(), dflags, Mode, Env);
153  int pageret = XrdOssOK;
154  if (dataret == XrdOssOK)
155  {
156  if (pmi_->pages)
157  {
158  return XrdOssOK;
159  }
160 
161  pageret = createPageUpdater(Oflag, Env);
162  if (pageret == XrdOssOK)
163  {
164  return XrdOssOK;
165  }
166 
167  // failed to open the datafile or create the page object.
168  // close datafile if needed
169  (void) successor_->Close();
170  }
171 
172  mapRelease(pmi_, &lck);
173  pmi_.reset();
174 
175  return (dataret != XrdOssOK) ? dataret : pageret;
176 }
177 
179 {
180  if (pmi_)
181  {
182  (void)Close();
183  }
184 }
185 
186 int XrdOssCsiFile::Close(long long *retsz)
187 {
188  if (!pmi_)
189  {
190  return -EBADF;
191  }
192 
193  // wait for any ongoing aios to finish
194  aioWait();
195 
196  const int cpret = pageMapClose();
197 
198  const int csret = successor_->Close(retsz);
199  if (cpret<0) return cpret;
200  return csret;
201 }
202 
203 int XrdOssCsiFile::createPageUpdater(const int Oflag, XrdOucEnv &Env)
204 {
205  std::unique_ptr<XrdOucEnv> tagEnv = XrdOssCsi::tagOpenEnv(config_, Env);
206 
207  // get information about data file size
208  off_t dsize = 0;
209  if (!(Oflag & O_EXCL) && !(Oflag & O_TRUNC))
210  {
211  struct stat sb;
212  const int sstat = successor_->Fstat(&sb);
213  if (sstat<0)
214  {
215  return sstat;
216  }
217  dsize = sb.st_size;
218  }
219 
220  // tag file always opened O_RDWR as the Tagstore/Pages object associated will be shared
221  // between any File instances which concurrently access the file
222  // (some of which may be RDWR, some RDONLY)
223  int tagFlags = O_RDWR;
224 
225  // data file was truncated, do same to tag file and let it be reset
226  if ((Oflag & O_TRUNC)) tagFlags |= O_TRUNC;
227 
228  // The concern with allowing creation of a new tag file is that the data file may
229  // already exist. Creating a new empty tag file would usually cause subsequent access
230  // errors, but not if the data file starts empty. In addition we may have been
231  // configured to ignore missing tag files. Approach taken is that:
232  // If the data file creation was wanted and it is currently zero length then
233  // allow creation of tag file.
234  if ((Oflag & O_CREAT) && dsize == 0)
235  {
236  tagFlags |= O_CREAT;
237  }
238 
239  // be sure the leading directories exist for the tag file
240  if ((tagFlags & O_CREAT))
241  {
242  int mkdret = XrdOssOK;
243  {
244  std::string base = pmi_->tpath;
245  const size_t idx = base.rfind("/");
246  base = base.substr(0,idx);
247  if (!base.empty())
248  {
249  const int AMode = S_IRWXU|S_IRWXG|S_IROTH|S_IXOTH; // 775
250  mkdret = parentOss_->Mkdir(base.c_str(), AMode, 1, tagEnv.get());
251  }
252  }
253  if (mkdret != XrdOssOK && mkdret != -EEXIST)
254  {
255  return mkdret;
256  }
257  }
258 
259  std::unique_ptr<XrdOssDF> integFile(parentOss_->newFile(tident));
260  std::unique_ptr<XrdOssCsiTagstore> ts(new
261  XrdOssCsiTagstoreFile(pmi_->dpath, std::move(integFile), tident));
262  std::unique_ptr<XrdOssCsiPages> pages(new
263  XrdOssCsiPages(pmi_->dpath, std::move(ts), config_.fillFileHole(), config_.allowMissingTags(),
264  config_.disablePgExtend(), config_.disableLooseWrite(), tident));
265 
266  int puret = pages->Open(pmi_->tpath.c_str(), dsize, tagFlags, *tagEnv);
267  if (puret<0)
268  {
269  if ((puret == -EROFS || puret == -EACCES) && rdonly_)
270  {
271  // try to open tag file readonly
272  puret = pages->Open(pmi_->tpath.c_str(), dsize, O_RDONLY, *tagEnv);
273  }
274  }
275 
276  if (puret<0)
277  {
278  return puret;
279  }
280 
281  pages->BasicConsistencyCheck(successor_);
282  pmi_->pages = std::move(pages);
283  return XrdOssOK;
284 }
285 
286 int XrdOssCsiFile::Open(const char *path, const int Oflag, const mode_t Mode, XrdOucEnv &Env)
287 {
288  char cxid[4];
289 
290  if (pmi_)
291  {
292  // already open
293  return -EINVAL;
294  }
295 
296  if (!path)
297  {
298  return -EINVAL;
299  }
300  if (config_.tagParam_.isTagFile(path))
301  {
302  if ((Oflag & O_CREAT)) return -EACCES;
303  return -ENOENT;
304  }
305 
306  int dflags = Oflag;
307  if ((dflags & O_ACCMODE) == O_WRONLY)
308  {
309  // for non-aligned writes it may be needed to do read-modify-write
310  dflags &= ~O_ACCMODE;
311  dflags |= O_RDWR;
312  }
313 
314  rdonly_ = true;
315  if ((dflags & O_ACCMODE) != O_RDONLY)
316  {
317  rdonly_ = false;
318  }
319 
320  const int oret = pageAndFileOpen(path, dflags, Oflag, Mode, Env);
321  if (oret<0)
322  {
323  return oret;
324  }
325 
326  if (successor_->isCompressed(cxid)>0)
327  {
328  (void)Close();
329  return -ENOTSUP;
330  }
331 
332  if (Pages()->IsReadOnly() && !rdonly_)
333  {
334  (void)Close();
335  return -EACCES;
336  }
337  return XrdOssOK;
338 }
339 
340 ssize_t XrdOssCsiFile::Read(off_t offset, size_t blen)
341 {
342  return successor_->Read(offset, blen);
343 }
344 
345 ssize_t XrdOssCsiFile::Read(void *buff, off_t offset, size_t blen)
346 {
347  if (!pmi_) return -EBADF;
348 
350  Pages()->LockTrackinglen(rg, offset, offset+blen, true);
351 
352  const ssize_t bread = successor_->Read(buff, offset, blen);
353  if (bread<0 || blen==0) return bread;
354 
355  const ssize_t puret = Pages()->VerifyRange(successor_, buff, offset, bread, rg);
356  if (puret<0) return puret;
357  return bread;
358 }
359 
360 ssize_t XrdOssCsiFile::ReadRaw(void *buff, off_t offset, size_t blen)
361 {
362  if (!pmi_) return -EBADF;
363 
365  Pages()->LockTrackinglen(rg, offset, offset+blen, true);
366 
367  const ssize_t bread = successor_->ReadRaw(buff, offset, blen);
368  if (bread<0 || blen==0) return bread;
369 
370  const ssize_t puret = Pages()->VerifyRange(successor_, buff, offset, bread, rg);
371  if (puret<0) return puret;
372  return bread;
373 }
374 
375 ssize_t XrdOssCsiFile::ReadV(XrdOucIOVec *readV, int n)
376 {
377  if (!pmi_) return -EBADF;
378  if (n==0) return 0;
379 
381  off_t start = readV[0].offset;
382  off_t end = start + (off_t)readV[0].size;
383  for(int i=1; i<n; i++)
384  {
385  const off_t p1 = readV[i].offset;
386  const off_t p2 = p1 + (off_t)readV[i].size;
387  if (p1<start) start = p1;
388  if (p2>end) end = p2;
389  }
390  Pages()->LockTrackinglen(rg, start, end, true);
391 
392  // standard OSS gives -ESPIPE in case of partial read of an element
393  ssize_t rret = successor_->ReadV(readV, n);
394  if (rret<0) return rret;
395  for (int i=0; i<n; i++)
396  {
397  if (readV[i].size == 0) continue;
398  ssize_t puret = Pages()->VerifyRange(successor_, readV[i].data, readV[i].offset, readV[i].size, rg);
399  if (puret<0) return puret;
400  }
401  return rret;
402 }
403 
404 ssize_t XrdOssCsiFile::Write(const void *buff, off_t offset, size_t blen)
405 {
406  if (!pmi_) return -EBADF;
407  if (rdonly_) return -EBADF;
408 
410  Pages()->LockTrackinglen(rg, offset, offset+blen, false);
411 
412  int puret = Pages()->UpdateRange(successor_, buff, offset, blen, rg);
413  if (puret<0)
414  {
415  rg.ReleaseAll();
416  resyncSizes();
417  return (ssize_t)puret;
418  }
419  ssize_t towrite = blen;
420  ssize_t bwritten = 0;
421  const uint8_t *p = (uint8_t*)buff;
422  while(towrite>0)
423  {
424  ssize_t wret = successor_->Write(&p[bwritten], offset+bwritten, towrite);
425  if (wret<0)
426  {
427  rg.ReleaseAll();
428  resyncSizes();
429  return wret;
430  }
431  towrite -= wret;
432  bwritten += wret;
433  }
434  return bwritten;
435 }
436 
437 ssize_t XrdOssCsiFile::WriteV(XrdOucIOVec *writeV, int n)
438 {
439  if (!pmi_) return -EBADF;
440  if (rdonly_) return -EBADF;
441  if (n==0) return 0;
442 
444  off_t start = writeV[0].offset;
445  off_t end = start + (off_t)writeV[0].size;
446  for(int i=1; i<n; i++)
447  {
448  const off_t p1 = writeV[i].offset;
449  const off_t p2 = p1 + (off_t)writeV[i].size;
450  if (p1<start) start = p1;
451  if (p2>end) end = p2;
452  }
453  Pages()->LockTrackinglen(rg, start, end, false);
454 
455  for (int i=0; i<n; i++)
456  {
457  int ret = Pages()->UpdateRange(successor_, writeV[i].data, writeV[i].offset, writeV[i].size, rg);
458  if (ret<0)
459  {
460  rg.ReleaseAll();
461  resyncSizes();
462  return ret;
463  }
464  }
465  // standard OSS gives -ESPIPE in case of partial write of an element
466  ssize_t wret = successor_->WriteV(writeV, n);
467  if (wret<0)
468  {
469  rg.ReleaseAll();
470  resyncSizes();
471  }
472  return wret;
473 }
474 
475 ssize_t XrdOssCsiFile::pgRead(void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
476 {
477  if (!pmi_) return -EBADF;
478 
480  Pages()->LockTrackinglen(rg, offset, offset+rdlen, true);
481 
482  // if we return a short amount of data the caller will have to deal with
483  // joining csvec values from repeated reads: for simplicity try to read as
484  // such as possible up to the request read length
485  ssize_t toread = rdlen;
486  ssize_t bread = 0;
487  uint8_t *const p = (uint8_t*)buffer;
488  do
489  {
490  ssize_t rret = successor_->Read(&p[bread], offset+bread, toread);
491  if (rret<0) return rret;
492  if (rret==0) break;
493  toread -= rret;
494  bread += rret;
495  } while(toread>0);
496  if (rdlen == 0) return bread;
497 
498  ssize_t puret = Pages()->FetchRange(successor_, buffer, offset, bread, csvec, opts, rg);
499  if (puret<0) return puret;
500  return bread;
501 }
502 
503 ssize_t XrdOssCsiFile::pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
504 {
505  if (!pmi_) return -EBADF;
506  if (rdonly_) return -EBADF;
507  uint64_t pgopts = opts;
508 
509  const int prec = XrdOssCsiPages::pgWritePrelockCheck(buffer, offset, wrlen, csvec, opts);
510  if (prec < 0)
511  {
512  return prec;
513  }
514 
516  Pages()->LockTrackinglen(rg, offset, offset+wrlen, false);
517 
518  int puret = Pages()->StoreRange(successor_, buffer, offset, wrlen, csvec, pgopts, rg);
519  if (puret<0) {
520  rg.ReleaseAll();
521  resyncSizes();
522  return (ssize_t)puret;
523  }
524  ssize_t towrite = wrlen;
525  ssize_t bwritten = 0;
526  const uint8_t *p = (uint8_t*)buffer;
527  do
528  {
529  ssize_t wret = successor_->Write(&p[bwritten], offset+bwritten, towrite);
530  if (wret<0)
531  {
532  rg.ReleaseAll();
533  resyncSizes();
534  return wret;
535  }
536  towrite -= wret;
537  bwritten += wret;
538  } while(towrite>0);
539  return bwritten;
540 }
541 
543 {
544  if (!pmi_) return -EBADF;
545 
546  const int psret = Pages()->Fsync();
547  const int ssret = successor_->Fsync();
548  if (psret<0) return psret;
549  return ssret;
550 }
551 
552 int XrdOssCsiFile::Ftruncate(unsigned long long flen)
553 {
554  if (!pmi_) return -EBADF;
555  if (rdonly_) return -EBADF;
556 
558  Pages()->LockTrackinglen(rg, flen, LLONG_MAX, false);
559  int ret = Pages()->truncate(successor_, flen, rg);
560  if (ret<0)
561  {
562  rg.ReleaseAll();
563  resyncSizes();
564  return ret;
565  }
566  ret = successor_->Ftruncate(flen);
567  if (ret<0)
568  {
569  rg.ReleaseAll();
570  resyncSizes();
571  }
572  return ret;
573 }
574 
575 int XrdOssCsiFile::Fstat(struct stat *buff)
576 {
577  if (!pmi_) return -EBADF;
579  const int tsret = Pages()->TrackedSizesGet(sizes, false);
580  const int fsret = successor_->Fstat(buff);
581  if (fsret<0) return fsret;
582  if (tsret<0) return 0;
583  buff->st_size = std::max(sizes.first, sizes.second);
584  return 0;
585 }
586 
587 int XrdOssCsiFile::resyncSizes()
588 {
590  Pages()->LockTrackinglen(rg, 0, LLONG_MAX, false);
591  struct stat sbuff;
592  int ret = successor_->Fstat(&sbuff);
593  if (ret<0) return ret;
594  Pages()->LockResetSizes(successor_, sbuff.st_size);
595  return 0;
596 }
597 
599 {
600  if (!pmi_) return;
601 
602  Pages()->Flush();
603  successor_->Flush();
604 }
605 
607 {
608  if (!pmi_) return 0;
609  return Pages()->VerificationStatus();
610 }
#define tident
XrdOucTrace OssCsiTrace
XrdSysError OssCsiEroute
Definition: XrdOssCsi.cc:52
#define XrdOssOK
Definition: XrdOss.hh:50
int stat(const char *path, struct stat *buf)
int Mode
struct myOpts opts
std::string makeTagFilename(const char *path)
bool isTagFile(const char *path)
bool fillFileHole() const
bool disableLooseWrite() const
bool disablePgExtend() const
bool allowMissingTags() const
virtual ssize_t pgWrite(void *, off_t, size_t, uint32_t *, uint64_t)
virtual int Fsync()
virtual ssize_t Write(const void *, off_t, size_t)
virtual int Ftruncate(unsigned long long)
virtual ssize_t pgRead(void *, off_t, size_t, uint32_t *, uint64_t)
virtual ssize_t ReadV(XrdOucIOVec *readV, int n)
virtual ssize_t Read(off_t, size_t)
int VerificationStatus()
virtual int Open(const char *, int, mode_t, XrdOucEnv &)
virtual int Close(long long *retsz=0)
virtual void Flush()
Flush filesystem cached pages for this file (used for checksums).
virtual ~XrdOssCsiFile()
static XrdSysMutex pumtx_
Definition: XrdOssCsi.hh:159
virtual int Fstat(struct stat *)
static std::unordered_map< std::string, std::shared_ptr< puMapItem_t > > pumap_
Definition: XrdOssCsi.hh:160
void aioWait()
Definition: XrdOssCsi.hh:126
virtual ssize_t ReadRaw(void *, off_t, size_t)
virtual ssize_t WriteV(XrdOucIOVec *writeV, int n)
XrdOssCsiPages * Pages()
Definition: XrdOssCsi.hh:140
static void mapTake(const std::string &, std::shared_ptr< puMapItem_t > &, bool create=true)
static int mapRelease(std::shared_ptr< puMapItem_t > &, XrdSysMutexHelper *plck=NULL)
std::pair< off_t, off_t > Sizes_t
int truncate(XrdOssDF *, off_t, XrdOssCsiRangeGuard &)
int LockResetSizes(XrdOssDF *, off_t)
int FetchRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
void LockTrackinglen(XrdOssCsiRangeGuard &, off_t, off_t, bool)
int TrackedSizesGet(Sizes_t &, bool)
static int pgWritePrelockCheck(const void *, off_t, size_t, const uint32_t *, uint64_t)
int StoreRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
int UpdateRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
bool IsReadOnly() const
int VerifyRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
static std::unique_ptr< XrdOucEnv > tagOpenEnv(const XrdOssCsiConfig &, XrdOucEnv &)
Definition: XrdOssCsi.cc:467
XrdOssDF * successor_
virtual int Fsync()
Definition: XrdOss.hh:144
virtual ssize_t WriteV(XrdOucIOVec *writeV, int wrvcnt)
Definition: XrdOss.cc:257
virtual int isCompressed(char *cxidp=0)
Definition: XrdOss.hh:187
virtual int Ftruncate(unsigned long long flen)
Definition: XrdOss.hh:164
virtual int Fstat(struct stat *buf)
Definition: XrdOss.hh:136
virtual ssize_t ReadRaw(void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:319
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:281
virtual void Flush()
Flush filesystem cached pages for this file (used for checksums).
Definition: XrdOss.hh:126
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition: XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
virtual int Mkdir(const char *path, mode_t mode, int mkpath=0, XrdOucEnv *envP=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
long long offset
Definition: XrdOucIOVec.hh:42