XRootD
XrdXrootdPgrwAio.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d X r o o t d P g r w A i o . c c */
4 /* */
5 /* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cerrno>
32 #include <cstdio>
33 #include <sys/uio.h>
34 
35 #include "Xrd/XrdLink.hh"
36 #include "Xrd/XrdScheduler.hh"
38 #include "XrdOuc/XrdOucCRC.hh"
40 #include "XrdSys/XrdSysError.hh"
41 #include "XrdSys/XrdSysPlatform.hh"
48 
49 #define TRACELINK dataLink
50 
51 /******************************************************************************/
52 /* G l o b a l S t a t i c s */
53 /******************************************************************************/
54 
56 
57 namespace XrdXrootd
58 {
59 extern XrdSysError eLog;
60 extern XrdScheduler *Sched;
61 }
62 using namespace XrdXrootd;
63 
64 /******************************************************************************/
65 /* S t a t i c M e m e b e r s */
66 /******************************************************************************/
67 
68 const char *XrdXrootdPgrwAio::TraceID = "PgrwAio";
69 
70 /******************************************************************************/
71 /* L o c a l S t a t i c s */
72 /******************************************************************************/
73 
74 namespace
75 {
76 XrdSysMutex fqMutex;
77 XrdXrootdPgrwAio *fqFirst = 0;
78 int numFree = 0;
79 
80 static const int maxKeep = 64; // 4 MB to keep in reserve
81 }
82 
83 /******************************************************************************/
84 /* A l l o c */
85 /******************************************************************************/
86 
88  XrdXrootdResponse &resp,
89  XrdXrootdFile *fP,
90  XrdXrootdPgwBadCS *bcsP)
91 {
92  XrdXrootdPgrwAio *reqP;
93 
94 // Obtain a preallocated aio request object
95 //
96  fqMutex.Lock();
97  if ((reqP = fqFirst))
98  {fqFirst = reqP->nextPgrw;
99  numFree--;
100  }
101  fqMutex.UnLock();
102 
103 // If we have no object, create a new one
104 //
105  if (!reqP) reqP = new XrdXrootdPgrwAio;
106 
107 // Initialize the object and return it
108 //
109  reqP->Init(protP, resp, fP);
110  reqP->nextPgrw = 0;
111  reqP->badCSP = bcsP;
112  return reqP;
113 }
114 
115 /******************************************************************************/
116 /* Private: C o p y F 2 L _ A d d 2 Q */
117 /******************************************************************************/
118 
119 bool XrdXrootdPgrwAio::CopyF2L_Add2Q(XrdXrootdAioPgrw *aioP)
120 {
121  const char *eMsg;
122  int dlen, rc;
123 
124 // Dispatch the requested number of aio requests if we have enough data
125 //
126  if (dataLen > 0)
127  {if (!aioP && !(aioP = XrdXrootdAioPgrw::Alloc(this)))
128  {if (inFlight) return true;
129  SendError(ENOMEM, "insufficient memory");
130  return false;
131  }
132  if (!(dlen = aioP->Setup2Send(dataOffset, dataLen, eMsg)))
133  {SendError(EINVAL, eMsg);
134  aioP->Recycle();
135  return false;
136  }
137  if ((rc = dataFile->XrdSfsp->pgRead((XrdSfsAio *)aioP)) != SFS_OK)
138  {SendFSError(rc);
139  aioP->Recycle();
140  return false;
141  }
142  inFlight++;
143  TRACEP(FSAIO, "pgrd beg " <<dlen <<'@' <<dataOffset
144  <<" inF=" <<int(inFlight));
145  dataOffset += dlen;
146  dataLen -= dlen;
147  if (dataLen <= 0)
148  {dataFile->aioFob->Schedule(Protocol);
149  aioState |= aioSchd;
150  }
151  }
152  return true;
153 }
154 
155 /******************************************************************************/
156 /* Private: C o p y F 2 L */
157 /******************************************************************************/
158 
159 void XrdXrootdPgrwAio::CopyF2L()
160 {
161  XrdXrootdAioBuff *bP;
162  XrdXrootdAioPgrw *aioP;
163 
164 // Pick a finished element off the pendQ. Wait for an oustanding buffer if we
165 // reached our buffer limit. Otherwise, ask for a return if we can start anew.
166 // Note: We asked getBuff() if it returns nil to not release the lock.
167 //
168 do{bool doWait = dataLen <= 0 || inFlight >= XrdXrootdProtocol::as_maxperreq;
169  if (!(bP = getBuff(doWait)))
170  {if (isDone || !CopyF2L_Add2Q()) break;
171  continue;
172  }
173 
174 // Step 1: do some tracing
175 //
176  TRACEP(FSAIO,"pgrd end "<<bP->sfsAio.aio_nbytes<<'@'<<bP->sfsAio.aio_offset
177  <<" result="<<bP->Result<<" D-S="<<isDone<<'-'<<int(Status)
178  <<" inF="<<int(inFlight));
179 
180 // Step 2: Validate this buffer
181 //
182  if (!Validate(bP))
183  {if (bP != finalRead) bP->Recycle();
184  continue;
185  }
186 
187 // Step 3: Get a pointer to the derived type (we avoid dynamic cast)
188 //
189  aioP = bP->pgrwP;
190 
191 // Step 4: If this aio request was simulated (indicated by cksVec being nil)
192 // we have to compute the checksums and reset the pointer via noChkSums().
193 //
194  if (aioP->noChkSums() && aioP->Result > 0)
195  XrdOucPgrwUtils::csCalc((char *)aioP->sfsAio.aio_buf,
196  aioP->sfsAio.aio_offset, aioP->Result, aioP->cksVec);
197 
198 // Step 5: If this is the last block to be read then save it for final status
199 //
200  if (inFlight == 0 && dataLen == 0 && !finalRead)
201  {finalRead = aioP;
202  break;
203  }
204 
205 // Step 8: Send the data to the client and if successful, see if we need to
206 // schedule more data to be read from the data source.
207 //
208  if (!isDone && SendData(aioP) && dataLen) {if (!CopyF2L_Add2Q(aioP)) break;}
209  else aioP->Recycle();
210 
211  } while(inFlight > 0);
212 
213 // If we are here then the request has finished. If all went well,
214 // fire off the final response.
215 //
216  if (!isDone) SendData(finalRead, true);
217  if (finalRead) finalRead->Recycle();
218 
219 // If we encountered a fatal link error then cancel any pending aio reads on
220 // this link. Otherwise if we have not yet scheduled the next aio, do so.
221 //
222  if (aioState & aioDead) dataFile->aioFob->Reset(Protocol);
223  else if (!(aioState & aioSchd)) dataFile->aioFob->Schedule(Protocol);
224 
225 // Do a quick drain if something is still in flight for logging purposes.
226 // If the quick drain wasn't successful, then draining will be done in
227 // the background; which, of course, might never complete. Otherwise, recycle.
228 //
229  if (!inFlight) Recycle(true);
230  else Recycle(Drain());
231 }
232 
233 /******************************************************************************/
234 /* Private: C o p y L 2 F */
235 /******************************************************************************/
236 
237 int XrdXrootdPgrwAio::CopyL2F()
238 {
239  XrdXrootdAioBuff *bP;
240  XrdXrootdAioPgrw *aioP;
241  const char *eMsg;
242  int dLen, ioVNum, rc;
243 
244 // Pick a finished element off the pendQ. If there are no elements then get
245 // a new one if we can. Otherwise, we will have to wait for one to come back.
246 // Unlike read() writes are bound to a socket and we cannot reliably
247 // give up the thread by returning to level 0.
248 //
249 do{bool doWait = dataLen <= 0 || inFlight >= XrdXrootdProtocol::as_maxperreq;
250  if (!(bP = getBuff(doWait)))
251  {if (isDone) return 0;
252  if (!(aioP = XrdXrootdAioPgrw::Alloc(this)))
253  {SendError(ENOMEM, "insufficient memory");
254  return 0;
255  }
256  } else {
257  aioP = bP->pgrwP;
258 
259  TRACEP(FSAIO,"pgwr end "<<aioP->sfsAio.aio_nbytes<<'@'
260  <<aioP->sfsAio.aio_offset<<" result="<<aioP->Result
261  <<" D-S="<<isDone<<'-'<<int(Status)<<" inF="<<int(inFlight));
262 
263 // If the aio failed, send an error
264 //
265  if (aioP->Result <= 0)
266  {SendError(-aioP->Result, 0);
267  aioP->Recycle();
268  return 0; // Caller will drain
269  }
270 
271 // If we have no data or status was posted, ignore the result
272 //
273  if (dataLen <= 0 || isDone)
274  {aioP->Recycle();
275  continue;
276  }
277  }
278 
279 // Setup the aio object
280 //
281  dLen = aioP->Setup2Recv(dataOffset, dataLen, eMsg);
282  if (!dLen)
283  {SendError(EINVAL, eMsg);
284  aioP->Recycle();
285  return 0;
286  }
287  dataOffset += aioP->sfsAio.aio_nbytes;
288  dataLen -= dLen;
289 
290 // Get the iovec information
291 //
292  struct iovec *ioV = aioP->iov4Recv(ioVNum);
293 
294 // Issue the read to get the data into the buffer
295 //
296  if ((rc = Protocol->getData(this, "pgWrite", ioV, ioVNum)))
297  {if (rc > 0) pendWrite = aioP;
298  else {aioP->Recycle(); // rc must be < 0!
299  dataLen = 0;
300  }
301  return rc;
302  }
303 
304 // Complete the write operation
305 //
306  if (!CopyL2F(aioP)) return 0;
307 
308  } while(inFlight);
309 
310 // If we finished successfully, send off final response otherwise its an error.
311 //
312  if (!isDone)
313  {if (!dataLen) return SendDone();
314  SendError(EIDRM, "pgWrite encountered an impossible condition");
315  eLog.Emsg("PgrwAio", "pgWrite logic error for",
316  dataLink->ID, dataFile->FileKey);
317  }
318 
319 // Cleanup as we don't know where we will return
320 //
321  return 0;
322 }
323 
324 /******************************************************************************/
325 
326 bool XrdXrootdPgrwAio::CopyL2F(XrdXrootdAioBuff *bP)
327 {
328 
329 // Verify the checksums. Upon success, write out the data.
330 //
331  if (VerCks(bP->pgrwP))
332  {int rc = dataFile->XrdSfsp->pgWrite((XrdSfsAio *)bP);
333  if (rc != SFS_OK) {SendFSError(rc); bP->Recycle();}
334  else {inFlight++;
335  TRACEP(FSAIO, "pgwr beg " <<bP->sfsAio.aio_nbytes <<'@'
336  <<bP->sfsAio.aio_offset
337  <<" inF=" <<int(inFlight));
338  return true;
339  }
340  }
341  return false;
342 }
343 
344 /******************************************************************************/
345 /* D o I t */
346 /******************************************************************************/
347 
348 // This method is invoked when we have run out of aio objects but have inflight
349 // objects during reading. In that case, we must relinquish the thread. When an
350 // aio object completes it will reschedule this object on a new thread.
351 
353 {
354 // Reads run disconnected as they will never read from the link.
355 //
356  if (aioState & aioRead) CopyF2L();
357 }
358 
359 /******************************************************************************/
360 /* R e a d */
361 /******************************************************************************/
362 
363 void XrdXrootdPgrwAio::Read(long long offs, int dlen)
364 {
365 
366 // Setup the copy from the file to the network
367 //
368  dataOffset = highOffset = offs;
369  dataLen = dlen;
370  aioState = aioRead | aioPage;
371 
372 // Reads run disconnected and are self-terminating, so we need to inclreas the
373 // refcount for the link we will be using to prevent it from disaapearing.
374 // Recycle will decrement it but does so only for reads. We always up the file
375 // refcount and number of requests.
376 //
377  dataLink->setRef(1);
378  dataFile->Ref(1);
379  Protocol->aioUpdReq(1);
380 
381 // Schedule ourselves to run this asynchronously and return
382 //
383  dataFile->aioFob->Schedule(this);
384 }
385 
386 /******************************************************************************/
387 /* R e c y c l e */
388 /******************************************************************************/
389 
390 void XrdXrootdPgrwAio::Recycle(bool release)
391 {
392 // Update request count, file and link reference count
393 //
394  if (!(aioState & aioHeld))
395  {Protocol->aioUpdReq(-1);
396  if (aioState & aioRead)
397  {dataLink->setRef(-1);
398  dataFile->Ref(-1);
399  }
400  aioState |= aioHeld;
401  }
402 
403 // Do some traceing
404 //
405  TRACEP(FSAIO,"pgrw recycle "<<(release ? "" : "hold ")
406  <<(aioState & aioRead ? 'R' : 'W')<<" D-S="
407  <<isDone<<'-'<<int(Status));
408 
409 // Place the object on the free queue if possible
410 //
411  if (release)
412  {fqMutex.Lock();
413  if (numFree >= maxKeep)
414  {fqMutex.UnLock();
415  delete this;
416  } else {
417  nextPgrw = fqFirst;
418  fqFirst = this;
419  numFree++;
420  fqMutex.UnLock();
421  }
422  }
423 }
424 
425 /******************************************************************************/
426 /* Private: S e n d D a t a */
427 /******************************************************************************/
428 
429 bool XrdXrootdPgrwAio::SendData(XrdXrootdAioBuff *bP, bool final)
430 {
431  static const int infoLen = sizeof(kXR_int64);
432  struct pgReadResponse
434  kXR_int64 ofs;
435  } pgrResp;
436  int rc;
437 
438 // Preinitialize the header
439 //
440  pgrResp.rsp.bdy.requestid = kXR_pgread - kXR_1stRequest;
441  pgrResp.rsp.bdy.resptype = (final ? XrdProto::kXR_FinalResult
443  memset(pgrResp.rsp.bdy.reserved, 0, sizeof(pgrResp.rsp.bdy.reserved));
444 
445 // Send the data; we might not have any (typically in a final response)
446 //
447  if (bP)
448  {int iovLen, iovNum;
449  struct iovec *ioVec = bP->pgrwP->iov4Send(iovNum, iovLen, true);
450  pgrResp.ofs = htonll(bP->sfsAio.aio_offset);
451 // char trBuff[512];
452 // snprintf(trBuff, sizeof(trBuff), "Aio PGR: %d@%ld (%ld)\n",
453 // iovLen, bP->sfsAio.aio_offset, (bP->sfsAio.aio_offset>>12));
454 // std::cerr<<trBuff<<std::flush;
455  rc = Response.Send(pgrResp.rsp, infoLen, ioVec, iovNum, iovLen);
456  } else {
457  pgrResp.rsp.bdy.dlen = 0;
458  pgrResp.ofs = htonll(dataOffset);
459  rc = Response.Send(pgrResp.rsp, infoLen);
460  }
461 
462 // Diagnose any errors
463 //
464  if (rc || final)
465  {isDone = true;
466  dataLen = 0;
467  if (rc) aioState |= aioDead;
468  }
469  return rc == 0;
470 }
471 
472 /******************************************************************************/
473 /* Private: S e n d D o n e */
474 /******************************************************************************/
475 
476 int XrdXrootdPgrwAio::SendDone()
477 {
478  static const int infoLen = sizeof(kXR_int64);
479  struct {ServerResponseStatus rsp;
480  ServerResponseBody_pgWrite info; // info.offset
481  } pgwResp;
482  char *buff;
483  int n, rc;
484 
485 // Preinitialize the header
486 //
487  pgwResp.rsp.bdy.requestid = kXR_pgwrite - kXR_1stRequest;
488  pgwResp.rsp.bdy.resptype = XrdProto::kXR_FinalResult;
489  pgwResp.info.offset = htonll(highOffset);
490  memset(pgwResp.rsp.bdy.reserved, 0, sizeof(pgwResp.rsp.bdy.reserved));
491 
492 // Get any checksum correction information we should turn
493 //
494  buff = badCSP->boInfo(n);
495 
496 // Send the final response
497 //
498  if ((rc = Response.Send(pgwResp.rsp, infoLen, buff, n))) dataLen = 0;
499  isDone = true;
500  if (rc) aioState |= aioDead;
501  return rc;
502 }
503 
504 /******************************************************************************/
505 /* V e r C k s */
506 /******************************************************************************/
507 
508 bool XrdXrootdPgrwAio::VerCks(XrdXrootdAioPgrw *aioP)
509 {
510  off_t dOffset = aioP->sfsAio.aio_offset;
511  uint32_t *csVec, *csVP, csVal;
512  int ioVNum, dLen;
513 
514 // Get the iovec information as this will drive the checksum
515 //
516  struct iovec *ioV = aioP->iov4Data(ioVNum);
517  csVP = csVec = (uint32_t*)ioV[0].iov_base;
518 
519 // Verify each page or page segment
520 //
521  for (int i = 1; i < ioVNum; i +=2)
522  {dLen = ioV[i].iov_len;
523  csVal = ntohl(*csVP); *csVP++ = csVal;
524  if (!XrdOucCRC::Ver32C(ioV[i].iov_base, dLen, csVal))
525  {const char *eMsg = badCSP->boAdd(dataFile, dOffset, dLen);
526  if (eMsg) {SendError(ETOOMANYREFS, eMsg);
527  aioP->Recycle();
528  return false;
529  }
530  }
531  dOffset += dLen;
532  }
533 
534 // All done, while we may have checksum error there is nothing we can do about
535 // it and it's up to the client to send corrections.
536 //
537  return true;
538 }
539 
540 /******************************************************************************/
541 /* W r i t e */
542 /******************************************************************************/
543 
544 int XrdXrootdPgrwAio::Write(long long offs, int dlen)
545 {
546 
547 // Update request count. Note that dataLink and dataFile references are
548 // handled outboard as writes are inextricably tied to the data link.
549 //
550  Protocol->aioUpdReq(1);
551 
552 // Setup the copy from the network to the file
553 //
554  aioState &= ~aioRead;
555  dataOffset = highOffset = offs;
556  dataLen = dlen;
557 
558 // Since this thread can't do anything else since it's blocked by the socket
559 // we simply initiate the write operation via a simulated getData() callback.
560 //
561  return gdDone();
562 }
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1261
@ kXR_1stRequest
Definition: XProtocol.hh:111
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_pgwrite
Definition: XProtocol.hh:138
long long kXR_int64
Definition: XPtypes.hh:98
#define eMsg(x)
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
#define SFS_OK
XrdSysTrace XrdXrootdTrace
#define TRACEP(act, x)
static bool Ver32C(const void *data, size_t count, const uint32_t csval, uint32_t *csbad=0)
Definition: XrdOucCRC.cc:222
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
uint32_t * cksVec
Definition: XrdSfsAio.hh:63
ssize_t Result
Definition: XrdSfsAio.hh:65
virtual void Recycle()=0
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
virtual void Recycle() override
XrdXrootdAioPgrw *const pgrwP
int Setup2Send(off_t offs, int dlen, const char *&eMsg)
struct iovec * iov4Send(int &iovNum, int &iovLen, bool cs2net=false)
struct iovec * iov4Recv(int &iovNum)
void Recycle() override
bool noChkSums(bool reset=true)
static XrdXrootdAioPgrw * Alloc(XrdXrootdAioTask *arp)
struct iovec * iov4Data(int &iovNum)
int Setup2Recv(off_t offs, int dlen, const char *&eMsg)
void Init(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
void DoIt() override
void Read(long long offs, int dlen) override
static XrdXrootdPgrwAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP, XrdXrootdPgwBadCS *bcsP=0)
int Write(long long offs, int dlen) override
void Recycle(bool release) override
ProtocolImpl< false > Protocol
@ kXR_PartialResult
Definition: XProtocol.hh:1250
@ kXR_FinalResult
Definition: XProtocol.hh:1249
XrdScheduler * Sched
XrdSysError eLog