80 bool XrdXrootdProtocol::do_PgClose(
XrdXrootdFile *fP,
int &rc)
83 int numErrs, numFixes, numLeft;
87 if (!fobP)
return true;
91 numLeft = fobP->
numOffs(&numErrs, &numFixes);
99 snprintf(ebuff,
sizeof(ebuff),
"%d uncorrected checksum errors",numLeft);
113 int XrdXrootdProtocol::do_PgRead()
133 "pgread does not refer to an open file");
140 pathID =
static_cast<int>(rargs->
pathid);
173 if (!pathID) pP =
this;
174 else {
if (!(pP =
VerifyStream(rc, pathID,
false)))
return rc;
188 if (pathID)
return do_Offload(&XrdXrootdProtocol::do_PgRIO, pathID);
203 int XrdXrootdProtocol::do_PgRIO()
209 static const int maxCSSZ = maxIOVZ/2 - 1;
210 static const int maxPGRD = maxCSSZ*pgPageSize;
211 static const int infoLen =
sizeof(
kXR_int64);
213 struct pgReadResponse
219 uint64_t pgrOpts = 0;
220 int dlen, fLen, lLen, rc, xframt, Quantum;
221 uint32_t csVec[maxCSSZ];
222 struct iovec
iov[maxIOVZ];
232 memset(pgrResp.rsp.bdy.reserved, 0,
sizeof(pgrResp.rsp.bdy.reserved));
237 int pgOff, rPages, rLen =
IO.
IOLen;
243 if (rPages < Quantum) Quantum = rPages;
249 {
if ((rc = getBuff(1, Quantum)) <= 0)
return rc;}
251 if (
argp->
bsize > maxPGRD) Quantum = maxPGRD;
257 int items = Quantum / pgPageSize;
263 uint32_t *csVP = csVec;
265 int i = 1, n = items * 2;
267 {
iov[i ].iov_base = csVP++;
268 iov[i++].iov_len =
sizeof(uint32_t);
269 iov[i ].iov_base = buff;
270 iov[i++].iov_len = pgPageSize;
278 if ((pgOff =
IO.Offset & pgPageMask))
279 {rLen = pgPageSize - pgOff;
281 iov[2].iov_base = buff;
282 iov[2].iov_len = rLen;
283 rLen += Quantum - pgPageSize;
294 long long ioOffset =
IO.Offset;
295 do {
if ((xframt = sfsP->
pgRead(
IO.Offset, buff, rLen, csVec, pgrOpts)) <= 0)
299 iov[2].iov_len = fLen;
300 if (items > 1)
iov[items<<1].iov_len = lLen;
302 if (xframt < rLen || xframt ==
IO.
IOLen)
310 for (
int i = 0; i < items; i++) csVec[i] = htonl(csVec[i]);
312 pgrResp.ofs = htonll(ioOffset);
317 dlen = xframt + (items *
sizeof(uint32_t));
318 if ((rc =
Response.
Send(pgrResp.rsp, infoLen,
iov, items*2+1, dlen)) < 0)
323 iov[2].iov_len = pgPageSize;
328 ioOffset =
IO.Offset;
333 if (xframt < 0)
return fsError(xframt, 0, sfsP->
error, 0, 0);
339 pgrResp.rsp.bdy.dlen = 0;
340 pgrResp.ofs = htonll(
IO.Offset);
350 int XrdXrootdProtocol::do_PgWrite()
380 return do_WriteNone(pathID);
389 TRACEP(FSIO, pathID<<
" pgwrite "
407 if (pathID)
return do_Offload(&XrdXrootdProtocol::do_PgWIO, pathID);
411 return do_PgWIO(
true);
423 bool XrdXrootdProtocol::do_PgWAIO(
int &rc)
457 int XrdXrootdProtocol::do_PgWIO() {
return do_PgWIO(
true);}
459 int XrdXrootdProtocol::do_PgWIO(
bool isFresh)
466 int n, rc, Quantum, iovLen, iovNum, csNum;
482 && !isRetry && do_PgWAIO(rc))
return rc;
483 if (isRetry && !do_PgWIORetry(rc))
return rc;
484 if (!do_PgWIOSetup(
pgwCtl))
return -1;
494 if ((rc =
getData(
this,
"pgwrite", ioV, iovNum)))
return rc;
504 for (
int i = 0; i < csNum; i++) csVec[i] = ntohl(csVec[i]);
521 if ((rc = sfsP->
pgWrite(
IO.Offset, buff, Quantum, csVec)) <= 0)
522 {
IO.EInfo[0] = rc;
IO.EInfo[1] = 0;
523 return do_WriteNone();
533 IO.Offset += Quantum;
554 bool XrdXrootdProtocol::do_PgWIORetry(
int &rc)
556 static const int csLen =
sizeof(
kXR_unt32);
563 if (
IO.Offset & pgPageMask)
564 {
int n = pgPageSize - (
IO.Offset & pgPageMask);
565 isBad =
IO.
IOLen > (n + csLen);
566 }
else isBad =
IO.
IOLen > pgUnitSize;
572 "pgwrite retry of more than one page not allowed");
581 snprintf(buff,
sizeof(buff),
"retry %d@%lld",
IO.
IOLen-csLen,
IO.Offset);
613 if (!
argp || Quantum < halfBSize || argp->bsize < Quantum
615 {
if (getBuff(0, Quantum) <= 0)
return -1;}
struct ClientPgReadRequest pgread
struct ClientPgWriteRequest pgwrite
struct ClientRequestHdr header
XrdSysTrace XrdXrootdTrace
int setEtext(const char *text)
static bool csVer(dataInfo &dInfo, off_t &bado, int &badc)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static const uint64_t Verify
Options for pgRead() and pgWrite() as noted below.
virtual XrdSfsXferSize pgRead(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0)
virtual XrdSfsXferSize pgWrite(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize wrlen, uint32_t *csvec, uint64_t opts=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void pgrOps(int rsz, bool isRetry=false)
void pgwOps(int wsz, bool isRetry=false)
void pgUpdt(int wErrs, int wFixd, int wUnc)
XrdXrootdFile * Get(int fnum)
void Add_rd(kXR_unt32 dictid, kXR_int32 rlen, kXR_int64 offset)
void Add_wr(kXR_unt32 dictid, kXR_int32 wlen, kXR_int64 offset)
void Read(long long offs, int dlen) override
static XrdXrootdPgrwAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP, XrdXrootdPgwBadCS *bcsP=0)
int Write(long long offs, int dlen) override
const char * boAdd(XrdXrootdFile *fP, kXR_int64 foffs, int dlen=XrdProto::kXR_pgPageSZ)
char * boInfo(int &boLen)
ServerResponseStatus resp
static const int maxBSize
struct iovec * FrameInfo(int &iovn, int &rdlen)
const char * Setup(XrdBuffer *buffP, kXR_int64 fOffs, int totlen)
ServerResponseBody_pgWrite info
bool hasOffs(kXR_int64 foffs, int dlen)
bool delOffs(kXR_int64 foffs, int dlen)
int numOffs(int *errs=0, int *fixs=0)
static XrdXrootdStats * SI
XrdXrootdProtocol * VerifyStream(int &rc, int pID, bool lok=true)
XrdXrootdProtocol * Stream[maxStreams]
XrdXrootdFileTable * FTab
static XrdSysError & eDest
int getData(gdCallBack *gdcbP, const char *dtype, char *buff, int blen)
XrdXrootdMonitor::User Monitor
XrdXrootdResponse Response
static const int maxStreams
static RAtomic_int srvrAioOps
static const int kXR_pgUnitSZ
static const int kXR_pgPageSZ
static const int kXR_pgRetry