XRootD
XrdOssCsiPagesUnaligned.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O s s C s i P a g e s U n a l i g n e d . c c */
4 /* */
5 /* (C) Copyright 2021 CERN. */
6 /* */
7 /* This file is part of the XRootD software suite. */
8 /* */
9 /* XRootD is free software: you can redistribute it and/or modify it under */
10 /* the terms of the GNU Lesser General Public License as published by the */
11 /* Free Software Foundation, either version 3 of the License, or (at your */
12 /* option) any later version. */
13 /* */
14 /* In applying this licence, CERN does not waive the privileges and */
15 /* immunities granted to it by virtue of its status as an Intergovernmental */
16 /* Organization or submit itself to any jurisdiction. */
17 /* */
18 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
19 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
20 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
21 /* License for more details. */
22 /* */
23 /* You should have received a copy of the GNU Lesser General Public License */
24 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
25 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
26 /* */
27 /* The copyright holder's institutional names and contributor's names may not */
28 /* be used to endorse or promote products derived from this software without */
29 /* specific prior written permission of the institution or contributor. */
30 /******************************************************************************/
31 
32 #include "XrdOssCsiTrace.hh"
33 #include "XrdOssCsiPages.hh"
34 #include "XrdOssCsiCrcUtils.hh"
35 #include "XrdOuc/XrdOucCRC.hh"
36 #include "XrdSys/XrdSysPageSize.hh"
37 
38 #include <vector>
39 #include <assert.h>
40 
43 
44 //
45 // UpdateRangeHoleUntilPage
46 //
47 // Used pgWrite/Write (both aligned and unaligned cases) when extending a file
48 // with implied zeros after then current end of file and the new one.
49 // fd (data file descriptor pointer) required only when last page in file is partial.
50 // current implementation does not use fd in this case, but requires it be set.
51 //
52 int XrdOssCsiPages::UpdateRangeHoleUntilPage(XrdOssDF *fd, const off_t until, const Sizes_t &sizes)
53 {
54  EPNAME("UpdateRangeHoleUntilPage");
55 
56  static const uint32_t crczero = CrcUtils.crc32c_extendwith_zero(0u, XrdSys::PageSize);
57  static const std::vector<uint32_t> crc32Vec(stsize_, crczero);
58 
59  const off_t trackinglen = sizes.first;
60  const off_t tracked_page = trackinglen / XrdSys::PageSize;
61  if (until <= tracked_page) return 0;
62 
63  const size_t tracked_off = trackinglen % XrdSys::PageSize;
64 
65  // if last tracked page is before page "until" extend it
66  if (tracked_off>0)
67  {
68  if (fd == NULL)
69  {
70  TRACE(Warn, "Unexpected partially filled last page " << fn_);
71  return -EDOM;
72  }
73 
74  uint32_t prevtag;
75  const ssize_t rret = ts_->ReadTags(&prevtag, tracked_page, 1);
76  if (rret < 0)
77  {
78  TRACE(Warn, TagsReadError(tracked_page, 1, rret));
79  return rret;
80  }
81 
82  // extend prevtag up to PageSize. If there is a mismatch it will only be
83  // discovered during a later read (but this saves a read now).
84  const uint32_t crc32c = CrcUtils.crc32c_extendwith_zero(prevtag, XrdSys::PageSize - tracked_off);
85  const ssize_t wret = ts_->WriteTags(&crc32c, tracked_page, 1);
86  if (wret < 0)
87  {
88  TRACE(Warn, TagsWriteError(tracked_page, 1, wret) << " (prev)");
89  return wret;
90  }
91  }
92 
93  if (!writeHoles_) return 0;
94 
95  const off_t nAllEmpty = (tracked_off>0) ? (until - tracked_page - 1) : (until - tracked_page);
96  const off_t firstEmpty = (tracked_off>0) ? (tracked_page + 1) : tracked_page;
97 
98  off_t towrite = nAllEmpty;
99  off_t nwritten = 0;
100  while(towrite>0)
101  {
102  const size_t nw = std::min(towrite, (off_t)crc32Vec.size());
103  const ssize_t wret = ts_->WriteTags(&crc32Vec[0], firstEmpty+nwritten, nw);
104  if (wret<0)
105  {
106  TRACE(Warn, TagsWriteError(firstEmpty+nwritten, nw, wret) << " (new)");
107  return wret;
108  }
109  towrite -= wret;
110  nwritten += wret;
111  }
112 
113  return 0;
114 }
115 
116 // UpdateRangeUnaligned
117 //
118 // Used by Write for various cases with mis-alignment that need checksum recalculation. See StoreRangeUnaligned for list of conditions.
119 //
120 int XrdOssCsiPages::UpdateRangeUnaligned(XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen, const Sizes_t &sizes)
121 {
122  return StoreRangeUnaligned(fd, buff, offset, blen, sizes, NULL);
123 }
124 
125 //
126 // used by StoreRangeUnaligned when the supplied data does not cover the whole of the first corresponding page in the file
127 //
128 // offset: offset in file for start of write
129 // blen: length of write in first page
130 //
131 int XrdOssCsiPages::StoreRangeUnaligned_preblock(XrdOssDF *const fd, const void *const buff, const size_t blen,
132  const off_t offset, const off_t trackinglen,
133  const uint32_t *const csvec, uint32_t &prepageval)
134 {
135  EPNAME("StoreRangeUnaligned_preblock");
136  const off_t p1 = offset / XrdSys::PageSize;
137  const size_t p1_off = offset % XrdSys::PageSize;
138 
139  const off_t tracked_page = trackinglen / XrdSys::PageSize;
140  const size_t tracked_off = trackinglen % XrdSys::PageSize;
141 
142  if (p1 > tracked_page)
143  {
144  // the start of will have a number of implied zero bytes
145  uint32_t crc32c = CrcUtils.crc32c_extendwith_zero(0u, p1_off);
146  if (csvec)
147  {
148  crc32c = CrcUtils.crc32c_combine(crc32c, csvec[0], blen);
149  }
150  else
151  {
152  crc32c = XrdOucCRC::Calc32C(buff, blen, crc32c);
153  }
154  prepageval = crc32c;
155  return 0;
156  }
157 
158  // we're appending, or appending within the last page after a gap of zeros
159  if (p1 == tracked_page && p1_off >= tracked_off)
160  {
161  // appending: with or without some implied zeros.
162 
163  // zero initialised value may be used
164  uint32_t crc32v = 0;
165  if (tracked_off > 0)
166  {
167  const ssize_t rret = ts_->ReadTags(&crc32v, p1, 1);
168  if (rret<0)
169  {
170  TRACE(Warn, TagsReadError(p1, 1, rret) << " (append)");
171  return rret;
172  }
173  }
174 
175  uint32_t crc32c = 0;
176 
177  // only do the loosewrite extending check one time for the page which was the
178  // last page according to the trackinglen at time the check was configured (open or size-resync).
179  // don't do the check every time because it needs an extra read compared to the non loose case;
180  // checklastpg_ is checked and modified here, but is protected from concurrent
181  // access because of the condition that p1==lastpgforloose_
182 
184  {
185  checklastpg_ = false;
186  uint8_t b[XrdSys::PageSize];
187 
188  // this will reissue read() until eof, or tracked_off bytes read but accept up to PageSize
189  const ssize_t rlen = XrdOssCsiPages::maxread(fd, b, XrdSys::PageSize * p1, XrdSys::PageSize, tracked_off);
190 
191  if (rlen<0)
192  {
193  TRACE(Warn, PageReadError(tracked_off, p1, rlen));
194  return rlen;
195  }
196  memset(&b[rlen], 0, XrdSys::PageSize - rlen);
197 
198  // in the loose-write mode, the new crc is based on the crc of data
199  // read from file up to p1_off, not on the previously stored tag.
200  // However must check if the data read were consistent with stored tag (crc32v)
201 
202  uint32_t crc32x = XrdOucCRC::Calc32C(b, tracked_off, 0u);
203  crc32c = XrdOucCRC::Calc32C(&b[tracked_off], p1_off-tracked_off, crc32x);
204 
205  do
206  {
207  if (static_cast<size_t>(rlen) == tracked_off)
208  {
209  // this is the expected match
210  if (tracked_off==0 || crc32x == crc32v) break;
211  }
212 
213  // any bytes on disk beyond p1_off+blan would not be included in the new crc.
214  // if tracked_off==0 we have no meaningful crc32v value.
215  if ((tracked_off>0 || p1_off==0) && static_cast<size_t>(rlen) <= p1_off+blen)
216  {
217 
218  if (tracked_off != 0)
219  {
220  TRACE(Warn, CRCMismatchError(tracked_off, p1, crc32x, crc32v) << " (loose match, still trying)");
221  }
222 
223  // there was no tag recorded for the page, and we're completely overwriting anything on disk in the page
224  if (tracked_off==0)
225  {
226  TRACE(Warn, "Recovered page with no tag at offset " << (XrdSys::PageSize * p1) <<
227  " of file " << fn_ << " rlen=" << rlen << " (append)");
228  break;
229  }
230 
231  if (static_cast<size_t>(rlen) != tracked_off && rlen>0)
232  {
233  crc32x = XrdOucCRC::Calc32C(b, rlen, 0u);
234  if (crc32x == crc32v)
235  {
236  TRACE(Warn, "Recovered page at offset " << (XrdSys::PageSize * p1)+p1_off << " of file " << fn_ << " (append)");
237  break;
238  }
239  TRACE(Warn, CRCMismatchError(rlen, p1, crc32x, crc32v) << " (loose match, still trying)");
240  }
241 
242  memcpy(&b[p1_off], buff, blen);
243  crc32x = XrdOucCRC::Calc32C(b, p1_off+blen, 0u);
244  if (crc32x == crc32v)
245  {
246  TRACE(Warn, "Recovered matching write at offset " << (XrdSys::PageSize * p1)+p1_off <<
247  " of file " << fn_ << " (append)");
248  break;
249  }
250  TRACE(Warn, CRCMismatchError(p1_off+blen, p1, crc32x, crc32v) << " (append)");
251  }
252  else
253  {
254  if (tracked_off>0)
255  {
256  TRACE(Warn, CRCMismatchError(tracked_off, p1, crc32x, crc32v) << " (append)");
257  }
258  else
259  {
260  TRACE(Warn, "Unexpected content, write at page at offset " << (XrdSys::PageSize * p1) <<
261  " of file " << fn_ << ", offset-in-page=" << p1_off << " rlen=" << rlen << " (append)");
262  }
263  }
264  return -EDOM;
265  } while(0);
266  }
267  else
268  {
269  // non-loose case;
270  // can recalc crc with new data without re-reading existing partial block's data
271  const size_t nz = p1_off - tracked_off;
272  crc32c = CrcUtils.crc32c_extendwith_zero(crc32v, nz);
273  }
274 
275  // crc32c is crc up to p1_off. Now add the user's data.
276  if (csvec)
277  {
278  crc32c = CrcUtils.crc32c_combine(crc32c, csvec[0], blen);
279  }
280  else
281  {
282  crc32c = XrdOucCRC::Calc32C(buff, blen, crc32c);
283  }
284  prepageval = crc32c;
285  return 0;
286  }
287 
288  const size_t bavail = (p1==tracked_page) ? tracked_off : XrdSys::PageSize;
289 
290  // assert we're overwriting some (or all) of the previous data (other case was above)
291  assert(p1_off < bavail);
292 
293  // case p1_off==0 && blen>=bavail is either handled by aligned case (p1==tracked_page)
294  // or not sent to preblock, so will need to read some preexisting data
295  assert(p1_off !=0 || blen<bavail);
296  uint8_t b[XrdSys::PageSize];
297 
298  uint32_t crc32v;
299  ssize_t rret = ts_->ReadTags(&crc32v, p1, 1);
300  if (rret<0)
301  {
302  TRACE(Warn, TagsReadError(p1, 1, rret) << " (overwrite)");
303  return rret;
304  }
305 
306  // in either loosewrite or non-loosewrite a read-modify-write sequence is done and the
307  // final crc is that of the modified block. The difference between loose and non-loose
308  // case if that the looser checks are done on the block.
309  //
310  // in either case there are implicit verification(s) (e.g. pgWrite may return EDOM without Verify requested)
311  // as it's not clear if there is a meaningful way to crc a mismatching page during a partial overwrite
312 
313  if (loosewrite_)
314  {
315  // this will reissue read() until eof, or bavail bytes read but accept up to PageSize
316  const ssize_t rlen = XrdOssCsiPages::maxread(fd, b, XrdSys::PageSize * p1, XrdSys::PageSize, bavail);
317  if (rlen<0)
318  {
319  TRACE(Warn, PageReadError(bavail, p1, rlen));
320  return rlen;
321  }
322  memset(&b[rlen], 0, XrdSys::PageSize - rlen);
323  do
324  {
325  uint32_t crc32c = XrdOucCRC::Calc32C(b, bavail, 0U);
326  // this is the expected case
327  if (static_cast<size_t>(rlen) == bavail && crc32c == crc32v) break;
328 
329  // after this write there will be nothing changed between p1_off+blen
330  // and bavail; if there is nothing on disk in this range it will not
331  // be added by the write. So don't try to match crc with implied zero
332  // in this range. Beyond bavail bytes on disk will not be included
333  // in the new crc.
334  const size_t rmin = (p1_off+blen < bavail) ? bavail : 0;
335  if (static_cast<size_t>(rlen) >= rmin && static_cast<size_t>(rlen)<=bavail)
336  {
337  if (crc32c == crc32v)
338  {
339  TRACE(Warn, "Recovered page at offset " << (XrdSys::PageSize * p1) << " of file " << fn_ << " (overwrite)");
340  break;
341  }
342  TRACE(Warn, CRCMismatchError(bavail, p1, crc32c, crc32v) << " (loose match, still trying)");
343 
344  if (static_cast<size_t>(rlen) != bavail && rlen > 0)
345  {
346  crc32c = XrdOucCRC::Calc32C(b, rlen, 0U);
347  if (crc32c == crc32v)
348  {
349  TRACE(Warn, "Recovered page (2) at offset " << (XrdSys::PageSize * p1) << " of file " << fn_ << " (overwrite)");
350  break;
351  }
352  TRACE(Warn, CRCMismatchError(rlen, p1, crc32c, crc32v) << " (loose match, still trying)");
353  }
354 
355  memcpy(&b[p1_off], buff, blen);
356  const size_t vl = std::max(bavail, p1_off+blen);
357  crc32c = XrdOucCRC::Calc32C(b, vl, 0U);
358  if (crc32c == crc32v)
359  {
360  TRACE(Warn, "Recovered matching write at offset " << (XrdSys::PageSize * p1)+p1_off << " of file " << fn_ << " (overwrite)");
361  break;
362  }
363  TRACE(Warn, CRCMismatchError(vl, p1, crc32c, crc32v) << " (overwrite)");
364  }
365  else
366  {
367  TRACE(Warn, CRCMismatchError(bavail, p1, crc32c, crc32v) << " (overwrite)");
368  }
369  return -EDOM;
370  } while(0);
371  }
372  else
373  {
374  // non-loose case
375  rret = XrdOssCsiPages::fullread(fd, b, XrdSys::PageSize * p1, bavail);
376  if (rret<0)
377  {
378  TRACE(Warn, PageReadError(bavail, p1, rret));
379  return rret;
380  }
381  const uint32_t crc32c = XrdOucCRC::Calc32C(b, bavail, 0U);
382  if (crc32v != crc32c)
383  {
384  TRACE(Warn, CRCMismatchError(bavail, p1, crc32c, crc32v));
385  return -EDOM;
386  }
387  }
388 
389  uint32_t crc32c = XrdOucCRC::Calc32C(b, p1_off, 0U);
390  if (csvec)
391  {
392  crc32c = CrcUtils.crc32c_combine(crc32c, csvec[0], blen);
393  }
394  else
395  {
396  crc32c = XrdOucCRC::Calc32C(buff, blen, crc32c);
397  }
398  if (p1_off+blen < bavail)
399  {
400  const uint32_t cl = XrdOucCRC::Calc32C(&b[p1_off+blen], bavail-p1_off-blen, 0U);
401  crc32c = CrcUtils.crc32c_combine(crc32c, cl, bavail-p1_off-blen);
402  }
403  prepageval = crc32c;
404  return 0;
405 }
406 
407 //
408 // used by StoreRangeUnaligned when the end of supplied data is not page aligned
409 // and is before the end of file
410 //
411 // offset: first offset in file at which write is page aligned
412 // blen: length of write after offset
413 //
414 int XrdOssCsiPages::StoreRangeUnaligned_postblock(XrdOssDF *const fd, const void *const buff, const size_t blen,
415  const off_t offset, const off_t trackinglen,
416  const uint32_t *const csvec, uint32_t &lastpageval)
417 {
418  EPNAME("StoreRangeUnaligned_postblock");
419 
420  const uint8_t *const p = (uint8_t*)buff;
421  const off_t p2 = (offset+blen) / XrdSys::PageSize;
422  const size_t p2_off = (offset+blen) % XrdSys::PageSize;
423 
424  const off_t tracked_page = trackinglen / XrdSys::PageSize;
425  const size_t tracked_off = trackinglen % XrdSys::PageSize;
426 
427  // we should not be called in this case
428  assert(p2_off != 0);
429 
430  // how much existing data this last (p2) page
431  const size_t bavail = (p2==tracked_page) ? tracked_off : XrdSys::PageSize;
432 
433  // how much of that data will not be overwritten
434  const size_t bremain = (p2_off < bavail) ? bavail-p2_off : 0;
435 
436  // we should not be called if it is a complete overwrite
437  assert(bremain>0);
438 
439  // need to use remaining data to calculate the crc of the new p2 page.
440  // read and verify it now.
441 
442  uint32_t crc32v;
443  ssize_t rret = ts_->ReadTags(&crc32v, p2, 1);
444  if (rret<0)
445  {
446  TRACE(Warn, TagsReadError(p2, 1, rret));
447  return rret;
448  }
449 
450  uint8_t b[XrdSys::PageSize];
451  rret = XrdOssCsiPages::fullread(fd, b, XrdSys::PageSize * p2, bavail);
452  if (rret<0)
453  {
454  TRACE(Warn, PageReadError(bavail, p2, rret));
455  return rret;
456  }
457 
458  // calculate crc of new data with remaining data at the end:
459  uint32_t crc32c = 0;
460  if (csvec)
461  {
462  crc32c = csvec[(blen-1)/XrdSys::PageSize];
463  }
464  else
465  {
466  crc32c = XrdOucCRC::Calc32C(&p[blen-p2_off], p2_off, 0U);
467  }
468 
469  const uint32_t cl = XrdOucCRC::Calc32C(&b[p2_off], bremain, 0U);
470  // crc of page with new data
471  crc32c = CrcUtils.crc32c_combine(crc32c, cl, bremain);
472  // crc of current page (before write)
473  const uint32_t crc32prev = XrdOucCRC::Calc32C(b, bavail, 0U);
474 
475  // check(s) to see if remaining data was valid
476 
477  // usual check; unmodified block is consistent with stored crc
478  // for loose write we allow case were the new crc has already been stored in the tagfile
479 
480  // this may be an implicit verification (e.g. pgWrite may return EDOM without Verify requested)
481  // however, it's not clear if there is a meaningful way to crc a mismatching page during a partial overwrite
482  if (crc32v != crc32prev)
483  {
484  if (loosewrite_ && crc32c != crc32prev)
485  {
486  // log that we chceked if the tag was matching the previous data
487  TRACE(Warn, CRCMismatchError(bavail, p2, crc32prev, crc32v) << " (loose match, still trying)");
488  if (crc32c == crc32v)
489  {
490  TRACE(Warn, "Recovered matching write at offset " << (XrdSys::PageSize * p2) << " of file " << fn_);
491  lastpageval = crc32c;
492  return 0;
493  }
494  TRACE(Warn, CRCMismatchError(bavail, p2, crc32c, crc32v));
495  }
496  else
497  {
498  TRACE(Warn, CRCMismatchError(bavail, p2, crc32prev, crc32v));
499  }
500  return -EDOM;
501  }
502 
503  lastpageval = crc32c;
504  return 0;
505 }
506 
507 //
508 // StoreRangeUnaligned
509 //
510 // Used by pgWrite or Write (via UpdateRangeUnaligned) where the start of this update is not page aligned within the file
511 // OR where the end of this update is before the end of the file and is not page aligned
512 // OR where end of the file is not page aligned and this update starts after it
513 // i.e. where checksums of last current page of file, or the first or last pages after writing this buffer will need to be recomputed
514 //
515 int XrdOssCsiPages::StoreRangeUnaligned(XrdOssDF *const fd, const void *buff, const off_t offset, const size_t blen, const Sizes_t &sizes, const uint32_t *const csvec)
516 {
517  EPNAME("StoreRangeUnaligned");
518  const off_t p1 = offset / XrdSys::PageSize;
519 
520  const off_t trackinglen = sizes.first;
521  if (offset > trackinglen)
522  {
523  const int ret = UpdateRangeHoleUntilPage(fd, p1, sizes);
524  if (ret<0)
525  {
526  TRACE(Warn, "Error updating tags for holes, error=" << ret);
527  return ret;
528  }
529  }
530 
531  const size_t p1_off = offset % XrdSys::PageSize;
532  const size_t p2_off = (offset+blen) % XrdSys::PageSize;
533 
534  bool hasprepage = false;
535  uint32_t prepageval;
536 
537  // deal with partial first page
538  if ( p1_off>0 || blen < static_cast<size_t>(XrdSys::PageSize) )
539  {
540  const size_t bavail = (XrdSys::PageSize-p1_off > blen) ? blen : (XrdSys::PageSize-p1_off);
541  const int ret = StoreRangeUnaligned_preblock(fd, buff, bavail, offset, trackinglen, csvec, prepageval);
542  if (ret<0)
543  {
544  return ret;
545  }
546  hasprepage = true;
547  }
548 
549  // next page (if any)
550  const off_t np = hasprepage ? p1+1 : p1;
551  // next page starts at buffer offset
552  const size_t npoff = hasprepage ? (XrdSys::PageSize - p1_off) : 0;
553 
554  // anything in next page?
555  if (blen <= npoff)
556  {
557  // only need to write the first, partial page
558  if (hasprepage)
559  {
560  const ssize_t wret = ts_->WriteTags(&prepageval, p1, 1);
561  if (wret<0)
562  {
563  TRACE(Warn, TagsWriteError(p1, 1, wret));
564  return wret;
565  }
566  }
567  return 0;
568  }
569 
570  const uint8_t *const p = (uint8_t*)buff;
571  const uint32_t *csp = csvec;
572  if (csp && hasprepage) csp++;
573 
574  // see if there will be no old data to account for in the last page
575  if (p2_off == 0 || (offset + blen >= static_cast<size_t>(trackinglen)))
576  {
577  // write any precomputed prepage, then write full pages and last partial page (computing or using supplied csvec)
578  const ssize_t aret = apply_sequential_aligned_modify(&p[npoff], np, blen-npoff, csp, hasprepage, false, prepageval, 0U);
579  if (aret<0)
580  {
581  TRACE(Warn, "Error updating tags, error=" << aret);
582  return aret;
583  }
584  return 0;
585  }
586 
587  // last page contains existing data that has to be read to modify it
588 
589  uint32_t lastpageval;
590  const int ret = StoreRangeUnaligned_postblock(fd, &p[npoff], blen-npoff, offset+npoff, trackinglen, csp, lastpageval);
591  if (ret<0)
592  {
593  return ret;
594  }
595 
596  // write any precomputed prepage, then write full pages (computing or using supplied csvec) and finally write precomputed last page
597  const ssize_t aret = apply_sequential_aligned_modify(&p[npoff], np, blen-npoff, csp, hasprepage, true, prepageval, lastpageval);
598  if (aret<0)
599  {
600  TRACE(Warn, "Error updating tags, error=" << aret);
601  return aret;
602  }
603 
604  return 0;
605 }
606 
607 // VerifyRangeUnaligned
608 //
609 // Used by Read for various cases with mis-alignment. See FetchRangeUnaligned for list of conditions.
610 //
611 int XrdOssCsiPages::VerifyRangeUnaligned(XrdOssDF *const fd, const void *const buff, const off_t offset, const size_t blen, const Sizes_t &sizes)
612 {
613  return FetchRangeUnaligned(fd, buff, offset, blen, sizes, NULL, XrdOssDF::Verify);
614 }
615 
616 //
617 // used by FetchRangeUnaligned when only part of the data in the first page is needed, or the page is short
618 //
619 // offset: offset in file for start of read
620 // blen: total length of read
621 //
622 int XrdOssCsiPages::FetchRangeUnaligned_preblock(XrdOssDF *const fd, const void *const buff, const off_t offset, const size_t blen,
623  const off_t trackinglen, uint32_t *const tbuf, uint32_t *const csvec, const uint64_t opts)
624 {
625  EPNAME("FetchRangeUnaligned_preblock");
626 
627  const off_t p1 = offset / XrdSys::PageSize;
628  const size_t p1_off = offset % XrdSys::PageSize;
629 
630  // bavail is length of data in this page
631  const size_t bavail = std::min(trackinglen - (XrdSys::PageSize*p1), (off_t)XrdSys::PageSize);
632 
633  // bcommon is length of data in this page that user wants
634  const size_t bcommon = std::min(bavail - p1_off, blen);
635 
636  uint8_t b[XrdSys::PageSize];
637  const uint8_t *ub = (uint8_t*)buff;
638  if (bavail>bcommon)
639  {
640  // will need more data to either verify or return crc of the user's data
641  // (in case of no verify and no csvec FetchRange() returns early)
642  const ssize_t rret = XrdOssCsiPages::fullread(fd, b, XrdSys::PageSize*p1, bavail);
643  if (rret<0)
644  {
645  TRACE(Warn, PageReadError(bavail, p1, rret));
646  return rret;
647  }
648  // if we're going to verify, make sure we just read the same overlapping data as that in the user's buffer
649  if ((opts & XrdOssDF::Verify))
650  {
651  if (memcmp(buff, &b[p1_off], bcommon))
652  {
653  size_t badoff;
654  for(badoff=0;badoff<bcommon;badoff++) { if (((uint8_t*)buff)[badoff] != b[p1_off+badoff]) break; }
655  badoff = (badoff < bcommon) ? badoff : 0; // may be possible with concurrent modification
656  TRACE(Warn, ByteMismatchError(bavail, XrdSys::PageSize*p1+p1_off+badoff, ((uint8_t*)buff)[badoff], b[p1_off+badoff]));
657  return -EDOM;
658  }
659  }
660  ub = b;
661  }
662  // verify; based on whole block, or user's buffer (if it contains the whole block)
663  if ((opts & XrdOssDF::Verify))
664  {
665  const uint32_t crc32calc = XrdOucCRC::Calc32C(ub, bavail, 0U);
666  if (tbuf[0] != crc32calc)
667  {
668  TRACE(Warn, CRCMismatchError(bavail, p1, crc32calc, tbuf[0]));
669  return -EDOM;
670  }
671  }
672 
673  // if we're returning csvec values and this first block
674  // needs adjustment because user requested a subset..
675  if (bavail>bcommon && csvec)
676  {
677  // make sure csvec[0] corresponds to only the data the user wanted, not whole page.
678  // if we have already verified the page + common part matches user's, take checksum of common.
679  // (Use local copy of page, perhaps less chance of accidental concurrent modification than buffer)
680  // Otherwise base on saved checksum.
681  if ((opts & XrdOssDF::Verify))
682  {
683  csvec[0] = XrdOucCRC::Calc32C(&b[p1_off], bcommon, 0u);
684  }
685  else
686  {
687  // calculate expected user checksum based on block's recorded checksum, adjusting
688  // for data not included in user's request. If either the returned data or the
689  // data not included in the user's request are corrupt the returned checksum and
690  // returned data will (probably) mismatch.
691 
692  // remove block data before p1_off from checksum
693  uint32_t crc32c = XrdOucCRC::Calc32C(b, p1_off, 0u);
694  csvec[0] = CrcUtils.crc32c_split2(csvec[0], crc32c, bavail-p1_off);
695 
696  // remove block data after p1_off+bcommon upto bavail
697  crc32c = XrdOucCRC::Calc32C(&b[p1_off+bcommon], bavail-p1_off-bcommon, 0u);
698  csvec[0] = CrcUtils.crc32c_split1(csvec[0], crc32c, bavail-p1_off-bcommon);
699  }
700  }
701  return 0;
702 }
703 
704 //
705 // used by FetchRangeUnaligned when only part of a page of data is needed from the last page
706 //
707 // offset: offset in file for start of read
708 // blen: total length of read
709 //
710 int XrdOssCsiPages::FetchRangeUnaligned_postblock(XrdOssDF *const fd, const void *const buff, const off_t offset, const size_t blen,
711  const off_t trackinglen, uint32_t *const tbuf, uint32_t *const csvec, const size_t tidx, const uint64_t opts)
712 {
713  EPNAME("FetchRangeUnaligned_postblock");
714 
715  const off_t p2 = (offset+blen) / XrdSys::PageSize;
716  const size_t p2_off = (offset+blen) % XrdSys::PageSize;
717 
718  // length of data in last (p2) page
719  const size_t bavail = std::min(trackinglen - (XrdSys::PageSize*p2), (off_t)XrdSys::PageSize);
720 
721  // how much of that data is not being returned
722  const size_t bremain = (p2_off < bavail) ? bavail-p2_off : 0;
723  uint8_t b[XrdSys::PageSize];
724  const uint8_t *ub = &((uint8_t*)buff)[blen-p2_off];
725  if (bremain>0)
726  {
727  const ssize_t rret = XrdOssCsiPages::fullread(fd, b, XrdSys::PageSize*p2, bavail);
728  if (rret<0)
729  {
730  TRACE(Warn, PageReadError(bavail, p2, rret));
731  return rret;
732  }
733  // if we're verifying make sure overlapping part of data just read matches user's buffer
734  if ((opts & XrdOssDF::Verify))
735  {
736  const uint8_t *const p = (uint8_t*)buff;
737  if (memcmp(&p[blen-p2_off], b, p2_off))
738  {
739  size_t badoff;
740  for(badoff=0;badoff<p2_off;badoff++) { if (p[blen-p2_off+badoff] != b[badoff]) break; }
741  badoff = (badoff < p2_off) ? badoff : 0; // may be possible with concurrent modification
742  TRACE(Warn, ByteMismatchError(bavail, XrdSys::PageSize*p2+badoff, p[blen-p2_off+badoff], b[badoff]));
743  return -EDOM;
744  }
745  }
746  ub = b;
747  }
748  if ((opts & XrdOssDF::Verify))
749  {
750  const uint32_t crc32calc = XrdOucCRC::Calc32C(ub, bavail, 0U);
751  if (tbuf[tidx] != crc32calc)
752  {
753  TRACE(Warn, CRCMismatchError(bavail, p2, crc32calc, tbuf[tidx]));
754  return -EDOM;
755  }
756  }
757  // if we're returning csvec and user only request part of page
758  // adjust the crc
759  if (csvec && bremain>0)
760  {
761  if ((opts & XrdOssDF::Verify))
762  {
763  // verified; calculate crc based on common part of page.
764  csvec[tidx] = XrdOucCRC::Calc32C(b, p2_off, 0u);
765  }
766  else
767  {
768  // recalculate crc based on recorded checksum and adjusting for part of data not returned.
769  // If either the returned data or the data not included in the user's request are
770  // corrupt the returned checksum and returned data will (probably) mismatch.
771 
772  const uint32_t crc32c = XrdOucCRC::Calc32C(&b[p2_off], bremain, 0u);
773  csvec[tidx] = CrcUtils.crc32c_split1(csvec[tidx], crc32c, bremain);
774  }
775  }
776 
777  return 0;
778 }
779 
780 //
781 // FetchRangeUnaligned
782 //
783 // Used by pgRead/Read when reading a range not starting at a page boundary within the file
784 // OR when the length is not a multiple of the page-size and the read finishes not at the end of file.
785 //
786 int XrdOssCsiPages::FetchRangeUnaligned(XrdOssDF *const fd, const void *const buff, const off_t offset, const size_t blen, const Sizes_t &sizes, uint32_t *const csvec, const uint64_t opts)
787 {
788  EPNAME("FetchRangeUnaligned");
789 
790  const off_t p1 = offset / XrdSys::PageSize;
791  const size_t p1_off = offset % XrdSys::PageSize;
792  const off_t p2 = (offset+blen) / XrdSys::PageSize;
793  const size_t p2_off = (offset+blen) % XrdSys::PageSize;
794 
795  const off_t trackinglen = sizes.first;
796 
797  size_t ntagstoread = (p2_off>0) ? p2-p1+1 : p2-p1;
798  size_t ntagsbase = p1;
799  uint32_t tbufint[stsize_], *tbuf=0;
800  size_t tbufsz = 0;
801  if (!csvec)
802  {
803  tbuf = tbufint;
804  tbufsz = sizeof(tbufint)/sizeof(uint32_t);
805  }
806  else
807  {
808  tbuf = csvec;
809  tbufsz = ntagstoread;
810  }
811 
812  size_t tcnt = std::min(ntagstoread, tbufsz);
813  ssize_t rret = ts_->ReadTags(tbuf, ntagsbase, tcnt);
814  if (rret<0)
815  {
816  TRACE(Warn, TagsReadError(ntagsbase, tcnt, rret) << " (first)");
817  return rret;
818  }
819  ntagstoread -= tcnt;
820 
821  // deal with partial first page
822  if ( p1_off>0 || blen < static_cast<size_t>(XrdSys::PageSize) )
823  {
824  const int ret = FetchRangeUnaligned_preblock(fd, buff, offset, blen, trackinglen, tbuf, csvec, opts);
825  if (ret<0)
826  {
827  return ret;
828  }
829  }
830 
831  // first (inclusive) and last (exclusive) full page
832  const off_t fp = (p1_off != 0) ? p1+1 : p1;
833  const off_t lp = p2;
834 
835  // verify full pages if wanted
836  if (fp<lp && (opts & XrdOssDF::Verify))
837  {
838  const uint8_t *const p = (uint8_t*)buff;
839  uint32_t calcbuf[stsize_];
840  const size_t cbufsz = sizeof(calcbuf)/sizeof(uint32_t);
841  size_t toread = lp-fp;
842  size_t nread = 0;
843  while(toread>0)
844  {
845  const size_t ccnt = std::min(toread, cbufsz);
846  XrdOucCRC::Calc32C(&p[(p1_off ? XrdSys::PageSize-p1_off : 0)+XrdSys::PageSize*nread],ccnt*XrdSys::PageSize,calcbuf);
847  size_t tovalid = ccnt;
848  size_t nvalid = 0;
849  while(tovalid>0)
850  {
851  const size_t tidx=fp+nread+nvalid - ntagsbase;
852  const size_t nv = std::min(tovalid, tbufsz-tidx);
853  if (nv == 0)
854  {
855  assert(csvec == NULL);
856  ntagsbase += tbufsz;
857  tcnt = std::min(ntagstoread, tbufsz);
858  rret = ts_->ReadTags(tbuf, ntagsbase, tcnt);
859  if (rret<0)
860  {
861  TRACE(Warn, TagsReadError(ntagsbase, tcnt, rret) << " (mid)");
862  return rret;
863  }
864  ntagstoread -= tcnt;
865  continue;
866  }
867  if (memcmp(&calcbuf[nvalid], &tbuf[tidx], 4*nv))
868  {
869  size_t badpg;
870  for(badpg=0;badpg<nv;badpg++) { if (memcmp(&calcbuf[nvalid+badpg], &tbuf[tidx+badpg],4)) break; }
872  (ntagsbase+tidx+badpg),
873  calcbuf[nvalid+badpg], tbuf[tidx+badpg]));
874  return -EDOM;
875  }
876  tovalid -= nv;
877  nvalid += nv;
878  }
879  toread -= ccnt;
880  nread += ccnt;
881  }
882  }
883 
884  // last partial page
885  if (p2>p1 && p2_off > 0)
886  {
887  // make sure we have last tag;
888  // (should already have all of them if we're returning them in csvec)
889  size_t tidx = p2 - ntagsbase;
890  if (tidx >= tbufsz)
891  {
892  assert(csvec == NULL);
893  tidx = 0;
894  ntagsbase = p2;
895  rret = ts_->ReadTags(tbuf, ntagsbase, 1);
896  if (rret<0)
897  {
898  TRACE(Warn, TagsReadError(ntagsbase, 1, rret) << " (last)");
899  return rret;
900  }
901  ntagstoread = 0;
902  }
903 
904  const int ret = FetchRangeUnaligned_postblock(fd, buff, offset, blen, trackinglen, tbuf, csvec, tidx, opts);
905  if (ret<0)
906  {
907  return ret;
908  }
909  }
910 
911  return 0;
912 }
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
XrdOucTrace OssCsiTrace
static XrdOssCsiCrcUtils CrcUtils
uint32_t crc32c(uint32_t crc, void const *buf, size_t len)
struct myOpts opts
#define TRACE(act, x)
Definition: XrdTrace.hh:63
static uint32_t crc32c_extendwith_zero(uint32_t crc, size_t len)
static uint32_t crc32c_combine(uint32_t crc1, uint32_t crc2, size_t len2)
static uint32_t crc32c_split1(uint32_t crctot, uint32_t crc2, size_t len2)
static uint32_t crc32c_split2(uint32_t crctot, uint32_t crc1, size_t len2)
int StoreRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &, const uint32_t *)
ssize_t apply_sequential_aligned_modify(const void *, off_t, size_t, const uint32_t *, bool, bool, uint32_t, uint32_t)
std::string ByteMismatchError(size_t blen, off_t off, uint8_t user, uint8_t page)
static ssize_t maxread(XrdOssDF *fd, void *buff, const off_t off, const size_t sz, size_t tg=0)
std::string TagsReadError(off_t start, size_t n, int ret)
std::unique_ptr< XrdOssCsiTagstore > ts_
int UpdateRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &)
std::string TagsWriteError(off_t start, size_t n, int ret)
int FetchRangeUnaligned_preblock(XrdOssDF *, const void *, off_t, size_t, off_t, uint32_t *, uint32_t *, uint64_t)
int UpdateRangeHoleUntilPage(XrdOssDF *, off_t, const Sizes_t &)
static ssize_t fullread(XrdOssDF *fd, void *buff, const off_t off, const size_t sz)
std::pair< off_t, off_t > Sizes_t
int FetchRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &, uint32_t *, uint64_t)
int FetchRangeUnaligned_postblock(XrdOssDF *, const void *, off_t, size_t, off_t, uint32_t *, uint32_t *, size_t, uint64_t)
int VerifyRangeUnaligned(XrdOssDF *, const void *, off_t, size_t, const Sizes_t &)
std::string CRCMismatchError(size_t blen, off_t pgnum, uint32_t got, uint32_t expected)
int StoreRangeUnaligned_preblock(XrdOssDF *, const void *, size_t, off_t, off_t, const uint32_t *, uint32_t &)
std::string PageReadError(size_t blen, off_t pgnum, int ret)
int StoreRangeUnaligned_postblock(XrdOssDF *, const void *, size_t, off_t, off_t, const uint32_t *, uint32_t &)
const std::string fn_
static const size_t stsize_
static const uint64_t Verify
all: Verify checksums
Definition: XrdOss.hh:223
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static const int PageSize