XRootD
XrdOssCsiPages.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O s s C s i P a g e s . c c */
4 /* */
5 /* (C) Copyright 2021 CERN. */
6 /* */
7 /* This file is part of the XRootD software suite. */
8 /* */
9 /* XRootD is free software: you can redistribute it and/or modify it under */
10 /* the terms of the GNU Lesser General Public License as published by the */
11 /* Free Software Foundation, either version 3 of the License, or (at your */
12 /* option) any later version. */
13 /* */
14 /* In applying this licence, CERN does not waive the privileges and */
15 /* immunities granted to it by virtue of its status as an Intergovernmental */
16 /* Organization or submit itself to any jurisdiction. */
17 /* */
18 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
19 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
20 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
21 /* License for more details. */
22 /* */
23 /* You should have received a copy of the GNU Lesser General Public License */
24 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
25 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
26 /* */
27 /* The copyright holder's institutional names and contributor's names may not */
28 /* be used to endorse or promote products derived from this software without */
29 /* specific prior written permission of the institution or contributor. */
30 /******************************************************************************/
31 
32 #include "XrdOssCsiTrace.hh"
33 #include "XrdOssCsiPages.hh"
34 #include "XrdOuc/XrdOucCRC.hh"
35 
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <fcntl.h>
39 
40 #include <assert.h>
41 
43 
44 XrdOssCsiPages::XrdOssCsiPages(const std::string &fn, std::unique_ptr<XrdOssCsiTagstore> ts, bool wh, bool am, bool dpe, bool dlw, const char *tid) :
45  ts_(std::move(ts)),
46  writeHoles_(wh),
47  allowMissingTags_(am),
48  disablePgExtend_(dpe),
49  hasMissingTags_(false),
50  rdonly_(false),
51  loosewriteConfigured_(!dlw),
52  loosewrite_(false),
53  tscond_(0),
54  tsforupdate_(false),
55  fn_(fn),
56  tident_(tid),
57  tident(tident_.c_str()),
58  lastpgforloose_(0),
59  checklastpg_(false)
60 {
61  // empty constructor
62 }
63 
64 int XrdOssCsiPages::Open(const char *path, off_t dsize, int flags, XrdOucEnv &envP)
65 {
66  EPNAME("Pages::Open");
67  hasMissingTags_ = false;
68  rdonly_ = false;
69  int ret = ts_->Open(path, dsize, flags, envP);
70  if (ret == -ENOENT)
71  {
72  // no existing tag
74  {
75  TRACE(Info, "Opening with missing tagfile: " << fn_);
76  hasMissingTags_ = true;
77  return 0;
78  }
79  TRACE(Warn, "Could not open tagfile for " << fn_ << " error " << ret);
80  return -EDOM;
81  }
82  if (ret<0) return ret;
83  if ((flags & O_ACCMODE) == O_RDONLY) rdonly_ = true;
84  loosewrite_ = (dsize==0 && ts_->GetTrackedTagSize()==0) ? false : loosewriteConfigured_;
85  return 0;
86 }
87 
89 {
90  if (hasMissingTags_)
91  {
92  hasMissingTags_ = false;
93  return 0;
94  }
95  return ts_->Close();
96 }
97 
99 {
100  if (!hasMissingTags_) ts_->Flush();
101 }
102 
104 {
105  if (hasMissingTags_) return 0;
106  return ts_->Fsync();
107 }
108 
110 {
111  if (hasMissingTags_) return -ENOENT;
112 
114  while (tsforupdate_)
115  {
116  tscond_.Wait();
117  }
118  off_t tagsize = ts_->GetTrackedTagSize();
119  off_t datasize = ts_->GetTrackedDataSize();
120  if (forupdate)
121  {
122  tsforupdate_ = true;
123  }
124  rsizes = std::make_pair(tagsize,datasize);
125  return 0;
126 }
127 
129 {
131  return ts_->SetTrackedSize(sz);
132 }
133 
134 int XrdOssCsiPages::LockResetSizes(XrdOssDF *fd, const off_t sz)
135 {
136  // nothing to do is no tag file
137  if (hasMissingTags_) return 0;
138 
140  const int ret = ts_->ResetSizes(sz);
143  return ret;
144 }
145 
146 int XrdOssCsiPages::LockTruncateSize(const off_t sz, const bool datatoo)
147 {
149  return ts_->Truncate(sz,datatoo);
150 }
151 
153 {
155  return ts_->SetUnverified();
156 }
157 
159 {
161  assert(tsforupdate_ == true);
162 
163  tsforupdate_ = false;
164  tscond_.Broadcast();
165 }
166 
167 // Used by Write: At this point the user's data has not yet been written to the file.
168 //
169 int XrdOssCsiPages::UpdateRange(XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen, XrdOssCsiRangeGuard &rg)
170 {
171  if (offset<0)
172  {
173  return -EINVAL;
174  }
175 
176  if (blen == 0)
177  {
178  return 0;
179  }
180 
181  // if the tag file is missing we don't need to store anything
182  if (hasMissingTags_)
183  {
184  return 0;
185  }
186 
187  // update of file were checksums are based on the file data suppplied: as there's no separate
188  // source of checksum information mark this file as having unverified checksums
190 
191  const Sizes_t sizes = rg.getTrackinglens();
192 
193  const off_t trackinglen = sizes.first;
194  if (offset+blen > static_cast<size_t>(trackinglen))
195  {
196  LockSetTrackedSize(offset+blen);
197  rg.unlockTrackinglen();
198  }
199 
200  int ret;
201  if ((offset % XrdSys::PageSize) != 0 ||
202  (offset+blen < static_cast<size_t>(trackinglen) && (blen % XrdSys::PageSize) != 0) ||
203  ((trackinglen % XrdSys::PageSize) !=0 && offset > trackinglen))
204  {
205  ret = UpdateRangeUnaligned(fd, buff, offset, blen, sizes);
206  }
207  else
208  {
209  ret = UpdateRangeAligned(buff, offset, blen, sizes);
210  }
211 
212  return ret;
213 }
214 
215 // Used by Read: At this point the user's buffer has already been filled from the file.
216 // offset: offset within the file at which the read starts
217 // blen : the length of the read already read into the buffer
218 // (which may be less than what was originally requested)
219 //
220 int XrdOssCsiPages::VerifyRange(XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen, XrdOssCsiRangeGuard &rg)
221 {
222  EPNAME("VerifyRange");
223 
224  if (offset<0)
225  {
226  return -EINVAL;
227  }
228 
229  // if the tag file is missing we don't verify anything
230  if (hasMissingTags_)
231  {
232  return 0;
233  }
234 
235  const Sizes_t sizes = rg.getTrackinglens();
236  const off_t trackinglen = sizes.first;
237 
238  if (offset >= trackinglen && blen == 0)
239  {
240  return 0;
241  }
242 
243  if (blen == 0)
244  {
245  // if offset is before the tracked len we should not be requested to verify zero bytes:
246  // the file may have been truncated
247  TRACE(Warn, "Verify request for zero bytes " << fn_ << ", file may be truncated");
248  return -EDOM;
249  }
250 
251  if (offset+blen > static_cast<size_t>(trackinglen))
252  {
253  TRACE(Warn, "Verify request for " << (offset+blen-trackinglen) << " bytes from " << fn_ << " beyond tracked length");
254  return -EDOM;
255  }
256 
257  int vret;
258  if ((offset % XrdSys::PageSize) != 0 || (offset+blen != static_cast<size_t>(trackinglen) && (blen % XrdSys::PageSize) != 0))
259  {
260  vret = VerifyRangeUnaligned(fd, buff, offset, blen, sizes);
261  }
262  else
263  {
264  vret = VerifyRangeAligned(buff, offset, blen, sizes);
265  }
266 
267  return vret;
268 }
269 
270 // apply_sequential_aligned_modify: Internal func used during Write/pgWrite
271 // (both aligned/unaligned cases) to update multiple tags.
272 //
273 // Write series of crc32c values to a file's tag file, with optional pre-block or lastblock
274 // values. Tries to reduce the number of calls to WriteTags() by using an internal buffer
275 // to make all the crc32c values available from a contiguous start location.
276 //
277 // buff: start of data buffer: Page aligned within the file, used if calculating crc32
278 // startp: page index corresponding to the start of buff
279 // nbytes: length of buffer
280 // csvec: optional pre-computed crc32 values covering nbytes of buffer
281 // preblockset: true/false. A value for a crc32 value to be placed at startp-1
282 // lastblockset:true/false. If true the last page of buff must be partial, and instead of
283 // calculating or fetching a crc32 value from csvec[], a supplied
284 // value is used.
285 // cspre: value to use for preblock crc32 (used if preblockset is true)
286 // cslast: value to use for last partial-block crc32 (used if lastblockset is true)
287 //
289  const void *const buff, const off_t startp, const size_t nbytes, const uint32_t *csvec,
290  const bool preblockset, const bool lastblockset, const uint32_t cspre, const uint32_t cslast)
291 {
292  EPNAME("apply_sequential_aligned_modify");
293 
294  if (lastblockset && (nbytes % XrdSys::PageSize)==0)
295  {
296  return -EINVAL;
297  }
298  if (preblockset && startp==0)
299  {
300  return -EINVAL;
301  }
302 
303  uint32_t calcbuf[stsize_];
304  const size_t calcbufsz = sizeof(calcbuf)/sizeof(uint32_t);
305  const uint8_t *const p = (uint8_t*)buff;
306 
307  // will be using calcbuf
308  bool useinternal = true;
309  if (csvec && !preblockset && !lastblockset)
310  {
311  useinternal = false;
312  }
313 
314  bool dopre = preblockset;
315  const off_t sp = preblockset ? startp-1 : startp;
316 
317  size_t blktowrite = ((nbytes+XrdSys::PageSize-1)/XrdSys::PageSize) + (preblockset ? 1 : 0);
318  size_t nblkwritten = 0;
319  size_t calcbytot = 0;
320  while(blktowrite>0)
321  {
322  size_t blkwcnt = blktowrite;
323  if (useinternal)
324  {
325  size_t cidx = 0;
326  size_t calcbycnt = nbytes - calcbytot;
327  if (nblkwritten == 0 && dopre)
328  {
329  calcbycnt = std::min(calcbycnt, (calcbufsz-1)*XrdSys::PageSize);
330  blkwcnt = (calcbycnt+XrdSys::PageSize-1)/XrdSys::PageSize;
331  calcbuf[cidx] = cspre;
332  cidx++;
333  blkwcnt++;
334  dopre = false;
335  }
336  else
337  {
338  calcbycnt = std::min(calcbycnt, calcbufsz*XrdSys::PageSize);
339  blkwcnt = (calcbycnt+XrdSys::PageSize-1)/XrdSys::PageSize;
340  }
341  if ((calcbycnt % XrdSys::PageSize)!=0 && lastblockset)
342  {
343  const size_t x = calcbycnt / XrdSys::PageSize;
344  calcbycnt = XrdSys::PageSize * x;
345  calcbuf[cidx + x] = cslast;
346  }
347  if (csvec)
348  {
349  memcpy(&calcbuf[cidx], &csvec[calcbytot/XrdSys::PageSize], 4*((calcbycnt+XrdSys::PageSize-1)/XrdSys::PageSize));
350  }
351  else
352  {
353  XrdOucCRC::Calc32C(&p[calcbytot], calcbycnt, &calcbuf[cidx]);
354  }
355  calcbytot += calcbycnt;
356  }
357  const ssize_t wret = ts_->WriteTags(useinternal ? calcbuf : &csvec[nblkwritten], sp+nblkwritten, blkwcnt);
358  if (wret<0)
359  {
360  TRACE(Warn, TagsWriteError(sp+nblkwritten, blkwcnt, wret));
361  return wret;
362  }
363  blktowrite -= blkwcnt;
364  nblkwritten += blkwcnt;
365  }
366  return nblkwritten;
367 }
368 
369 //
370 // FetchRangeAligned
371 //
372 // Used by pgRead or Read (via VerifyRangeAligned) when the read offset is at a page boundary within the file
373 // AND the length is a multiple of page size or the read is up to exactly the end of file.
374 //
375 int XrdOssCsiPages::FetchRangeAligned(const void *const buff, const off_t offset, const size_t blen, const Sizes_t & /* sizes */, uint32_t *const csvec, const uint64_t opts)
376 {
377  EPNAME("FetchRangeAligned");
378  uint32_t rdvec[stsize_],vrbuf[stsize_];
379 
380  const off_t p1 = offset / XrdSys::PageSize;
381  const off_t p2 = (offset+blen) / XrdSys::PageSize;
382  const size_t p2_off = (offset+blen) % XrdSys::PageSize;
383  const size_t nfull = p2-p1;
384 
385  uint32_t *rdbuf;
386  size_t rdbufsz;
387  if (csvec == NULL)
388  {
389  // use fixed sized stack buffer
390  rdbuf = rdvec;
391  rdbufsz = sizeof(rdvec)/sizeof(uint32_t);
392  }
393  else
394  {
395  // use supplied buffer, assumed to be large enough
396  rdbuf = csvec;
397  rdbufsz = (p2_off==0) ? nfull : (nfull+1);
398  }
399 
400  // always use stack based, fixed sized buffer for verify
401  const size_t vrbufsz = sizeof(vrbuf)/sizeof(uint32_t);
402 
403  // pointer to data
404  const uint8_t *const p = (uint8_t*)buff;
405 
406  // process full pages + any partial page
407  size_t toread = (p2_off>0) ? nfull+1 : nfull;
408  size_t nread = 0;
409  while(toread>0)
410  {
411  const size_t rcnt = std::min(toread, rdbufsz-(nread%rdbufsz));
412  const ssize_t rret = ts_->ReadTags(&rdbuf[nread%rdbufsz], p1+nread, rcnt);
413  if (rret<0)
414  {
415  TRACE(Warn, TagsReadError(p1+nread, rcnt, rret));
416  return rret;
417  }
418  if ((opts & XrdOssDF::Verify))
419  {
420  size_t toverif = rcnt;
421  size_t nverif = 0;
422  while(toverif>0)
423  {
424  const size_t vcnt = std::min(toverif, vrbufsz);
425  const size_t databytes = (nread+nverif+vcnt <= nfull) ? (vcnt*XrdSys::PageSize) : ((vcnt-1)*XrdSys::PageSize+p2_off);
426  XrdOucCRC::Calc32C(&p[XrdSys::PageSize*(nread+nverif)],databytes,vrbuf);
427  if (memcmp(vrbuf, &rdbuf[(nread+nverif)%rdbufsz], 4*vcnt))
428  {
429  size_t badpg;
430  for(badpg=0;badpg<vcnt;++badpg) { if (memcmp(&vrbuf[badpg],&rdbuf[(nread+nverif+badpg)%rdbufsz],4)) break; }
431  TRACE(Warn, CRCMismatchError( (nread+nverif+badpg<nfull) ? XrdSys::PageSize : p2_off,
432  (p1+nread+nverif+badpg),
433  vrbuf[badpg],
434  rdbuf[(nread+nverif+badpg)%rdbufsz] ));
435  return -EDOM;
436  }
437  toverif -= vcnt;
438  nverif += vcnt;
439  }
440  }
441  toread -= rcnt;
442  nread += rcnt;
443  }
444 
445  return 0;
446 }
447 
448 int XrdOssCsiPages::VerifyRangeAligned(const void *const buff, const off_t offset, const size_t blen, const Sizes_t &sizes)
449 {
450  return FetchRangeAligned(buff,offset,blen,sizes,NULL,XrdOssDF::Verify);
451 }
452 
453 int XrdOssCsiPages::StoreRangeAligned(const void *const buff, const off_t offset, const size_t blen, const Sizes_t &sizes, uint32_t *csvec)
454 {
455  EPNAME("StoreRangeAligned");
456 
457  // if csvec given store those values
458  // if no csvec then calculate against data and store
459 
460  const off_t p1 = offset / XrdSys::PageSize;
461  const off_t trackinglen = sizes.first;
462 
463  if (offset > trackinglen)
464  {
465  const int ret = UpdateRangeHoleUntilPage(NULL, p1, sizes);
466  if (ret<0)
467  {
468  TRACE(Warn, "Error updating tags for holes, error=" << ret);
469  return ret;
470  }
471  }
472 
473  const ssize_t aret = apply_sequential_aligned_modify(buff, p1, blen, csvec, false, false, 0U, 0U);
474  if (aret<0)
475  {
476  TRACE(Warn, "Error updating tags, error=" << aret);
477  return aret;
478  }
479 
480  return 0;
481 }
482 
483 // Used by Read for aligned reads. See StoreRangeAligned for conditions.
484 //
485 int XrdOssCsiPages::UpdateRangeAligned(const void *const buff, const off_t offset, const size_t blen, const Sizes_t &sizes)
486 {
487  return StoreRangeAligned(buff, offset, blen, sizes, NULL);
488 }
489 
490 //
491 // LockTrackinglen: obtain current tracking counts and lock the following as necessary:
492 // tracking counts and file byte range [offset, offend). Lock will be applied
493 // at the page level.
494 //
495 // offset - byte offset of first page to apply lock
496 // offend - end of range byte (excluding byte at end) of page at which to end lock
497 // rdonly - will be a read-only operation
498 //
499 void XrdOssCsiPages::LockTrackinglen(XrdOssCsiRangeGuard &rg, const off_t offset, const off_t offend, const bool rdonly)
500 {
501  // no need to lock if we don't have a tag file
502  if (hasMissingTags_) return;
503 
504  // in case of empty range the tracking len is not copied
505  if (offset == offend) return;
506 
507  {
509 
510  Sizes_t sizes;
511  (void)TrackedSizesGet(sizes, !rdonly);
512 
513  // tag tracking data filesize, as recorded in the tagfile and for which the tagfile
514  // should be approprately sized, is sizes.first: usually the same as the in
515  // memory "actual" data filesize (sizes.second), but may differ after crashes or write failure.
516  const off_t trackinglen = sizes.first;
517 
518  const off_t p1 = (offset>trackinglen ? trackinglen : offset) / XrdSys::PageSize;
519  bool unlock = false;
520  if (!rdonly && offend <= trackinglen)
521  {
522  unlock = true;
523  }
524 
525  off_t p2 = offend / XrdSys::PageSize;
526  const size_t p2_off = offend % XrdSys::PageSize;
527 
528  // range is exclusive
529  if (p2_off ==0) p2--;
530 
531  ranges_.AddRange(p1, p2, rg, rdonly);
532 
533  if (unlock)
534  {
536  }
537  rg.SetTrackingInfo(this, sizes, (!rdonly && !unlock));
538  }
539 
540  rg.Wait();
541 }
542 
543 int XrdOssCsiPages::truncate(XrdOssDF *const fd, const off_t len, XrdOssCsiRangeGuard &rg)
544 {
545  EPNAME("truncate");
546 
547  if (len<0) return -EINVAL;
548 
549  // nothing to truncate if there is no tag file
550  if (hasMissingTags_) return 0;
551 
552  const Sizes_t sizes = rg.getTrackinglens();
553 
554  const off_t trackinglen = sizes.first;
555  const off_t p_until = len / XrdSys::PageSize;
556  const size_t p_off = len % XrdSys::PageSize;
557 
558  if (len>trackinglen)
559  {
560  int ret = UpdateRangeHoleUntilPage(fd,p_until,sizes);
561  if (ret<0)
562  {
563  TRACE(Warn, "Error updating tags for holes, error=" << ret);
564  return ret;
565  }
566  }
567 
568  if (len != trackinglen && p_off != 0)
569  {
570  const off_t tracked_page = trackinglen / XrdSys::PageSize;
571  const size_t tracked_off = trackinglen % XrdSys::PageSize;
572  size_t toread = tracked_off;
573  if (len>trackinglen)
574  {
575  if (p_until != tracked_page) toread = 0;
576  }
577  else
578  {
579  if (p_until != tracked_page) toread = XrdSys::PageSize;
580  }
581  uint8_t b[XrdSys::PageSize];
582  if (toread>0)
583  {
584  ssize_t rret = XrdOssCsiPages::fullread(fd, b, p_until*XrdSys::PageSize, toread);
585  if (rret<0)
586  {
587  TRACE(Warn, PageReadError(toread, p_until, rret));
588  return rret;
589  }
590  const uint32_t crc32c = XrdOucCRC::Calc32C(b, toread, 0U);
591  uint32_t crc32v;
592  rret = ts_->ReadTags(&crc32v, p_until, 1);
593  if (rret<0)
594  {
595  TRACE(Warn, TagsReadError(p_until, 1, rret));
596  return rret;
597  }
598  if (crc32v != crc32c)
599  {
600  TRACE(Warn, CRCMismatchError(toread, p_until, crc32c, crc32v));
601  return -EDOM;
602  }
603  }
604  if (p_off > toread)
605  {
606  memset(&b[toread],0,p_off-toread);
607  }
608  const uint32_t crc32c = XrdOucCRC::Calc32C(b, p_off, 0U);
609  const ssize_t wret = ts_->WriteTags(&crc32c, p_until, 1);
610  if (wret < 0)
611  {
612  TRACE(Warn, TagsWriteError(p_until, 1, wret));
613  return wret;
614  }
615  }
616 
617  LockTruncateSize(len,true);
618  rg.unlockTrackinglen();
619  return 0;
620 }
621 
622 // used by pgRead: At this point the user's buffer has already been filled from the file
623 // offset: offset within the file at which the read starts
624 // blen : the length of the read already read into the buffer
625 // (which may be less than what was originally requested)
626 //
628  XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen,
629  uint32_t *csvec, const uint64_t opts, XrdOssCsiRangeGuard &rg)
630 {
631  EPNAME("FetchRange");
632  if (offset<0)
633  {
634  return -EINVAL;
635  }
636 
637  // if the tag file is missing there is nothing to fetch or verify
638  // but if we should return a list of checksums calculate them from the data
639  if (hasMissingTags_)
640  {
641  if (csvec)
642  {
643  pgDoCalc(buff, offset, blen, csvec);
644  }
645  return 0;
646  }
647 
648  const Sizes_t sizes = rg.getTrackinglens();
649  const off_t trackinglen = sizes.first;
650 
651  if (offset >= trackinglen && blen == 0)
652  {
653  return 0;
654  }
655 
656  if (blen == 0)
657  {
658  // if offset is before the tracked len we should not be requested to verify zero bytes:
659  // the file may have been truncated
660  TRACE(Warn, "Fetch request for zero bytes " << fn_ << ", file may be truncated");
661  return -EDOM;
662  }
663 
664  if (offset+blen > static_cast<size_t>(trackinglen))
665  {
666  TRACE(Warn, "Fetch request for " << (offset+blen-trackinglen) << " bytes from " << fn_ << " beyond tracked length");
667  return -EDOM;
668  }
669 
670  if (csvec == NULL && !(opts & XrdOssDF::Verify))
671  {
672  // if the crc values are not wanted nor checks against data, then
673  // there's nothing more to do here
674  return 0;
675  }
676 
677  int fret;
678  if ((offset % XrdSys::PageSize) != 0 || (offset+blen != static_cast<size_t>(trackinglen) && (blen % XrdSys::PageSize) != 0))
679  {
680  fret = FetchRangeUnaligned(fd, buff, offset, blen, sizes, csvec, opts);
681  }
682  else
683  {
684  fret = FetchRangeAligned(buff,offset,blen,sizes,csvec,opts);
685  }
686  return fret;
687 }
688 
689 // Used by pgWrite: At this point the user's data has not yet been written to the file.
690 //
691 int XrdOssCsiPages::StoreRange(XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen, uint32_t *csvec, const uint64_t opts, XrdOssCsiRangeGuard &rg)
692 {
693  if (offset<0)
694  {
695  return -EINVAL;
696  }
697 
698  if (blen == 0)
699  {
700  return 0;
701  }
702 
703  // if the tag file is missing there is nothing to store
704  // but do calculate checksums to return, if requested to do so
705  if (hasMissingTags_)
706  {
707  if (csvec && (opts & XrdOssDF::doCalc))
708  {
709  pgDoCalc(buff, offset, blen, csvec);
710  }
711  return 0;
712  }
713 
714  const Sizes_t sizes = rg.getTrackinglens();
715  const off_t trackinglen = sizes.first;
716 
717  // in the original specification of pgWrite there was the idea of a logical-eof, set by
718  // the a pgWrite with non-page aligned length: We support an option to approximate that
719  // by disallowing pgWrite past the current (non page aligned) eof.
720  if (disablePgExtend_ && (trackinglen % XrdSys::PageSize) !=0 && offset+blen > static_cast<size_t>(trackinglen))
721  {
722  return -ESPIPE;
723  }
724 
725  // if doCalc is set and we have a csvec buffer fill it with calculated values
726  if (csvec && (opts & XrdOssDF::doCalc))
727  {
728  pgDoCalc(buff, offset, blen, csvec);
729  }
730 
731  // if no vector of crc have been given and not specifically requested to calculate,
732  // then mark this file as having unverified checksums
733  if (!csvec && !(opts & XrdOssDF::doCalc))
734  {
736  }
737 
738  if (offset+blen > static_cast<size_t>(trackinglen))
739  {
740  LockSetTrackedSize(offset+blen);
741  rg.unlockTrackinglen();
742  }
743 
744  int ret;
745  if ((offset % XrdSys::PageSize) != 0 ||
746  (offset+blen < static_cast<size_t>(trackinglen) && (blen % XrdSys::PageSize) != 0) ||
747  ((trackinglen % XrdSys::PageSize) !=0 && offset > trackinglen))
748  {
749  ret = StoreRangeUnaligned(fd,buff,offset,blen,sizes,csvec);
750  }
751  else
752  {
753  ret = StoreRangeAligned(buff,offset,blen,sizes,csvec);
754  }
755 
756  return ret;
757 }
758 
760 {
761  if (hasMissingTags_)
762  {
763  return 0;
764  }
765  bool iv;
766  {
768  iv = ts_->IsVerified();
769  }
770  if (iv)
771  {
772  return XrdOss::PF_csVer;
773  }
774  return XrdOss::PF_csVun;
775 }
776 
777 void XrdOssCsiPages::pgDoCalc(const void *buffer, off_t offset, size_t wrlen, uint32_t *csvec)
778 {
779  const size_t p_off = offset % XrdSys::PageSize;
780  const size_t p_alen = (p_off > 0) ? (XrdSys::PageSize - p_off) : wrlen;
781  if (p_alen < wrlen)
782  {
783  XrdOucCRC::Calc32C((uint8_t *)buffer+p_alen, wrlen-p_alen, &csvec[1]);
784  }
785  XrdOucCRC::Calc32C((void*)buffer, std::min(p_alen, wrlen), csvec);
786 }
787 
788 int XrdOssCsiPages::pgWritePrelockCheck(const void *buffer, off_t offset, size_t wrlen, const uint32_t *csvec, uint64_t opts)
789 {
790  // do verify before taking locks to allow for faster fail
791  if (csvec && (opts & XrdOssDF::Verify))
792  {
793  uint32_t valcs;
794  const size_t p_off = offset % XrdSys::PageSize;
795  const size_t p_alen = (p_off > 0) ? (XrdSys::PageSize - p_off) : wrlen;
796  if (p_alen < wrlen)
797  {
798  if (XrdOucCRC::Ver32C((uint8_t *)buffer+p_alen, wrlen-p_alen, &csvec[1], valcs)>=0)
799  {
800  return -EDOM;
801  }
802  }
803  if (XrdOucCRC::Ver32C((void*)buffer, std::min(p_alen, wrlen), csvec, valcs)>=0)
804  {
805  return -EDOM;
806  }
807  }
808 
809  return 0;
810 }
811 
812 //
813 // Do some consistency checks and repair in case the datafile length is inconsistent
814 // with the length in the tag file. Called on open() and after write failures.
815 // Disabled if loosewrite_ off, or file readonly. Sets lastpgforloose_.
816 //
817 // May be called under tscond_ lock from LockResetSizes, or directly from
818 // CsiFile just after open.
819 //
821 {
822  EPNAME("BasicConsistencyCheck");
823 
824  if (!loosewrite_ || rdonly_) return;
825 
826  uint8_t b[XrdSys::PageSize];
827  static const uint8_t bz[XrdSys::PageSize] = {0};
828 
829  const off_t tagsize = ts_->GetTrackedTagSize();
830  const off_t datasize = ts_->GetTrackedDataSize();
831 
832  off_t taglp = 0, datalp = 0;
833  size_t tag_len = 0, data_len = 0;
834 
835  if (tagsize>0)
836  {
837  taglp = (tagsize - 1) / XrdSys::PageSize;
838  tag_len = tagsize % XrdSys::PageSize;
839  tag_len = tag_len ? tag_len : XrdSys::PageSize;
840  }
841  if (datasize>0)
842  {
843  datalp = (datasize - 1) / XrdSys::PageSize;
844  data_len = datasize % XrdSys::PageSize;
845  data_len = data_len ? data_len : XrdSys::PageSize;
846  }
847 
848  lastpgforloose_ = taglp;
849  checklastpg_ = true;
850 
851  if (datasize>0 && taglp > datalp)
852  {
853  ssize_t rlen = XrdOssCsiPages::maxread(fd, b, XrdSys::PageSize * datalp, XrdSys::PageSize);
854  if (rlen<0)
855  {
856  TRACE(Warn, PageReadError(XrdSys::PageSize, datalp, rlen));
857  return;
858  }
859 
860  memset(&b[rlen], 0, XrdSys::PageSize-rlen);
861  const uint32_t data_crc = XrdOucCRC::Calc32C(b, data_len, 0u);
862  const uint32_t data_crc_z = XrdOucCRC::Calc32C(b, XrdSys::PageSize, 0u);
863  uint32_t tagv;
864  ssize_t rret = ts_->ReadTags(&tagv, datalp, 1);
865  if (rret<0)
866  {
867  TRACE(Warn, TagsReadError(datalp, 1, rret));
868  return;
869  }
870 
871  if (tagv == data_crc_z)
872  {
873  // expected
874  }
875  else if (tagv == data_crc)
876  {
877  // should set tagv to data_crc_z
878  TRACE(Warn, "Resetting tag for page at " << datalp*XrdSys::PageSize << " to zero-extended");
879  const ssize_t wret = ts_->WriteTags(&data_crc_z, datalp, 1);
880  if (wret < 0)
881  {
882  TRACE(Warn, TagsWriteError(datalp, 1, wret));
883  return;
884  }
885  }
886  else
887  {
888  // something else wrong
889  TRACE(Warn, CRCMismatchError(data_len, datalp, data_crc, tagv) << " (ignoring)");
890  }
891  }
892  else if (tagsize>0 && taglp < datalp)
893  {
894  // datafile has more pages than recorded in the tag file:
895  // the tag file should have a crc corresponding to the relevant data fragment that is tracked in the last page.
896  // If it has the crc for a whole page (and there no non-zero content later in the page) reset it.
897  // This is so that a subsequnt UpdateRangeHoleUntilPage can zero-extend the CRC and get a consistent CRC.
898 
899  ssize_t rlen = XrdOssCsiPages::maxread(fd, b, XrdSys::PageSize * taglp, XrdSys::PageSize);
900  if (rlen<0)
901  {
902  TRACE(Warn, PageReadError(XrdSys::PageSize, taglp, rlen));
903  return;
904  }
905 
906  memset(&b[rlen], 0, XrdSys::PageSize-rlen);
907  const uint32_t tag_crc = XrdOucCRC::Calc32C(b, tag_len, 0u);
908  const uint32_t tag_crc_z = XrdOucCRC::Calc32C(b, XrdSys::PageSize, 0u);
909  const uint32_t dp_ext_is_zero = !memcmp(&b[tag_len], bz, XrdSys::PageSize-tag_len);
910  uint32_t tagv;
911  ssize_t rret = ts_->ReadTags(&tagv, taglp, 1);
912  if (rret<0)
913  {
914  TRACE(Warn, TagsReadError(taglp, 1, rret));
915  return;
916  }
917 
918  if (tagv == tag_crc)
919  {
920  // expected
921  }
922  else if (tagv == tag_crc_z && dp_ext_is_zero)
923  {
924  // should set tagv to tag_crc
925  TRACE(Warn, "Resetting tag for page at " << taglp*XrdSys::PageSize << " to not zero-extended");
926  const ssize_t wret = ts_->WriteTags(&tag_crc, taglp, 1);
927  if (wret < 0)
928  {
929  TRACE(Warn, TagsWriteError(taglp, 1, wret));
930  return;
931  }
932  }
933  else
934  {
935  // something else wrong
936  TRACE(Warn, CRCMismatchError(tag_len, taglp, tag_crc, tagv) << " dp_ext_is_zero=" << dp_ext_is_zero << " (ignoring)");
937  }
938  }
939 }
#define tident
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
XrdOucTrace OssCsiTrace
@ Info
uint32_t crc32c(uint32_t crc, void const *buf, size_t len)
struct myOpts opts
#define TRACE(act, x)
Definition: XrdTrace.hh:63
XrdSysCondVar tscond_
int UpdateRangeAligned(const void *, off_t, size_t, const Sizes_t &)
int StoreRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &, const uint32_t *)
ssize_t apply_sequential_aligned_modify(const void *, off_t, size_t, const uint32_t *, bool, bool, uint32_t, uint32_t)
XrdOssCsiRanges ranges_
static ssize_t maxread(XrdOssDF *fd, void *buff, const off_t off, const size_t sz, size_t tg=0)
std::string TagsReadError(off_t start, size_t n, int ret)
std::unique_ptr< XrdOssCsiTagstore > ts_
int FetchRangeAligned(const void *, off_t, size_t, const Sizes_t &, uint32_t *, uint64_t)
int LockTruncateSize(off_t, bool)
XrdOssCsiPages(const std::string &fn, std::unique_ptr< XrdOssCsiTagstore > ts, bool wh, bool am, bool dpe, bool dlw, const char *)
int UpdateRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &)
std::string TagsWriteError(off_t start, size_t n, int ret)
int StoreRangeAligned(const void *, off_t, size_t, const Sizes_t &, uint32_t *)
void TrackedSizeRelease()
int UpdateRangeHoleUntilPage(XrdOssDF *, off_t, const Sizes_t &)
static ssize_t fullread(XrdOssDF *fd, void *buff, const off_t off, const size_t sz)
std::pair< off_t, off_t > Sizes_t
int FetchRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &, uint32_t *, uint64_t)
int truncate(XrdOssDF *, off_t, XrdOssCsiRangeGuard &)
XrdSysMutex rangeaddmtx_
int LockResetSizes(XrdOssDF *, off_t)
int VerifyRangeAligned(const void *, off_t, size_t, const Sizes_t &)
int VerifyRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &)
int LockSetTrackedSize(off_t)
int FetchRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
std::string CRCMismatchError(size_t blen, off_t pgnum, uint32_t got, uint32_t expected)
void BasicConsistencyCheck(XrdOssDF *)
void LockTrackinglen(XrdOssCsiRangeGuard &, off_t, off_t, bool)
std::string PageReadError(size_t blen, off_t pgnum, int ret)
int Open(const char *path, off_t dsize, int flags, XrdOucEnv &envP)
int TrackedSizesGet(Sizes_t &, bool)
const bool loosewriteConfigured_
static int pgWritePrelockCheck(const void *, off_t, size_t, const uint32_t *, uint64_t)
const std::string fn_
int StoreRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
static const size_t stsize_
int UpdateRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
static void pgDoCalc(const void *, off_t, size_t, uint32_t *)
int VerifyRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
void SetTrackingInfo(XrdOssCsiPages *p, const std::pair< off_t, off_t > &tsizes, bool locked)
const std::pair< off_t, off_t > & getTrackinglens() const
void AddRange(const off_t start, const off_t end, XrdOssCsiRangeGuard &rg, bool rdonly)
static const uint64_t doCalc
pgw: Calculate checksums
Definition: XrdOss.hh:225
static const uint64_t Verify
all: Verify checksums
Definition: XrdOss.hh:223
static const int PF_csVer
verified file checksums present
Definition: XrdOss.hh:778
static const int PF_csVun
unverified file checksums present
Definition: XrdOss.hh:779
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static bool Ver32C(const void *data, size_t count, const uint32_t csval, uint32_t *csbad=0)
Definition: XrdOucCRC.cc:222
XrdOucEnv * envP
Definition: XrdPss.cc:109
static const int PageSize