XRootD
XrdOfsPrepGPI.cc
Go to the documentation of this file.
1 
2 #include <string>
3 #include <vector>
4 
5 #include <stdio.h>
6 #include <unistd.h>
7 
8 #include "Xrd/XrdJob.hh"
9 #include "Xrd/XrdScheduler.hh"
10 
11 #include "XrdOfs/XrdOfsPrepare.hh"
12 
13 #include "XrdOss/XrdOss.hh"
14 
15 #include "XrdOuc/XrdOuca2x.hh"
16 #include "XrdOuc/XrdOucBuffer.hh"
17 #include "XrdOuc/XrdOucEnv.hh"
18 #include "XrdOuc/XrdOucErrInfo.hh"
20 #include "XrdOuc/XrdOucProg.hh"
21 #include "XrdOuc/XrdOucStream.hh"
22 #include "XrdOuc/XrdOucString.hh"
23 #include "XrdOuc/XrdOucTList.hh"
24 
25 #include "XrdSec/XrdSecEntity.hh"
26 
28 
29 #include "XrdSys/XrdSysError.hh"
30 #include "XrdSys/XrdSysE2T.hh"
31 #include "XrdSys/XrdSysPthread.hh"
32 #include "XrdSys/XrdSysTrace.hh"
33 
34 #include "XrdVersion.hh"
35 
36 /******************************************************************************/
37 /* M a c r o s */
38 /******************************************************************************/
39 
40 #define DEBUG(usr,x) if (Debug) SYSTRACE(SysTrace.,usr,EPName,0,x)
41 
42 #define EPNAME(x) const char *EPName=x
43 
44 /******************************************************************************/
45 /* X r d O f s P r e p G P I R e a l */
46 /******************************************************************************/
47 /******************************************************************************/
48 /* L o c a l S t a t i c s */
49 /******************************************************************************/
50 
52 {
54 
56 
57 XrdOss *ossP = 0;
61 
62 XrdSysCondVar qryCond(0, "prepG query");
63 int qryAllow = 8; // Protected by the condition variable above
64 int qryWait = 0; // Ditto
65 static const int qryMaxWT = 33; // Maximum wait time
66 
67 int maxFiles = 48;
69 
70 bool addCGI = false;
71 bool Debug = false;
72 bool usePFN = false;
73 char okReq = 0;
74 
75 static const int okCancel = 0x01;
76 static const int okEvict = 0x02;
77 static const int okPrep = 0x04;
78 static const int okQuery = 0x08;
79 static const int okStage = 0x10;
80 static const int okAll = 0x1f;
81 
82 XrdSysTrace SysTrace("PrepGPI");
83 }
84 
85 using namespace XrdOfsPrepGPIReal;
86 
87 /******************************************************************************/
88 /* P r e p R e q u e s t */
89 /******************************************************************************/
90 
91 namespace XrdOfsPrepGPIReal
92 {
95  static PrepRequest *First;
96  static PrepRequest *Last;
97 
98 
99  static const int envSZ = 4;
100  static const int argSZ = 12;
101 
102  const char *argVec[argSZ];
103  int argCnt;
104  int envCnt;
105  const char *envVec[envSZ];
106 
107  char *reqID;
108  const char *reqName;
109  const char *tID;
110 
111  std::vector<std::string> argMem;
112  std::vector<std::string> envMem;
113 
114  const char *Info(char *bP, int bL)
115  {snprintf(bP, bL, "%s %s %s", tID, reqName, reqID);
116  return bP;
117  }
118 
119  PrepRequest() : next(0), argCnt(0), envCnt(0), reqID(0),
120  reqName("?"), tID("anon") {}
121  ~PrepRequest() {if (reqID) free(reqID);}
122 };
123 
126 
127 }
128 
129 /******************************************************************************/
130 /* P r e p G R u n */
131 /******************************************************************************/
132 
133 namespace XrdOfsPrepGPIReal
134 {
135 class PrepGRun : public XrdJob
136 {
137 public:
138 
139 void DoIt() override;
140 
141 int Run(PrepRequest &req, char *bP=0, int bL=0);
142 
143 void Sched(PrepRequest *rP) {reqP = rP;
144  schedP->Schedule(this);
145  }
146 
147  PrepGRun(XrdOucProg &pgm) : prepProg(pgm) {}
148 
150 static PrepGRun *Q;
151 
152 private:
153  ~PrepGRun() {} // Never gets deleted
154 
155 int Capture(PrepRequest &req, XrdOucStream &cmd, char *bP, int bL);
156 void makeArgs(PrepRequest &req, const char *argVec[]);
157 
158 PrepRequest *reqP;
159 XrdOucProg &prepProg;
160 };
161 
162 PrepGRun *PrepGRun::Q = 0;
163 }
164 
165 /******************************************************************************/
166 /* Private: P r e p G R u n : : C a p t u r e */
167 /******************************************************************************/
168 
169 namespace XrdOfsPrepGPIReal
170 {
171 int PrepGRun::Capture(PrepRequest &req, XrdOucStream &cmd, char *bP, int bL)
172 {
173  EPNAME("Capture");
174  static const int bReserve = 40;
175  char *lp, *bPNow = bP, *bPEnd = bP+bL-bReserve;
176  int len;
177  bool isTrunc = false;
178 
179 // Make sure the buffer length is minimum we need
180 //
181  if (bL < 256)
182  {char ib[512];
183  eLog->Emsg("PrepGRun","Prep exec for",req.Info(ib,sizeof(ib)),
184  "failed; invalid buffer size.");
185  return -1;
186  }
187 
188 // Place all lines that will fit into the suplied buffer
189 //
190  while((lp = cmd.GetLine()))
191  {len = strlen(lp) + 1;
192  if (bPNow + len >= bPEnd) {isTrunc = true; break;}
193  if (len > 1)
194  {strcpy(bPNow, lp);
195  bPNow[len-1] = '\n';
196  bPNow += len;
197  DEBUG(req.tID, " +=> " <<lp);
198  }
199  }
200 
201 // Take care of overflow lines
202 //
203  while(lp)
204  {DEBUG(req.tID, " -=> " <<lp);
205  lp = cmd.GetLine();
206  }
207 
208 // Change last line to end with a null byte and compute total length
209 //
210  if (bPNow == bP) len = snprintf(bP, bL, "No information available.") + 1;
211  else {if (isTrunc) bPNow += snprintf(bPNow, bReserve,
212  "***response has been truncated***");
213  else *(bPNow-1) = 0;
214  len = bPNow - bP + 1;
215  }
216 
217 // Return number of bytes in buffer
218 //
219  return len;
220 }
221 }
222 
223 /******************************************************************************/
224 /* P r e p G R u n : : D o I t */
225 /******************************************************************************/
226 
227 namespace XrdOfsPrepGPIReal
228 {
230 {
231 
232 // Run as many requests as we can
233 //
234 do{Run(*reqP);
235  delete reqP;
236  gpiMutex.Lock();
237  if ((reqP = PrepRequest::First))
241  } else {
242  next = Q;
243  Q = this;
244  }
245  gpiMutex.UnLock();
246  } while(reqP);
247 }
248 }
249 
250 /******************************************************************************/
251 /* Private: P r e p G R u n : : m a k e A r g s */
252 /******************************************************************************/
253 
254 namespace XrdOfsPrepGPIReal
255 {
256 void PrepGRun::makeArgs(PrepRequest &req, const char *argVec[])
257 {
258 
259 // Copy front arguments
260 //
261  memcpy(argVec, req.argVec, req.argCnt*sizeof(char*));
262 
263 // Copy over all the allocated arguments
264 //
265  int j = req.argCnt;
266  for (int i = 0; i < (int)req.argMem.size(); i++)
267  argVec[j++] = req.argMem[i].c_str();
268 }
269 }
270 
271 /******************************************************************************/
272 /* P r e p G R u n : : R u n */
273 /******************************************************************************/
274 
275 namespace XrdOfsPrepGPIReal
276 {
277 int PrepGRun::Run(PrepRequest &req, char *bP, int bL)
278 {
279  EPNAME("Run");
280  XrdOucStream cmd;
281  char *lp;
282  int rc, bytes = 0;
283 
284 // Allocate a arg vector of appropriate size
285 //
286  int n = req.argCnt + req.argMem.size();
287  const char **argVec = (const char **)alloca((n+2) * sizeof(char*));
288 
289 // Fill the vector
290 //
291  makeArgs(req, argVec);
292 
293 // Do some debugging
294 //
295  DEBUG(req.tID,"Starting prep for "<<req.reqName<<' '<<req.reqID);
296 
297 // Invoke the program
298 //
299  rc = prepProg.Run(&cmd, argVec, n, req.envVec);
300 
301 // Drain or capture any output
302 //
303  if (!rc)
304  {if (Debug)
305  {DEBUG(req.tID, req.reqName<<' '<<req.reqID<<" output:");}
306  if (!bP) while((lp = cmd.GetLine())) {DEBUG(req.tID," ==> "<<lp);}
307  else bytes = Capture(req, cmd, bP, bL);
308  rc = prepProg.RunDone(cmd);
309  }
310 
311 // Document unsuccessful end
312 //
313  if (rc)
314  {char ib[512];
315  eLog->Emsg("PrepGRun","Prep exec for",req.Info(ib,sizeof(ib)),"failed.");
316  }
317 
318 // Return the error, success or number of bytes
319 //
320  if (bP) return bytes;
321  return (rc ? -1 : 0);
322 }
323 }
324 
325 /******************************************************************************/
326 /* P r e p G P I */
327 /******************************************************************************/
328 
329 namespace XrdOfsPrepGPIReal
330 {
331 class PrepGPI : public XrdOfsPrepare
332 {
333 public:
334 
335 int begin( XrdSfsPrep &pargs,
336  XrdOucErrInfo &eInfo,
337  const XrdSecEntity *client = 0) override;
338 
339 int cancel( XrdSfsPrep &pargs,
340  XrdOucErrInfo &eInfo,
341  const XrdSecEntity *client = 0) override;
342 
343 int query( XrdSfsPrep &pargs,
344  XrdOucErrInfo &eInfo,
345  const XrdSecEntity *client = 0) override;
346 
347  PrepGPI(PrepGRun &gRun) : qryRunner(gRun) {}
348 
349 virtual ~PrepGPI() {}
350 
351 private:
352 const char *ApplyN2N(const char *tid, const char *path, char *buff, int blen);
353 PrepRequest *Assemble(int &rc, const char *tid, const char *reqName,
354  XrdSfsPrep &pargs, const char *xOpt);
355 bool reqFind(const char *reqid, PrepRequest *&rPP, PrepRequest *&rP,
356  bool del=false, bool locked=false);
357 int RetErr(XrdOucErrInfo &eInfo, int rc, const char *txt1,
358  const char *txt2);
359 int Xeq(PrepRequest *rP);
360 
361 PrepGRun &qryRunner;
362 };
363 }
364 
365 /******************************************************************************/
366 /* Private: P r e p G P I : : A p p l y N 2 N */
367 /******************************************************************************/
368 
369 namespace XrdOfsPrepGPIReal
370 {
371 const char *PrepGPI::ApplyN2N(const char *tid, const char *path, char *buff,
372  int blen)
373 {
374  int rc;
375 
376 // Translate this path
377 //
378  const char *pfn = ossP->Lfn2Pfn(path, buff, blen, rc);
379 
380 // Diagnose any errors
381 //
382  if (rc)
383  {char buff[1024];
384  snprintf(buff, sizeof(buff), "handle %s path", tid);
385  eLog->Emsg("PrepGPI", rc, buff, path);
386  pfn = 0;
387  }
388 
389 // Return result
390 //
391  return pfn;
392 }
393 }
394 
395 /******************************************************************************/
396 /* Private: P r e p G P I : : A s s e m b l e */
397 /******************************************************************************/
398 
399 namespace XrdOfsPrepGPIReal
400 {
401 PrepRequest *PrepGPI::Assemble(int &rc, const char *tid, const char *reqName,
402  XrdSfsPrep &pargs, const char *xOpt)
403 {
404  PrepRequest *rP = new PrepRequest;
405  int n;
406  char buff[1024];
407 
408 // Count the number of paths and size the vector to hold all of them
409 //
410  XrdOucTList *pP = pargs.paths;
411  n = 0;
412  while(pP) {n++; pP = pP->next;}
413 
414 // Make sure we don't have too many files here
415 //
416  if (n > maxFiles) {rc = E2BIG; return 0;}
417  rc = 0;
418 
419 // Size the vector to accomodate the file arguments
420 //
421  rP->argMem.reserve(n);
422 
423 // Add trace ID to the environment, this is always the first element.
424 // 0123456789012
425  snprintf(buff, sizeof(buff), "XRDPREP_TID=%s", tid);
426  rP->envMem.emplace_back(buff);
427 
428 // Insert options and envars for request:
429 // -a -C -n {fin | rdy} -p <n> -w -- <rid> <reqname> [<file> [...]]
430 // 1 2 3 4 5 6 7 8 9 10
431 //
432 // Envars: XRDPREP_COLOC XRDPREP_NOTIFY XRDPREP_TID
433 //
434  while(*xOpt)
435  {switch(*xOpt)
436  {case 'a': if (pargs.opts & Prep_FRESH)
437  rP->argVec[rP->argCnt++] = "-a";
438  break;
439  case 'C': if (!(pargs.opts & Prep_COLOC)
440  || !pargs.paths || !pargs.paths->next) break;
441  snprintf(buff, sizeof(buff), "XRDPREP_COLOC=%s",
442  pargs.paths->text);
443  rP->envMem.emplace_back(buff);
444  rP->argVec[rP->argCnt++] = "-C";
445  break;
446  case 'n': if (!pargs.notify || !*pargs.notify) break;
447  snprintf(buff, sizeof(buff), "XRDPREP_NOTIFY=%s",
448  pargs.notify);
449  rP->envMem.emplace_back(buff);
450  rP->argVec[rP->argCnt++] = "-n";
451  rP->argVec[rP->argCnt++] = (pargs.opts & Prep_SENDERR ?
452  "fin" : "rdy");
453  break;
454  case 'p': n = pargs.opts & Prep_PMASK;
455  rP->argVec[rP->argCnt++] = "-p";
456  if (n == 0) rP->argVec[rP->argCnt++] = "0";
457  else if (n == 1) rP->argVec[rP->argCnt++] = "1";
458  else if (n == 2) rP->argVec[rP->argCnt++] = "2";
459  else rP->argVec[rP->argCnt++] = "3";
460  break;
461  case 'w': if (pargs.opts & Prep_WMODE)
462  rP->argVec[rP->argCnt++] = "-w";
463  break;
464  default: break;
465  }
466  xOpt++;
467  }
468 
469 // Complete the envVec as now no objects will be relocated in envMem
470 //
471  for (n = 0; n < (int)rP->envMem.size(); n++)
472  rP->envVec[n] = rP->envMem[n].c_str();
473  rP->envVec[n] = 0;
474 
475 // Extract out the trace ID
476 //
477  rP->tID = rP->envMem[0].c_str() + 12;
478 
479 // Add request id to the argument list
480 //
481  rP->argVec[rP->argCnt++] = "--";
482  rP->reqID = strdup(pargs.reqid);
483  rP->argVec[rP->argCnt++] = rP->reqID;
484 
485 // Add request name in the argument list
486 //
487  rP->reqName = reqName;
488  rP->argVec[rP->argCnt++] = reqName;
489 
490 // Handle the path list, we have two generic cases with cgi or no cgi
491 //
492  if ((pP = pargs.paths))
493  {const char *path;
494  if (addCGI)
495  {XrdOucTList *cP = pargs.oinfo;
496  char pBuff[8192];
497  do {path = (usePFN ? ApplyN2N(tid,pP->text,buff,sizeof(buff)):pP->text);
498  if (!path) continue;
499  if (cP->text && *cP->text)
500  {snprintf(pBuff, sizeof(pBuff), "%s?%s", path, cP->text);
501  path = pBuff;
502  }
503  rP->argMem.emplace_back(path);
504  pP = pP->next;
505  } while(pP);
506  } else {
507  while(pP)
508  do {path = (usePFN ? ApplyN2N(tid,pP->text,buff,sizeof(buff)):pP->text);
509  if (!path) continue;
510  rP->argMem.emplace_back(path);
511  pP = pP->next;
512  } while(pP);
513  }
514  }
515 
516 // All done return the request object
517 //
518  return rP;
519 }
520 }
521 
522 /******************************************************************************/
523 /* P r e p G P I : : b e g i n */
524 /******************************************************************************/
525 
526 namespace XrdOfsPrepGPIReal
527 {
529  XrdOucErrInfo &eInfo,
530  const XrdSecEntity *client)
531 {
532  const char *reqName, *reqOpts, *tid = (client ? client->tident : "anon");
533  int rc;
534  bool ignore;
535 
536 // Establish the actual request
537 //
538  if (pargs.opts & Prep_EVICT)
539  {reqName = "evict";
540  reqOpts = "";
541  ignore = (okReq & okEvict) == 0;
542  }
543  else if (pargs.opts & Prep_STAGE)
544  {reqName = "stage";
545  reqOpts = "Cnpw";
546  ignore = (okReq & okStage) == 0;
547  }
548  else {reqName = "prep";
549  reqOpts = "Cnpw";
550  ignore = (okReq & okPrep) == 0;
551  }
552 
553 // Check if this operation is supported
554 //
555  if (ignore) return RetErr(eInfo, ENOTSUP, "process", reqName);
556 
557 // Get a request request object
558 //
559  PrepRequest *rP = Assemble(rc, tid, reqName, pargs, reqOpts);
560 
561 // If we didn't get one or if there are no paths selected, complain
562 //
563  if (!rP || rP->argMem.size() == 0)
564  return RetErr(eInfo, (rc ? rc : EINVAL), reqName, "files");
565 
566 // Either run or queue this request and return
567 //
568  return Xeq(rP);
569 }
570 }
571 
572 /******************************************************************************/
573 /* P r e p G P I : : c a n c e l */
574 /******************************************************************************/
575 
576 namespace XrdOfsPrepGPIReal
577 {
579  XrdOucErrInfo &eInfo,
580  const XrdSecEntity *client)
581 {
582  const char *tid = (client ? client->tident : "anon");
583  int rc;
584 
585 // If the attached program does no know how to handle cancel, do the minimal
586 // thing and remove the request from the waiting queue if it is there.
587 //
588  if (!(okReq & okCancel))
589  {PrepRequest *rPP, *rP;
590  int bL;
591  char *bP = eInfo.getMsgBuff(bL);
592  if (reqFind(pargs.reqid, rPP, rP, true))
593  {bL = snprintf(bP, bL, "Request %s cancelled.", pargs.reqid);
594  } else {
595  bL = snprintf(bP, bL, "Request %s not cancellable.", pargs.reqid);
596  }
597  eInfo.setErrCode(bL);
598  return SFS_DATA;
599  }
600 
601 // Get a request request object
602 //
603  PrepRequest *rP = Assemble(rc, tid, "cancel", pargs, "n");
604 
605 // If we didn't get one or if there are no paths selected, complain
606 //
607  if (!rP) return RetErr(eInfo, (rc ? rc : EINVAL), "cancel", "files");
608 
609 // Either run or queue this request and return
610 //
611  return Xeq(rP);
612 }
613 }
614 
615 /******************************************************************************/
616 /* P r e p G P I : : q u e r y */
617 /******************************************************************************/
618 
619 namespace XrdOfsPrepGPIReal
620 {
622  XrdOucErrInfo &eInfo,
623  const XrdSecEntity *client)
624 {
625  EPNAME("Query");
626  struct OucBuffer {XrdOucBuffer *pBuff;
627  OucBuffer() : pBuff(0) {}
628  ~OucBuffer() {if (pBuff) pBuff->Recycle();}
629  } OucBuff;
630  const char *tid = (client ? client->tident : "anon");
631  int rc, bL;
632  char *bP = eInfo.getMsgBuff(bL);
633 
634 // If the attached program does no know how to handle cancel, do the minimal
635 // thing and remove the request from the waiting queue if it is there.
636 //
637  if (!(okReq & okQuery))
638  {PrepRequest *rPP, *rP;
639  if (reqFind(pargs.reqid, rPP, rP))
640  {bL = snprintf(bP, bL, "Request %s queued.", pargs.reqid)+1;
641  } else {
642  bL = snprintf(bP, bL, "Request %s not queued.", pargs.reqid)+1;
643  }
644  eInfo.setErrCode(bL);
645  return SFS_DATA;
646  }
647 
648 // Allocate a buffer if need be
649 //
650  if (bPool)
651  {OucBuff.pBuff = bPool->Alloc(maxResp);
652  if (OucBuff.pBuff)
653  {bP = OucBuff.pBuff->Buffer();
654  bL = maxResp;
655  }
656  }
657 
658 // Get a request request object
659 //
660  PrepRequest *rP = Assemble(rc, tid, "query", pargs, "");
661 
662 // If we didn't get one or if there are no paths selected, complain
663 //
664  if (!rP) return RetErr(eInfo, (rc ? rc : EINVAL), "query", "request");
665 
666 // Wait for our turn if need be. This is sloppy and spurious wakeups may
667 // cause us to exceed the allowed limit.
668 //
669  qryCond.Lock();
670  if (qryAllow) qryAllow--;
671  else {qryWait++;
672  DEBUG(tid, "Waiting to launch query "<<rP->reqID);
673  rc = qryCond.Wait(qryMaxWT);
674  qryWait--;
675  if (!rc) qryAllow--;
676  else {qryCond.UnLock();
677  return RetErr(eInfo, ETIMEDOUT, "query", "request");
678  }
679  }
680  qryCond.UnLock();
681 
682 // Run the query
683 //
684  *bP = 0;
685  rc = qryRunner.Run(*rP, bP, bL);
686 
687 // Let the next query run
688 //
689  qryCond.Lock();
690  qryAllow++;
691  if (qryWait) qryCond.Signal();
692  qryCond.UnLock();
693 
694 // See if this ended in an error
695 //
696  if (rc <= 0) return RetErr(eInfo, ECANCELED, "query", "request");
697 
698 // Return response
699 //
700  if (!OucBuff.pBuff) eInfo.setErrCode(rc);
701  else {OucBuff.pBuff->SetLen(rc);
702  eInfo.setErrInfo(rc, OucBuff.pBuff);
703  OucBuff.pBuff = 0;
704  }
705  return SFS_DATA;
706 }
707 }
708 
709 /******************************************************************************/
710 /* Private: P r e p G P I : : r e q F i n d */
711 /******************************************************************************/
712 
713 namespace XrdOfsPrepGPIReal
714 {
715 bool PrepGPI::reqFind(const char *reqid, PrepRequest *&rPP, PrepRequest *&rP,
716  bool del, bool locked)
717 {
718 // Even thougth "*' requestid's can be queued they cannot be subject to find
719 //
720  if (!strcmp("*", reqid)) return false;
721 
722 // Handle locking
723 //
724  if (!locked) gpiMutex.Lock();
725 
726 // Find the element
727 //
728  rPP = 0;
729  rP = PrepRequest::First;
730  while(rP && strcmp(reqid, rP->reqID)) {rPP = rP; rP = rP->next;}
731 
732 // Check if we found the element and if we must delete it
733 //
734  if (rP && del)
735  {if (rPP) rPP->next = rP->next;
736  else PrepRequest::First = rP->next;
737  if (rP == PrepRequest::Last) PrepRequest::Last = rPP;
738  delete rP;
739  }
740 
741 // Return result
742 //
743  if (!locked) gpiMutex.UnLock();
744  return rP != 0;
745 }
746 }
747 
748 /******************************************************************************/
749 /* Private: P r e p G P I : : R e t E r r */
750 /******************************************************************************/
751 
752 namespace XrdOfsPrepGPIReal
753 {
754 int PrepGPI::RetErr(XrdOucErrInfo &eInfo, int rc, const char *txt1,
755  const char *txt2)
756 {
757  int bL;
758  char *bP = eInfo.getMsgBuff(bL);
759 
760 // Format messages
761 //
762  snprintf(bP, bL, "Unable to %s %s; %s", txt1, txt2, XrdSysE2T(rc));
763  eInfo.setErrCode(bL);
764  return SFS_ERROR;
765 }
766 }
767 
768 /******************************************************************************/
769 /* Private: P r e p G P I : : X e q */
770 /******************************************************************************/
771 
772 namespace XrdOfsPrepGPIReal
773 {
774 int PrepGPI::Xeq(PrepRequest *rP)
775 {
776  EPNAME("Xeq");
777  PrepGRun *grP;
778  const char *tid = rP->tID, *reqName = rP->reqName;
779  char reqID[64];
780 
781 // If we are debugging we need to copy some stuff before it escapes
782 //
783  if (Debug) snprintf(reqID, sizeof(reqID), "%s", rP->reqID);
784  else *reqID = 0;
785 
786 // Run or queue this request
787 //
788  gpiMutex.Lock();
789  if ((grP = PrepGRun::Q))
791  grP->Sched(rP);
792  } else {
794  else PrepRequest::First = rP;
795  PrepRequest::Last = rP;
796  }
797  gpiMutex.UnLock();
798 
799 // Do some debugging
800 //
801  DEBUG(tid, reqName<<" request "<<reqID<<(grP ? " scheduled" : " queued"));
802 
803 // All Done
804 //
805  return SFS_OK;
806 }
807 }
808 
809 /******************************************************************************/
810 /* X r d O f s g e t P r e p a r e */
811 /******************************************************************************/
812 
813 // Parameters: -admit <reqlist> [-cgi] [-maxfiles <n> [-maxreq <n>]
814 // [-maxquery <n>] [-maxresp <sz>] [-pfn] -run <pgm>
815 //
816 // <request>: cancel | evict | prep | query | stage
817 // <reqlist>: <request>[,<request>]
818 
819 extern "C"
820 {
822 {
823  XrdOucGatherConf gpiConf("prepgpi.parms", eDest);
824  XrdOucString RunPgm, Token;
825  char *tokP;
826  int maxReq = 4;
827 
828 // Save some of the arguments that we may need later
829 //
830  eLog = eDest;
831  ossP = theOss;
832  schedP = (XrdScheduler *)(envP->GetPtr("XrdScheduler*"));
833 
834 // If parameters specified on the preplib directive, use them. Otherwise,
835 // get them from the config file.
836 //
837  if (!gpiConf.useData(parms)
838  && gpiConf.Gather(confg, XrdOucGatherConf::only_body) < 0) return 0;
839 
840 // Verify we actually have parameters (there is only one line of them).
841 //
842  if (!(tokP = gpiConf.GetLine()) || !*tokP)
843  {eLog->Emsg("PrepGPI", "Parameters not specified.");
844  return 0;
845  }
846 
847 // Parse the parameters, they are space delimited
848 //
849  while((tokP = gpiConf.GetToken()))
850  {Token = tokP;
851  if (Token == "-admit")
852  {if (!(tokP = gpiConf.GetToken()) || *tokP == '-')
853  {eLog->Emsg("PrepGPI", "-admit argument not specified.");
854  return 0;
855  }
856  XrdOucString Args(tokP);
857  int argPos = 0;
858  bool argOK = false;
859  while((argPos = Args.tokenize(Token, argPos, ',')) != -1)
860  { if (Token == "cancel") okReq |= okCancel;
861  else if (Token == "evict") okReq |= okEvict;
862  else if (Token == "prep") okReq |= okPrep;
863  else if (Token == "query") okReq |= okQuery;
864  else if (Token == "stage") okReq |= okStage;
865  else if (Token == "all") okReq |= okAll;
866  else {eLog->Emsg("PrepGPI", "Invalid -admit request -",
867  Token.c_str());
868  return 0;
869  }
870  argOK = true;
871  }
872  if (!argOK)
873  {eLog->Emsg("PrepGPI", "invalid -admit request list");
874  return 0;
875  }
876  }
877  else if (Token == "-cgi") addCGI= true;
878  else if (Token == "-debug") Debug = true;
879  else if (Token == "-maxfiles")
880  {if (!(tokP = gpiConf.GetToken()) || *tokP == '-')
881  {eLog->Emsg("PrepGPI", "-maxfiles argument not specified.");
882  return 0;
883  }
884  if (XrdOuca2x::a2i(*eLog, "PrepPGI -maxfiles", tokP,
885  &maxFiles, 1, 1024)) return 0;
886  }
887  else if (Token == "-maxquery")
888  {if (!(tokP = gpiConf.GetToken()) || *tokP == '-')
889  {eLog->Emsg("PrepGPI", "-maxquery argument not specified.");
890  return 0;
891  }
892  if (XrdOuca2x::a2i(*eLog, "PrepPGI -maxquery", tokP,
893  &qryAllow, 1, 64)) return 0;
894  }
895  else if (Token == "-maxreq")
896  {if (!(tokP = gpiConf.GetToken()) || *tokP == '-')
897  {eLog->Emsg("PrepGPI", "-maxreq argument not specified.");
898  return 0;
899  }
900  if (XrdOuca2x::a2i(*eLog, "PrepPGI -maxreq", tokP,
901  &maxReq, 1, 64)) return 0;
902  }
903  else if (Token == "-maxresp")
904  {if (!(tokP = gpiConf.GetToken()) || *tokP == '-')
905  {eLog->Emsg("PrepGPI", "-maxresp argument not specified.");
906  return 0;
907  }
908  long long rspsz;
909  if (XrdOuca2x::a2sz(*eLog, "PrepPGI -maxresp", tokP,
910  &rspsz, 2048, 16777216)) return 0;
911  maxResp = static_cast<int>(rspsz);
912  }
913  else if (Token == "-pfn") usePFN = true;
914  else if (Token == "-run")
915  {if (!(tokP = gpiConf.GetToken()) || *tokP == '-')
916  {eLog->Emsg("PrepGPI", "-run argument not specified.");
917  return 0;
918  }
919  RunPgm = tokP;
920  }
921  else {eLog->Emsg("PrepGPI", "Invalid option -", Token.c_str());
922  return 0;
923  }
924  }
925 
926 // Make sure at least one request was enabled
927 //
928  if (!(okReq & okAll))
929  {eLog->Emsg("PrepGPI", "'-admit' was not specified.");
930  return 0;
931  }
932 
933 // Make sure the prepare program was specified
934 //
935  if (!RunPgm.length())
936  {eLog->Emsg("PrepGPI", "prepare program not specified.");
937  return 0;
938  }
939 
940 // Create a buffer pool for query responses if we need to
941 //
942  if (maxResp > (int)XrdOucEI::Max_Error_Len)
944 
945 // Set final debug flags
946 //
947  if (!Debug) Debug = getenv("XRDDEBUG") != 0;
949 
950 // Obtain an instance of the program object for this command. Note that
951 // all grun object will share this program as it's thread safe in the
952 // context in which we will use it (i.e. read/only).
953 //
954  pgmObj = new XrdOucProg(eLog, 0); // EFD????
955  if (pgmObj->Setup(RunPgm.c_str()))
956  {delete pgmObj;
957  eLog->Emsg("PrepGPI", "Unable to use prepare program", RunPgm.c_str());
958  return 0;
959  }
960 
961 // Create as many run object as we need
962 //
963  PrepGRun *gRun;
964  while(maxReq--)
965  {gRun = new PrepGRun(*pgmObj);
966  gRun->next = PrepGRun::Q;
967  PrepGRun::Q = gRun;
968  }
969 
970 // Create one additional such object for queries to pass to the plugin
971 //
972  gRun = new PrepGRun(*pgmObj);
973 
974 // Return an instance of the prepare plugin
975 //
976  return new PrepGPI(*gRun);
977 }
978 }
static XrdSysError eDest(0,"crypto_")
XrdOfsPrepare * XrdOfsgetPrepare(XrdOfsgetPrepareArguments)
#define DEBUG(usr, x)
#define EPNAME(x)
XrdVERSIONINFO(XrdOfsgetPrepare, PrepGPI)
#define XrdOfsgetPrepareArguments
#define Prep_EVICT
#define Prep_FRESH
#define SFS_DATA
#define Prep_SENDERR
char * notify
Notification path or 0.
XrdOucTList * paths
List of paths.
XrdOucTList * oinfo
1-to-1 correspondence of opaque info
#define SFS_ERROR
#define Prep_WMODE
#define Prep_COLOC
#define Prep_STAGE
char * reqid
Request ID.
#define Prep_PMASK
#define SFS_OK
int opts
Prep_xxx.
< Prepare parameters
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
Definition: XrdJob.hh:43
int query(XrdSfsPrep &pargs, XrdOucErrInfo &eInfo, const XrdSecEntity *client=0) override
int begin(XrdSfsPrep &pargs, XrdOucErrInfo &eInfo, const XrdSecEntity *client=0) override
int cancel(XrdSfsPrep &pargs, XrdOucErrInfo &eInfo, const XrdSecEntity *client=0) override
int Run(PrepRequest &req, char *bP=0, int bL=0)
void Sched(PrepRequest *rP)
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873
XrdOucBuffer * Alloc(int sz)
char * Buffer() const
void Recycle()
Recycle the buffer. The buffer may be reused in the future.
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
int setErrInfo(int code, const char *emsg)
char * getMsgBuff(int &mblen)
int setErrCode(int code)
char * GetToken(char **rest=0, int lowcase=0)
int Gather(const char *cfname, Level lvl, const char *parms=0)
bool useData(const char *data)
@ only_body
Only directive bodies as a string blob.
int RunDone(XrdOucStream &cmd) const
Definition: XrdOucProg.cc:257
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
Definition: XrdOucProg.cc:108
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
Definition: XrdOucProg.cc:296
char * GetLine()
const char * c_str() const
int length() const
int tokenize(XrdOucString &tok, int from, char del=':')
XrdOucTList * next
Definition: XrdOucTList.hh:45
char * text
Definition: XrdOucTList.hh:46
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
void Schedule(XrdJob *jp)
const char * tident
Trace identifier always preset.
Definition: XrdSecEntity.hh:81
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:141
void SetLogger(XrdSysLogger *logp)
Definition: XrdSysTrace.cc:65
static const int qryMaxWT
XrdOucBuffPool * bPool
XrdSysTrace SysTrace("PrepGPI")
static const int okCancel
XrdSysMutex gpiMutex
XrdOucProg * pgmObj
XrdSysError * eLog
static const int okEvict
XrdScheduler * schedP
static const int okQuery
static const int okStage
static const int okAll
XrdSysCondVar qryCond(0, "prepG query")
static const int okPrep
XrdOucEnv * envP
Definition: XrdPss.cc:109
std::vector< std::string > argMem
static PrepRequest * First
const char * Info(char *bP, int bL)
static PrepRequest * Last
std::vector< std::string > envMem
static const size_t Max_Error_Len