37 #include <sys/param.h>
38 #include <sys/types.h>
64 memset((
void *)&HdrData, 0,
sizeof(HdrData));
66 strcpy(buff, fn); strcat(buff,
".lock");
78 rqMonitor rqMon(isAgent);
84 if (!FileLock()) {FailAdd(rP->
LFN, 0);
return;}
88 if ((fP = HdrData.Free))
89 {
if (!reqRead((
void *)&tmpReq, fP)) {FailAdd(rP->
LFN, 1);
return;}
90 HdrData.Free = tmpReq.Next;
93 if (
fstat(reqFD, &buf))
94 {
Say.
Emsg(
"Add",errno,
"stat",reqFN); FailAdd(rP->
LFN, 1);
return;}
101 {
if (!(rP->
Next = HdrData.First)) HdrData.Last = fP;
104 if (HdrData.First && HdrData.Last)
105 {
if (!reqRead((
void *)&tmpReq, HdrData.Last))
106 {FailAdd(rP->
LFN, 1);
return;}
108 if (!reqWrite((
void *)&tmpReq, HdrData.Last, 0))
109 {FailAdd(rP->
LFN, 1);
return;}
110 }
else HdrData.First = fP;
111 HdrData.Last = fP; rP->
Next = 0;
117 if (!reqWrite(rP, fP)) FailAdd(rP->
LFN, 0);
127 rqMonitor rqMon(isAgent);
129 int Offs, numCan = 0, numBad = 0;
135 if (!FileLock() ||
fstat(reqFD, &buf)) {FailCan(rP->
ID, 0);
return;}
139 for (Offs = ReqSize; Offs < buf.st_size; Offs += ReqSize)
140 {
if (!reqRead((
void *)&tmpReq, Offs))
return FailCan(rP->
ID);
141 if (!strcmp(tmpReq.ID, rP->
ID))
142 {tmpReq.LFN[0] =
'\0';
143 if (!reqWrite((
void *)&tmpReq, Offs, 0)) numBad++;
150 if (numCan)
fsync(reqFD);
154 if (numCan || numBad)
155 {sprintf(txt,
"has %d entries; %d removed (%d failures).",
156 numCan+numBad, numCan, numBad);
168 rqMonitor rqMon(isAgent);
173 if (!FileLock()) {FailDel(rP->
LFN, 0);
return;}
177 memset(&tmpReq, 0,
sizeof(tmpReq));
178 tmpReq.
Next = HdrData.Free;
179 HdrData.Free = rP->
This;
180 if (!reqWrite((
void *)&tmpReq, rP->
This)) FailDel(rP->
LFN, 0);
194 if (!FileLock())
return 0;
198 while((fP = HdrData.First))
199 {
if (!reqRead((
void *)rP, fP)) {FileLock(lkNone);
return 0;}
200 HdrData.First= rP->Next;
201 if (*(rP->LFN)) {reqWrite(0,0,1);
break;}
202 rP->Next = HdrData.Free;
204 if (!reqWrite(rP, fP)) {fP = 0;
break;}
206 if (fP) rc = (HdrData.First ? 1 : -1);
219 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
222 recEnt *RegList = 0, *First = 0, *rP, *pP, *tP;
223 int Offs, rc, numreq = 0;
227 if ((lokFD = XrdSysFD_Open(lokFN, O_RDWR|O_CREAT,
Mode)) < 0)
228 {
Say.
Emsg(
"Init",errno,
"open",lokFN);
return 0;}
232 if (!FileLock(lkInit))
return 0;
236 if ((reqFD = XrdSysFD_Open(reqFN, O_RDWR|O_CREAT,
Mode)) < 0)
238 Say.
Emsg(
"Init",errno,
"open",reqFN);
244 if (
fstat(reqFD, &buf))
return FailIni(
"stat");
245 if (buf.st_size < ReqSize)
246 {memset(&tmpReq, 0,
sizeof(tmpReq));
247 HdrData.Free = ReqSize;
248 if (!reqWrite((
void *)&tmpReq, ReqSize))
return FailIni(
"init file");
262 for (Offs = ReqSize; Offs < buf.st_size; Offs += ReqSize)
263 {
if (!reqRead((
void *)&tmpReq, Offs))
return FailIni(
"read file");
264 if (*tmpReq.LFN ==
'\0' || !tmpReq.addTOD
265 || tmpReq.Opaque >=
int(
sizeof(tmpReq.LFN)))
continue;
266 pP = 0; rP = First; tP =
new recEnt(tmpReq); numreq++;
268 {tP->Next = RegList; RegList = tP;
270 while(rP && rP->reqData.addTOD < tmpReq.addTOD) {pP=rP;rP=rP->Next;}
271 if (pP) pP->Next = tP;
279 while((rP = RegList))
287 DEBUG(numreq <<
" request(s) recovered from " <<reqFN);
294 CID.
Ref(tP->reqData.iName);
311 rqMonitor rqMon(isAgent);
317 if (Offs < ReqSize) Offs = ReqSize;
321 if (!FileLock(lkShare))
return 0;
325 do{
do {rc =
pread(reqFD, (
void *)&tmpReq, ReqSize, Offs);}
326 while(rc < 0 && errno == EINTR);
329 if (*tmpReq.
LFN ==
'\0' || !tmpReq.
addTOD
330 || tmpReq.
Opaque >=
int(
sizeof(tmpReq.
LFN))
333 if (!ITNum || !ITList)
strlcpy(Buff, tmpReq.
LFN, bsz);
334 else ListL(tmpReq, Buff, bsz, ITList, ITNum);
337 }
while(rc == ReqSize);
341 if (rc < 0)
Say.
Emsg(
"List",errno,
"read",reqFN);
358 int i, k, n, bln = bsz-2, Lfo;
360 for (i = 0; i < ITNum && bln > 0; i++)
365 n = strlen(tmpReq.
LFN);
370 n = strlen(tmpReq.
LFN+Lfo);
376 n = strlen(tmpReq.
LFN); tmpReq.
LFN[n] =
'?';
377 if (!tmpReq.
Opaque) tmpReq.
LFN[n+1] =
'\0';
379 k = strlen(tmpReq.
LFN);
380 tmpReq.
LFN[n] =
'\0'; n = k;
384 n = strlen(tmpReq.
LFN); tmpReq.
LFN[n] =
'?';
385 if (!tmpReq.
Opaque) tmpReq.
LFN[n+1] =
'\0';
387 k = strlen(tmpReq.
LFN+Lfo);
388 tmpReq.
LFN[n] =
'\0'; n = k;
395 if (bln) {Buff[n] = What; n++;}
397 if (bln-n > 0) {Buff[n] =
'f'; n++;}
399 if (bln-n > 0) {Buff[n] =
'n'; n++;}
403 n = strlen(tmpReq.
Notify);
413 if (tmpReq.
Prty == 2) What =
'2';
414 else if (tmpReq.
Prty == 1) What =
'1';
417 if (bln) *Buff = What;
424 if ((n = sprintf(tbuf,
"%lld", tval)) >= 0)
429 n = strlen(tmpReq.
ID);
434 n = strlen(tmpReq.
User);
438 default: n = 0;
break;
440 if (bln > 0) {bln -= n; Buff += n;}
441 if (bln > 0) {*Buff++ =
' '; bln--;}
450 void XrdFrcReqFile::FailAdd(
char *lfn,
int unlk)
452 Say.
Emsg(
"Add", lfn,
"not added to prestage queue.");
453 if (unlk) FileLock(lkNone);
460 void XrdFrcReqFile::FailCan(
char *rid,
int unlk)
462 Say.
Emsg(
"Can", rid,
"request not removed from prestage queue.");
463 if (unlk) FileLock(lkNone);
470 void XrdFrcReqFile::FailDel(
char *lfn,
int unlk)
472 Say.
Emsg(
"Del", lfn,
"not removed from prestage queue.");
473 if (unlk) FileLock(lkNone);
480 int XrdFrcReqFile::FailIni(
const char *txt)
482 Say.
Emsg(
"Init", errno, txt, reqFN);
491 int XrdFrcReqFile::FileLock(LockType lktype)
499 memset(&lock_args, 0,
sizeof(lock_args));
500 lock_args.l_whence = SEEK_SET;
501 if (lktype == lkNone)
502 {lock_args.l_type = F_UNLCK; What =
"unlock";
503 if (isAgent && reqFD >= 0) {
close(reqFD); reqFD = -1;}
505 else {lock_args.l_type = (lktype == lkShare ? F_RDLCK : F_WRLCK);
512 do {rc =
fcntl(lokFD,F_SETLKW,&lock_args);}
513 while(rc < 0 && errno == EINTR);
514 if (rc < 0) {
Say.
Emsg(
"FileLock", errno, What , lokFN);
return 0;}
518 if (lktype == lkExcl || lktype == lkShare)
519 {
if (reqFD < 0 && (reqFD = XrdSysFD_Open(reqFN, O_RDWR)) < 0)
520 {
Say.
Emsg(
"FileLock",errno,
"open",reqFN);
524 do {rc =
pread(reqFD, (
void *)&HdrData,
sizeof(HdrData), 0);}
525 while(rc < 0 && errno == EINTR);
526 if (rc < 0) {
Say.
Emsg(
"reqRead",errno,
"refresh hdr from", reqFN);
527 FileLock(lkNone);
return 0;
529 }
else if (lktype == lkNone) flMutex.UnLock();
540 int XrdFrcReqFile::reqRead(
void *Buff,
int Offs)
544 do {rc =
pread(reqFD, Buff, ReqSize, Offs);}
while(rc < 0 && errno == EINTR);
545 if (rc < 0) {
Say.
Emsg(
"reqRead",errno,
"read",reqFN);
return 0;}
553 int XrdFrcReqFile::reqWrite(
void *Buff,
int Offs,
int updthdr)
557 if (Buff && Offs)
do {rc =
pwrite(reqFD, Buff, ReqSize, Offs);}
558 while(rc < 0 && errno == EINTR);
559 if (rc >= 0 && updthdr){
do {rc =
pwrite(reqFD,&HdrData,
sizeof(HdrData), 0);}
560 while(rc < 0 && errno == EINTR);
561 if (rc >= 0) rc =
fsync(reqFD);
563 if (rc < 0) {
Say.
Emsg(
"reqWrite",errno,
"write", reqFN);
return 0;}
571 int XrdFrcReqFile::ReWrite(XrdFrcReqFile::recEnt *rP)
573 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
574 char newFN[MAXPATHLEN], *oldFN;
575 int newFD, oldFD, Offs = ReqSize, aOK = 1;
579 strcpy(newFN, reqFN); strcat(newFN,
".new");
580 if ((newFD = XrdSysFD_Open(newFN, O_RDWR|O_CREAT|O_TRUNC,
Mode)) < 0)
581 {
Say.
Emsg(
"ReWrite",errno,
"open",newFN); FileLock(lkNone);
return 0;}
585 oldFD = reqFD; reqFD = newFD;
586 oldFN = reqFN; reqFN = newFN;
591 {HdrData.First = Offs;
593 {rP->reqData.This = Offs;
594 rP->reqData.Next = (rP->Next ? Offs+ReqSize : 0);
595 if (!reqWrite((
void *)&rP->reqData, Offs, 0)) {aOK = 0;
break;}
599 HdrData.Last = Offs - ReqSize;
601 HdrData.First = HdrData.Last = 0;
603 {
Say.
Emsg(
"ReWrite",errno,
"trunc",newFN); aOK = 0;}
609 if (aOK && !(aOK = reqWrite(0, 0)))
610 Say.
Emsg(
"ReWrite",errno,
"write header",newFN);
614 if (aOK &&
rename(newFN, oldFN) < 0)
615 {
Say.
Emsg(
"ReWrite",errno,
"rename",newFN); aOK = 0;}
619 if (aOK)
close(oldFD);
620 else {
close(newFD); reqFD = oldFD;}
ssize_t pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
int stat(const char *path, struct stat *buf)
int ftruncate(int fildes, off_t offset)
ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset)
int fstat(int fildes, struct stat *buf)
int fcntl(int fd, int cmd,...)
int rename(const char *oldpath, const char *newpath)
void Ref(const char *iName)
int Get(XrdFrcRequest *rP)
void Del(XrdFrcRequest *rP)
char * List(char *Buff, int bsz, int &Offs, XrdFrcRequest::Item *ITList=0, int ITNum=0)
void Add(XrdFrcRequest *rP)
XrdFrcReqFile(const char *fn, int aVal)
void ListL(XrdFrcRequest &tmpReq, char *Buff, int bsz, XrdFrcRequest::Item *ITList, int ITNum)
void Can(XrdFrcRequest *rP)
static const int Register
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)