XRootD
XrdCmsPrepare.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d C m s P r e p a r e . c c */
4 /* */
5 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <fcntl.h>
32 #include <cstdlib>
33 #include <unistd.h>
34 #include <sys/types.h>
35 #include <sys/stat.h>
36 
37 #include "XrdCms/XrdCmsConfig.hh"
38 #include "XrdCms/XrdCmsPrepare.hh"
39 #include "XrdCms/XrdCmsTrace.hh"
40 #include "XrdFrc/XrdFrcProxy.hh"
41 #include "XrdNet/XrdNetMsg.hh"
42 #include "XrdOss/XrdOss.hh"
43 #include "XrdOuc/XrdOucEnv.hh"
44 #include "XrdOuc/XrdOucMsubs.hh"
45 #include "XrdOuc/XrdOucTList.hh"
46 #include "XrdSys/XrdSysError.hh"
47 
48 using namespace XrdCms;
49 
50 /******************************************************************************/
51 /* S t a t i c O b j e c t s */
52 /******************************************************************************/
53 
55 
56 /******************************************************************************/
57 /* G l o b a l s & E x t e r n a l F u n c t i o n s */
58 /******************************************************************************/
59 
60 // This function is applied to all prepare queue entries. It checks if the file
61 // in online and if so, returns a -1 to delete the entry from the queue. O/W
62 // it returns a zero which keeps the entry in the queue. The key is the LFN.
63 //
64 int XrdCmsScrubScan(const char *key, char *cip, void *xargp)
65 {
66  struct stat buf;
67 
68 // Use oss interface to determine whether the file exists or not
69 //
70  return (Config.ossFS->Stat(key, &buf, XRDOSS_resonly) ? 0 : -1);
71 }
72 
73 /******************************************************************************/
74 /* C o n s t r u c t o r */
75 /******************************************************************************/
76 
77 XrdCmsPrepare::XrdCmsPrepare() : XrdJob("Prep cache scrubber"),
78  prepSched(&Say)
79 {prepif = 0;
80  preppid = 0;
81  resetcnt = scrub2rst = 3;
82  scrubtime= 20*60;
83  NumFiles = 0;
84  lastemsg = time(0);
85  Relay = 0; // This will be initialized via Init()!
86  PrepFrm = 0;
87  prepOK = 0;
88  N2N = 0;
89 }
90 
91 /******************************************************************************/
92 /* A d d */
93 /******************************************************************************/
94 
96 {
97  char *pdata[XrdOucMsubs::maxElem+2], prtybuff[8], *pP=prtybuff;
98  int rc, pdlen[XrdOucMsubs::maxElem + 2];
99 
100 // Check if we are using the built-in mechanism
101 //
102  if (PrepFrm)
103  {rc = PrepFrm->Add('+',pargs.path, pargs.opaque,pargs.Ident,pargs.reqid,
104  pargs.notify,pargs.mode,atoi(pargs.prty));
105  if (rc) Say.Emsg("Add", rc, "prepare", pargs.path);
106  else {PTMutex.Lock();
107  if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
108  PTMutex.UnLock();
109  }
110  return rc == 0;
111  }
112 
113 // Restart the scheduler if need be
114 //
115  PTMutex.Lock();
116  if (!prepif || !prepSched.isAlive())
117  {Say.Emsg("Add","No prepare manager; prepare",pargs.reqid,"ignored.");
118  PTMutex.UnLock();
119  return 0;
120  }
121 
122 // Write out the header line
123 //
124  if (!prepMsg)
125  {*pP++ = pargs.prty[0]; *pP = '\0';
126  pdata[0] = (char *)"+ "; pdlen[0] = 2;
127  pdata[1] = pargs.reqid; pdlen[1] = strlen(pargs.reqid);
128  pdata[2] = (char *)" "; pdlen[2] = 1;
129  pdata[3] = pargs.notify; pdlen[3] = strlen(pargs.notify);
130  pdata[4] = (char *)" "; pdlen[4] = 1;
131  pdata[5] = prtybuff; pdlen[5] = strlen(prtybuff);
132  pdata[6] = (char *)" "; pdlen[6] = 1;
133  pdata[7] = pargs.mode; pdlen[7] = strlen(pargs.mode);
134  pdata[8] = (char *)" "; pdlen[8] = 1;
135  pdata[9] = pargs.path; pdlen[9] = strlen(pargs.path);
136  pdata[10] = (char *)"\n"; pdlen[10] = 1;
137  pdata[11]= 0; pdlen[11]= 0;
138  if (!(rc = prepSched.Put((const char **)pdata, (const int *)pdlen)))
139  if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
140  } else {
141  int Oflag = (index(pargs.mode, (int)'w') ? O_RDWR : 0);
142  mode_t Prty = atoi(pargs.prty);
143  XrdOucEnv Env(pargs.opaque);
144  XrdOucMsubsInfo Info(pargs.Ident, &Env, N2N, pargs.path,
145  pargs.notify, Prty, Oflag, pargs.mode, pargs.reqid);
146  int k = prepMsg->Subs(Info, pdata, pdlen);
147  pdata[k] = (char *)"\n"; pdlen[k++] = 1;
148  pdata[k] = 0; pdlen[k] = 0;
149  if (!(rc = prepSched.Put((const char **)pdata, (const int *)pdlen)))
150  if (!PTable.Add(pargs.path, 0, 0, Hash_data_is_key)) NumFiles++;
151  }
152 
153 // All done
154 //
155  PTMutex.UnLock();
156  return rc == 0;
157 }
158 
159 /******************************************************************************/
160 /* D e l */
161 /******************************************************************************/
162 
163 int XrdCmsPrepare::Del(char *reqid)
164 {
165  char *pdata[4];
166  int rc, pdlen[4];
167 
168 // Use our built-in mechanism if so wanted
169 //
170  if (PrepFrm)
171  {if ((rc = PrepFrm->Del('-', reqid)))
172  Say.Emsg("Del", rc, "unprepare", reqid);
173  return rc == 0;
174  }
175 
176 // Restart the scheduler if need be
177 //
178  PTMutex.Lock();
179  if (!prepif || !prepSched.isAlive())
180  {Say.Emsg("Del","No prepare manager; unprepare",reqid,"ignored.");
181  PTMutex.UnLock();
182  return 0;
183  }
184 
185 // Write out the delete request
186 //
187  pdata[0] = (char *)"- ";
188  pdlen[0] = 2;
189  pdata[1] = reqid;
190  pdlen[1] = strlen(reqid);
191  pdata[2] = (char *)"\n";
192  pdlen[2] = 1;
193  pdata[3] = (char *)0;
194  pdlen[3] = 0;
195  rc = prepSched.Put((const char **)pdata, (const int *)pdlen);
196  PTMutex.UnLock();
197  return rc == 0;
198 }
199 
200 /******************************************************************************/
201 /* D o I t */
202 /******************************************************************************/
203 
205 {
206 // Simply scrub the cache
207 //
208  Scrub();
209  Sched->Schedule((XrdJob *)this,scrubtime+time(0));
210 }
211 
212 /******************************************************************************/
213 /* E x i s t s */
214 /******************************************************************************/
215 
216 int XrdCmsPrepare::Exists(char *path)
217 {
218  int Found;
219 
220 // Lock the hash table
221 //
222  PTMutex.Lock();
223 
224 // Look up the entry
225 //
226  Found = (NumFiles ? PTable.Find(path) != 0 : 0);
227 
228 // All done
229 //
230  PTMutex.UnLock();
231  return Found;
232 }
233 
234 /******************************************************************************/
235 /* G o n e */
236 /******************************************************************************/
237 
238 void XrdCmsPrepare::Gone(char *path)
239 {
240 
241 // Lock the hash table
242 //
243  PTMutex.Lock();
244 
245 // Delete the entry
246 //
247  if (NumFiles > 0 && PTable.Del(path) == 0) NumFiles--;
248 
249 // All done
250 //
251  PTMutex.UnLock();
252 }
253 
254 /******************************************************************************/
255 /* I n f o r m */
256 /******************************************************************************/
257 
258 void XrdCmsPrepare::Inform(const char *cmd, XrdCmsPrepArgs *pargs)
259 {
260  EPNAME("Inform")
261  struct iovec Msg[8];
262  char *mdest, *minfo;
263 
264 // See if requestor wants a response
265 //
266  if (!index(pargs->mode, (int)'n')
267  || strncmp("udp://", pargs->notify, 6)
268  || !Relay)
269  {DEBUG(pargs->Ident <<' ' <<cmd <<' ' <<pargs->reqid <<" not sent to "
270  <<pargs->notify);
271  return;
272  }
273 
274 // Extract out destination and argument
275 //
276  mdest = pargs->notify+6;
277  if ((minfo = index(mdest, (int)'/')))
278  {*minfo = '\0'; minfo++;}
279  if (!minfo || !*minfo) minfo = (char *)"*";
280  DEBUG("Sending " <<mdest <<": " <<cmd <<' '<<pargs->reqid <<' ' <<minfo);
281 
282 // Create message to be sent
283 //
284  Msg[0].iov_base = (char *)cmd; Msg[0].iov_len = strlen(cmd);
285  Msg[1].iov_base = (char *)" "; Msg[1].iov_len = 1;
286  Msg[2].iov_base = pargs->reqid; Msg[2].iov_len = strlen(pargs->reqid);
287  Msg[3].iov_base = (char *)" "; Msg[3].iov_len = 1;
288  Msg[4].iov_base = minfo; Msg[4].iov_len = strlen(minfo);
289  Msg[5].iov_base = (char *)" "; Msg[5].iov_len = 1;
290  Msg[6].iov_base = pargs->path; Msg[6].iov_len = (pargs->pathlen)-1;
291  Msg[7].iov_base = (char *)"\n"; Msg[7].iov_len = 1;
292 
293 // Send the message and return
294 //
295  Relay->Send(Msg, 8, mdest);
296 }
297 
298 /******************************************************************************/
299 /* I n i t */
300 /******************************************************************************/
301 
303 {
304 // Obtain a msg object. We need to do it outside of a global constructor!
305 //
306  Relay = new XrdNetMsg(&Say);
307 }
308 
309 /******************************************************************************/
310 /* P r e p a r e */
311 /******************************************************************************/
312 
314 {
315  EPNAME("Prepare");
316  int rc;
317 
318 // Check if this file is not online, prepare it
319 //
320  if (!(rc = isOnline(pargs->path)))
321  {DEBUG("Preparing " <<pargs->reqid <<' ' <<pargs->notify <<' '
322  <<pargs->prty <<' ' <<pargs->mode <<' ' <<pargs->path);
323  if (!Config.DiskSS) Say.Emsg("Prepare","staging disallowed; ignoring prep",
324  pargs->Ident, pargs->reqid);
325  else Add(*pargs);
326  return;
327  }
328 
329 // If the file is really online, inform the requestor
330 //
331  if (rc > 0) Inform("avail", pargs);
332 }
333 
334 /******************************************************************************/
335 /* R e s e t */
336 /******************************************************************************/
337 
338 void XrdCmsPrepare::Reset(const char *iName, const char *aPath, int aMode)
339 {
340  EPNAME("Reset");
341  char baseAP[1024], *Slash;
342 
343 // This is a call from the configurator. No need to do anything if we have
344 // no interface to initialize.
345 //
346  if (!prepif) return;
347 
348 // If this is a built-in mechanism, then allocate the prepare interface
349 // and initialize it. This is a one-time thing and it better work right away.
350 // In any case, do a standard reset.
351 //
352  if (!*prepif)
353  {PrepFrm = new XrdFrcProxy(Say.logger(), iName);
354  DEBUG("Initializing internal FRM prepare interface.");
355  strcpy(baseAP, aPath); baseAP[strlen(baseAP)-1] = '\0';
356  if ((Slash = rindex(baseAP, '/'))) *Slash = '\0';
357  if (!(prepOK = PrepFrm->Init(XrdFrcProxy::opStg, baseAP, aMode)))
358  {Say.Emsg("Reset", "Built-in prepare init failed; prepare disabled.");
359  return;
360  }
361  }
362 
363 // Reset the interface and schedule a scrub
364 //
365  Reset();
366  if (scrubtime) Sched->Schedule((XrdJob *)this,scrubtime+time(0));
367 
368 }
369 
370 /******************************************************************************/
371 /* s e t P a r m s */
372 /******************************************************************************/
373 
374 int XrdCmsPrepare::setParms(int rcnt, int stime, int deco)
375 {if (rcnt > 0) resetcnt = scrub2rst = rcnt;
376  if (stime > 0) scrubtime = stime;
377  doEcho = deco;
378  return 0;
379 }
380 
381 int XrdCmsPrepare::setParms(const char *ifpgm, char *ifmsg)
382 {if (ifpgm)
383  {const char *Slash = rindex(ifpgm, '/');
384  if (prepif) free(prepif);
385  if (Slash && !strcmp(Slash+1, "frm_xfragent")) ifpgm = "";
386  prepif = strdup(ifpgm);
387  }
388  if (ifmsg)
389  {if (prepMsg) delete prepMsg;
390  prepMsg = new XrdOucMsubs(&Say);
391  if (!(prepMsg->Parse("prepmsg", ifmsg)))
392  {delete prepMsg; prepMsg = 0; return 1;}
393  }
394  return 0;
395 }
396 
397 /******************************************************************************/
398 /* P r i v a t e M e t h o d s */
399 /******************************************************************************/
400 /******************************************************************************/
401 /* i s O n l i n e */
402 /******************************************************************************/
403 
404 int XrdCmsPrepare::isOnline(char *path)
405 {
406  static const int Sopts = XRDOSS_resonly | XRDOSS_updtatm;
407  struct stat buf;
408 
409 // Issue the stat() via oss plugin. If it indicates the file is not there is
410 // still might be logically here because it's in a staging queue.
411 //
412  if (Config.ossFS->Stat(path, &buf, Sopts))
413  {if (Config.DiskSS && Exists(path)) return -1;
414  else return 0;
415  }
416  return 1;
417 }
418 
419 /******************************************************************************/
420 /* R e s e t */
421 /******************************************************************************/
422 
423 void XrdCmsPrepare::Reset() // Must be called with PTMutex locked!
424 {
425  char *lp, *pdata[] = {(char *)"?\n", 0};
426  int pdlen[] = {2, 0};
427 
428 // Hanlde via built-in mechanism
429 //
430  if (PrepFrm)
432  char Buff[1024];
433  if (prepOK)
434  {PTable.Purge(); NumFiles = 0;
435  while(PrepFrm->List(State, Buff, sizeof(Buff)))
436  {PTable.Add(Buff, 0, 0, Hash_data_is_key); NumFiles++;
437  if (doEcho) Say.Emsg("Reset","Prepare pending for",Buff);
438  }
439  }
440  return;
441  }
442 
443 // Check if we really have an interface to reset
444 //
445  if (!prepif)
446  {Say.Emsg("Reset", "Prepare program not specified; prepare disabled.");
447  return;
448  }
449 
450 // Do it the slow external way
451 //
452  if (!prepSched.isAlive() && !startIF()) return;
453  if (prepSched.Put((const char **)pdata, (const int *)pdlen))
454  {Say.Emsg("Prepare", prepSched.LastError(), "write to", prepif);
455  prepSched.Drain(); prepOK = 0;
456  }
457  else {PTable.Purge(); NumFiles = 0;
458  while((lp = prepSched.GetLine()) && *lp)
459  {PTable.Add(lp, 0, 0, Hash_data_is_key); NumFiles++;
460  if (doEcho) Say.Emsg("Reset","Prepare pending for",lp);
461  }
462  }
463 }
464 
465 /******************************************************************************/
466 /* S c r u b */
467 /******************************************************************************/
468 
469 void XrdCmsPrepare::Scrub()
470 {
471  PTMutex.Lock();
472  if (scrub2rst <= 0)
473  {Reset();
474  scrub2rst = resetcnt;
475  }
476  else {PTable.Apply(XrdCmsScrubScan, (void *)0);
477  scrub2rst--;
478  }
479  if (!PrepFrm && !prepSched.isAlive()) startIF();
480  PTMutex.UnLock();
481 }
482 
483 /******************************************************************************/
484 /* s t a r t I F */
485 /******************************************************************************/
486 
487 int XrdCmsPrepare::startIF() // Must be called with PTMutex locked!
488 {
489  EPNAME("startIF")
490 
491 // If we are using a local interface then there is nothing to start.
492 //
493  if (PrepFrm) return prepOK;
494 
495 // Complain if there is no external prepare program
496 //
497  if (!prepif)
498  {Say.Emsg("startIF","Prepare program not specified; prepare disabled.");
499  return (prepOK = 0);
500  }
501 
502 // Setup the external program
503 //
504  DEBUG("Prepare: Starting " <<prepif);
505  if (prepSched.Exec(prepif, 1))
506  {time_t eNow = time(0);
507  prepOK = 0;
508  if ((eNow - lastemsg) >= 60)
509  {lastemsg = eNow;
510  Say.Emsg("Prepare", prepSched.LastError(), "start", prepif);
511  }
512  } else prepOK = 1;
513 
514 // All done
515 //
516  return prepOK;
517 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
int XrdCmsScrubScan(const char *key, char *cip, void *xargp)
@ Info
#define XRDOSS_resonly
Definition: XrdOss.hh:486
#define XRDOSS_updtatm
Definition: XrdOss.hh:487
@ Hash_data_is_key
Definition: XrdOucHash.hh:52
int stat(const char *path, struct stat *buf)
if(Avsz)
XrdOss * ossFS
void Prepare(XrdCmsPrepArgs *pargs)
int setParms(int rcnt, int stime, int deco=0)
void Gone(char *path)
int Exists(char *path)
void Inform(const char *cmd, XrdCmsPrepArgs *pargs)
int Add(XrdCmsPrepArgs &pargs)
int Del(char *reqid)
void Reset(const char *iName, const char *aPath, int aMode)
int Add(char Opc, const char *Lfn, const char *Opq, const char *Usr, const char *Rid, const char *Nop, const char *Pop, int Prty=1)
Definition: XrdFrcProxy.cc:95
int Init(int opX, const char *aPath, int aMode, const char *qPath=0)
Definition: XrdFrcProxy.cc:229
int List(Queues &State, char *Buff, int Bsz)
Definition: XrdFrcProxy.cc:174
static const int opStg
Definition: XrdFrcProxy.hh:52
int Del(char Opc, const char *Rid)
Definition: XrdFrcProxy.cc:150
Definition: XrdJob.hh:43
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
Definition: XrdNetMsg.cc:70
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
int Del(const char *KeyVal, XrdOucHash_Options opt=Hash_default)
Definition: XrdOucHash.icc:136
void Purge()
Definition: XrdOucHash.icc:193
T * Apply(int(*func)(const char *, T *, void *), void *Arg)
Definition: XrdOucHash.icc:102
T * Add(const char *KeyVal, T *KeyData, const int LifeTime=0, XrdOucHash_Options opt=Hash_default)
Definition: XrdOucHash.icc:61
T * Find(const char *KeyVal, time_t *KeyTime=0)
Definition: XrdOucHash.icc:160
int Parse(const char *oname, char *msg)
Definition: XrdOucMsubs.cc:100
int Subs(XrdOucMsubsInfo &Info, char **Data, int *Dlen)
Definition: XrdOucMsubs.cc:146
static const int maxElem
Definition: XrdOucMsubs.hh:94
char * GetLine()
int Put(const char *data, const int dlen)
int Exec(const char *, int inrd=0, int efd=0)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:141
XrdSysError Say
XrdCmsPrepare PrepQ
XrdCmsConfig Config
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54