XRootD
XrdCmsBaseFS.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d C m s B a s e F S . c c */
4 /* */
5 /* (c) 2011 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cerrno>
32 #include <ctime>
33 #include <sys/time.h>
34 #include <sys/types.h>
35 #include <cstdio>
36 
37 #include "XProtocol/YProtocol.hh"
38 #include "XProtocol/XPtypes.hh"
39 
40 #include "XrdCms/XrdCmsBaseFS.hh"
41 #include "XrdCms/XrdCmsConfig.hh"
42 #include "XrdCms/XrdCmsPrepare.hh"
43 #include "XrdCms/XrdCmsTrace.hh"
44 
45 #include "XrdOss/XrdOss.hh"
46 
47 #include "XrdSfs/XrdSfsFlags.hh"
48 
49 #include "XrdSys/XrdSysError.hh"
50 #include "XrdSys/XrdSysTimer.hh"
51 
52 using namespace XrdCms;
53 
54 /******************************************************************************/
55 /* E x t e r n a l T h r e a d I n t e r f a c e s */
56 /******************************************************************************/
57 
58 void *XrdCmsBasePacer(void *carg)
59  {((XrdCmsBaseFS *)carg)->Pacer();
60  return (void *)0;
61  }
62 
63 void *XrdCmsBaseRunner(void *carg)
64  {((XrdCmsBaseFS *)carg)->Runner();
65  return (void *)0;
66  }
67 
68 /******************************************************************************/
69 /* Private: B y p a s s */
70 /******************************************************************************/
71 
72 int XrdCmsBaseFS::Bypass()
73 {
74  static XrdSysTimer Window;
75 
76 // If we are not timing requests, we can bypass (typically checked beforehand)
77 //
78  if (!theQ.rLimit) return 1;
79 
80 // If this is a fixed rate queue then we cannot bypass
81 //
82  if (Fixed) return 0;
83 
84 // Check if we can reset the number of requests that can be issued inline. We
85 // do this to bypass the queue unless until we get flooded by requests.
86 //
87  theQ.Mutex.Lock();
88  if (!theQ.rLeft && !theQ.pqFirst)
89  {unsigned long Interval = 0;
90  Window.Report(Interval);
91  if (Interval >= 450)
92  {theQ.rLeft = theQ.rAgain;
93  Window.Reset();
94  std::cerr <<"BYPASS " <<Interval <<"ms left=" <<theQ.rLeft <<std::endl;
95  }
96  }
97 
98 // At this point we may or may not have freebies left
99 //
100  if (theQ.rLeft > 0)
101  {theQ.rLeft--; theQ.Mutex.UnLock();
102  return 1;
103  }
104 
105 // This request must be queued
106 //
107  theQ.Mutex.UnLock();
108  return 0;
109 }
110 
111 /******************************************************************************/
112 /* Public: E x i s t s */
113 /******************************************************************************/
114 
116 {
117  int aOK, fnPos;
118 
119 // If we cannot do this locally, then we need to forward the request but only
120 // if we have a route. Otherwise, just indicate that queueing is necessary.
121 //
122  if (!lclStat)
123  {aOK = (!theQ.rLimit || noLim || (!Fixed && Bypass()));
124  if (Who.rovec) Queue(Arg, Who, -(Arg.PathLen-1), !aOK);
125  return 0;
126  }
127 
128 // If directory checking is enabled, find where the directory component ends
129 // and then check if we even have this directory.
130 //
131  if (dmLife)
132  {for (fnPos=Arg.PathLen-2;fnPos >= 0 && Arg.Path[fnPos] != '/';fnPos--) {}
133  if (fnPos > 0 && !hasDir(Arg.Path, fnPos)) return -1;
134  } else fnPos = 0;
135 
136 // If we are not limiting requests, or not limiting everyone and this is not
137 // a meta-manager, or we are not timing requests and can skip the queue; then
138 // issue the fstat() inline and report back the result.
139 //
140  if (!theQ.rLimit || noLim || (Fixed && Bypass()))
141  return Exists(Arg.Path, fnPos);
142 
143 // We can't do this now, so forcibly queue the request
144 //
145  if (Who.rovec) Queue(Arg, Who, fnPos, 1);
146  return 0;
147 }
148 
149 /******************************************************************************/
150 
151 int XrdCmsBaseFS::Exists(char *Path, int fnPos, int UpAT)
152 {
153  EPNAME("Exists");
154  static struct dMoP dirMiss = {0}, dirPres = {1};
155  static int badDStat = 0;
156  static int badFStat = 0;
157  struct stat buf;
158  int eCnt, fRC, dRC;
160 
161 // If directory checking is enabled, find where the directory component ends
162 // if so requested.
163 //
164  if (fnPos < 0 && dmLife)
165  {for (fnPos = -(fnPos+1); fnPos >= 0 && Path[fnPos] != '/'; fnPos--) {}
166  if (fnPos > 0 && !hasDir(Path, fnPos)) return -1;
167  }
168 
169 // Issue stat() via oss plugin. If it succeeds, return result.
170 //
171  if (!(fRC = Config.ossFS->Stat(Path, &buf, Opts)))
172  {if ((buf.st_mode & S_IFMT) == S_IFREG)
173  return (buf.st_mode & XRDSFS_POSCPEND ? CmsHaveRequest::Pending
175 
176  return (buf.st_mode & S_IFMT) == S_IFDIR ? CmsHaveRequest::Online : -1;
177  }
178 
179 // The entry does not exist but if we are a staging server then it may be in
180 // the prepare queue in which case we must say that it is pending arrival.
181 //
183 
184 // The entry does not exist. Check if the directory exists and if not, put it
185 // in our directory missing table so we don't keep hitting this directory.
186 // This is disabled by default and enabled by the cms.dfs directive.
187 //
188  if (fnPos > 0 && dmLife)
189  {struct dMoP *xVal = &dirMiss;
190  int xLife = dmLife;
191  Path[fnPos] = '\0';
192  if (!(dRC = Config.ossFS->Stat(Path, &buf, XRDOSS_resonly)))
193  {xLife = dpLife; xVal = &dirPres;}
194  if (dRC && dRC != -ENOENT)
195  {fsMutex.Lock(); eCnt = badDStat++; fsMutex.UnLock();
196  if (!(eCnt & 0xff))
197  {char buff[80];
198  snprintf(buff, sizeof(buff), "to stat dir (events=%d)", eCnt+1);
199  Say.Emsg("Exists", dRC, buff, Path);
200  Path[fnPos] = '/';
201  }
202  } else {
203  fsMutex.Lock();
204  fsDirMP.Rep(Path, xVal, xLife, Hash_keepdata);
205  fsMutex.UnLock();
206  DEBUG("add " <<xLife <<(xVal->Present ? " okdir ":" nodir ") <<Path);
207  Path[fnPos] = '/';
208  }
209  } else {
210  if (fRC && fRC != -ENOENT)
211  {fsMutex.Lock(); eCnt = badFStat++; fsMutex.UnLock();
212  if (!(eCnt & 0xff))
213  {char buff[80];
214  snprintf(buff, sizeof(buff), "to stat file (events=%d)", eCnt+1);
215  Say.Emsg("Exists", fRC, buff, Path);
216  }
217  }
218  }
219  return -1;
220 }
221 
222 /******************************************************************************/
223 /* Private: h a s D i r */
224 /******************************************************************************/
225 
226 int XrdCmsBaseFS::hasDir(char *Path, int fnPos)
227 {
228  struct dMoP *dP;
229  int Have;
230 
231 // Strip to directory and check if we have it
232 //
233  Path[fnPos] = '\0';
234  fsMutex.Lock();
235  Have = ((dP = fsDirMP.Find(Path)) ? dP->Present : 1);
236  fsMutex.UnLock();
237  Path[fnPos] = '/';
238  return Have;
239 }
240 
241 /******************************************************************************/
242 /* I n i t */
243 /******************************************************************************/
244 
245 void XrdCmsBaseFS::Init(int Opts, int DMLife, int DPLife)
246 {
247 
248 // Set values.
249 //
250  dmLife = DMLife;
251  dpLife = DPLife ? DPLife : DMLife * 10;
252  Server = (Opts & Servr) != 0;
253  lclStat = (Opts & Cntrl) != 0 || Server;
254  preSel = (Opts & Immed) == 0;
255  dfsSys = (Opts & DFSys) != 0;
256 }
257 
258 /******************************************************************************/
259 /* L i m i t */
260 /******************************************************************************/
261 
262 void XrdCmsBaseFS::Limit(int rLim, int Qmax)
263 {
264 
265 // Establish the limits
266 //
267  if (rLim < 0) {theQ.rAgain=theQ.rLeft = -1; rLim = -rLim; Fixed = 1;}
268  else {theQ.rAgain = theQ.rLeft = (rLim > 1 ? rLim/2 : 1); Fixed = 0;}
269  theQ.rLimit = (rLim <= 1000 ? rLim : 0);
270  if (Qmax > 0) theQ.qMax = Qmax;
271  else if (!(theQ.qMax = theQ.rLimit*2 + theQ.rLimit/2)) theQ.qMax = 1;
272 }
273 
274 /******************************************************************************/
275 /* P a c e r */
276 /******************************************************************************/
277 
279 {
280  XrdCmsBaseFR *rP;
281  int inQ, rqRate = 1000/theQ.rLimit;
282 
283 // Process requests at the given rate
284 //
285 do{theQ.pqAvail.Wait();
286  theQ.Mutex.Lock(); inQ = 1;
287  while((rP = theQ.pqFirst))
288  {if (!(theQ.pqFirst = rP->Next)) {theQ.pqLast = 0; inQ = 0;}
289  theQ.Mutex.UnLock();
290  if (rP->PDirLen > 0 && !hasDir(rP->Path, rP->PDirLen))
291  {delete rP; continue;}
292  theQ.Mutex.Lock();
293  if (theQ.rqFirst) {theQ.rqLast->Next = rP; theQ.rqLast = rP;}
294  else {theQ.rqFirst = theQ.rqLast = rP; theQ.rqAvail.Post();}
295  theQ.Mutex.UnLock();
296  XrdSysTimer::Wait(rqRate);
297  if (!inQ) break;
298  theQ.Mutex.Lock();
299  }
300  if (inQ) theQ.Mutex.UnLock();
301  } while(1);
302 }
303 
304 /******************************************************************************/
305 /* Q u e u e */
306 /******************************************************************************/
307 
308 void XrdCmsBaseFS::Queue(XrdCmsRRData &Arg, XrdCmsPInfo &Who,
309  int fnpos, int Force)
310 {
311  EPNAME("Queue");
312  static int noMsg = 1;
313  XrdCmsBaseFR *rP;
314  int Msg, n, prevHWM;
315 
316 // If we can bypass the queue and execute this now. Avoid the grabbing the buff.
317 //
318  if (!Force)
319  {XrdCmsBaseFR myReq(&Arg, Who, fnpos);
320  Xeq(&myReq);
321  return;
322  }
323 
324 // Queue this request for callback after an appropriate time.
325 // We will also steal the underlying data buffer from the Arg.
326 //
327  DEBUG("inq " <<theQ.qNum <<" pace " <<Arg.Path);
328  rP = new XrdCmsBaseFR(Arg, Who, fnpos);
329 
330 // Add the element to the queue
331 //
332  theQ.Mutex.Lock();
333  n = ++theQ.qNum; prevHWM = theQ.qHWM;
334  if ((Msg = (n > prevHWM))) theQ.qHWM = n;
335  if (theQ.pqFirst) {theQ.pqLast->Next = rP; theQ.pqLast = rP;}
336  else {theQ.pqFirst = theQ.pqLast = rP; theQ.pqAvail.Post();}
337  theQ.Mutex.UnLock();
338 
339 // Issue a warning message if we have an excessive number of requests queued
340 //
341  if (n > theQ.qMax && Msg && (n-prevHWM > 3 || noMsg))
342  {int Pct = n/theQ.qMax;
343  char Buff[80];
344  noMsg = 0;
345  sprintf(Buff, "Queue overrun %d%%; %d requests now queued.", Pct, n);
346  Say.Emsg("Pacer", Buff);
347  }
348 }
349 
350 /******************************************************************************/
351 /* R u n n e r */
352 /******************************************************************************/
353 
355 {
356  XrdCmsBaseFR *rP;
357  int inQ;
358 
359 // Process requests at the given rate
360 //
361 do{theQ.rqAvail.Wait();
362  theQ.Mutex.Lock(); inQ = 1;
363  while((rP = theQ.rqFirst))
364  {if (!(theQ.rqFirst = rP->Next)) {theQ.rqLast = 0; inQ = 0;}
365  theQ.qNum--;
366  theQ.Mutex.UnLock();
367  Xeq(rP); delete rP;
368  if (!inQ) break;
369  theQ.Mutex.Lock();
370  }
371  if (inQ) theQ.Mutex.UnLock();
372  } while(1);
373 }
374 
375 /******************************************************************************/
376 /* S t a r t */
377 /******************************************************************************/
378 
380 {
381  EPNAME("Start");
382  void *Me = (void *)this;
383  pthread_t tid;
384 
385 // Issue some debugging here so we know how we are starting up
386 //
387  DEBUG("Srv=" <<int(Server) <<" dfs=" <<int(dfsSys) <<" lcl=" <<int(lclStat)
388  <<" Pre=" <<int(preSel) <<" dmLife=" <<dmLife <<' ' <<dpLife);
389  DEBUG("Lim=" <<theQ.rLimit <<' ' <<theQ.rAgain <<" fix=" <<int(Fixed)
390  <<" Qmax=" <<theQ.qMax);
391 
392 // Set the passthru option if we can't do this locally and have no limit
393 //
394  Punt = (!theQ.rLimit && !lclStat);
395 
396 // If we need to throttle we will need two threads for the queue. The first is
397 // the pacer thread that feeds the runner thread at a fixed rate.
398 //
399  if (theQ.rLimit)
400  {if (XrdSysThread::Run(&tid, XrdCmsBasePacer, Me, 0, "fsQ pacer")
401  || XrdSysThread::Run(&tid, XrdCmsBaseRunner, Me, 0, "fsQ runner"))
402  {Say.Emsg("cmsd", errno, "start baseFS queue handler");
403  theQ.rLimit = 0;
404  }
405  }
406 }
407 
408 /******************************************************************************/
409 /* Pricate: X e q */
410 /******************************************************************************/
411 
412 void XrdCmsBaseFS::Xeq(XrdCmsBaseFR *rP)
413 {
414  int rc;
415 
416 // If we are not doing local stat calls, callback indicating a forward is needed
417 //
418  if (!lclStat)
419  {if (cBack) (*cBack)(rP, 0);
420  return;
421  }
422 
423 // Check if we can avoid doing a stat()
424 //
425  if (dmLife && rP->PDirLen > 0 && !hasDir(rP->Path, rP->PDirLen))
426  {if (cBack) (*cBack)(rP, -1);
427  return;
428  }
429 
430 // If we have exceeded the queue limit and this is a meta-manager request
431 // then just deep-six it. Local requests must complete
432 //
433  if (theQ.qNum > theQ.qMax)
434  {Say.Emsg("Xeq", "Queue limit exceeded; ignoring lkup for", rP->Path);
435  return;
436  }
437 
438 // Perform a local stat() and if we don't have the file
439 //
440  rc = Exists(rP->Path, rP->PDirLen);
441  if (cBack) (*cBack)(rP, rc);
442 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
void * XrdCmsBaseRunner(void *carg)
Definition: XrdCmsBaseFS.cc:63
void * XrdCmsBasePacer(void *carg)
Definition: XrdCmsBaseFS.cc:58
#define XRDOSS_resonly
Definition: XrdOss.hh:486
#define XRDOSS_updtatm
Definition: XrdOss.hh:487
@ Hash_keepdata
Definition: XrdOucHash.hh:57
int stat(const char *path, struct stat *buf)
bool Exists
XrdOucString Path
bool Force
#define XRDSFS_POSCPEND
Definition: XrdSfsFlags.hh:89
XrdCmsBaseFR * Next
Definition: XrdCmsBaseFS.hh:54
int Exists(XrdCmsRRData &Arg, XrdCmsPInfo &Who, int noLim=0)
void Init(int Opts, int DMlife, int DPLife)
XrdOss * ossFS
SMask_t rovec
Definition: XrdCmsPList.hh:47
int Exists(char *path)
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
void Reset()
Definition: XrdSysTimer.hh:61
unsigned long Report(double &)
Definition: XrdSysTimer.cc:100
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
XrdSysError Say
XrdCmsPrepare PrepQ
XrdCmsConfig Config
int Opts
Definition: XrdMpxStats.cc:58