34 #include <sys/types.h>
72 int XrdCmsBaseFS::Bypass()
78 if (!theQ.rLimit)
return 1;
88 if (!theQ.rLeft && !theQ.pqFirst)
89 {
unsigned long Interval = 0;
92 {theQ.rLeft = theQ.rAgain;
94 std::cerr <<
"BYPASS " <<Interval <<
"ms left=" <<theQ.rLeft <<std::endl;
101 {theQ.rLeft--; theQ.Mutex.UnLock();
123 {aOK = (!theQ.rLimit || noLim || (!Fixed && Bypass()));
132 {
for (fnPos=Arg.
PathLen-2;fnPos >= 0 && Arg.
Path[fnPos] !=
'/';fnPos--) {}
133 if (fnPos > 0 && !hasDir(Arg.
Path, fnPos))
return -1;
140 if (!theQ.rLimit || noLim || (Fixed && Bypass()))
145 if (Who.
rovec) Queue(Arg, Who, fnPos, 1);
154 static struct dMoP dirMiss = {0}, dirPres = {1};
155 static int badDStat = 0;
156 static int badFStat = 0;
164 if (fnPos < 0 && dmLife)
165 {
for (fnPos = -(fnPos+1); fnPos >= 0 &&
Path[fnPos] !=
'/'; fnPos--) {}
166 if (fnPos > 0 && !hasDir(
Path, fnPos))
return -1;
172 {
if ((buf.st_mode & S_IFMT) == S_IFREG)
188 if (fnPos > 0 && dmLife)
189 {
struct dMoP *xVal = &dirMiss;
193 {xLife = dpLife; xVal = &dirPres;}
194 if (dRC && dRC != -ENOENT)
195 {fsMutex.Lock(); eCnt = badDStat++; fsMutex.UnLock();
198 snprintf(buff,
sizeof(buff),
"to stat dir (events=%d)", eCnt+1);
206 DEBUG(
"add " <<xLife <<(xVal->Present ?
" okdir ":
" nodir ") <<
Path);
210 if (fRC && fRC != -ENOENT)
211 {fsMutex.Lock(); eCnt = badFStat++; fsMutex.UnLock();
214 snprintf(buff,
sizeof(buff),
"to stat file (events=%d)", eCnt+1);
226 int XrdCmsBaseFS::hasDir(
char *
Path,
int fnPos)
235 Have = ((dP = fsDirMP.Find(
Path)) ? dP->Present : 1);
251 dpLife = DPLife ? DPLife : DMLife * 10;
252 Server = (
Opts & Servr) != 0;
253 lclStat = (
Opts & Cntrl) != 0 || Server;
254 preSel = (
Opts & Immed) == 0;
255 dfsSys = (
Opts & DFSys) != 0;
267 if (rLim < 0) {theQ.rAgain=theQ.rLeft = -1; rLim = -rLim; Fixed = 1;}
268 else {theQ.rAgain = theQ.rLeft = (rLim > 1 ? rLim/2 : 1); Fixed = 0;}
269 theQ.rLimit = (rLim <= 1000 ? rLim : 0);
270 if (Qmax > 0) theQ.qMax = Qmax;
271 else if (!(theQ.qMax = theQ.rLimit*2 + theQ.rLimit/2)) theQ.qMax = 1;
281 int inQ, rqRate = 1000/theQ.rLimit;
285 do{theQ.pqAvail.Wait();
286 theQ.Mutex.Lock(); inQ = 1;
287 while((rP = theQ.pqFirst))
288 {
if (!(theQ.pqFirst = rP->
Next)) {theQ.pqLast = 0; inQ = 0;}
291 {
delete rP;
continue;}
293 if (theQ.rqFirst) {theQ.rqLast->
Next = rP; theQ.rqLast = rP;}
294 else {theQ.rqFirst = theQ.rqLast = rP; theQ.rqAvail.Post();}
300 if (inQ) theQ.Mutex.UnLock();
309 int fnpos,
int Force)
312 static int noMsg = 1;
327 DEBUG(
"inq " <<theQ.qNum <<
" pace " <<Arg.
Path);
333 n = ++theQ.qNum; prevHWM = theQ.qHWM;
334 if ((Msg = (n > prevHWM))) theQ.qHWM = n;
335 if (theQ.pqFirst) {theQ.pqLast->Next = rP; theQ.pqLast = rP;}
336 else {theQ.pqFirst = theQ.pqLast = rP; theQ.pqAvail.Post();}
341 if (n > theQ.qMax && Msg && (n-prevHWM > 3 || noMsg))
342 {
int Pct = n/theQ.qMax;
345 sprintf(Buff,
"Queue overrun %d%%; %d requests now queued.", Pct, n);
361 do{theQ.rqAvail.Wait();
362 theQ.Mutex.Lock(); inQ = 1;
363 while((rP = theQ.rqFirst))
364 {
if (!(theQ.rqFirst = rP->
Next)) {theQ.rqLast = 0; inQ = 0;}
371 if (inQ) theQ.Mutex.UnLock();
382 void *Me = (
void *)
this;
387 DEBUG(
"Srv=" <<
int(Server) <<
" dfs=" <<
int(dfsSys) <<
" lcl=" <<
int(lclStat)
388 <<
" Pre=" <<
int(preSel) <<
" dmLife=" <<dmLife <<
' ' <<dpLife);
389 DEBUG(
"Lim=" <<theQ.rLimit <<
' ' <<theQ.rAgain <<
" fix=" <<
int(Fixed)
390 <<
" Qmax=" <<theQ.qMax);
394 Punt = (!theQ.rLimit && !lclStat);
402 {
Say.
Emsg(
"cmsd", errno,
"start baseFS queue handler");
419 {
if (cBack) (*cBack)(rP, 0);
426 {
if (cBack) (*cBack)(rP, -1);
433 if (theQ.qNum > theQ.qMax)
434 {
Say.
Emsg(
"Xeq",
"Queue limit exceeded; ignoring lkup for", rP->
Path);
441 if (cBack) (*cBack)(rP, rc);
void * XrdCmsBaseRunner(void *carg)
void * XrdCmsBasePacer(void *carg)
int stat(const char *path, struct stat *buf)
int Exists(XrdCmsRRData &Arg, XrdCmsPInfo &Who, int noLim=0)
void Init(int Opts, int DMlife, int DPLife)
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
unsigned long Report(double &)
static void Wait(int milliseconds)