49 #define TRACELINK dataLink
68 const char *XrdXrootdPgrwAio::TraceID =
"PgrwAio";
80 static const int maxKeep = 64;
98 {fqFirst = reqP->nextPgrw;
109 reqP->
Init(protP, resp, fP);
128 {
if (inFlight)
return true;
129 SendError(ENOMEM,
"insufficient memory");
133 {SendError(EINVAL,
eMsg);
143 TRACEP(FSAIO,
"pgrd beg " <<dlen <<
'@' <<dataOffset
144 <<
" inF=" <<
int(inFlight));
148 {dataFile->aioFob->Schedule(
Protocol);
159 void XrdXrootdPgrwAio::CopyF2L()
169 if (!(bP = getBuff(doWait)))
170 {
if (isDone || !CopyF2L_Add2Q())
break;
177 <<
" result="<<bP->
Result<<
" D-S="<<isDone<<
'-'<<
int(Status)
178 <<
" inF="<<
int(inFlight));
183 {
if (bP != finalRead) bP->
Recycle();
200 if (inFlight == 0 && dataLen == 0 && !finalRead)
208 if (!isDone && SendData(aioP) && dataLen) {
if (!CopyF2L_Add2Q(aioP))
break;}
211 }
while(inFlight > 0);
216 if (!isDone) SendData(finalRead,
true);
217 if (finalRead) finalRead->Recycle();
222 if (aioState & aioDead) dataFile->aioFob->Reset(
Protocol);
223 else if (!(aioState & aioSchd)) dataFile->aioFob->Schedule(
Protocol);
229 if (!inFlight) Recycle(
true);
230 else Recycle(Drain());
237 int XrdXrootdPgrwAio::CopyL2F()
242 int dLen, ioVNum, rc;
250 if (!(bP = getBuff(doWait)))
251 {
if (isDone)
return 0;
253 {SendError(ENOMEM,
"insufficient memory");
261 <<
" D-S="<<isDone<<
'-'<<
int(Status)<<
" inF="<<
int(inFlight));
266 {SendError(-aioP->
Result, 0);
273 if (dataLen <= 0 || isDone)
283 {SendError(EINVAL,
eMsg);
292 struct iovec *ioV = aioP->
iov4Recv(ioVNum);
296 if ((rc =
Protocol->getData(
this,
"pgWrite", ioV, ioVNum)))
297 {
if (rc > 0) pendWrite = aioP;
306 if (!CopyL2F(aioP))
return 0;
313 {
if (!dataLen)
return SendDone();
314 SendError(EIDRM,
"pgWrite encountered an impossible condition");
315 eLog.
Emsg(
"PgrwAio",
"pgWrite logic error for",
316 dataLink->ID, dataFile->FileKey);
331 if (VerCks(bP->
pgrwP))
332 {
int rc = dataFile->XrdSfsp->pgWrite((
XrdSfsAio *)bP);
337 <<
" inF=" <<
int(inFlight));
356 if (aioState & aioRead) CopyF2L();
368 dataOffset = highOffset = offs;
370 aioState = aioRead | aioPage;
383 dataFile->aioFob->Schedule(
this);
394 if (!(aioState & aioHeld))
396 if (aioState & aioRead)
397 {dataLink->setRef(-1);
405 TRACEP(FSAIO,
"pgrw recycle "<<(release ?
"" :
"hold ")
406 <<(aioState & aioRead ?
'R' :
'W')<<
" D-S="
407 <<isDone<<
'-'<<
int(Status));
413 if (numFree >= maxKeep)
431 static const int infoLen =
sizeof(
kXR_int64);
432 struct pgReadResponse
443 memset(pgrResp.rsp.bdy.reserved, 0,
sizeof(pgrResp.rsp.bdy.reserved));
449 struct iovec *ioVec = bP->
pgrwP->
iov4Send(iovNum, iovLen,
true);
455 rc = Response.Send(pgrResp.rsp, infoLen, ioVec, iovNum, iovLen);
457 pgrResp.rsp.bdy.dlen = 0;
458 pgrResp.ofs = htonll(dataOffset);
459 rc = Response.Send(pgrResp.rsp, infoLen);
467 if (rc) aioState |= aioDead;
476 int XrdXrootdPgrwAio::SendDone()
478 static const int infoLen =
sizeof(
kXR_int64);
489 pgwResp.info.offset = htonll(highOffset);
490 memset(pgwResp.rsp.bdy.reserved, 0,
sizeof(pgwResp.rsp.bdy.reserved));
494 buff = badCSP->boInfo(n);
498 if ((rc = Response.Send(pgwResp.rsp, infoLen, buff, n))) dataLen = 0;
500 if (rc) aioState |= aioDead;
511 uint32_t *csVec, *csVP, csVal;
516 struct iovec *ioV = aioP->
iov4Data(ioVNum);
517 csVP = csVec = (uint32_t*)ioV[0].iov_base;
521 for (
int i = 1; i < ioVNum; i +=2)
522 {dLen = ioV[i].iov_len;
523 csVal = ntohl(*csVP); *csVP++ = csVal;
525 {
const char *
eMsg = badCSP->boAdd(dataFile, dOffset, dLen);
526 if (
eMsg) {SendError(ETOOMANYREFS,
eMsg);
554 aioState &= ~aioRead;
555 dataOffset = highOffset = offs;
struct ServerResponseBody_Status bdy
XrdSysTrace XrdXrootdTrace
static bool Ver32C(const void *data, size_t count, const uint32_t csval, uint32_t *csbad=0)
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
virtual void Recycle() override
XrdXrootdAioPgrw *const pgrwP
int Setup2Send(off_t offs, int dlen, const char *&eMsg)
struct iovec * iov4Send(int &iovNum, int &iovLen, bool cs2net=false)
struct iovec * iov4Recv(int &iovNum)
bool noChkSums(bool reset=true)
static XrdXrootdAioPgrw * Alloc(XrdXrootdAioTask *arp)
struct iovec * iov4Data(int &iovNum)
int Setup2Recv(off_t offs, int dlen, const char *&eMsg)
void Init(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
void Read(long long offs, int dlen) override
static XrdXrootdPgrwAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP, XrdXrootdPgwBadCS *bcsP=0)
int Write(long long offs, int dlen) override
void Recycle(bool release) override
ProtocolImpl< false > Protocol