XRootD
XrdXrootdNormAio.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d X r o o t d N o r m A i o . c c */
4 /* */
5 /* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cerrno>
32 #include <cstdio>
33 #include <sys/uio.h>
34 
35 #include "Xrd/XrdLink.hh"
36 #include "Xrd/XrdScheduler.hh"
38 #include "XrdSys/XrdSysError.hh"
39 #include "XrdSys/XrdSysPlatform.hh"
45 
46 #define TRACELINK dataLink
47 
48 /******************************************************************************/
49 /* G l o b a l S t a t i c s */
50 /******************************************************************************/
51 
53 
54 namespace XrdXrootd
55 {
56 extern XrdSysError eLog;
57 extern XrdScheduler *Sched;
58 }
59 using namespace XrdXrootd;
60 
61 /******************************************************************************/
62 /* S t a t i c M e m e b e r s */
63 /******************************************************************************/
64 
65 const char *XrdXrootdNormAio::TraceID = "NormAio";
66 
67 /******************************************************************************/
68 /* L o c a l S t a t i c s */
69 /******************************************************************************/
70 
71 namespace
72 {
73 XrdSysMutex fqMutex;
74 XrdXrootdNormAio *fqFirst = 0;
75 int numFree = 0;
76 
77 static const int maxKeep = 64; // Keep in reserve
78 }
79 
80 /******************************************************************************/
81 /* A l l o c */
82 /******************************************************************************/
83 
85  XrdXrootdResponse &resp,
86  XrdXrootdFile *fP)
87 {
88  XrdXrootdNormAio *reqP;
89 
90 // Obtain a preallocated aio request object
91 //
92  fqMutex.Lock();
93  if ((reqP = fqFirst))
94  {fqFirst = reqP->nextNorm;
95  numFree--;
96  }
97  fqMutex.UnLock();
98 
99 // If we have no object, create a new one
100 //
101  if (!reqP) reqP = new XrdXrootdNormAio;
102 
103 // Initialize the object and return it
104 //
105  reqP->Init(protP, resp, fP);
106  reqP->nextNorm = 0;
107  return reqP;
108 }
109 
110 /******************************************************************************/
111 /* Private: C o p y F 2 L _ A d d 2 Q */
112 /******************************************************************************/
113 
114 bool XrdXrootdNormAio::CopyF2L_Add2Q(XrdXrootdAioBuff *aioP)
115 {
116  int dlen, rc;
117 
118 // Dispatch the requested number of aio requests if we have enough data
119 //
120  if (dataLen > 0)
121  {if (!aioP && !(aioP = XrdXrootdAioBuff::Alloc(this)))
122  {if (inFlight) return true;
123  SendError(ENOMEM, "insufficient memory");
124  return false;
125  }
126  aioP->sfsAio.aio_offset = dataOffset;
127  if (dataLen >= (int)aioP->sfsAio.aio_nbytes)
128  dlen = aioP->sfsAio.aio_nbytes;
129  else dlen = aioP->sfsAio.aio_nbytes = dataLen;
130 
131  if ((rc = dataFile->XrdSfsp->read((XrdSfsAio *)aioP)) != SFS_OK)
132  {SendFSError(rc);
133  aioP->Recycle();
134  return false;
135  }
136  inFlight++;
137  TRACEP(FSAIO, "aioR beg " <<dlen <<'@' <<dataOffset
138  <<" inF=" <<int(inFlight));
139  dataOffset += dlen;
140  dataLen -= dlen;
141  if (dataLen <= 0)
142  {dataFile->aioFob->Schedule(Protocol);
143  aioState |= aioSchd;
144  }
145  }
146  return true;
147 }
148 
149 /******************************************************************************/
150 /* Private: C o p y F 2 L */
151 /******************************************************************************/
152 
153 void XrdXrootdNormAio::CopyF2L()
154 {
155  XrdXrootdAioBuff *aioP;
156  bool aOK = true;
157 
158 // Pick a finished element off the pendQ. Wait for an oustanding buffer if we
159 // reached our buffer limit. Otherwise, ask for a return if we can start anew.
160 // Note: We asked getBuff() if it returns nil to not release the lock.
161 //
162 do{bool doWait = dataLen <= 0 || inFlight >= XrdXrootdProtocol::as_maxperreq;
163  if (!(aioP = getBuff(doWait)))
164  {if (isDone || !CopyF2L_Add2Q()) break;
165  continue;
166  }
167 
168 // Step 1: do some tracing
169 //
170  TRACEP(FSAIO,"aioR end "<<aioP->sfsAio.aio_nbytes
171  <<'@'<<aioP->sfsAio.aio_offset
172  <<" result="<<aioP->Result<<" D-S="<<isDone<<'-'<<int(Status)
173  <<" inF="<<int(inFlight));
174 
175 // Step 2: Validate this buffer
176 //
177  if (!Validate(aioP))
178  {if (aioP != finalRead) aioP->Recycle();
179  continue;
180  }
181 
182 // Step 3: Since block may come back out of order we need to make sure we are
183 // sending then in proper order with no gaps.
184 //
185  if (aioP->sfsAio.aio_offset != sendOffset && !isDone)
186  {XrdXrootdAioBuff *bP = sendQ, *bPP = 0;
187  while(bP)
188  {if (aioP->sfsAio.aio_offset < bP->sfsAio.aio_offset) break;
189  bPP = bP; bP = bP->next;
190  }
191  aioP->next = bP;
192  if (bPP) bPP->next = aioP;
193  else sendQ = aioP;
194  reorders++;
195  TRACEP(FSAIO,"aioR inQ "<<aioP->Result<<'@'<<aioP->sfsAio.aio_offset);
196  continue;
197  }
198 
199 // Step 4: If this is the last block to be read then establish the actual
200 // last block to be used for final status to avoid an extra response.
201 //
202  if (inFlight == 0 && dataLen == 0 && !finalRead)
203  {if (!sendQ)
204  {finalRead = aioP;
205  break;
206  } else {
207  XrdXrootdAioBuff *bP = sendQ, *bPP = 0;
208  while(bP->next) {bPP = bP; bP = bP->next;}
209  if (bPP) {finalRead = bP; bPP->next = 0;}
210  else {finalRead = sendQ; sendQ = 0;}
211  }
212  }
213 
214 // Step 5: Send the data to the client and if successful, see if we need to
215 // schedule more data to be read from the data source.
216 //
217  if (isDone || !Send(aioP) || dataLen <= 0) aioP->Recycle();
218  else if (!CopyF2L_Add2Q(aioP)) break;
219 
220 // Step 6: Now send any queued messages that are eligible to be sent
221 //
222  while(sendQ && sendQ->sfsAio.aio_offset == sendOffset && aOK)
223  {aioP = sendQ;
224  sendQ = sendQ->next;
225  TRACEP(FSAIO,"aioR deQ "<<aioP->Result<<'@'<<aioP->sfsAio.aio_offset);
226  if (!isDone && Send(aioP) && dataLen) aOK = CopyF2L_Add2Q(aioP);
227  else aioP->Recycle();
228  }
229 
230  } while(inFlight > 0 && aOK);
231 
232 // If we are here then the request has finished. If all went well,
233 // fire off the final response.
234 // .
235  if (!isDone)
236  {if (sendQ)
237  {char ebuff[80];
238  snprintf(ebuff, sizeof(ebuff), "aio read failed at offset %lld; "
239  "missing data", static_cast<long long>(sendOffset));
240  SendError(ENODEV, ebuff);
241  } else Send(finalRead, true);
242  }
243 
244 // Cleanup anything left over
245 //
246  if (finalRead) finalRead->Recycle();
247  while((aioP = sendQ)) {sendQ = sendQ->next; aioP->Recycle();}
248 
249 // If we encountered a fatal link error then cancel any pending aio reads on
250 // this link. Otherwise if we have not yet scheduled the next aio, do so.
251 //
252  if (aioState & aioDead) dataFile->aioFob->Reset(Protocol);
253  else if (!(aioState & aioSchd)) dataFile->aioFob->Schedule(Protocol);
254 
255 // Do a quick drain if something is still in flight for logging purposes.
256 // If the quick drain wasn't successful, then draining will be done in
257 // the background; which, of course, might never complete. Otherwise, recycle.
258 //
259  if (!inFlight) Recycle(true);
260  else Recycle(Drain());
261 }
262 
263 /******************************************************************************/
264 /* Private: C o p y L 2 F */
265 /******************************************************************************/
266 
267 int XrdXrootdNormAio::CopyL2F()
268 {
269  XrdXrootdAioBuff *aioP;
270  int dLen, rc;
271 
272 // Pick a finished element off the pendQ. If there are no elements then get
273 // a new one if we can. Otherwise, we will have to wait for one to come back.
274 // Unlike read() writes are bound to a socket and we cannot reliably
275 // give up the thread by returning to level 0.
276 //
277 do{bool doWait = dataLen <= 0 || inFlight >= XrdXrootdProtocol::as_maxperreq;
278  if (!(aioP = getBuff(doWait)))
279  {if (isDone) return 0;
280  if (!(aioP = XrdXrootdAioBuff::Alloc(this)))
281  {SendError(ENOMEM, "insufficient memory");
282  return 0;
283  }
284  } else {
285 
286  TRACEP(FSAIO, "aioW end "<<aioP->sfsAio.aio_nbytes<<'@'
287  <<aioP->sfsAio.aio_offset<<" result="<<aioP->Result
288  <<" D-S="<<isDone<<'-'<<int(Status)<<" inF="<<int(inFlight));
289 
290 // If the aio failed, send an error
291 //
292  if (aioP->Result <= 0)
293  {SendError(-aioP->Result, 0);
294  aioP->Recycle();
295  return 0; // Caller will drain
296  }
297 
298 // If we have no data or status was posted, ignore the result
299 //
300  if (dataLen <= 0 || isDone)
301  {aioP->Recycle();
302  continue;
303  }
304  }
305 
306 // Setup the aio object
307 //
308  aioP->sfsAio.aio_offset = dataOffset;
309  if (dataLen >= (int)aioP->sfsAio.aio_nbytes)
310  dLen = aioP->sfsAio.aio_nbytes;
311  else dLen = aioP->sfsAio.aio_nbytes = dataLen;
312  dataOffset += dLen;
313  dataLen -= dLen;
314 
315 // Issue the read to get the data into the buffer
316 //
317  if ((rc = Protocol->getData(this,"aiowr",(char *)aioP->sfsAio.aio_buf,dLen)))
318  {if (rc > 0) pendWrite = aioP;
319  else {aioP->Recycle(); // rc must be < 0!
320  dataLen = 0;
321  }
322  return rc;
323  }
324 
325 // Complete the write operation
326 //
327  if (!CopyL2F(aioP)) return 0;
328 
329  } while(inFlight);
330 
331 // If we finished successfully, send off final response otherwise its an error.
332 //
333  if (!isDone)
334  {if (!dataLen) return (Send(0) ? 0 : -1);
335  SendError(EIDRM, "aioWrite encountered an impossible condition");
336  eLog.Emsg("NormAio", "write logic error for",
337  dataLink->ID, dataFile->FileKey);
338  }
339 
340 // Cleanup as we don't know where we will return
341 //
342  return 0;
343 }
344 
345 /******************************************************************************/
346 
347 bool XrdXrootdNormAio::CopyL2F(XrdXrootdAioBuff *aioP)
348 {
349 
350 // Write out the data
351 //
352  int rc = dataFile->XrdSfsp->write((XrdSfsAio *)aioP);
353  if (rc != SFS_OK)
354  {SendFSError(rc);
355  aioP->Recycle();
356  return false;
357  }
358 
359 // Do some tracing and return
360 //
361  inFlight++;
362  TRACEP(FSAIO,"aioW beg "<<aioP->sfsAio.aio_nbytes <<'@'
363  <<aioP->sfsAio.aio_offset <<" inF=" <<int(inFlight));
364  return true;
365 }
366 
367 /******************************************************************************/
368 /* D o I t */
369 /******************************************************************************/
370 
371 // This method is invoked when we have run out of aio objects but have inflight
372 // objects during reading. In that case, we must relinquish the thread. When an
373 // aio object completes it will reschedule this object on a new thread.
374 
376 {
377 // Reads run disconnected as they will never read from the link.
378 //
379  if (aioState & aioRead) CopyF2L();
380 }
381 
382 /******************************************************************************/
383 /* R e a d */
384 /******************************************************************************/
385 
386 void XrdXrootdNormAio::Read(long long offs, int dlen)
387 {
388 
389 // Setup the copy from the file to the network
390 //
391  dataOffset = highOffset = sendOffset = offs;
392  dataLen = dlen;
393  aioState = aioRead;
394 
395 // Reads run disconnected and are self-terminating, so we need to increase the
396 // refcount for the link we will be using to prevent it from disapearing.
397 // Recycle will decrement it but does so only for reads. We always update
398 // the file refcount and increase the request count.
399 //
400  dataLink->setRef(1);
401  dataFile->Ref(1);
402  Protocol->aioUpdReq(1);
403 
404 // Schedule ourselves to run this asynchronously and return
405 //
406  dataFile->aioFob->Schedule(this);
407 }
408 
409 /******************************************************************************/
410 /* R e c y c l e */
411 /******************************************************************************/
412 
413 void XrdXrootdNormAio::Recycle(bool release)
414 {
415 // Update request count, file and link reference count
416 //
417  if (!(aioState & aioHeld))
418  {Protocol->aioUpdReq(-1);
419  if (aioState & aioRead)
420  {dataFile->Ref(-1);
421  dataLink->setRef(-1);
422  }
423  aioState |= aioHeld;
424  }
425 
426 // Do some tracing and reset reorder counter
427 //
428  TRACEP(FSAIO,"aio"<<(aioState & aioRead ? 'R' : 'W')<<" recycle"
429  <<(release ? "" : " hold")<<"; reorders="<<reorders
430  <<" D-S="<<isDone<<'-'<<int(Status));
431  reorders = 0;
432 
433 // Place the object on the free queue if possible
434 //
435  if (release)
436  {fqMutex.Lock();
437  if (numFree >= maxKeep)
438  {fqMutex.UnLock();
439  delete this;
440  } else {
441  nextNorm = fqFirst;
442  fqFirst = this;
443  numFree++;
444  fqMutex.UnLock();
445  }
446  }
447 }
448 
449 /******************************************************************************/
450 /* Private: S e n d */
451 /******************************************************************************/
452 
453 bool XrdXrootdNormAio::Send(XrdXrootdAioBuff *aioP, bool final)
454 {
455  XResponseType code = (final ? kXR_ok : kXR_oksofar);
456  int rc;
457 
458 // Send the data (note that no data means it's a finalresponse)
459 //
460  if (aioP)
461  {rc = Response.Send(code,(void*)aioP->sfsAio.aio_buf,aioP->Result);
462  sendOffset = aioP->sfsAio.aio_offset + aioP->Result;
463  } else rc = Response.Send();
464 
465 // Diagnose any errors
466 //
467  if (rc || final)
468  {isDone = true;
469  dataLen = 0;
470  if (rc) aioState |= aioDead;
471  }
472  return rc == 0;
473 }
474 
475 /******************************************************************************/
476 /* W r i t e */
477 /******************************************************************************/
478 
479 int XrdXrootdNormAio::Write(long long offs, int dlen)
480 {
481 // Update request count. Note that dataLink and dataFile references are
482 // handled outboard as writes are inextricably tied to the data link.
483 //
484  Protocol->aioUpdReq(1);
485 
486 // Setup the copy from the network to the file
487 //
488  aioState &= ~aioRead;
489  dataOffset = highOffset = offs;
490  dataLen = dlen;
491 
492 // Since this thread can't do anything else since it's blocked by the socket
493 // we simply initiate the write operation via a simulated getData() callback.
494 //
495  return gdDone();
496 }
XResponseType
Definition: XProtocol.hh:898
@ kXR_oksofar
Definition: XProtocol.hh:900
@ kXR_ok
Definition: XProtocol.hh:899
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
#define SFS_OK
XrdSysTrace XrdXrootdTrace
#define TRACEP(act, x)
ssize_t Result
Definition: XrdSfsAio.hh:65
virtual void Recycle()=0
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdXrootdAioBuff * next
virtual void Recycle() override
static XrdXrootdAioBuff * Alloc(XrdXrootdAioTask *arp)
void Init(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
void DoIt() override
int Write(long long offs, int dlen) override
void Recycle(bool release) override
void Read(long long offs, int dlen) override
static XrdXrootdNormAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
ProtocolImpl< false > Protocol
ssize_t Send(int fd, KernelBuffer &buffer)
XrdScheduler * Sched
XrdSysError eLog