XRootD
XrdCmsFinder.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d C m s F i n d e r . c c */
4 /* */
5 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdlib>
32 #include <cstdio>
33 #include <cerrno>
34 #include <fcntl.h>
35 #include <limits.h>
36 #include <strings.h>
37 #include <ctime>
38 #include <unistd.h>
39 #include <sys/stat.h>
40 #include <sys/times.h>
41 #include <sys/types.h>
42 #include <sys/uio.h>
43 #include <sys/wait.h>
44 #include <netinet/in.h>
45 #include <cinttypes>
46 
47 #include "XrdVersion.hh"
48 
49 #include "XProtocol/YProtocol.hh"
50 
54 
55 #include "XrdCms/XrdCmsFinder.hh"
56 #include "XrdCms/XrdCmsParser.hh"
57 #include "XrdCms/XrdCmsResp.hh"
58 #include "XrdCms/XrdCmsRRData.hh"
59 #include "XrdCms/XrdCmsSecurity.hh"
60 #include "XrdCms/XrdCmsTrace.hh"
61 
62 #include "XrdOss/XrdOss.hh"
63 
64 #include "XrdOuc/XrdOucBuffer.hh"
65 #include "XrdOuc/XrdOucEnv.hh"
66 #include "XrdOuc/XrdOucErrInfo.hh"
67 #include "XrdOuc/XrdOucReqID.hh"
68 #include "XrdOuc/XrdOucStream.hh"
69 #include "XrdOuc/XrdOucUtils.hh"
70 #include "XrdNet/XrdNetOpts.hh"
71 #include "XrdNet/XrdNetSocket.hh"
73 
74 #include "XrdSys/XrdSysError.hh"
75 #include "XrdSys/XrdSysTimer.hh"
76 #include "XrdSys/XrdSysPlatform.hh"
77 #include "XrdSys/XrdSysPlugin.hh"
78 
79 using namespace XrdCms;
80 
81 class XrdInet;
82 
83 /******************************************************************************/
84 /* G l o b a l s */
85 /******************************************************************************/
86 
87 namespace XrdCms
88 {
89 XrdSysError Say(0, "cms_");
90 
91 XrdSysTrace Trace("cms");
92 
93 XrdVERSIONINFODEF(myVersion,cmsclient,XrdVNUMBER,XrdVERSION);
94 };
95 
96 /******************************************************************************/
97 /* R e m o t e F i n d e r */
98 /******************************************************************************/
99 /******************************************************************************/
100 /* C o n s t r u c t o r */
101 /******************************************************************************/
102 
104  : XrdCmsClient(XrdCmsClient::amRemote)
105 {
106  myManagers = 0;
107  myManCount = 0;
108  myManList = 0;
109  myPort = Port;
110  SMode = 0;
111  sendID = 0;
112  isMeta = whoami & IsMeta;
113  isProxy = whoami & IsProxy;
114  isTarget = whoami & IsTarget;
115  savePath = 0;
116  Say.logger(lp);
117  Trace.SetLogger(lp);
118 }
119 
120 /******************************************************************************/
121 /* D e s t r u c t o r */
122 /******************************************************************************/
123 
125 {
126  XrdCmsClientMan *mp, *nmp = myManagers;
127  XrdOucTList *tp, *tpp = myManList;
128 
129  while((mp = nmp)) {nmp = mp->nextManager(); delete mp;}
130 
131  while((tp = tpp)) {tpp = tp->next; delete tp;}
132 }
133 
134 /******************************************************************************/
135 /* C o n f i g u r e */
136 /******************************************************************************/
137 
138 int XrdCmsFinderRMT::Configure(const char *cfn, char *Args, XrdOucEnv *envP)
139 {
140  XrdCmsClientConfig config;
143  XrdInet *netP;
144  int Topts = IsRedir;
145 
146 // Establish what we will be configuring
147 //
148  if (isProxy)
149  {How = XrdCmsClientConfig::configProxy; Topts |= IsProxy;}
150  else if (isMeta) How = XrdCmsClientConfig::configMeta;
152  What = (isTarget ? XrdCmsClientConfig::configSuper
154 
155 // Establish the network interface that the caller must provide
156 //
157  if (!envP || !(netP = (XrdInet *)envP->GetPtr("XrdInet*")))
158  {Say.Emsg("Finder", "Network not defined; unable to connect to cmsd.");
159  return 0;
160  }
163  XrdCmsSecurity::setSecFunc(envP->GetPtr("XrdSecGetProtocol*"));
164 
165 // Now call the configration object
166 //
167  if (config.Configure(cfn, What, How)) return 0;
168 
169 // Set configured values and start the managers
170 //
171  CMSPath = config.CMSPath;
172  RepDelay = config.RepDelay;
173  RepNone = config.RepNone;
174  RepWait = config.RepWait;
175  ConWait = config.ConWait;
176  FwdWait = config.FwdWait;
177  PrepWait = config.PrepWait;
178  if (isProxy)
179  {SMode = config.SModeP;
180  StartManagers(config.PanList);
181  config.PanList = 0;
182  }
183  else {SMode = config.SMode;
184  StartManagers(config.ManList);
185  config.ManList = 0;
186  }
187 
188 // If we are tracing or if redirect monitoring is enabled, we will need
189 // to save path information.
190 //
191  if (QTRACE(Redirect) || getenv("XRDMONRDR")) savePath = 1;
192 
193 // If we are a plain manager but have a meta manager then we must start
194 // a responder (that we will hide) to pass through the port number.
195 //
196  if (!isMeta && !isTarget && config.haveMeta)
197  {XrdCmsFinderTRG *Rsp = new XrdCmsFinderTRG(Say.logger(),Topts,myPort);
198  return Rsp->RunAdmin(CMSPath, config.myVNID);
199  }
200 
201 // All done
202 //
203  return 1;
204 }
205 
206 /******************************************************************************/
207 /* F o r w a r d */
208 /******************************************************************************/
209 
210 int XrdCmsFinderRMT::Forward(XrdOucErrInfo &Resp, const char *cmd,
211  const char *arg1, const char *arg2,
212  XrdOucEnv *Env1, XrdOucEnv *Env2)
213 {
214  static XrdSysMutex fwdMutex;
215  static struct timeval fwdClk = {time(0),0};
216  static const int xNum = 12;
217 
218  XrdCmsClientMan *Manp;
219  XrdCmsRRData Data;
220  unsigned int iMan;
221  int iovcnt, is2way, doAll = 0, opQ1Len = 0, opQ2Len = 0;
222  char Work[xNum*12];
223  struct iovec xmsg[xNum];
224 
225 // Encode the request as a redirector command
226 //
227  if ((is2way = (*cmd == '+'))) cmd++;
228 
229  if (!strcmp("chmod", cmd)) Data.Request.rrCode = kYR_chmod;
230  else if (!strcmp("mkdir", cmd)) Data.Request.rrCode = kYR_mkdir;
231  else if (!strcmp("mkpath",cmd)) Data.Request.rrCode = kYR_mkpath;
232  else if (!strcmp("mv", cmd)){Data.Request.rrCode = kYR_mv; doAll=1;}
233  else if (!strcmp("rm", cmd)){Data.Request.rrCode = kYR_rm; doAll=1;}
234  else if (!strcmp("rmdir", cmd)){Data.Request.rrCode = kYR_rmdir; doAll=1;}
235  else if (!strcmp("trunc", cmd)) Data.Request.rrCode = kYR_trunc;
236  else {Say.Emsg("Finder", "Unable to forward '", cmd, "'.");
237  Resp.setErrInfo(EINVAL, "Internal error processing file.");
238  return SFS_ERROR;
239  }
240 
241 // Fill out the RR data structure
242 //
243  Data.Ident = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : "");
244  Data.Path = (char *)arg1;
245  Data.Mode = (char *)arg2;
246  Data.Path2 = (char *)arg2;
247  Data.Opaque = (Env1 ? Env1->EnvTidy(opQ1Len) : 0);
248  Data.Opaque2 = (Env2 ? Env2->EnvTidy(opQ2Len) : 0);
249 
250 // Pack the arguments
251 //
252  if (!(iovcnt = Parser.Pack(int(Data.Request.rrCode), &xmsg[1], &xmsg[xNum],
253  (char *)&Data, Work)))
254  {Resp.setErrInfo(EINVAL, "Internal error processing file.");
255  return SFS_ERROR;
256  }
257 
258 // Insert the header into the stream
259 //
260  Data.Request.streamid = 0;
261  Data.Request.modifier = 0;
262  xmsg[0].iov_base = (char *)&Data.Request;
263  xmsg[0].iov_len = sizeof(Data.Request);
264 
265 // This may be a 2way message. If so, use the longer path.
266 //
267  if (is2way) return send2Man(Resp, (arg1 ? arg1 : "/"), xmsg, iovcnt+1);
268 
269 // Check if we have exceeded the maximum rate for requests. If we have, we
270 // will wait to repace the stream so as to not burden the cmsd.
271 //
272  if (doAll && FwdWait)
273  {struct timeval nowClk;
274  time_t Window;
275  fwdMutex.Lock();
276  gettimeofday(&nowClk, 0);
277  fwdClk.tv_sec = nowClk.tv_sec - fwdClk.tv_sec;
278  fwdClk.tv_usec = nowClk.tv_usec - fwdClk.tv_usec;
279  if (fwdClk.tv_usec < 0) {fwdClk.tv_sec--; fwdClk.tv_usec += 1000000;}
280  Window = fwdClk.tv_sec*1000 + fwdClk.tv_usec/1000;
281  if (Window < FwdWait) XrdSysTimer::Wait(FwdWait - Window);
282  fwdClk = nowClk;
283  fwdMutex.UnLock();
284  }
285 
286 // Select the right manager for this request
287 //
288  if (!(Manp = SelectManager(Resp, (arg1 ? arg1 : "/")))) return ConWait;
289 
290 // Send message and simply wait for the reply
291 //
292  if (Manp->Send(iMan, xmsg, iovcnt+1))
293  {if (doAll)
294  {Data.Request.modifier |= kYR_dnf;
295  Inform(Manp, xmsg, iovcnt+1);
296  }
297  return 0;
298  }
299 
300 // Indicate client should retry later
301 //
302  Resp.setErrInfo(RepDelay, "");
303  return RepDelay;
304 }
305 
306 /******************************************************************************/
307 /* I n f o r m */
308 /******************************************************************************/
309 
310 void XrdCmsFinderRMT::Inform(XrdCmsClientMan *xman,
311  struct iovec xmsg[], int xnum)
312 {
313  XrdCmsClientMan *Womp, *Manp;
314  unsigned int iMan;
315 
316 // Make sure we are configured
317 //
318  if (!myManagers)
319  {Say.Emsg("Finder", "SelectManager() called prior to Configure().");
320  return;
321  }
322 
323 // Start at the beginning (we will avoid the previously selected one)
324 //
325  Womp = Manp = myManagers;
326  do {if (Manp != xman && Manp->isActive()) Manp->Send(iMan, xmsg, xnum);
327  } while((Manp = Manp->nextManager()) != Womp);
328 }
329 
330 /******************************************************************************/
331 /* L o c a t e */
332 /******************************************************************************/
333 
334 int XrdCmsFinderRMT::Locate(XrdOucErrInfo &Resp, const char *path, int flags,
335  XrdOucEnv *Env)
336 {
337  static const int xNum = 12;
338 
339  XrdCmsRRData Data;
340  int n, iovcnt;
341  char Work[xNum*12];
342  struct iovec xmsg[xNum];
343  char *triedRC, *affmode;
344 
345 // Fill out the RR data structure
346 //
347  Data.Ident = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : "");
348  Data.Path = (char *)path;
349  Data.Opaque = (Env ? Env->EnvTidy(n) : 0);
350  Data.Avoid = (Env ? Env->Get("tried") : 0);
351 
352 // Set options and command
353 //
354  if (flags & SFS_O_LOCATE)
355  {bool doAll = (flags & SFS_O_FORCE) != 0;
356  if (flags & SFS_O_LOCAL) return LocLocal(Resp, Env);
357  Data.Request.rrCode = kYR_locate;
358  Data.Opts = (flags & SFS_O_NOWAIT ? CmsLocateRequest::kYR_asap : 0)
359  | (flags & SFS_O_RESET ? CmsLocateRequest::kYR_refresh : 0);
360  if (Resp.getUCap() & XrdOucEI::uPrip)
361  Data.Opts |= CmsLocateRequest::kYR_prvtnet;
362  if (Resp.getUCap() & XrdOucEI::uIPv4)
363  {Data.Opts |= (Resp.getUCap() & XrdOucEI::uIPv64 || doAll
364  ? CmsLocateRequest::kYR_retipv46 : 0);
365  } else {
366  Data.Opts |= (Resp.getUCap() & XrdOucEI::uIPv64 || doAll
367  ? CmsLocateRequest::kYR_retipv64 :
368  CmsLocateRequest::kYR_retipv6);
369  }
370  if (flags & SFS_O_HNAME) Data.Opts |= CmsLocateRequest::kYR_retname;
371  if (flags & SFS_O_RAWIO) Data.Opts |= CmsLocateRequest::kYR_retuniq;
372  if (doAll) Data.Opts |= CmsLocateRequest::kYR_listall;
373  } else
374  { Data.Request.rrCode = kYR_select;
375  if (flags & SFS_O_TRUNC) Data.Opts = CmsSelectRequest::kYR_trunc;
376  else if (flags & SFS_O_CREAT)
377  { Data.Opts = CmsSelectRequest::kYR_create;
378  if (flags & SFS_O_REPLICA)
379  Data.Opts|= CmsSelectRequest::kYR_replica;
380  }
381  else if (flags & SFS_O_STAT) Data.Opts = CmsSelectRequest::kYR_stat;
382  else Data.Opts = 0;
383 
384  Data.Opts |= (flags & (SFS_O_WRONLY | SFS_O_RDWR)
385  ? CmsSelectRequest::kYR_write : CmsSelectRequest::kYR_read);
386 
387  if (flags & SFS_O_META) Data.Opts |= CmsSelectRequest::kYR_metaop;
388 
389  if (flags & SFS_O_NOWAIT) Data.Opts |= CmsSelectRequest::kYR_online;
390 
391  if (flags & SFS_O_RESET) Data.Opts |= CmsSelectRequest::kYR_refresh;
392 
393  if (flags & SFS_O_MULTIW) Data.Opts |= CmsSelectRequest::kYR_mwfiles;
394 
395  if (Env && (affmode = Env->Get("cms.aff")))
396  { if (*affmode == 'n') Data.Opts |= CmsSelectRequest::kYR_aNone;
397  else if (*affmode == 'S') Data.Opts |= CmsSelectRequest::kYR_aStrict;
398  else if (*affmode == 's') Data.Opts |= CmsSelectRequest::kYR_aStrong;
399  else if (*affmode == 'w') Data.Opts |= CmsSelectRequest::kYR_aWeak;
400  }
401 
402  if (Resp.getUCap() & XrdOucEI::uPrip)
403  Data.Opts |= CmsSelectRequest::kYR_prvtnet;
404  if (Resp.getUCap() & XrdOucEI::uIPv4)
405  {Data.Opts |= (Resp.getUCap() & XrdOucEI::uIPv64
406  ? CmsSelectRequest::kYR_retipv46 : 0);
407  } else {
408  Data.Opts |= (Resp.getUCap() & XrdOucEI::uIPv64
409  ? CmsSelectRequest::kYR_retipv64 :
410  CmsSelectRequest::kYR_retipv6);
411  }
412 
413  if (Data.Avoid && Env && (triedRC = Env->Get("triedrc")))
414  {char *comma = rindex(triedRC, ',');
415  if (comma) triedRC = comma+1;
416  if (!strcmp(triedRC, "enoent"))
417  Data.Opts |= CmsSelectRequest::kYR_tryMISS;
418  else if (!strcmp(triedRC, "ioerr"))
419  Data.Opts |= CmsSelectRequest::kYR_tryIOER;
420  else if (!strcmp(triedRC, "fserr"))
421  Data.Opts |= CmsSelectRequest::kYR_tryFSER;
422  else if (!strcmp(triedRC, "srverr"))
423  Data.Opts |= CmsSelectRequest::kYR_trySVER;
424  else if (!strcmp(triedRC, "resel"))
425  Data.Opts |= CmsSelectRequest::kYR_tryRSEL;
426  else if (!strcmp(triedRC, "reseg"))
427  Data.Opts |= CmsSelectRequest::kYR_tryRSEG;
428  }
429  }
430 
431 // Pack the arguments
432 //
433  if (!(iovcnt = Parser.Pack(int(Data.Request.rrCode), &xmsg[1], &xmsg[xNum],
434  (char *)&Data, Work)))
435  {Resp.setErrInfo(EINVAL, "Internal error processing file.");
436  return SFS_ERROR;
437  }
438 
439 // Insert the header into the stream
440 //
441  Data.Request.streamid = 0;
442  Data.Request.modifier = 0;
443  xmsg[0].iov_base = (char *)&Data.Request;
444  xmsg[0].iov_len = sizeof(Data.Request);
445 
446 // Send the 2way message
447 //
448  return send2Man(Resp, path, xmsg, iovcnt+1);
449 }
450 
451 /******************************************************************************/
452 /* L o c L o c a l */
453 /******************************************************************************/
454 
455 int XrdCmsFinderRMT::LocLocal(XrdOucErrInfo &Resp, XrdOucEnv *Env)
456 {
457  XrdCmsClientMan *Womp, *Manp;
458  XrdOucBuffer *xBuff = 0;
459  char *mBeg, *mBuff, mStat;
460  int mBlen, n;
461 
462 // If we have no managers or no role, we are not clustered
463 //
464  if (!myManagers)
465  {Resp.setErrInfo(0, "");
466  return SFS_DATA;
467  }
468 
469 // Get where to start and where to put the information
470 //
471  Womp = Manp = myManagers;
472  mBeg = mBuff = Resp.getMsgBuff(mBlen);
473 
474 // Check if we can use the internal buffer or need to get an external buffer
475 //
476  n = 8 + (myManCount * (256+6+2));
477  if (n > mBlen)
478  {mBeg = mBuff = (char *)malloc(n);
479  if (!mBeg)
480  {Resp.setErrInfo(ENOMEM, "Insufficient memory.");
481  return SFS_ERROR;
482  }
483  xBuff = new XrdOucBuffer(mBeg, n);
484  mBlen = n;
485  }
486 
487 // Make sure we have enough space to continue
488 //
489  if (mBlen < 1024)
490  {Resp.setErrInfo(EINVAL, "Invalid role.");
491  return SFS_ERROR;
492  }
493 
494 // List the status of each manager
495 //
496  do {if (Manp->isActive()) mStat = (Manp->Suspended() ? 's' : 'c');
497  else mStat = 'd';
498  n = snprintf(mBuff, mBlen, "%s:%d/%c ",
499  Manp->Name(), Manp->manPort(), mStat);
500  mBuff += n; mBlen -= n;
501  } while((Manp = Manp->nextManager()) != Womp && mBlen > 0);
502 
503 // We should not have overrun the buffer; if we did declare failure
504 //
505  if (mBlen < 0)
506  {Resp.setErrInfo(EINVAL, "Internal processing error.");
507  if (xBuff) xBuff->Recycle();
508  return SFS_ERROR;
509  }
510 
511 // Set the final result
512 //
513  n = mBuff - mBeg;
514  if (!xBuff) Resp.setErrCode(n);
515  else {xBuff->SetLen(n);
516  Resp.setErrInfo(n, xBuff);
517  }
518 
519 // All done
520 //
521  return SFS_DATA;
522 }
523 
524 /******************************************************************************/
525 /* P r e p a r e */
526 /******************************************************************************/
527 
529  XrdOucEnv *envP)
530 {
531  EPNAME("Prepare")
532  static const int xNum = 16;
533  static XrdSysMutex prepMutex;
534 
535  XrdCmsRRData Data;
536  XrdOucTList *tp, *op;
537  XrdCmsClientMan *Manp = 0;
538 
539  unsigned int iMan;
540  int iovcnt = 0, NoteLen, n;
541  char Prty[1032], *NoteNum = 0, *colocp = 0;
542  char Work[xNum*12];
543  struct iovec xmsg[xNum];
544 
545 // Prefill the RR data structure and iovec
546 //
547  Data.Ident = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : "");
548  Data.Reqid = pargs.reqid;
549  Data.Request.streamid = 0;
550  Data.Request.modifier = 0;
551  xmsg[0].iov_base = (char *)&Data.Request;
552  xmsg[0].iov_len = sizeof(Data.Request);
553 
554 // Check for a cancel request
555 //
556  if (!(tp = pargs.paths))
557  {Data.Request.rrCode = kYR_prepdel;
558  if (!(iovcnt = Parser.Pack(kYR_prepdel, &xmsg[1], &xmsg[xNum],
559  (char *)&Data, Work)))
560  {Resp.setErrInfo(EINVAL, "Internal error processing file.");
561  return SFS_ERROR;
562  }
563  if (!(Manp = SelectManager(Resp, 0))) return ConWait;
564  if (Manp->Send(iMan, (const struct iovec *)&xmsg, iovcnt+1)) return 0;
565  DEBUG("Finder: Failed to send prepare cancel to "
566  <<Manp->Name() <<" reqid=" <<pargs.reqid);
567  Resp.setErrInfo(RepDelay, "");
568  return RepDelay;
569  }
570 
571 // Set prepadd options
572 //
573  Data.Request.modifier =
574  (pargs.opts & Prep_STAGE ? CmsPrepAddRequest::kYR_stage : 0)
575  | (pargs.opts & Prep_WMODE ? CmsPrepAddRequest::kYR_write : 0)
576  | (pargs.opts & Prep_FRESH ? CmsPrepAddRequest::kYR_fresh : 0);
577 
578 // Set coloc information if staging wanted and there are atleast two paths
579 //
580 
581 // Set the prepadd mode
582 //
583  if (!pargs.notify || !(pargs.opts & Prep_SENDACK))
584  {Data.Mode = (char *)(pargs.opts & Prep_WMODE ? "wq" : "rq");
585  Data.Notify = (char *)"*";
586  NoteNum = 0;
587  } else {
588  NoteLen = strlen(pargs.notify);
589  Data.Notify = (char *)malloc(NoteLen+16);
590  strcpy(Data.Notify, pargs.notify);
591  NoteNum = Data.Notify+NoteLen; *NoteNum++ = '-';
592  if (pargs.opts & Prep_SENDERR)
593  Data.Mode = (char *)(pargs.opts & Prep_WMODE ? "wn" : "rn");
594  else Data.Mode = (char *)(pargs.opts & Prep_WMODE ? "wnq" : "rnq");
595  }
596 
597 // Set the priority (if co-locate, add in the co-locate path)
598 //
599  n = sprintf(Prty, "%d", (pargs.opts & Prep_PMASK));
600  if (pargs.opts & Prep_STAGE && pargs.opts & Prep_COLOC
601  && pargs.paths && pargs.paths->next)
602  {colocp = Prty + n;
603  strlcpy(colocp+1, pargs.paths->text, sizeof(Prty)-n-1);
604  }
605  Data.Prty = Prty;
606 
607 // Distribute out paths to the various managers
608 //
609  Data.Request.rrCode = kYR_prepadd;
610  op = pargs.oinfo;
611  while(tp)
612  {if (NoteNum) sprintf(NoteNum, "%d", tp->val);
613  Data.Path = tp->text;
614  if (op) {Data.Opaque = op->text; op = op->next;}
615  else Data.Opaque = 0;
616  if (!(iovcnt = Parser.Pack(kYR_prepadd, &xmsg[1], &xmsg[xNum],
617  (char *)&Data, Work))) break;
618  if (!(Manp = SelectManager(Resp, tp->text))) break;
619  DEBUG("Finder: Sending " <<Manp->Name() <<' ' <<Data.Reqid
620  <<' ' <<Data.Path);
621  if (!Manp->Send(iMan, (const struct iovec *)&xmsg, iovcnt+1)) break;
622  if ((tp = tp->next))
623  {prepMutex.Lock(); XrdSysTimer::Wait(PrepWait); prepMutex.UnLock();}
624  if (colocp) {Data.Request.modifier |= CmsPrepAddRequest::kYR_coloc;
625  *colocp = ' '; colocp = 0;
626  }
627  }
628 
629 // Check if all went well
630 //
631  if (NoteNum) free(Data.Notify);
632  if (!tp) return 0;
633 
634 // Decode the error condition
635 //
636  if (!Manp) return ConWait;
637 
638  if (!iovcnt)
639  {Say.Emsg("Finder", "Unable to send prepadd; too much data.");
640  Resp.setErrInfo(EINVAL, "Internal error processing file.");
641  return SFS_ERROR;
642  }
643 
644  Resp.setErrInfo(RepDelay, "");
645  DEBUG("Finder: Failed to send prepare to " <<(Manp ? Manp->Name() : "?")
646  <<" reqid=" <<pargs.reqid);
647  return RepDelay;
648 }
649 
650 /******************************************************************************/
651 /* S e l e c t M a n a g e r */
652 /******************************************************************************/
653 
654 XrdCmsClientMan *XrdCmsFinderRMT::SelectManager(XrdOucErrInfo &Resp,
655  const char *path)
656 {
657  XrdCmsClientMan *Womp, *Manp;
658 
659 // Make sure we are configured
660 //
661  if (!myManagers)
662  {Say.Emsg("Finder", "SelectManager() called prior to Configure().");
663  Resp.setErrInfo(ConWait, "");
664  return (XrdCmsClientMan *)0;
665  }
666 
667 // Get where to start
668 //
669  if (SMode != XrdCmsClientConfig::RoundRob || !path) Womp = Manp = myManagers;
670  else Womp = Manp = myManTable[XrdOucReqID::Index(myManCount, path)];
671 
672 // Find the next active server
673 //
674  do {if (Manp->isActive()) return (Manp->Suspended() ? 0 : Manp);
675  } while((Manp = Manp->nextManager()) != Womp);
676 
677 // All managers are dead
678 //
679  SelectManFail(Resp);
680  return (XrdCmsClientMan *)0;
681 }
682 
683 /******************************************************************************/
684 /* S e l e c t M a n F a i l */
685 /******************************************************************************/
686 
687 void XrdCmsFinderRMT::SelectManFail(XrdOucErrInfo &Resp)
688 {
689  EPNAME("SelectManFail")
690  static time_t nextMsg = 0;
691  time_t now;
692 
693 // All servers are dead, indicate so every minute
694 //
695  now = time(0);
696  myData.Lock();
697  if (nextMsg < now)
698  {nextMsg = now + 60;
699  myData.UnLock();
700  Say.Emsg("Finder", "All managers are dysfunctional.");
701  } else myData.UnLock();
702  Resp.setErrInfo(ConWait, "");
703  TRACE(Redirect, "user=" <<Resp.getErrUser() <<" No managers available; wait " <<ConWait);
704 }
705 
706 /******************************************************************************/
707 /* s e n d 2 M a n */
708 /******************************************************************************/
709 
710 int XrdCmsFinderRMT::send2Man(XrdOucErrInfo &Resp, const char *path,
711  struct iovec *xmsg, int xnum)
712 {
713  EPNAME("send2Man")
714  unsigned int iMan;
715  int retc;
716  XrdCmsClientMsg *mp;
717  XrdCmsClientMan *Manp;
718 
719 // Select the right manager for this request
720 //
721  if (!(Manp = SelectManager(Resp, path)) || Manp->Suspended())
722  return ConWait;
723 
724 // Allocate a message object. There is only a fixed number of these and if
725 // all of them are in use, th client has to wait to prevent over-runs.
726 //
727  if (!(mp = XrdCmsClientMsg::Alloc(&Resp)))
728  {Resp.setErrInfo(RepDelay, "");
729  TRACE(Redirect, Resp.getErrUser() <<" no more msg objects; path=" <<path);
730  return RepDelay;
731  }
732 
733 // Insert the message number into the header
734 //
735  ((CmsRRHdr *)(xmsg[0].iov_base))->streamid = mp->ID();
736  if (savePath) Resp.setErrData(path);
737  else Resp.setErrData(0);
738 
739 // Send message and simply wait for the reply (msg object is locked via Alloc)
740 //
741  if (!Manp->Send(iMan, xmsg, xnum) || (mp->Wait4Reply(Manp->waitTime())))
742  {mp->Recycle();
743  retc = Manp->whatsUp(Resp.getErrUser(), path, iMan);
744  Resp.setErrInfo(retc, "");
745  return retc;
746  }
747 
748 // A reply was received; process as appropriate
749 //
750  retc = mp->getResult();
751  if (retc == SFS_STARTED) retc = Manp->delayResp(Resp);
752  else if (retc == SFS_STALL) retc = Resp.getErrInfo();
753 
754 // All done
755 //
756  mp->Recycle();
757  return retc;
758 }
759 
760 /******************************************************************************/
761 /* S t a r t M a n a g e r s */
762 /******************************************************************************/
763 
764 void *XrdCmsStartManager(void *carg)
765  {XrdCmsClientMan *mp = (XrdCmsClientMan *)carg;
766  return mp->Start();
767  }
768 
769 void *XrdCmsStartResp(void *carg)
771  return (void *)0;
772  }
773 
774 int XrdCmsFinderRMT::StartManagers(XrdOucTList *theManList)
775 {
776  XrdOucTList *tp;
777  XrdCmsClientMan *mp, *firstone = 0;
778  int i = 0;
779  pthread_t tid;
780  char buff[128];
781 
782 // Save the proper manager list for later reporting
783 //
784  myManList = theManList;
785 
786 // Clear manager table
787 //
788  memset((void *)myManTable, 0, sizeof(myManTable));
789 
790 // For each manager, start a thread to handle it
791 //
792  tp = theManList;
793  while(tp && i < MaxMan)
794  {mp = new XrdCmsClientMan(tp->text,tp->val,ConWait,RepNone,RepWait,RepDelay);
795  myManTable[i] = mp;
796  if (myManagers) mp->setNext(myManagers);
797  else firstone = mp;
798  myManagers = mp;
799  if (XrdSysThread::Run(&tid,XrdCmsStartManager,(void *)mp,0,mp->Name()))
800  Say.Emsg("Finder", errno, "start manager");
801  tp = tp->next; i++;
802  }
803 
804 // Check if we exceeded maximum manager count
805 //
806  if (tp)
807  while(tp)
808  {Say.Emsg("Config warning: too many managers;",tp->text,"ignored.");
809  tp = tp->next;
810  }
811 
812 // Make this a circular chain
813 //
814  if (firstone) firstone->setNext(myManagers);
815 
816 // Indicate how many managers have been started
817 //
818  sprintf(buff, "%d manager(s) started.", i);
819  Say.Say("Config ", buff);
820  myManCount = i;
821 
822 // Now Start that many callback threads
823 //
824  while(i--)
825  if (XrdSysThread::Run(&tid,XrdCmsStartResp,(void *)0,0,"async callback"))
826  Say.Emsg("Finder", errno, "start callback manager");
827 
828 // All done
829 //
830  return 0;
831 }
832 
833 /******************************************************************************/
834 /* S p a c e */
835 /******************************************************************************/
836 
837 int XrdCmsFinderRMT::Space(XrdOucErrInfo &Resp, const char *path, XrdOucEnv *eP)
838 {
839  static const int xNum = 4;
840 
841  XrdCmsRRData Data;
842  int iovcnt;
843  char Work[xNum*12];
844  struct iovec xmsg[xNum];
845 
846 // Fill out the RR data structure
847 //
848  Data.Ident = (char *)(XrdCmsClientMan::doDebug ? Resp.getErrUser() : "");
849  Data.Path = (char *)path;
850 
851 // Pack the arguments
852 //
853  if (!(iovcnt = Parser.Pack(kYR_statfs, &xmsg[1], &xmsg[xNum],
854  (char *)&Data, Work)))
855  {Resp.setErrInfo(EINVAL, "Internal error processing file.");
856  return SFS_ERROR;
857  }
858 
859 // Insert the header into the stream
860 //
861  Data.Request.rrCode = kYR_statfs;
862  Data.Request.streamid = 0;
863  Data.Request.modifier = (eP && eP->Get("cms.qvfs")
864  ? CmsStatfsRequest::kYR_qvfs : 0);
865  xmsg[0].iov_base = (char *)&Data.Request;
866  xmsg[0].iov_len = sizeof(Data.Request);
867 
868 // Send the 2way message
869 //
870  return send2Man(Resp, path, xmsg, iovcnt+1);
871 }
872 
873 /******************************************************************************/
874 /* V C h e c k */
875 /******************************************************************************/
876 
877 bool XrdCmsFinderRMT::VCheck(XrdVersionInfo &urVersion)
878 {
879  return XrdSysPlugin::VerCmp(urVersion, myVersion);
880 }
881 
882 /******************************************************************************/
883 /* T a r g e t F i n d e r */
884 /******************************************************************************/
885 /******************************************************************************/
886 /* C o n s t r u c t o r */
887 /******************************************************************************/
888 
890  XrdOss *theSS)
891  : XrdCmsClient(XrdCmsClient::amTarget)
892 {
893  isRedir = whoami & IsRedir;
894  isProxy = whoami & IsProxy;
895  SS = theSS;
896  CMSPath = 0;
897  Login = 0;
898  myManList = 0;
899  CMSp = new XrdOucStream(&Say);
900  Active = 0;
901  myPort = port;
902  resMax = -1;
903  resCur = 0;
904  Say.logger(lp);
905 }
906 
907 /******************************************************************************/
908 /* D e s t r u c t o r */
909 /******************************************************************************/
910 
912 {
913  XrdOucTList *tp, *tpp = myManList;
914 
915  if (CMSp) delete CMSp;
916  if (Login) free(Login);
917 
918  while((tp = tpp)) {tpp = tp->next; delete tp;}
919 }
920 
921 /******************************************************************************/
922 /* A d d e d */
923 /******************************************************************************/
924 
925 void XrdCmsFinderTRG::Added(const char *path, int Pend)
926 {
927  char *data[4];
928  int dlen[4];
929 
930 // Set up to notify the cluster that a file has been added
931 //
932  data[0] = (char *)"newfn "; dlen[0] = 6;
933  data[1] = (char *)path; dlen[1] = strlen(path);
934  if (Pend)
935  {data[2] = (char *)" p\n"; dlen[2] = 3;}
936  else
937  {data[2] = (char *)"\n"; dlen[2] = 1;}
938  data[3] = 0; dlen[3] = 0;
939 
940 // Now send the notification
941 //
942  myData.Lock();
943  if (Active && CMSp->Put((const char **)data, (const int *)dlen))
944  {CMSp->Close(); Active = 0;}
945  myData.UnLock();
946 }
947 
948 /******************************************************************************/
949 /* C o n f i g u r e */
950 /******************************************************************************/
951 
952 namespace
953 {
954 void *StartPM(void *carg)
955  {XrdCmsFinderTRG *mp = (XrdCmsFinderTRG *)carg;
956  return mp->RunPM();
957  }
958 
959 void *StartRsp(void *carg)
960  {XrdCmsFinderTRG *mp = (XrdCmsFinderTRG *)carg;
961  return mp->Start();
962  }
963 }
964 
965 int XrdCmsFinderTRG::Configure(const char *cfn, char *Ags, XrdOucEnv *envP)
966 {
967  XrdCmsClientConfig config(this);
969 
970 // Establish what we will be configuring
971 //
972  What = (isRedir ? XrdCmsClientConfig::configSuper
974 
975 // Steal the manlist as we might have to report it
976 //
977  if (isProxy) {myManList = config.PanList; config.PanList = 0;}
978  else {myManList = config.ManList; config.ManList = 0;}
979 
980 // Set the error dest and simply call the configration object and if
981 //
982  if (config.Configure(cfn, What, XrdCmsClientConfig::configNorm)) return 0;
983 
984 // Run the Admin thread. Note that unlike FinderRMT, we do not extract the
985 // security function pointer or the network object pointer from the
986 // environment as we don't need these at all.
987 //
988  if (RunAdmin(config.CMSPath, config.myVNID)
989  && config.perfMon && config.perfInt)
990  {pthread_t tid;
991  perfMon = config.perfMon;
992  perfInt = config.perfInt;
993  if (XrdSysThread::Run(&tid, StartPM, (void *)this, 0, "perfmon"))
994 // if (XrdSysThread::Run(&tid, StartRsp, (void *)this, 0, "cms i/f"))
995  {Say.Emsg("Config", errno, "start performance monitor."); return 0;}
996  }
997 
998 // Record the address of this cms client
999 //
1001  envP->PutPtr("XrdCmsClientT*", (XrdCmsClient*)this);
1002 
1003 // All done
1004 //
1005  return 1;
1006 }
1007 
1008 /******************************************************************************/
1009 /* L o c a t e */
1010 /******************************************************************************/
1011 
1012 int XrdCmsFinderTRG::Locate(XrdOucErrInfo &Resp, const char *path, int flags,
1013  XrdOucEnv *Env)
1014 {
1015  char *mBuff;
1016  int mBlen, n;
1017 
1018 // We only support locate on the local configuration
1019 //
1020  if (!(flags & SFS_O_LOCATE) || !(flags & SFS_O_LOCAL))
1021  {Resp.setErrInfo(EINVAL, "Invalid locate option for target config.");
1022  return SFS_ERROR;
1023  }
1024 
1025 // Get the buffer for the result
1026 //
1027  mBuff = Resp.getMsgBuff(mBlen);
1028 
1029 // Return information
1030 //
1031  n = snprintf(mBuff, mBlen, "localhost:0/%c", (Active ? 'a' : 'd'));
1032  Resp.setErrCode(n);
1033  return SFS_DATA;
1034 }
1035 
1036 
1037 /******************************************************************************/
1038 /* P u t I n f o */
1039 /******************************************************************************/
1040 
1042 {
1043  char buff[256];
1044  char *data[2] = {buff, 0};
1045  int dlen[2];
1046  uint32_t cpu_load, mem_load, net_load, pag_load, xeq_load;
1047 
1048  cpu_load = (perfInfo.cpu_load <= 100 ? perfInfo.cpu_load : 100);
1049  mem_load = (perfInfo.mem_load <= 100 ? perfInfo.mem_load : 100);
1050  net_load = (perfInfo.net_load <= 100 ? perfInfo.net_load : 100);
1051  pag_load = (perfInfo.pag_load <= 100 ? perfInfo.pag_load : 100);
1052  xeq_load = (perfInfo.xeq_load <= 100 ? perfInfo.xeq_load : 100);
1053 
1054  dlen[0] = snprintf(buff, sizeof(buff), "%s %u %u %u %u %u\n",
1055  (alert ? "PERF" : "perf"),
1056  xeq_load, cpu_load, mem_load, pag_load, net_load);
1057  dlen[1] = 0;
1058 
1059 // Now send the notification
1060 //
1061  myData.Lock();
1062  if (Active && CMSp->Put((const char **)data, (const int *)dlen))
1063  {CMSp->Close(); Active = 0;}
1064  myData.UnLock();
1065 }
1066 
1067 /******************************************************************************/
1068 /* R e l e a s e */
1069 /******************************************************************************/
1070 
1072 {
1073  int resOld;
1074 
1075 // Lock the variables of interest
1076 //
1077  rrMutex.Lock();
1078  resOld = resCur;
1079 
1080 // If reserve/release not enabled or we have a non-positive value, return
1081 //
1082  if (resMax < 0 || rNum <= 0) {rrMutex.UnLock(); return resOld;}
1083 
1084 // Adjust resource and check if we can resume
1085 //
1086  resCur += rNum;
1087  if (resCur > resMax) resCur = resMax;
1088  if (resOld < 1 && resCur > 0) Resume(0);
1089 
1090 // All done
1091 //
1092  resOld = resCur;
1093  rrMutex.UnLock();
1094  return resOld;
1095 }
1096 
1097 /******************************************************************************/
1098 /* R e m o v e d */
1099 /******************************************************************************/
1100 
1101 void XrdCmsFinderTRG::Removed(const char *path)
1102 {
1103  char *data[4];
1104  int dlen[4];
1105 
1106 // Set up to notify the cluster that a file has been removed
1107 //
1108  data[0] = (char *)"rmdid "; dlen[0] = 6;
1109  data[1] = (char *)path; dlen[1] = strlen(path);
1110  data[2] = (char *)"\n"; dlen[2] = 1;
1111  data[3] = 0; dlen[3] = 0;
1112 
1113 // Now send the notification
1114 //
1115  myData.Lock();
1116  if (Active && CMSp->Put((const char **)data, (const int *)dlen))
1117  {CMSp->Close(); Active = 0;}
1118  myData.UnLock();
1119 }
1120 
1121 /******************************************************************************/
1122 /* R e s e r v e */
1123 /******************************************************************************/
1124 
1126 {
1127  int resOld;
1128 
1129 // Lock the variables of interest
1130 //
1131  rrMutex.Lock();
1132  resOld = resCur;
1133 
1134 // If reserve/release not enabled or we have a non-positive value, return
1135 //
1136  if (resMax < 0 || rNum <= 0) {rrMutex.UnLock(); return resOld;}
1137 
1138 // Adjust resource and check if we can suspend
1139 //
1140  resCur -= rNum;
1141  if (resOld > 0 && resCur < 1) Suspend(0);
1142 
1143 // All done
1144 //
1145  resOld = resCur;
1146  rrMutex.UnLock();
1147  return resOld;
1148 }
1149 
1150 /******************************************************************************/
1151 /* R e s o u r c e */
1152 /******************************************************************************/
1153 
1155 {
1156  int resOld;
1157 
1158 // Lock the variables of interest
1159 //
1160  rrMutex.Lock();
1161  resOld = (resMax < 0 ? 0 : resMax);
1162 
1163 // If we have a non-positive value, return
1164 //
1165  if (rNum <= 0) {rrMutex.UnLock(); return resOld;}
1166 
1167 // Set the resource and adjust the current value as needed
1168 //
1169  resMax = rNum;
1170  if (resCur > resMax) resCur = resMax;
1171 
1172 // All done
1173 //
1174  rrMutex.UnLock();
1175  return resOld;
1176 }
1177 
1178 /******************************************************************************/
1179 /* R e s u m e */
1180 /******************************************************************************/
1181 
1183 { // 1234567890
1184  static const char *rPerm[2] = {"resume\n", 0};
1185  static const char *rTemp[2] = {"resume t\n", 0};
1186  static int lPerm[2] = { 7, 0};
1187  static int lTemp[2] = { 9, 0};
1188 
1189 // Now send the notification
1190 //
1191  myData.Lock();
1192  if (Active && CMSp->Put((const char **)(Perm ? rPerm : rTemp),
1193  (const int *) (Perm ? lPerm : lTemp)))
1194  {CMSp->Close(); Active = 0;}
1195  myData.UnLock();
1196 }
1197 
1198 /******************************************************************************/
1199 /* S u s p e n d */
1200 /******************************************************************************/
1201 
1203 { // 1234567890
1204  static const char *sPerm[2] = {"suspend\n", 0};
1205  static const char *sTemp[2] = {"suspend t\n", 0};
1206  static int lPerm[2] = { 8, 0};
1207  static int lTemp[2] = {10, 0};
1208 
1209 // Now send the notification
1210 //
1211  if (Active && CMSp->Put((const char **)(Perm ? sPerm : sTemp),
1212  (const int *) (Perm ? lPerm : lTemp)))
1213  {CMSp->Close(); Active = 0;}
1214  myData.UnLock();
1215 }
1216 
1217 /******************************************************************************/
1218 /* R u n A d m i n */
1219 /******************************************************************************/
1220 
1221 int XrdCmsFinderTRG::RunAdmin(char *Path, const char *vnid)
1222 {
1223  const char *lFmt;
1224  pthread_t tid;
1225  char buff [512];
1226 
1227 // Make sure we have a path to the cmsd
1228 //
1229  if (!(CMSPath = Path))
1230  {Say.Emsg("Config", "Unable to determine cms admin path"); return 0;}
1231 
1232 // Construct the login line
1233 //
1234  lFmt = (vnid ? "login %c %d port %d vnid %s\n" : "login %c %d port %d\n");
1235  snprintf(buff, sizeof(buff), lFmt, (isProxy ? 'P' : 'p'),
1236  static_cast<int>(getpid()), myPort, vnid);
1237  Login = strdup(buff);
1238 
1239 // Start a thread to connect with the local cmsd
1240 //
1241  if (XrdSysThread::Run(&tid, StartRsp, (void *)this, 0, "cms i/f"))
1242  {Say.Emsg("Config", errno, "start cmsd interface"); return 0;}
1243 
1244  return 1;
1245 }
1246 
1247 /******************************************************************************/
1248 /* R u n P M */
1249 /******************************************************************************/
1250 
1252 {
1253  XrdCmsPerfMon::PerfInfo perfInfo;
1254 
1255 // Keep asking the plugin for statistics.
1256 //
1257  while(1)
1258  {perfMon->GetInfo(perfInfo);
1259  PutInfo(perfInfo);
1260  perfInfo.Clear();
1261  XrdSysTimer::Snooze(perfInt);
1262  }
1263  return (void *)0;
1264 }
1265 
1266 /******************************************************************************/
1267 /* S t a r t */
1268 /******************************************************************************/
1269 
1271 {
1272  XrdCmsRRData Data;
1273 
1274 // First step is to connect to the local cmsd. We also establish a binary
1275 // read stream (old olbd's never used it) to get requests that can only be
1276 // executed by the xrootd (e.g., rm and mv).
1277 //
1278  while(1)
1279  {do {Hookup();
1280 
1281  // Login to cmsd
1282  //
1283  myData.Lock();
1284  CMSp->Put(Login);
1285  myData.UnLock();
1286 
1287  // Get the FD for this connection
1288  //
1289  Data.Routing = CMSp->FDNum();
1290 
1291  // Put up a read to process local requests. Sould the cmsd die,
1292  // we will notice and try to reconnect.
1293  //
1294  while(recv(Data.Routing, &Data.Request, sizeof(Data.Request),
1295  MSG_WAITALL) > 0 && Process(Data)) {}
1296  break;
1297  } while(1);
1298 
1299  // The cmsd went away
1300  //
1301  myData.Lock();
1302  CMSp->Close();
1303  Active = 0;
1304  myData.UnLock();
1305  Say.Emsg("Finder", "Lost contact with cmsd via", CMSPath);
1306  XrdSysTimer::Wait(10*1000);
1307  }
1308 
1309 // We should never get here
1310 //
1311  return (void *)0;
1312 }
1313 
1314 /******************************************************************************/
1315 /* U t i l i z a t i o n */
1316 /******************************************************************************/
1317 
1318 void XrdCmsFinderTRG::Utilization(unsigned int util, bool alert)
1319 {
1320  XrdCmsPerfMon::PerfInfo perfInfo;
1321 
1322 // Make sure value is in range
1323 //
1324  if (util > 100) util = 100;
1325 
1326 // Send this out as a performance figure
1327 //
1328  perfInfo.cpu_load = util;
1329  perfInfo.mem_load = util;
1330  perfInfo.net_load = util;
1331  perfInfo.pag_load = util;
1332  perfInfo.xeq_load = util;
1333  PutInfo(perfInfo, alert);
1334 }
1335 
1336 /******************************************************************************/
1337 /* V C h e c k */
1338 /******************************************************************************/
1339 
1340 bool XrdCmsFinderTRG::VCheck(XrdVersionInfo &urVersion)
1341 {
1342  return XrdSysPlugin::VerCmp(urVersion, myVersion);
1343 }
1344 
1345 /******************************************************************************/
1346 /* P r i v a t e M e t h o d s */
1347 /******************************************************************************/
1348 /******************************************************************************/
1349 /* H o o k u p */
1350 /******************************************************************************/
1351 
1352 void XrdCmsFinderTRG::Hookup()
1353 {
1354  struct stat buf;
1355  XrdNetSocket Sock(&Say);
1356  int opts = 0, tries = 6;
1357 
1358 // Wait for the cmsd path to be created
1359 //
1360  while(stat(CMSPath, &buf))
1361  {if (!tries--)
1362  {Say.Emsg("Finder", "Waiting for cms path", CMSPath); tries=6;}
1363  XrdSysTimer::Wait(10*1000);
1364  }
1365 
1366 // We can now try to connect
1367 //
1368  tries = 0;
1369  while(Sock.Open(CMSPath, -1, opts) < 0)
1370  {if (!tries--)
1371  {opts = XRDNET_NOEMSG;
1372  tries = 6;
1373  } else if (!tries) opts = 0;
1374  XrdSysTimer::Wait(10*1000);
1375  };
1376 
1377 // Transfer the socket FD to a stream
1378 //
1379  myData.Lock();
1380  Active = 1;
1381  CMSp->Attach(Sock.Detach());
1382  myData.UnLock();
1383 
1384 // Tell the world
1385 //
1386  Say.Emsg("Finder", "Connected to cmsd via", CMSPath);
1387 }
1388 
1389 /******************************************************************************/
1390 /* P r o c e s s */
1391 /******************************************************************************/
1392 
1393 int XrdCmsFinderTRG::Process(XrdCmsRRData &Data)
1394 {
1395  EPNAME("Process")
1396  static const int maxReqSize = 16384;
1397  static int Wmsg = 255;
1398  const char *myArgs, *myArgt, *Act;
1399  char buff[16];
1400  int rc;
1401 
1402 // Decode the length and get the rest of the data
1403 //
1404  Data.Dlen = static_cast<int>(ntohs(Data.Request.datalen));
1405  if (!(Data.Dlen)) {myArgs = myArgt = 0;}
1406  else {if (Data.Dlen > maxReqSize)
1407  {Say.Emsg("Finder","Request args too long from local cmsd");
1408  return 0;
1409  }
1410  if ((!Data.Buff || Data.Blen < Data.Dlen)
1411  && !Data.getBuff(Data.Dlen))
1412  {Say.Emsg("Finder", "No buffers to serve local cmsd");
1413  return 0;
1414  }
1415  if (recv(Data.Routing,Data.Buff,Data.Dlen,MSG_WAITALL) != Data.Dlen)
1416  return 0;
1417  myArgs = Data.Buff; myArgt = Data.Buff + Data.Dlen;
1418  }
1419 
1420 // Process the request as needed. We ignore opaque information for now.
1421 // If the request is not valid is could be that we lost sync on the connection.
1422 // The only way to recover is to tear it down and start over.
1423 //
1424  switch(Data.Request.rrCode)
1425  {case kYR_mv: Act = "mv"; break;
1426  case kYR_rm: Act = "rm"; Data.Path2 = (char *)""; break;
1427  case kYR_rmdir: Act = "rmdir"; Data.Path2 = (char *)""; break;
1428  default: sprintf(buff, "%d", Data.Request.rrCode);
1429  Say.Emsg("Finder","Local cmsd sent an invalid request -",buff);
1430  return 0;
1431  }
1432 
1433 // Parse the arguments
1434 //
1435  if (!myArgs || !Parser.Parse(int(Data.Request.rrCode),myArgs,myArgt,&Data))
1436  {Say.Emsg("Finder", "Local cmsd sent a badly formed",Act,"request");
1437  return 1;
1438  }
1439  DEBUG("cmsd requested " <<Act <<" " <<Data.Path <<' ' <<Data.Path2);
1440 
1441 // If we have no storage system then issue a warning but otherwise
1442 // ignore this operation (this may happen in proxy mode).
1443 //
1444  if (SS == 0)
1445  {Wmsg++;
1446  if (!(Wmsg & 255)) Say.Emsg("Finder", "Local cmsd request",Act,
1447  "ignored; no storage system provided.");
1448  return 1;
1449  }
1450 
1451 // Perform the request
1452 //
1453  switch(Data.Request.rrCode)
1454  {case kYR_mv: rc = SS->Rename(Data.Path, Data.Path2); break;
1455  case kYR_rm: rc = SS->Unlink(Data.Path); break;
1456  case kYR_rmdir: rc = SS->Remdir(Data.Path); break;
1457  default: rc = 0; break;
1458  }
1459  if (rc) Say.Emsg("Finder", rc, Act, Data.Path);
1460 
1461 // All Done
1462 //
1463  return 1;
1464 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
void * XrdCmsStartResp(void *carg)
void * XrdCmsStartManager(void *carg)
#define QTRACE(act)
Definition: XrdCmsTrace.hh:49
#define XRDNET_NOEMSG
Definition: XrdNetOpts.hh:71
int stat(const char *path, struct stat *buf)
XrdOucString Path
struct myOpts opts
#define SFS_O_HNAME
#define Prep_FRESH
#define SFS_DATA
#define Prep_SENDERR
#define SFS_O_RESET
#define SFS_O_STAT
char * notify
Notification path or 0.
XrdOucTList * paths
List of paths.
XrdOucTList * oinfo
1-to-1 correspondence of opaque info
#define SFS_O_META
#define SFS_ERROR
#define Prep_WMODE
#define SFS_O_FORCE
#define SFS_STALL
#define SFS_STARTED
#define Prep_COLOC
#define SFS_O_MULTIW
#define Prep_STAGE
char * reqid
Request ID.
#define SFS_O_WRONLY
#define SFS_O_CREAT
#define Prep_PMASK
#define SFS_O_LOCATE
#define SFS_O_RAWIO
#define SFS_O_RDWR
#define Prep_SENDACK
#define SFS_O_LOCAL
#define SFS_O_NOWAIT
int opts
Prep_xxx.
#define SFS_O_REPLICA
#define SFS_O_TRUNC
< Prepare parameters
if(Avsz)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
XrdCmsPerfMon * perfMon
int Configure(const char *cfn, configWhat What, configHow How)
void setNext(XrdCmsClientMan *np)
XrdCmsClientMan * nextManager()
int Send(unsigned int &iMan, char *msg, int mlen=0)
int delayResp(XrdOucErrInfo &Resp)
static void setConfig(const char *cfn)
static void setNetwork(XrdInet *nP)
int whatsUp(const char *user, const char *path, unsigned int iMan)
static char doDebug
int Wait4Reply(int wtime)
int Forward(XrdOucErrInfo &Resp, const char *cmd, const char *arg1=0, const char *arg2=0, XrdOucEnv *Env1=0, XrdOucEnv *Env2=0)
static const int MaxMan
Definition: XrdCmsFinder.hh:83
int Locate(XrdOucErrInfo &Resp, const char *path, int flags, XrdOucEnv *Info=0)
int Prepare(XrdOucErrInfo &Resp, XrdSfsPrep &pargs, XrdOucEnv *Info=0)
int Space(XrdOucErrInfo &Resp, const char *path, XrdOucEnv *Info=0)
int Configure(const char *cfn, char *Args, XrdOucEnv *EnvInfo)
XrdCmsFinderRMT(XrdSysLogger *lp, int whoami=0, int Port=0)
static bool VCheck(XrdVersionInfo &urVersion)
XrdCmsFinderTRG(XrdSysLogger *, int, int, XrdOss *theSS=0)
void Resume(int Perm=1)
int Release(int n)
int Locate(XrdOucErrInfo &Resp, const char *path, int flags, XrdOucEnv *Info=0)
void PutInfo(XrdCmsPerfMon::PerfInfo &perfInfo, bool alert=false)
void Utilization(unsigned int util, bool alert=false)
void Removed(const char *path)
void Suspend(int Perm=1)
int Reserve(int n)
int RunAdmin(char *Path, const char *vnid)
void Added(const char *path, int Pend=0)
int Resource(int n)
static bool VCheck(XrdVersionInfo &urVersion)
int Configure(const char *cfn, char *Args, XrdOucEnv *EnvInfo)
static int Pack(int rnum, struct iovec *iovP, struct iovec *iovE, char *Base, char *Work)
int Parse(XrdCms::CmsLoginData *Data, const char *Aps, const char *Apt)
Definition: XrdCmsParser.hh:59
virtual void GetInfo(PerfInfo &info)
unsigned int Opts
Definition: XrdCmsRRData.hh:65
char * Opaque2
Definition: XrdCmsRRData.hh:58
int getBuff(size_t bsz)
Definition: XrdCmsRRData.cc:44
XrdCms::CmsRRHdr Request
Definition: XrdCmsRRData.hh:54
static void Reply()
Definition: XrdCmsResp.cc:164
static void setSecFunc(void *secfP)
virtual int Remdir(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void Recycle()
Recycle the buffer. The buffer may be reused in the future.
void SetLen(int dataL, int dataO=0)
char * EnvTidy(int &envlen)
Definition: XrdOucEnv.cc:160
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
char * Get(const char *varname)
Definition: XrdOucEnv.hh:69
void PutPtr(const char *varname, void *value)
Definition: XrdOucEnv.cc:316
int setErrInfo(int code, const char *emsg)
char * getMsgBuff(int &mblen)
const char * getErrUser()
int setErrCode(int code)
static int Index(int KeyMax, const char *KeyVal, int KeyLen=0)
Definition: XrdOucReqID.cc:159
int Attach(int FileDescriptor, int bsz=2047)
int Put(const char *data, const int dlen)
void Close(int hold=0)
XrdOucTList * next
Definition: XrdOucTList.hh:45
char * text
Definition: XrdOucTList.hh:46
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:141
static bool VerCmp(XrdVersionInfo &vInf1, XrdVersionInfo &vInf2, bool noMsg=false)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Snooze(int seconds)
Definition: XrdSysTimer.cc:168
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
void SetLogger(XrdSysLogger *logp)
Definition: XrdSysTrace.cc:65
XrdVersionInfo myVersion
XrdVERSIONINFODEF(myVersion, cmsclient, XrdVNUMBER, XrdVERSION)
kXR_char modifier
Definition: YProtocol.hh:85
@ IsTarget
The role is server and will be a redirection target.
@ IsProxy
The role is proxy {plus one or more of the below}.
@ IsRedir
The role is manager and will redirect users.
@ IsMeta
The role is meta {plus one or more of the above}.
XrdCmsParser Parser
XrdSysError Say
kXR_char rrCode
Definition: YProtocol.hh:84
XrdSysTrace Trace("cms")
kXR_unt32 streamid
Definition: YProtocol.hh:83
XrdSysError Say(0, "cms_")
@ kYR_dnf
Definition: YProtocol.hh:133
@ kYR_select
Definition: YProtocol.hh:100
@ kYR_mkpath
Definition: YProtocol.hh:94
@ kYR_prepdel
Definition: YProtocol.hh:97
@ kYR_statfs
Definition: YProtocol.hh:111
@ kYR_prepadd
Definition: YProtocol.hh:96
@ kYR_chmod
Definition: YProtocol.hh:91
@ kYR_rmdir
Definition: YProtocol.hh:99
@ kYR_mkdir
Definition: YProtocol.hh:93
@ kYR_locate
Definition: YProtocol.hh:92
@ kYR_rm
Definition: YProtocol.hh:98
@ kYR_trunc
Definition: YProtocol.hh:113
@ kYR_mv
Definition: YProtocol.hh:95
XrdOucEnv * envP
Definition: XrdPss.cc:109
Structure used for reporting performance metrics.
unsigned char pag_load
Paging 0 to 100 utilization.
unsigned char xeq_load
Other 0 to 100 utilization (arbitrary)
unsigned char cpu_load
CPU 0 to 100 utilization.
unsigned char mem_load
Memory 0 to 100 utilization.
unsigned char net_load
Network 0 to 100 utilization.
static const int uIPv64
ucap: Supports only IPv4 info
static const int uIPv4
ucap: Supports read redirects
static const int uPrip