38 #include <sys/param.h>
39 #include <sys/types.h>
61 SlotList = SlotLust = 0;
63 if (sv > 32767) sv = 32767;
64 else if (sv < 0) sv = 0;
75 std::map<std::string,int>::iterator it = pqMap.end();
88 {
if (isNew)
return -EEXIST;
89 it = pqMap.find(std::string(Lfn));
90 if (it != pqMap.end() && VerOffset(Lfn, it->second))
return it->second;
102 if ((freeSlot = SlotList))
103 {fP = freeSlot->Offset;
104 SlotList = freeSlot->Next;
105 freeSlot->Next = SlotLust;
107 }
else {fP = pocSZ; pocSZ +=
ReqSize;}
112 if (!reqWrite((
void *)&tmpReq,
sizeof(tmpReq), fP))
113 {
eDest->
Emsg(
"Add", Lfn,
"not added to the persist queue.");
120 if (it != pqMap.end()) it->second = fP;
121 else pqMap[std::string(Lfn)] = fP;
134 long long addT =
static_cast<long long>(time(0));
138 if (!VerOffset(Lfn, Offset))
return -EINVAL;
142 if (!reqWrite((
void *)&addT,
sizeof(addT), Offset))
143 {
eDest->
Emsg(
"Commit", Lfn,
"not committed to the persist queue.");
150 pqMap.erase(std::string(Lfn));
167 if (!VerOffset(Lfn, Offset))
return -EINVAL;
171 if (
Unlink && (retc = ossFS->
Unlink(Lfn)) && retc != -ENOENT)
172 {
eDest->
Emsg(
"Del", retc,
"remove", Lfn);
173 return (retc < 0 ? retc : -retc);
178 if (!reqWrite((
void *)&Zero,
sizeof(Zero), Offset+offsetof(
Request,LFN)))
179 {
eDest->
Emsg(
"Del", Lfn,
"not removed from the persist queue.");
186 if ((freeSlot = SlotLust)) SlotLust = freeSlot->Next;
187 else freeSlot =
new FileSlot;
188 freeSlot->Offset = Offset;
189 freeSlot->Next = SlotList;
191 if (pocIQ > 0) pocIQ--;
195 pqMap.erase(std::string(Lfn));
209 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
214 int rc, Offs, numreq = 0;
222 if ((pocFD = XrdSysFD_Open(pocFN, O_RDWR|O_CREAT,
Mode)) < 0)
229 if (
fstat(pocFD, &buf)) {FailIni(
"stat");
return 0;}
243 {
do {rc =
pread(pocFD, (
void *)&tmpReq,
ReqSize, Offs);}
244 while(rc < 0 && errno == EINTR);
245 if (rc < 0) {
eDest->
Emsg(
"Init",errno,
"read",pocFN);
return First;}
246 if (*tmpReq.
LFN ==
'\0'
254 sprintf(Buff,
" %d pending create%s", numreq, (numreq != 1 ?
"s" :
""));
255 eDest->
Say(
"Init", Buff,
" recovered from ", pocFN);
256 if (ReWrite(First)) Ok = 1;
273 if ((theFD = XrdSysFD_Open(theFN, O_RDONLY)) < 0)
274 {
Say->
Emsg(
"Init",errno,
"open",theFN);
280 if (
fstat(theFD, &buf))
281 {
Say->
Emsg(
"Init",errno,
"stat",theFN);
285 if (buf.st_size <
ReqSize) buf.st_size = 0;
290 {
do {rc =
pread(theFD, (
void *)&tmpReq,
ReqSize, Offs);}
291 while(rc < 0 && errno == EINTR);
292 if (rc < 0) {
Say->
Emsg(
"List",errno,
"read",theFN);
293 close(theFD);
return First;
295 if (*tmpReq.
LFN !=
'\0') First =
new recEnt(tmpReq, 0, First);
308 void XrdOfsPoscq::FailIni(
const char *txt)
317 bool XrdOfsPoscq::reqWrite(
void *Buff,
int Bsz,
int Offs)
321 do {rc =
pwrite(pocFD, Buff, Bsz, Offs);}
while(rc < 0 && errno == EINTR);
323 if (rc >= 0 && Bsz > 8)
324 {
if (!pocWS) {pocWS = pocSV; rc =
fsync(pocFD);}
328 if (rc < 0) {
eDest->
Emsg(
"reqWrite",errno,
"write", pocFN);
return false;}
338 static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
339 char newFN[MAXPATHLEN], *oldFN;
340 int newFD, oldFD, Offs =
ReqOffs;
345 strcpy(newFN, pocFN); strcat(newFN,
".new");
346 if ((newFD = XrdSysFD_Open(newFN, O_RDWR|O_CREAT|O_TRUNC,
Mode)) < 0)
347 {
eDest->
Emsg(
"ReWrite",errno,
"open",newFN);
return false;}
351 oldFD = pocFD; pocFD = newFD;
352 oldFN = pocFN; pocFN = newFN;
359 {aOK =
false;
break;}
367 if (aOK &&
rename(newFN, oldFN) < 0)
368 {
eDest->
Emsg(
"ReWrite",errno,
"rename",newFN); aOK =
false;}
372 if (aOK)
close(oldFD);
373 else {
close(newFD); pocFD = oldFD;}
383 bool XrdOfsPoscq::VerOffset(
const char *Lfn,
int Offset)
390 sprintf(buff,
"Invalid slot %d for", Offset);
static XrdSysError eDest(0,"crypto_")
ssize_t pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
int stat(const char *path, struct stat *buf)
int ftruncate(int fildes, off_t offset)
ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset)
int fstat(int fildes, struct stat *buf)
int rename(const char *oldpath, const char *newpath)
int Del(const char *Lfn, int Offset, int Unlink=0)
static recEnt * List(XrdSysError *Say, const char *theFN)
XrdOfsPoscq(XrdSysError *erp, XrdOss *oss, const char *fn, int sv=1)
int Commit(const char *Lfn, int Offset)
int Add(const char *Tident, const char *Lfn, bool isNew)
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
XRootDStatus Unlink(Davix::DavPosix &davix_client, const std::string &url, uint16_t timeout)