48 #define TRACELINK dataLink
81 if (Status == Offline && isDone)
85 if (inFlight <= 0) Recycle(
true);
92 if (!pendQ) pendQEnd = pendQ = aioP;
93 else {pendQEnd->
next = aioP;
100 if (Status != Running)
101 {
if (Status == Waiting) aioReady.Signal();
122 {
while((aioP = pendQ))
123 {
if (!(pendQ = aioP->
next)) pendQEnd = 0;
129 if (inFlight <= 0 || !Wait4Buff(maxWait))
break;
137 snprintf(buff,
sizeof(buff),
138 "aio%c overdue %d inflight request%s for",
139 (aioState & aioRead ?
'R' :
'W'),
int(inFlight),
140 (inFlight > 1 ?
"s" :
""));
141 eLog.
Emsg(
"AioTask", buff, dataLink->ID, dataFile->FileKey);
150 return inFlight <= 0;
164 TRACEP(
DEBUG,
"gdDone: "<<(
void *)
this<<
" pendWrite "
165 <<(pendWrite != 0 ?
"set":
"not set"));
171 if (!bP) rc = CopyL2F();
172 else {
if (CopyL2F(bP) && (inFlight || !isDone)) rc = CopyL2F();
178 TRACEP(
DEBUG,
"gdDone: "<<(
void *)
this<<
" ending rc="<<rc);
187 if (!inFlight) Recycle(
true);
188 else Recycle(Drain());
189 if (!rc && dlen)
return prot->
getDump(Comment, dlen);
208 snprintf(eBuff,
sizeof(eBuff),
"link error aborted %s for", Comment);
209 eLog.
Emsg(
"AioTask", eBuff, dataLink->ID, dataFile->FileKey);
216 if (pendWrite) {pendWrite->Recycle(); pendWrite = 0;}
220 if (aioState & aioRead) dataFile->aioFob->Reset(
Protocol);
224 if (!inFlight) Recycle(
true);
225 else Recycle(Drain());
239 do{
if ((aioP = pendQ))
240 {
if (!(pendQ = aioP->
next)) pendQEnd = 0;
248 if (!wait || !inFlight)
255 }
while(Wait4Buff());
260 SendError(ETIMEDOUT, (aioState & aioRead ?
"aio file read timed out"
261 :
"aio file write timed out"));
282 pendQEnd = pendQ = 0;
304 if (!eText) eText = (rc ?
XrdSysE2T(rc) :
"invalid error code");
308 snprintf(eBuff,
sizeof(eBuff),
"async %s failed for %s;",
309 (aioState & aioRead ?
"read" :
"write"), dataLink->ID);
310 eLog.
Emsg(
"AioTask", eBuff, eText, dataFile->FileKey);
316 if (Response.Send(eCode, eText))
317 {aioState |= aioDead;
319 }
else if (aioState & aioRead) dataLen = 0;
338 snprintf(eBuff,
sizeof(eBuff),
"fs returned unexpected rc %d", rc);
339 SendError(EFAULT, eBuff);
348 eLog.
Emsg(
"AioTask", dataLink->ID,
eMsg, dataFile->FileKey);
351 {aioState |= aioDead;
353 }
else if (aioState & aioRead) dataLen = 0;
368 ssize_t aioResult = aioP->
Result;
375 if (isDone)
return false;
381 {SendError(-aioP->
Result, 0);
391 if (aioResult < aioLength)
394 {
if ((finalRead && aioOffset < finalRead->sfsAio.aio_offset)
395 || aioOffset < highOffset) SendError(EFAULT,
"embedded null block");
398 if (aioOffset < highOffset)
399 {SendError(ENODEV,
"embedded short block");
402 if (finalRead) SendError(ENODEV,
"multiple short blocks");
403 else {finalRead = aioP;
404 highOffset = aioOffset;
414 if (finalRead && aioOffset >= finalRead->sfsAio.aio_offset)
415 {SendError(ENODEV,
"read offset past EOD");
418 if (aioOffset > highOffset) highOffset = aioOffset;
428 bool XrdXrootdAioTask::Wait4Buff(
int maxWait)
430 static const int msgInterval = 30;
432 int aioWait, msgWait = msgInterval, totWait = 0;
436 if (pendQ)
return true;
442 {
eLog.
Emsg(
"Wait4Buff", dataLink->ID,
"has nothing inflight for",
452 aioWait = (maxWait > msgInterval ? msgInterval : maxWait);
457 while(totWait < maxWait)
459 aioReady.Wait(aioWait);
461 totWait = (time(0) - begWait);
462 int tmpWait = maxWait - totWait;
463 if (tmpWait > 0 && tmpWait < aioWait) aioWait = tmpWait;
464 if (totWait >= msgWait)
468 snprintf(buff,
sizeof(buff),
"%d tardy aio%c requests for",
469 inF, (aioState & aioRead ?
'R' :
'W'));
470 eLog.
Emsg(
"Wait4Buff", dataLink->ID, buff, dataFile->FileKey);
const char * XrdSysE2T(int errcode)
XrdSysTrace XrdXrootdTrace
static int mapError(int rc)
const char * getErrText()
void Reset()
Reset object to no message state. Call this method to release appendages.
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
virtual void Recycle() override
bool Validate(XrdXrootdAioBuff *aioP)
XrdXrootdAioBuff * getBuff(bool wait)
void SendError(int rc, const char *eText)
void Completed(XrdXrootdAioBuff *aioP)
void Init(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
static const char * TraceID
int getDump(const char *dtype, int dlen)
ProtocolImpl< false > Protocol