46 #define TRACELINK dataLink
65 const char *XrdXrootdNormAio::TraceID =
"NormAio";
77 static const int maxKeep = 64;
94 {fqFirst = reqP->nextNorm;
105 reqP->
Init(protP, resp, fP);
122 {
if (inFlight)
return true;
123 SendError(ENOMEM,
"insufficient memory");
137 TRACEP(FSAIO,
"aioR beg " <<dlen <<
'@' <<dataOffset
138 <<
" inF=" <<
int(inFlight));
142 {dataFile->aioFob->Schedule(
Protocol);
153 void XrdXrootdNormAio::CopyF2L()
163 if (!(aioP = getBuff(doWait)))
164 {
if (isDone || !CopyF2L_Add2Q())
break;
172 <<
" result="<<aioP->
Result<<
" D-S="<<isDone<<
'-'<<
int(Status)
173 <<
" inF="<<
int(inFlight));
178 {
if (aioP != finalRead) aioP->
Recycle();
189 bPP = bP; bP = bP->
next;
192 if (bPP) bPP->
next = aioP;
202 if (inFlight == 0 && dataLen == 0 && !finalRead)
208 while(bP->
next) {bPP = bP; bP = bP->
next;}
209 if (bPP) {finalRead = bP; bPP->
next = 0;}
210 else {finalRead = sendQ; sendQ = 0;}
217 if (isDone || !
Send(aioP) || dataLen <= 0) aioP->
Recycle();
218 else if (!CopyF2L_Add2Q(aioP))
break;
222 while(sendQ && sendQ->sfsAio.aio_offset == sendOffset && aOK)
226 if (!isDone &&
Send(aioP) && dataLen) aOK = CopyF2L_Add2Q(aioP);
230 }
while(inFlight > 0 && aOK);
238 snprintf(ebuff,
sizeof(ebuff),
"aio read failed at offset %lld; "
239 "missing data",
static_cast<long long>(sendOffset));
240 SendError(ENODEV, ebuff);
241 }
else Send(finalRead,
true);
246 if (finalRead) finalRead->Recycle();
247 while((aioP = sendQ)) {sendQ = sendQ->next; aioP->
Recycle();}
252 if (aioState & aioDead) dataFile->aioFob->Reset(
Protocol);
253 else if (!(aioState & aioSchd)) dataFile->aioFob->Schedule(
Protocol);
259 if (!inFlight) Recycle(
true);
260 else Recycle(Drain());
267 int XrdXrootdNormAio::CopyL2F()
278 if (!(aioP = getBuff(doWait)))
279 {
if (isDone)
return 0;
281 {SendError(ENOMEM,
"insufficient memory");
288 <<
" D-S="<<isDone<<
'-'<<
int(Status)<<
" inF="<<
int(inFlight));
293 {SendError(-aioP->
Result, 0);
300 if (dataLen <= 0 || isDone)
318 {
if (rc > 0) pendWrite = aioP;
327 if (!CopyL2F(aioP))
return 0;
334 {
if (!dataLen)
return (
Send(0) ? 0 : -1);
335 SendError(EIDRM,
"aioWrite encountered an impossible condition");
336 eLog.
Emsg(
"NormAio",
"write logic error for",
337 dataLink->ID, dataFile->FileKey);
352 int rc = dataFile->XrdSfsp->write((
XrdSfsAio *)aioP);
379 if (aioState & aioRead) CopyF2L();
391 dataOffset = highOffset = sendOffset = offs;
406 dataFile->aioFob->Schedule(
this);
417 if (!(aioState & aioHeld))
419 if (aioState & aioRead)
421 dataLink->setRef(-1);
428 TRACEP(FSAIO,
"aio"<<(aioState & aioRead ?
'R' :
'W')<<
" recycle"
429 <<(release ?
"" :
" hold")<<
"; reorders="<<reorders
430 <<
" D-S="<<isDone<<
'-'<<
int(Status));
437 if (numFree >= maxKeep)
463 }
else rc = Response.Send();
470 if (rc) aioState |= aioDead;
488 aioState &= ~aioRead;
489 dataOffset = highOffset = offs;
XrdSysTrace XrdXrootdTrace
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
virtual void Recycle() override
static XrdXrootdAioBuff * Alloc(XrdXrootdAioTask *arp)
void Init(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
int Write(long long offs, int dlen) override
void Recycle(bool release) override
void Read(long long offs, int dlen) override
static XrdXrootdNormAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
ProtocolImpl< false > Protocol
ssize_t Send(int fd, KernelBuffer &buffer)