XRootD
XrdXrootdAioTask.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d X r o o t d A i o T a s k . c c */
4 /* */
5 /* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cerrno>
32 #include <cstdio>
33 #include <ctime>
34 #include <limits.h>
35 #include <sys/uio.h>
36 #include "Xrd/XrdLink.hh"
37 #include "Xrd/XrdScheduler.hh"
39 #include "XrdSys/XrdSysError.hh"
40 #include "XrdSys/XrdSysE2T.hh"
41 #include "XrdSys/XrdSysTimer.hh"
47 
48 #define TRACELINK dataLink
49 
50 /******************************************************************************/
51 /* G l o b a l S t a t i c s */
52 /******************************************************************************/
53 
55 
56 namespace XrdXrootd
57 {
60 }
61 using namespace XrdXrootd;
62 
63 /******************************************************************************/
64 /* S t a t i c M e m b e r s */
65 /******************************************************************************/
66 
67 const char *XrdXrootdAioTask::TraceID = "AioTask";
68 
69 /******************************************************************************/
70 /* C o m p l e t e d */
71 /******************************************************************************/
72 
74 {
75 // Lock this code path
76 //
77  aioMutex.Lock();
78 
79 // If this request is not running and completed then take a shortcut.
80 //
81  if (Status == Offline && isDone)
82  {aioP->Recycle();
83  inFlight--;
84  aioMutex.UnLock();
85  if (inFlight <= 0) Recycle(true);
86  return;
87  }
88 
89 // Add this element to the end of the queue
90 //
91  aioP->next = 0;
92  if (!pendQ) pendQEnd = pendQ = aioP;
93  else {pendQEnd->next = aioP;
94  pendQEnd = aioP;
95  }
96 
97 // Check if the request is waiting for our buffer tell it now has one. Otherwise,
98 // if the task is offline then it cannot be done (see above); so schedule it.
99 //
100  if (Status != Running)
101  {if (Status == Waiting) aioReady.Signal();
102  else Sched->Schedule(this);
103  Status = Running;
104  }
105 
106  aioMutex.UnLock();
107 }
108 
109 /******************************************************************************/
110 /* Protected: D r a i n */
111 /******************************************************************************/
112 
114 {
115  XrdXrootdAioBuff *aioP;
116  int maxWait = 6; // Max seconds to wait for outstanding requests
117 
118 // Reap as many aio object as you can
119 //
120  aioMutex.Lock();
121  while(inFlight > 0)
122  {while((aioP = pendQ))
123  {if (!(pendQ = aioP->next)) pendQEnd = 0;
124  aioMutex.UnLock(); // Open a window of opportunity
125  inFlight--;
126  aioP->Recycle();
127  aioMutex.Lock();
128  }
129  if (inFlight <= 0 || !Wait4Buff(maxWait)) break;
130  }
131 
132 // If there are still in flight requets, issue message and we will run the
133 // drain in the background.
134 //
135  if (inFlight > 0)
136  {char buff[128];
137  snprintf(buff, sizeof(buff),
138  "aio%c overdue %d inflight request%s for",
139  (aioState & aioRead ? 'R' : 'W'), int(inFlight),
140  (inFlight > 1 ? "s" : ""));
141  eLog.Emsg("AioTask", buff, dataLink->ID, dataFile->FileKey);
142  }
143 
144 // Indicate we are going offline and tell the caller if we need to stay
145 // alive to drain the tardy requests in the background.
146 //
147  Status = Offline;
148  isDone = true;
149  aioMutex.UnLock();
150  return inFlight <= 0;
151 }
152 
153 /******************************************************************************/
154 /* Private: g d D o n e */
155 /******************************************************************************/
156 
157 int XrdXrootdAioTask::gdDone() // Only called for link to file transfers!
158 {
159  XrdXrootdAioBuff *bP = pendWrite;
160  int rc;
161 
162 // Do some debugging
163 //
164  TRACEP(DEBUG,"gdDone: "<<(void *)this<<" pendWrite "
165  <<(pendWrite != 0 ? "set":"not set"));
166 
167 // This is a callback indicating the pending aio object has all of the data.
168 // Resume sending data to the destination.
169 //
170  pendWrite = 0;
171  if (!bP) rc = CopyL2F();
172  else {if (CopyL2F(bP) && (inFlight || !isDone)) rc = CopyL2F();
173  else rc = 0;
174  }
175 
176 // Do some debugging
177 //
178  TRACEP(DEBUG,"gdDone: "<<(void *)this<<" ending rc="<<rc);
179 
180 // If we are not pausing for data to be delivered. Drain any oustanding aio
181 // requests and discard left over bytes, if any. Note we must copy the left
182 // over length as we may recycle before discarding as discard must be last.
183 //
184  if (rc <= 0)
185  {XrdXrootdProtocol* prot = Protocol;
186  int dlen = dataLen;
187  if (!inFlight) Recycle(true);
188  else Recycle(Drain());
189  if (!rc && dlen) return prot->getDump(Comment, dlen);
190  }
191  return rc;
192 }
193 
194 /******************************************************************************/
195 /* Private: g d F a i l */
196 /******************************************************************************/
197 
199 {
200  char eBuff[512];
201 
202 // Do some tracing
203 //
204  TRACEP(DEBUG,"gdFail: "<<(void *)this);
205 
206 // Format message for display
207 //
208  snprintf(eBuff, sizeof(eBuff), "link error aborted %s for", Comment);
209  eLog.Emsg("AioTask", eBuff, dataLink->ID, dataFile->FileKey);
210 
211 // This is a callback indicating the link is dead. Terminate this operation.
212 //
213  isDone = true;
214  aioState |= aioDead;
215  dataLen = 0;
216  if (pendWrite) {pendWrite->Recycle(); pendWrite = 0;}
217 
218 // If this is a read, cancel all queued read requests
219 //
220  if (aioState & aioRead) dataFile->aioFob->Reset(Protocol);
221 
222 // If we still have any requests in flight drain them.
223 //
224  if (!inFlight) Recycle(true);
225  else Recycle(Drain());
226 }
227 
228 /******************************************************************************/
229 /* Protected: g e t B u f f */
230 /******************************************************************************/
231 
233 {
234  XrdXrootdAioBuff* aioP;
235 
236 // Try to get the next buffer
237 //
238  aioMutex.Lock();
239 do{if ((aioP = pendQ))
240  {if (!(pendQ = aioP->next)) pendQEnd = 0;
241  aioMutex.UnLock();
242  inFlight--;
243  return aioP;
244  }
245 
246 // If the caller does not want to wait or if there is nothing in flight, return
247 //
248  if (!wait || !inFlight)
249  {aioMutex.UnLock();
250  return 0;
251  }
252 
253 // So, wait for a buffer to arrive
254 //
255  } while(Wait4Buff());
256 
257 // We timed out and this is considered an error
258 //
259  aioMutex.UnLock();
260  SendError(ETIMEDOUT, (aioState & aioRead ? "aio file read timed out"
261  : "aio file write timed out"));
262  return 0;
263 }
264 
265 /******************************************************************************/
266 /* I D */
267 /******************************************************************************/
268 
269 const char *XrdXrootdAioTask::ID() {return dataLink->ID;}
270 
271 /******************************************************************************/
272 /* I n i t */
273 /******************************************************************************/
274 
276  XrdXrootdResponse &resp,
277  XrdXrootdFile *fP)
278 {
279 
280 // Reset the object
281 //
282  pendQEnd = pendQ = 0;
283  finalRead = 0; // Also sets pendWrite
284  Protocol = protP;
285  dataLink = resp.theLink();
286  Response = resp;
287  dataFile = fP;
288  aioState = 0;
289  inFlight = 0;
290  isDone = false;
291  Status = Running;
292 }
293 
294 /******************************************************************************/
295 /* Protected: S e n d E r r o r */
296 /******************************************************************************/
297 
298 void XrdXrootdAioTask::SendError(int rc, const char *eText)
299 {
300  char eBuff[1024];
301 
302 // If there is no error text, use the rc
303 //
304  if (!eText) eText = (rc ? XrdSysE2T(rc) : "invalid error code");
305 
306 // For message for display
307 //
308  snprintf(eBuff, sizeof(eBuff), "async %s failed for %s;",
309  (aioState & aioRead ? "read" : "write"), dataLink->ID);
310  eLog.Emsg("AioTask", eBuff, eText, dataFile->FileKey);
311 
312 // If this request is still active, send the error to the client
313 //
314  if (!isDone)
316  if (Response.Send(eCode, eText))
317  {aioState |= aioDead;
318  dataLen = 0;
319  } else if (aioState & aioRead) dataLen = 0;
320  isDone = true;
321  }
322 }
323 
324 /******************************************************************************/
325 /* Protected: S e n d F S E r r o r */
326 /******************************************************************************/
327 
329 {
330  XrdOucErrInfo &myError = dataFile->XrdSfsp->error;
331  int eCode;
332 
333 // We can only handle actual errors. Under some conditions a redirect (e.g.
334 // Xcache) can return other error codes. We treat these as server errors.
335 //
336  if (rc != SFS_ERROR)
337  {char eBuff[256];
338  snprintf(eBuff, sizeof(eBuff), "fs returned unexpected rc %d", rc);
339  SendError(EFAULT, eBuff);
340  if (myError.extData()) myError.Reset();
341  return;
342  }
343 
344 // Handle file system error but only if we are still alive
345 //
346  if (!isDone)
347  {const char *eMsg = myError.getErrText(eCode);
348  eLog.Emsg("AioTask", dataLink->ID, eMsg, dataFile->FileKey);
349  int rc = XProtocol::mapError(eCode);
350  if (Response.Send((XErrorCode)rc, eMsg))
351  {aioState |= aioDead;
352  dataLen = 0;
353  } else if (aioState & aioRead) dataLen = 0;
354  isDone = true;
355  }
356 
357 // Clear error message and recycle aio object if need be
358 //
359  if (myError.extData()) myError.Reset();
360 }
361 
362 /******************************************************************************/
363 /* Protected: V a l i d a t e */
364 /******************************************************************************/
365 
367 {
368  ssize_t aioResult = aioP->Result;
369  off_t aioOffset = aioP->sfsAio.aio_offset;
370  int aioLength = aioP->sfsAio.aio_nbytes;
371 
372 // Step 1: Check if this request is already completed. This may be the case
373 // if we had a previous error.
374 //
375  if (isDone) return false;
376 
377 // Step 2: Check if an error occurred as this will terminate the request even
378 // if we have not sent all of the data.
379 //
380  if (aioP->Result < 0)
381  {SendError(-aioP->Result, 0);
382  return false;
383  }
384 
385 // Step 3: Check for a short read which signals that no more data past this
386 // offset is forthcomming. Save it as we will send a final response
387 // using this element. We discard zero length reads. It's an error if we
388 // get more than one short read with data or if its offset is less than
389 // the highest full read element.
390 //
391  if (aioResult < aioLength)
392  {dataLen = 0;
393  if (!aioResult)
394  {if ((finalRead && aioOffset < finalRead->sfsAio.aio_offset)
395  || aioOffset < highOffset) SendError(EFAULT, "embedded null block");
396  return false;
397  } else {
398  if (aioOffset < highOffset)
399  {SendError(ENODEV, "embedded short block");
400  return false;
401  } else {
402  if (finalRead) SendError(ENODEV, "multiple short blocks");
403  else {finalRead = aioP;
404  highOffset = aioOffset;
405  }
406  }
407  }
408  return false;
409  }
410 
411 // Step 4: This is a full read and its offset must be lower than the offset of
412 // any short read we have encountered.
413 //
414  if (finalRead && aioOffset >= finalRead->sfsAio.aio_offset)
415  {SendError(ENODEV, "read offset past EOD");
416  return false;
417  }
418  if (aioOffset > highOffset) highOffset = aioOffset;
419  return true;
420 }
421 
422 /******************************************************************************/
423 /* Private: W a i t 4 B u f f */
424 /******************************************************************************/
425 
426 // Called with with aioMutex locked and returns it locked.
427 
428 bool XrdXrootdAioTask::Wait4Buff(int maxWait)
429 {
430  static const int msgInterval = 30;
431  time_t begWait;
432  int aioWait, msgWait = msgInterval, totWait = 0;
433 
434 // Return success if somehow we got a buffer
435 //
436  if (pendQ) return true;
437 
438 // Make sure that something will actually arrive but issue a warning
439 // message and sleep a bit to avoid a loop as there is clearly a logic error.
440 //
441  if (!inFlight)
442  {eLog.Emsg("Wait4Buff", dataLink->ID, "has nothing inflight for",
443  dataFile->FileKey);
445  return false;
446  }
447 
448 // Calculate wait time and when we should produce a message
449 //
450  if (maxWait <= 0) maxWait = (XrdXrootdProtocol::as_timeout ?
452  aioWait = (maxWait > msgInterval ? msgInterval : maxWait);
453 
454 // Wait for a buffer to arrive.
455 //
456  begWait = time(0);
457  while(totWait < maxWait)
458  {Status = Waiting;
459  aioReady.Wait(aioWait);
460  if (pendQ) break;
461  totWait = (time(0) - begWait); // Spurious wakeup
462  int tmpWait = maxWait - totWait;
463  if (tmpWait > 0 && tmpWait < aioWait) aioWait = tmpWait;
464  if (totWait >= msgWait)
465  {char buff[80];
466  int inF = inFlight;
467  msgWait += aioWait;
468  snprintf(buff, sizeof(buff), "%d tardy aio%c requests for",
469  inF, (aioState & aioRead ? 'R' : 'W'));
470  eLog.Emsg("Wait4Buff", dataLink->ID, buff, dataFile->FileKey);
471  }
472  }
473 
474 // If we are here either we actually have a buffer available or timed out.
475 //
476  Status = Running;
477  return (pendQ != 0);
478 }
XErrorCode
Definition: XProtocol.hh:989
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define eMsg(x)
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
#define SFS_ERROR
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
XrdSysTrace XrdXrootdTrace
#define TRACEP(act, x)
static int mapError(int rc)
Definition: XProtocol.hh:1361
const char * getErrText()
void Reset()
Reset object to no message state. Call this method to release appendages.
void Schedule(XrdJob *jp)
ssize_t Result
Definition: XrdSfsAio.hh:65
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static void Snooze(int seconds)
Definition: XrdSysTimer.cc:168
XrdXrootdAioBuff * next
virtual void Recycle() override
int gdDone() override
bool Validate(XrdXrootdAioBuff *aioP)
XrdXrootdAioBuff * getBuff(bool wait)
void SendError(int rc, const char *eText)
void Completed(XrdXrootdAioBuff *aioP)
void Init(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
void SendFSError(int rc)
const char * ID()
static const char * TraceID
void gdFail() override
int getDump(const char *dtype, int dlen)
ProtocolImpl< false > Protocol
XrdScheduler * Sched
XrdSysError eLog