XRootD
XrdFrcReqFile.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d F r c R e q F i l e . c c */
4 /* */
5 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstring>
32 #include <strings.h>
33 #include <cstdio>
34 #include <fcntl.h>
35 #include <unistd.h>
36 #include <cerrno>
37 #include <sys/param.h>
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 
41 #include "XrdFrc/XrdFrcCID.hh"
42 #include "XrdFrc/XrdFrcReqFile.hh"
43 #include "XrdFrc/XrdFrcTrace.hh"
44 #include "XrdSys/XrdSysError.hh"
45 #include "XrdSys/XrdSysFD.hh"
46 #include "XrdSys/XrdSysPlatform.hh"
47 
48 using namespace XrdFrc;
49 
50 /******************************************************************************/
51 /* S t a t i c V a r i a b l e s */
52 /******************************************************************************/
53 
54 XrdSysMutex XrdFrcReqFile::rqMonitor::rqMutex;
55 
56 /******************************************************************************/
57 /* C o n s t r u c t o r */
58 /******************************************************************************/
59 
60 XrdFrcReqFile::XrdFrcReqFile(const char *fn, int aVal)
61 {
62  char buff[1200];
63 
64  memset((void *)&HdrData, 0, sizeof(HdrData));
65  reqFN = strdup(fn);
66  strcpy(buff, fn); strcat(buff, ".lock");
67  lokFN = strdup(buff);
68  lokFD = reqFD = -1;
69  isAgent = aVal;
70 }
71 
72 /******************************************************************************/
73 /* A d d */
74 /******************************************************************************/
75 
77 {
78  rqMonitor rqMon(isAgent);
79  XrdFrcRequest tmpReq;
80  int fP;
81 
82 // Lock the file
83 //
84  if (!FileLock()) {FailAdd(rP->LFN, 0); return;}
85 
86 // Obtain a free slot
87 //
88  if ((fP = HdrData.Free))
89  {if (!reqRead((void *)&tmpReq, fP)) {FailAdd(rP->LFN, 1); return;}
90  HdrData.Free = tmpReq.Next;
91  } else {
92  struct stat buf;
93  if (fstat(reqFD, &buf))
94  {Say.Emsg("Add",errno,"stat",reqFN); FailAdd(rP->LFN, 1); return;}
95  fP = buf.st_size;
96  }
97 
98 // Chain in the request (registration requests go fifo)
99 //
101  {if (!(rP->Next = HdrData.First)) HdrData.Last = fP;
102  HdrData.First = fP;
103  } else {
104  if (HdrData.First && HdrData.Last)
105  {if (!reqRead((void *)&tmpReq, HdrData.Last))
106  {FailAdd(rP->LFN, 1); return;}
107  tmpReq.Next = fP;
108  if (!reqWrite((void *)&tmpReq, HdrData.Last, 0))
109  {FailAdd(rP->LFN, 1); return;}
110  } else HdrData.First = fP;
111  HdrData.Last = fP; rP->Next = 0;
112  }
113 
114 // Write out the file
115 //
116  rP->This = fP;
117  if (!reqWrite(rP, fP)) FailAdd(rP->LFN, 0);
118  FileLock(lkNone);
119 }
120 
121 /******************************************************************************/
122 /* C a n */
123 /******************************************************************************/
124 
126 {
127  rqMonitor rqMon(isAgent);
128  XrdFrcRequest tmpReq;
129  int Offs, numCan = 0, numBad = 0;
130  struct stat buf;
131  char txt[128];
132 
133 // Lock the file and get its size
134 //
135  if (!FileLock() || fstat(reqFD, &buf)) {FailCan(rP->ID, 0); return;}
136 
137 // Run through all of the file entries removing matching requests
138 //
139  for (Offs = ReqSize; Offs < buf.st_size; Offs += ReqSize)
140  {if (!reqRead((void *)&tmpReq, Offs)) return FailCan(rP->ID);
141  if (!strcmp(tmpReq.ID, rP->ID))
142  {tmpReq.LFN[0] = '\0';
143  if (!reqWrite((void *)&tmpReq, Offs, 0)) numBad++;
144  else numCan++;
145  }
146  }
147 
148 // Make sure this is written to disk
149 //
150  if (numCan) fsync(reqFD);
151 
152 // Document the action
153 //
154  if (numCan || numBad)
155  {sprintf(txt, "has %d entries; %d removed (%d failures).",
156  numCan+numBad, numCan, numBad);
157  Say.Emsg("Can", rP->ID, txt);
158  }
159  FileLock(lkNone);
160 }
161 
162 /******************************************************************************/
163 /* D e l */
164 /******************************************************************************/
165 
167 {
168  rqMonitor rqMon(isAgent);
169  XrdFrcRequest tmpReq;
170 
171 // Lock the file
172 //
173  if (!FileLock()) {FailDel(rP->LFN, 0); return;}
174 
175 // Put entry on the free chain
176 //
177  memset(&tmpReq, 0, sizeof(tmpReq));
178  tmpReq.Next = HdrData.Free;
179  HdrData.Free = rP->This;
180  if (!reqWrite((void *)&tmpReq, rP->This)) FailDel(rP->LFN, 0);
181  FileLock(lkNone);
182 }
183 
184 /******************************************************************************/
185 /* G e t */
186 /******************************************************************************/
187 
189 {
190  int fP, rc;
191 
192 // Lock the file
193 //
194  if (!FileLock()) return 0;
195 
196 // Get the next request
197 //
198  while((fP = HdrData.First))
199  {if (!reqRead((void *)rP, fP)) {FileLock(lkNone); return 0;}
200  HdrData.First= rP->Next;
201  if (*(rP->LFN)) {reqWrite(0,0,1); break;}
202  rP->Next = HdrData.Free;
203  HdrData.Free = fP;
204  if (!reqWrite(rP, fP)) {fP = 0; break;}
205  }
206  if (fP) rc = (HdrData.First ? 1 : -1);
207  else rc = 0;
208  FileLock(lkNone);
209  return rc;
210 }
211 
212 /******************************************************************************/
213 /* I n i t */
214 /******************************************************************************/
215 
217 {
218  EPNAME("Init");
219  static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
220  XrdFrcRequest tmpReq;
221  struct stat buf;
222  recEnt *RegList = 0, *First = 0, *rP, *pP, *tP;
223  int Offs, rc, numreq = 0;
224 
225 // Open the lock file first in r/w mode
226 //
227  if ((lokFD = XrdSysFD_Open(lokFN, O_RDWR|O_CREAT, Mode)) < 0)
228  {Say.Emsg("Init",errno,"open",lokFN); return 0;}
229 
230 // Obtain a lock
231 //
232  if (!FileLock(lkInit)) return 0;
233 
234 // Open the file first in r/w mode
235 //
236  if ((reqFD = XrdSysFD_Open(reqFN, O_RDWR|O_CREAT, Mode)) < 0)
237  {FileLock(lkNone);
238  Say.Emsg("Init",errno,"open",reqFN);
239  return 0;
240  }
241 
242 // Check for a new file here
243 //
244  if (fstat(reqFD, &buf)) return FailIni("stat");
245  if (buf.st_size < ReqSize)
246  {memset(&tmpReq, 0, sizeof(tmpReq));
247  HdrData.Free = ReqSize;
248  if (!reqWrite((void *)&tmpReq, ReqSize)) return FailIni("init file");
249  FileLock(lkNone);
250  return 1;
251  }
252 
253 // We are done if this is a agent
254 //
255  if (isAgent)
256  {FileLock(lkNone);
257  return 1;
258  }
259 
260 // Read the full file
261 //
262  for (Offs = ReqSize; Offs < buf.st_size; Offs += ReqSize)
263  {if (!reqRead((void *)&tmpReq, Offs)) return FailIni("read file");
264  if (*tmpReq.LFN == '\0' || !tmpReq.addTOD
265  || tmpReq.Opaque >= int(sizeof(tmpReq.LFN))) continue;
266  pP = 0; rP = First; tP = new recEnt(tmpReq); numreq++;
267  if (tmpReq.Options & XrdFrcRequest::Register)
268  {tP->Next = RegList; RegList = tP;
269  } else {
270  while(rP && rP->reqData.addTOD < tmpReq.addTOD) {pP=rP;rP=rP->Next;}
271  if (pP) pP->Next = tP;
272  else First = tP;
273  tP->Next = rP;
274  }
275  }
276 
277 // Plase registration requests in the front
278 //
279  while((rP = RegList))
280  {RegList = rP->Next;
281  rP->Next = First;
282  First = rP;
283  }
284 
285 // Now write out the file
286 //
287  DEBUG(numreq <<" request(s) recovered from " <<reqFN);
288  rc = ReWrite(First);
289 
290 // Delete all the entries in memory while referencing known instance names
291 //
292  while((tP = First))
293  {First = tP->Next;
294  CID.Ref(tP->reqData.iName);
295  delete tP;
296  }
297 
298 // All done
299 //
300  FileLock(lkNone);
301  return rc;
302 }
303 
304 /******************************************************************************/
305 /* L i s t */
306 /******************************************************************************/
307 
308 char *XrdFrcReqFile::List(char *Buff, int bsz, int &Offs,
309  XrdFrcRequest::Item *ITList, int ITNum)
310 {
311  rqMonitor rqMon(isAgent);
312  XrdFrcRequest tmpReq;
313  int rc;
314 
315 // Set Offs argument
316 //
317  if (Offs < ReqSize) Offs = ReqSize;
318 
319 // Lock the file
320 //
321  if (!FileLock(lkShare)) return 0;
322 
323 // Return next valid filename
324 //
325  do{do {rc = pread(reqFD, (void *)&tmpReq, ReqSize, Offs);}
326  while(rc < 0 && errno == EINTR);
327  if (rc == ReqSize)
328  {Offs += ReqSize;
329  if (*tmpReq.LFN == '\0' || !tmpReq.addTOD
330  || tmpReq.Opaque >= int(sizeof(tmpReq.LFN))
331  || tmpReq.Options & XrdFrcRequest::Register) continue;
332  FileLock(lkNone);
333  if (!ITNum || !ITList) strlcpy(Buff, tmpReq.LFN, bsz);
334  else ListL(tmpReq, Buff, bsz, ITList, ITNum);
335  return Buff;
336  }
337  } while(rc == ReqSize);
338 
339 // Diagnose ending condition
340 //
341  if (rc < 0) Say.Emsg("List",errno,"read",reqFN);
342 
343 // Return end of list
344 //
345  FileLock(lkNone);
346  return 0;
347 }
348 
349 /******************************************************************************/
350 /* L i s t L */
351 /******************************************************************************/
352 
353 void XrdFrcReqFile::ListL(XrdFrcRequest &tmpReq, char *Buff, int bsz,
354  XrdFrcRequest::Item *ITList, int ITNum)
355 {
356  char What, tbuf[32];
357  long long tval;
358  int i, k, n, bln = bsz-2, Lfo;
359 
360  for (i = 0; i < ITNum && bln > 0; i++)
361  {Lfo = tmpReq.LFO;
362  switch(ITList[i])
363  {case XrdFrcRequest::getOBJ:
364  Lfo = 0;
365  n = strlen(tmpReq.LFN);
366  strlcpy(Buff, tmpReq.LFN, bln);
367  break;
368 
369  case XrdFrcRequest::getLFN:
370  n = strlen(tmpReq.LFN+Lfo);
371  strlcpy(Buff, tmpReq.LFN+Lfo, bln);
372  break;
373 
375  Lfo = 0;
376  n = strlen(tmpReq.LFN); tmpReq.LFN[n] = '?';
377  if (!tmpReq.Opaque) tmpReq.LFN[n+1] = '\0';
378  strlcpy(Buff, tmpReq.LFN, bln);
379  k = strlen(tmpReq.LFN);
380  tmpReq.LFN[n] = '\0'; n = k;
381  break;
382 
384  n = strlen(tmpReq.LFN); tmpReq.LFN[n] = '?';
385  if (!tmpReq.Opaque) tmpReq.LFN[n+1] = '\0';
386  strlcpy(Buff, tmpReq.LFN+Lfo, bln);
387  k = strlen(tmpReq.LFN+Lfo);
388  tmpReq.LFN[n] = '\0'; n = k;
389  break;
390 
392  n = 0;
393  What = (tmpReq.Options & XrdFrcRequest::makeRW
394  ? 'w' : 'r');
395  if (bln) {Buff[n] = What; n++;}
396  if (tmpReq.Options & XrdFrcRequest::msgFail)
397  if (bln-n > 0) {Buff[n] = 'f'; n++;}
398  if (tmpReq.Options & XrdFrcRequest::msgSucc)
399  if (bln-n > 0) {Buff[n] = 'n'; n++;}
400  break;
401 
403  n = strlen(tmpReq.Notify);
404  strlcpy(Buff, tmpReq.Notify, bln);
405  break;
406 
408  *Buff = tmpReq.OPc;
409  n = 1;
410  break;
411 
413  if (tmpReq.Prty == 2) What = '2';
414  else if (tmpReq.Prty == 1) What = '1';
415  else What = '0';
416  n = 1;
417  if (bln) *Buff = What;
418  break;
419 
422  tval = tmpReq.addTOD;
423  if (ITList[i] == XrdFrcRequest::getQWT) tval = time(0)-tval;
424  if ((n = sprintf(tbuf, "%lld", tval)) >= 0)
425  strlcpy(Buff, tbuf, bln);
426  break;
427 
429  n = strlen(tmpReq.ID);
430  strlcpy(Buff, tmpReq.ID, bln);
431  break;
432 
434  n = strlen(tmpReq.User);
435  strlcpy(Buff, tmpReq.User, bln);
436  break;
437 
438  default: n = 0; break;
439  }
440  if (bln > 0) {bln -= n; Buff += n;}
441  if (bln > 0) {*Buff++ = ' '; bln--;}
442  }
443  *Buff = '\0';
444 }
445 
446 /******************************************************************************/
447 /* F a i l A d d */
448 /******************************************************************************/
449 
450 void XrdFrcReqFile::FailAdd(char *lfn, int unlk)
451 {
452  Say.Emsg("Add", lfn, "not added to prestage queue.");
453  if (unlk) FileLock(lkNone);
454 }
455 
456 /******************************************************************************/
457 /* F a i l C a n */
458 /******************************************************************************/
459 
460 void XrdFrcReqFile::FailCan(char *rid, int unlk)
461 {
462  Say.Emsg("Can", rid, "request not removed from prestage queue.");
463  if (unlk) FileLock(lkNone);
464 }
465 
466 /******************************************************************************/
467 /* F a i l D e l */
468 /******************************************************************************/
469 
470 void XrdFrcReqFile::FailDel(char *lfn, int unlk)
471 {
472  Say.Emsg("Del", lfn, "not removed from prestage queue.");
473  if (unlk) FileLock(lkNone);
474 }
475 
476 /******************************************************************************/
477 /* F a i l I n i */
478 /******************************************************************************/
479 
480 int XrdFrcReqFile::FailIni(const char *txt)
481 {
482  Say.Emsg("Init", errno, txt, reqFN);
483  FileLock(lkNone);
484  return 0;
485 }
486 
487 /******************************************************************************/
488 /* F i l e L o c k */
489 /******************************************************************************/
490 
491 int XrdFrcReqFile::FileLock(LockType lktype)
492 {
493  FLOCK_t lock_args;
494  const char *What;
495  int rc;
496 
497 // Establish locking options
498 //
499  memset(&lock_args, 0, sizeof(lock_args));
500  lock_args.l_whence = SEEK_SET;
501  if (lktype == lkNone)
502  {lock_args.l_type = F_UNLCK; What = "unlock";
503  if (isAgent && reqFD >= 0) {close(reqFD); reqFD = -1;}
504  }
505  else {lock_args.l_type = (lktype == lkShare ? F_RDLCK : F_WRLCK);
506  What = "lock";
507  flMutex.Lock();
508  }
509 
510 // Perform action.
511 //
512  do {rc = fcntl(lokFD,F_SETLKW,&lock_args);}
513  while(rc < 0 && errno == EINTR);
514  if (rc < 0) {Say.Emsg("FileLock", errno, What , lokFN); return 0;}
515 
516 // Refresh the header
517 //
518  if (lktype == lkExcl || lktype == lkShare)
519  {if (reqFD < 0 && (reqFD = XrdSysFD_Open(reqFN, O_RDWR)) < 0)
520  {Say.Emsg("FileLock",errno,"open",reqFN);
521  FileLock(lkNone);
522  return 0;
523  }
524  do {rc = pread(reqFD, (void *)&HdrData, sizeof(HdrData), 0);}
525  while(rc < 0 && errno == EINTR);
526  if (rc < 0) {Say.Emsg("reqRead",errno,"refresh hdr from", reqFN);
527  FileLock(lkNone); return 0;
528  }
529  } else if (lktype == lkNone) flMutex.UnLock();
530 
531 // All done
532 //
533  return 1;
534 }
535 
536 /******************************************************************************/
537 /* r e q R e a d */
538 /******************************************************************************/
539 
540 int XrdFrcReqFile::reqRead(void *Buff, int Offs)
541 {
542  int rc;
543 
544  do {rc = pread(reqFD, Buff, ReqSize, Offs);} while(rc < 0 && errno == EINTR);
545  if (rc < 0) {Say.Emsg("reqRead",errno,"read",reqFN); return 0;}
546  return 1;
547 }
548 
549 /******************************************************************************/
550 /* r e q W r i t e */
551 /******************************************************************************/
552 
553 int XrdFrcReqFile::reqWrite(void *Buff, int Offs, int updthdr)
554 {
555  int rc = 0;
556 
557  if (Buff && Offs) do {rc = pwrite(reqFD, Buff, ReqSize, Offs);}
558  while(rc < 0 && errno == EINTR);
559  if (rc >= 0 && updthdr){do {rc = pwrite(reqFD,&HdrData, sizeof(HdrData), 0);}
560  while(rc < 0 && errno == EINTR);
561  if (rc >= 0) rc = fsync(reqFD);
562  }
563  if (rc < 0) {Say.Emsg("reqWrite",errno,"write", reqFN); return 0;}
564  return 1;
565 }
566 
567 /******************************************************************************/
568 /* R e W r i t e */
569 /******************************************************************************/
570 
571 int XrdFrcReqFile::ReWrite(XrdFrcReqFile::recEnt *rP)
572 {
573  static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
574  char newFN[MAXPATHLEN], *oldFN;
575  int newFD, oldFD, Offs = ReqSize, aOK = 1;
576 
577 // Construct new file and open it
578 //
579  strcpy(newFN, reqFN); strcat(newFN, ".new");
580  if ((newFD = XrdSysFD_Open(newFN, O_RDWR|O_CREAT|O_TRUNC, Mode)) < 0)
581  {Say.Emsg("ReWrite",errno,"open",newFN); FileLock(lkNone); return 0;}
582 
583 // Setup to write/swap the file
584 //
585  oldFD = reqFD; reqFD = newFD;
586  oldFN = reqFN; reqFN = newFN;
587 
588 // Rewrite all records if we have any
589 //
590  if (rP)
591  {HdrData.First = Offs;
592  while(rP)
593  {rP->reqData.This = Offs;
594  rP->reqData.Next = (rP->Next ? Offs+ReqSize : 0);
595  if (!reqWrite((void *)&rP->reqData, Offs, 0)) {aOK = 0; break;}
596  Offs += ReqSize;
597  rP = rP->Next;
598  }
599  HdrData.Last = Offs - ReqSize;
600  } else {
601  HdrData.First = HdrData.Last = 0;
602  if (ftruncate(newFD, ReqSize) < 0)
603  {Say.Emsg("ReWrite",errno,"trunc",newFN); aOK = 0;}
604  }
605 
606 // Update the header
607 //
608  HdrData.Free = 0;
609  if (aOK && !(aOK = reqWrite(0, 0)))
610  Say.Emsg("ReWrite",errno,"write header",newFN);
611 
612 // If all went well, rename the file
613 //
614  if (aOK && rename(newFN, oldFN) < 0)
615  {Say.Emsg("ReWrite",errno,"rename",newFN); aOK = 0;}
616 
617 // Perform post processing
618 //
619  if (aOK) close(oldFD);
620  else {close(newFD); reqFD = oldFD;}
621  reqFN = oldFN;
622  return aOK;
623 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
ssize_t pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
int stat(const char *path, struct stat *buf)
int ftruncate(int fildes, off_t offset)
ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset)
int fstat(int fildes, struct stat *buf)
int fcntl(int fd, int cmd,...)
int rename(const char *oldpath, const char *newpath)
int fsync(int fildes)
#define close(a)
Definition: XrdPosix.hh:43
int Mode
size_t strlcpy(char *dst, const char *src, size_t sz)
#define FLOCK_t
void Ref(const char *iName)
Definition: XrdFrcCID.cc:248
int Get(XrdFrcRequest *rP)
void Del(XrdFrcRequest *rP)
char * List(char *Buff, int bsz, int &Offs, XrdFrcRequest::Item *ITList=0, int ITNum=0)
void Add(XrdFrcRequest *rP)
XrdFrcReqFile(const char *fn, int aVal)
void ListL(XrdFrcRequest &tmpReq, char *Buff, int bsz, XrdFrcRequest::Item *ITList, int ITNum)
void Can(XrdFrcRequest *rP)
char LFN[3072]
static const int msgFail
char User[256]
static const int makeRW
static const int msgSucc
static const int Register
signed char Prty
char Notify[512]
long long addTOD
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysError Say
XrdFrcCID CID
Definition: XrdFrcCID.cc:56