XRootD
XrdXrootdAioTask Class Referenceabstract

#include <XrdXrootdAioTask.hh>

+ Inheritance diagram for XrdXrootdAioTask:
+ Collaboration diagram for XrdXrootdAioTask:

Public Member Functions

void Completed (XrdXrootdAioBuff *aioP)
 
const char * ID ()
 
void Init (XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
 
virtual void Read (long long offs, int dlen)=0
 
virtual void Recycle (bool release)=0
 
XrdXrootdProtocolurProtocol ()
 
virtual int Write (long long offs, int dlen)=0
 
- Public Member Functions inherited from XrdJob
 XrdJob (const char *desc="")
 
virtual ~XrdJob ()
 
virtual void DoIt ()=0
 

Protected Member Functions

 XrdXrootdAioTask (const char *what="aio request")
 
virtual ~XrdXrootdAioTask ()
 
virtual void CopyF2L ()=0
 
virtual int CopyL2F ()=0
 
virtual bool CopyL2F (XrdXrootdAioBuff *aioP)=0
 
bool Drain ()
 
int gdDone () override
 
void gdFail () override
 
XrdXrootdAioBuffgetBuff (bool wait)
 
void SendError (int rc, const char *eText)
 
void SendFSError (int rc)
 
bool Validate (XrdXrootdAioBuff *aioP)
 

Protected Attributes

union {
XrdXrootdNormAionextNorm
 
XrdXrootdPgrwAionextPgrw
 
XrdXrootdAioTasknextTask
 
}; 
 
union {
XrdXrootdAioBufffinalRead
 
XrdXrootdAioBuffpendWrite
 
}; 
 
XrdSysMutex aioMutex
 
XrdSysCondVar2 aioReady
 
char aioState
 
XrdXrootdFiledataFile
 
int dataLen
 
XrdLinkdataLink
 
off_t dataOffset
 
off_t highOffset
 
RAtomic_uchar inFlight
 
RAtomic_bool isDone
 
XrdXrootdAioBuffpendQ
 
XrdXrootdAioBuffpendQEnd
 
XrdXrootdProtocolProtocol
 
XrdXrootdResponse Response
 
char Status
 

Static Protected Attributes

static const int aioDead = 0x01
 
static const int aioHeld = 0x02
 
static const int aioPage = 0x04
 
static const int aioRead = 0x08
 
static const int aioSchd = 0x10
 
static const int Offline = 0
 
static const int Running = 1
 
static const char * TraceID = "AioTask"
 
static const int Waiting = 2
 

Friends

class XrdXrootdAioFob
 

Additional Inherited Members

- Public Attributes inherited from XrdJob
const char * Comment
 
XrdJobNextJob
 

Detailed Description

Definition at line 46 of file XrdXrootdAioTask.hh.

Constructor & Destructor Documentation

◆ XrdXrootdAioTask()

XrdXrootdAioTask::XrdXrootdAioTask ( const char *  what = "aio request")
inlineprotected

Definition at line 69 of file XrdXrootdAioTask.hh.

70  : XrdJob(what), aioReady(aioMutex) {}
XrdJob(const char *desc="")
Definition: XrdJob.hh:51
XrdSysCondVar2 aioReady
XrdSysMutex aioMutex

◆ ~XrdXrootdAioTask()

virtual XrdXrootdAioTask::~XrdXrootdAioTask ( )
inlineprotectedvirtual

Definition at line 71 of file XrdXrootdAioTask.hh.

71 {}

Member Function Documentation

◆ Completed()

void XrdXrootdAioTask::Completed ( XrdXrootdAioBuff aioP)

Definition at line 73 of file XrdXrootdAioTask.cc.

74 {
75 // Lock this code path
76 //
77  aioMutex.Lock();
78 
79 // If this request is not running and completed then take a shortcut.
80 //
81  if (Status == Offline && isDone)
82  {aioP->Recycle();
83  inFlight--;
84  aioMutex.UnLock();
85  if (inFlight <= 0) Recycle(true);
86  return;
87  }
88 
89 // Add this element to the end of the queue
90 //
91  aioP->next = 0;
92  if (!pendQ) pendQEnd = pendQ = aioP;
93  else {pendQEnd->next = aioP;
94  pendQEnd = aioP;
95  }
96 
97 // Check if the request is waiting for our buffer tell it now has one. Otherwise,
98 // if the task is offline then it cannot be done (see above); so schedule it.
99 //
100  if (Status != Running)
101  {if (Status == Waiting) aioReady.Signal();
102  else Sched->Schedule(this);
103  Status = Running;
104  }
105 
106  aioMutex.UnLock();
107 }
void Schedule(XrdJob *jp)
XrdXrootdAioBuff * next
virtual void Recycle() override
static const int Offline
static const int Waiting
virtual void Recycle(bool release)=0
XrdXrootdAioBuff * pendQ
static const int Running
XrdXrootdAioBuff * pendQEnd
RAtomic_uchar inFlight
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54

References XrdXrootdAioBuff::next, XrdXrootdAioBuff::Recycle(), XrdXrootd::Sched, and XrdScheduler::Schedule().

+ Here is the call graph for this function:

◆ CopyF2L()

virtual void XrdXrootdAioTask::CopyF2L ( )
protectedpure virtual

◆ CopyL2F() [1/2]

virtual int XrdXrootdAioTask::CopyL2F ( )
protectedpure virtual

◆ CopyL2F() [2/2]

virtual bool XrdXrootdAioTask::CopyL2F ( XrdXrootdAioBuff aioP)
protectedpure virtual

◆ Drain()

bool XrdXrootdAioTask::Drain ( )
protected

Definition at line 113 of file XrdXrootdAioTask.cc.

114 {
115  XrdXrootdAioBuff *aioP;
116  int maxWait = 6; // Max seconds to wait for outstanding requests
117 
118 // Reap as many aio object as you can
119 //
120  aioMutex.Lock();
121  while(inFlight > 0)
122  {while((aioP = pendQ))
123  {if (!(pendQ = aioP->next)) pendQEnd = 0;
124  aioMutex.UnLock(); // Open a window of opportunity
125  inFlight--;
126  aioP->Recycle();
127  aioMutex.Lock();
128  }
129  if (inFlight <= 0 || !Wait4Buff(maxWait)) break;
130  }
131 
132 // If there are still in flight requets, issue message and we will run the
133 // drain in the background.
134 //
135  if (inFlight > 0)
136  {char buff[128];
137  snprintf(buff, sizeof(buff),
138  "aio%c overdue %d inflight request%s for",
139  (aioState & aioRead ? 'R' : 'W'), int(inFlight),
140  (inFlight > 1 ? "s" : ""));
141  eLog.Emsg("AioTask", buff, dataLink->ID, dataFile->FileKey);
142  }
143 
144 // Indicate we are going offline and tell the caller if we need to stay
145 // alive to drain the tardy requests in the background.
146 //
147  Status = Offline;
148  isDone = true;
149  aioMutex.UnLock();
150  return inFlight <= 0;
151 }
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdXrootdFile * dataFile
static const int aioRead
XrdSysError * eLog

References XrdXrootd::eLog, XrdSysError::Emsg(), XrdXrootdAioBuff::next, and XrdXrootdAioBuff::Recycle().

+ Here is the call graph for this function:

◆ gdDone()

int XrdXrootdAioTask::gdDone ( )
overrideprotected

Definition at line 157 of file XrdXrootdAioTask.cc.

158 {
159  XrdXrootdAioBuff *bP = pendWrite;
160  int rc;
161 
162 // Do some debugging
163 //
164  TRACEP(DEBUG,"gdDone: "<<(void *)this<<" pendWrite "
165  <<(pendWrite != 0 ? "set":"not set"));
166 
167 // This is a callback indicating the pending aio object has all of the data.
168 // Resume sending data to the destination.
169 //
170  pendWrite = 0;
171  if (!bP) rc = CopyL2F();
172  else {if (CopyL2F(bP) && (inFlight || !isDone)) rc = CopyL2F();
173  else rc = 0;
174  }
175 
176 // Do some debugging
177 //
178  TRACEP(DEBUG,"gdDone: "<<(void *)this<<" ending rc="<<rc);
179 
180 // If we are not pausing for data to be delivered. Drain any oustanding aio
181 // requests and discard left over bytes, if any. Note we must copy the left
182 // over length as we may recycle before discarding as discard must be last.
183 //
184  if (rc <= 0)
185  {XrdXrootdProtocol* prot = Protocol;
186  int dlen = dataLen;
187  if (!inFlight) Recycle(true);
188  else Recycle(Drain());
189  if (!rc && dlen) return prot->getDump(Comment, dlen);
190  }
191  return rc;
192 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define TRACEP(act, x)
const char * Comment
Definition: XrdJob.hh:47
virtual int CopyL2F()=0
XrdXrootdProtocol * Protocol
int getDump(const char *dtype, int dlen)

References DEBUG, XrdXrootdProtocol::getDump(), and TRACEP.

+ Here is the call graph for this function:

◆ gdFail()

void XrdXrootdAioTask::gdFail ( )
overrideprotected

Definition at line 198 of file XrdXrootdAioTask.cc.

199 {
200  char eBuff[512];
201 
202 // Do some tracing
203 //
204  TRACEP(DEBUG,"gdFail: "<<(void *)this);
205 
206 // Format message for display
207 //
208  snprintf(eBuff, sizeof(eBuff), "link error aborted %s for", Comment);
209  eLog.Emsg("AioTask", eBuff, dataLink->ID, dataFile->FileKey);
210 
211 // This is a callback indicating the link is dead. Terminate this operation.
212 //
213  isDone = true;
214  aioState |= aioDead;
215  dataLen = 0;
216  if (pendWrite) {pendWrite->Recycle(); pendWrite = 0;}
217 
218 // If this is a read, cancel all queued read requests
219 //
221 
222 // If we still have any requests in flight drain them.
223 //
224  if (!inFlight) Recycle(true);
225  else Recycle(Drain());
226 }
static const int aioDead
XrdXrootdAioFob * aioFob

References DEBUG, XrdXrootd::eLog, XrdSysError::Emsg(), and TRACEP.

+ Here is the call graph for this function:

◆ getBuff()

XrdXrootdAioBuff * XrdXrootdAioTask::getBuff ( bool  wait)
protected

Definition at line 232 of file XrdXrootdAioTask.cc.

233 {
234  XrdXrootdAioBuff* aioP;
235 
236 // Try to get the next buffer
237 //
238  aioMutex.Lock();
239 do{if ((aioP = pendQ))
240  {if (!(pendQ = aioP->next)) pendQEnd = 0;
241  aioMutex.UnLock();
242  inFlight--;
243  return aioP;
244  }
245 
246 // If the caller does not want to wait or if there is nothing in flight, return
247 //
248  if (!wait || !inFlight)
249  {aioMutex.UnLock();
250  return 0;
251  }
252 
253 // So, wait for a buffer to arrive
254 //
255  } while(Wait4Buff());
256 
257 // We timed out and this is considered an error
258 //
259  aioMutex.UnLock();
260  SendError(ETIMEDOUT, (aioState & aioRead ? "aio file read timed out"
261  : "aio file write timed out"));
262  return 0;
263 }
void SendError(int rc, const char *eText)

References XrdXrootdAioBuff::next.

◆ ID()

const char * XrdXrootdAioTask::ID ( void  )

Definition at line 269 of file XrdXrootdAioTask.cc.

269 {return dataLink->ID;}

◆ Init()

void XrdXrootdAioTask::Init ( XrdXrootdProtocol protP,
XrdXrootdResponse resp,
XrdXrootdFile fP 
)

Definition at line 275 of file XrdXrootdAioTask.cc.

278 {
279 
280 // Reset the object
281 //
282  pendQEnd = pendQ = 0;
283  finalRead = 0; // Also sets pendWrite
284  Protocol = protP;
285  dataLink = resp.theLink();
286  Response = resp;
287  dataFile = fP;
288  aioState = 0;
289  inFlight = 0;
290  isDone = false;
291  Status = Running;
292 }
XrdXrootdResponse Response

References XrdXrootdResponse::theLink().

Referenced by XrdXrootdNormAio::Alloc(), and XrdXrootdPgrwAio::Alloc().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Read()

virtual void XrdXrootdAioTask::Read ( long long  offs,
int  dlen 
)
pure virtual

Implemented in XrdXrootdPgrwAio, and XrdXrootdNormAio.

◆ Recycle()

virtual void XrdXrootdAioTask::Recycle ( bool  release)
pure virtual

Implemented in XrdXrootdPgrwAio, and XrdXrootdNormAio.

Referenced by XrdXrootdAioFob::Reset().

+ Here is the caller graph for this function:

◆ SendError()

void XrdXrootdAioTask::SendError ( int  rc,
const char *  eText 
)
protected

Definition at line 298 of file XrdXrootdAioTask.cc.

299 {
300  char eBuff[1024];
301 
302 // If there is no error text, use the rc
303 //
304  if (!eText) eText = (rc ? XrdSysE2T(rc) : "invalid error code");
305 
306 // For message for display
307 //
308  snprintf(eBuff, sizeof(eBuff), "async %s failed for %s;",
309  (aioState & aioRead ? "read" : "write"), dataLink->ID);
310  eLog.Emsg("AioTask", eBuff, eText, dataFile->FileKey);
311 
312 // If this request is still active, send the error to the client
313 //
314  if (!isDone)
316  if (Response.Send(eCode, eText))
317  {aioState |= aioDead;
318  dataLen = 0;
319  } else if (aioState & aioRead) dataLen = 0;
320  isDone = true;
321  }
322 }
XErrorCode
Definition: XProtocol.hh:989
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
static int mapError(int rc)
Definition: XProtocol.hh:1361

References XrdXrootd::eLog, XrdSysError::Emsg(), XProtocol::mapError(), and XrdSysE2T().

+ Here is the call graph for this function:

◆ SendFSError()

void XrdXrootdAioTask::SendFSError ( int  rc)
protected

Definition at line 328 of file XrdXrootdAioTask.cc.

329 {
330  XrdOucErrInfo &myError = dataFile->XrdSfsp->error;
331  int eCode;
332 
333 // We can only handle actual errors. Under some conditions a redirect (e.g.
334 // Xcache) can return other error codes. We treat these as server errors.
335 //
336  if (rc != SFS_ERROR)
337  {char eBuff[256];
338  snprintf(eBuff, sizeof(eBuff), "fs returned unexpected rc %d", rc);
339  SendError(EFAULT, eBuff);
340  if (myError.extData()) myError.Reset();
341  return;
342  }
343 
344 // Handle file system error but only if we are still alive
345 //
346  if (!isDone)
347  {const char *eMsg = myError.getErrText(eCode);
348  eLog.Emsg("AioTask", dataLink->ID, eMsg, dataFile->FileKey);
349  int rc = XProtocol::mapError(eCode);
350  if (Response.Send((XErrorCode)rc, eMsg))
351  {aioState |= aioDead;
352  dataLen = 0;
353  } else if (aioState & aioRead) dataLen = 0;
354  isDone = true;
355  }
356 
357 // Clear error message and recycle aio object if need be
358 //
359  if (myError.extData()) myError.Reset();
360 }
#define eMsg(x)
#define SFS_ERROR
const char * getErrText()
void Reset()
Reset object to no message state. Call this method to release appendages.
XrdOucErrInfo & error
XrdSfsFile * XrdSfsp

References XrdXrootd::eLog, XrdSysError::Emsg(), eMsg, XrdOucErrInfo::extData(), XrdOucErrInfo::getErrText(), XProtocol::mapError(), XrdOucErrInfo::Reset(), and SFS_ERROR.

+ Here is the call graph for this function:

◆ urProtocol()

XrdXrootdProtocol* XrdXrootdAioTask::urProtocol ( )
inline

Definition at line 63 of file XrdXrootdAioTask.hh.

63 {return Protocol;}

Referenced by XrdXrootdAioBuff::Alloc(), XrdXrootdAioPgrw::Alloc(), and XrdXrootdAioPgrw::Recycle().

+ Here is the caller graph for this function:

◆ Validate()

bool XrdXrootdAioTask::Validate ( XrdXrootdAioBuff aioP)
protected

Definition at line 366 of file XrdXrootdAioTask.cc.

367 {
368  ssize_t aioResult = aioP->Result;
369  off_t aioOffset = aioP->sfsAio.aio_offset;
370  int aioLength = aioP->sfsAio.aio_nbytes;
371 
372 // Step 1: Check if this request is already completed. This may be the case
373 // if we had a previous error.
374 //
375  if (isDone) return false;
376 
377 // Step 2: Check if an error occurred as this will terminate the request even
378 // if we have not sent all of the data.
379 //
380  if (aioP->Result < 0)
381  {SendError(-aioP->Result, 0);
382  return false;
383  }
384 
385 // Step 3: Check for a short read which signals that no more data past this
386 // offset is forthcomming. Save it as we will send a final response
387 // using this element. We discard zero length reads. It's an error if we
388 // get more than one short read with data or if its offset is less than
389 // the highest full read element.
390 //
391  if (aioResult < aioLength)
392  {dataLen = 0;
393  if (!aioResult)
394  {if ((finalRead && aioOffset < finalRead->sfsAio.aio_offset)
395  || aioOffset < highOffset) SendError(EFAULT, "embedded null block");
396  return false;
397  } else {
398  if (aioOffset < highOffset)
399  {SendError(ENODEV, "embedded short block");
400  return false;
401  } else {
402  if (finalRead) SendError(ENODEV, "multiple short blocks");
403  else {finalRead = aioP;
404  highOffset = aioOffset;
405  }
406  }
407  }
408  return false;
409  }
410 
411 // Step 4: This is a full read and its offset must be lower than the offset of
412 // any short read we have encountered.
413 //
414  if (finalRead && aioOffset >= finalRead->sfsAio.aio_offset)
415  {SendError(ENODEV, "read offset past EOD");
416  return false;
417  }
418  if (aioOffset > highOffset) highOffset = aioOffset;
419  return true;
420 }
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
ssize_t Result
Definition: XrdSfsAio.hh:65
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62

References aiocb::aio_nbytes, aiocb::aio_offset, XrdSfsAio::Result, and XrdSfsAio::sfsAio.

◆ Write()

virtual int XrdXrootdAioTask::Write ( long long  offs,
int  dlen 
)
pure virtual

Implemented in XrdXrootdPgrwAio, and XrdXrootdNormAio.

Friends And Related Function Documentation

◆ XrdXrootdAioFob

friend class XrdXrootdAioFob
friend

Definition at line 49 of file XrdXrootdAioTask.hh.

Member Data Documentation

◆ 

union { ... }

◆ 

union { ... }

◆ aioDead

const int XrdXrootdAioTask::aioDead = 0x01
staticprotected

Definition at line 115 of file XrdXrootdAioTask.hh.

◆ aioHeld

const int XrdXrootdAioTask::aioHeld = 0x02
staticprotected

Definition at line 116 of file XrdXrootdAioTask.hh.

◆ aioMutex

XrdSysMutex XrdXrootdAioTask::aioMutex
protected

Definition at line 86 of file XrdXrootdAioTask.hh.

◆ aioPage

const int XrdXrootdAioTask::aioPage = 0x04
staticprotected

Definition at line 117 of file XrdXrootdAioTask.hh.

◆ aioRead

const int XrdXrootdAioTask::aioRead = 0x08
staticprotected

Definition at line 118 of file XrdXrootdAioTask.hh.

◆ aioReady

XrdSysCondVar2 XrdXrootdAioTask::aioReady
protected

Definition at line 87 of file XrdXrootdAioTask.hh.

◆ aioSchd

const int XrdXrootdAioTask::aioSchd = 0x10
staticprotected

Definition at line 119 of file XrdXrootdAioTask.hh.

◆ aioState

char XrdXrootdAioTask::aioState
protected

Definition at line 106 of file XrdXrootdAioTask.hh.

◆ dataFile

XrdXrootdFile* XrdXrootdAioTask::dataFile
protected

Definition at line 98 of file XrdXrootdAioTask.hh.

◆ dataLen

int XrdXrootdAioTask::dataLen
protected

Definition at line 104 of file XrdXrootdAioTask.hh.

◆ dataLink

XrdLink* XrdXrootdAioTask::dataLink
protected

Definition at line 97 of file XrdXrootdAioTask.hh.

◆ dataOffset

off_t XrdXrootdAioTask::dataOffset
protected

Definition at line 103 of file XrdXrootdAioTask.hh.

◆ highOffset

off_t XrdXrootdAioTask::highOffset
protected

Definition at line 102 of file XrdXrootdAioTask.hh.

◆ inFlight

RAtomic_uchar XrdXrootdAioTask::inFlight
protected

Definition at line 107 of file XrdXrootdAioTask.hh.

◆ isDone

RAtomic_bool XrdXrootdAioTask::isDone
protected

Definition at line 108 of file XrdXrootdAioTask.hh.

◆ Offline

const int XrdXrootdAioTask::Offline = 0
staticprotected

Definition at line 123 of file XrdXrootdAioTask.hh.

◆ pendQ

XrdXrootdAioBuff* XrdXrootdAioTask::pendQ
protected

Definition at line 88 of file XrdXrootdAioTask.hh.

◆ pendQEnd

XrdXrootdAioBuff* XrdXrootdAioTask::pendQEnd
protected

Definition at line 89 of file XrdXrootdAioTask.hh.

◆ Protocol

XrdXrootdProtocol* XrdXrootdAioTask::Protocol
protected

Definition at line 96 of file XrdXrootdAioTask.hh.

Referenced by XrdXrootdAioFob::Schedule().

◆ Response

XrdXrootdResponse XrdXrootdAioTask::Response
protected

Definition at line 111 of file XrdXrootdAioTask.hh.

◆ Running

const int XrdXrootdAioTask::Running = 1
staticprotected

Definition at line 124 of file XrdXrootdAioTask.hh.

◆ Status

char XrdXrootdAioTask::Status
protected

Definition at line 109 of file XrdXrootdAioTask.hh.

◆ TraceID

const char * XrdXrootdAioTask::TraceID = "AioTask"
staticprotected

Definition at line 84 of file XrdXrootdAioTask.hh.

◆ Waiting

const int XrdXrootdAioTask::Waiting = 2
staticprotected

Definition at line 125 of file XrdXrootdAioTask.hh.


The documentation for this class was generated from the following files: