92 int expWait, expClock;
96 if ((expWait = maxLife/4) == 0) expWait = 60;
102 if ((ntp = deferQ)) deferQ = 0;
110 {Events.Del(tp->
Path);
114 if ((ntp = deferQ)) deferQ = 0;
144 if (!(p = getenv(
"XRDADMINPATH")) || !*p)
145 {eobj->
Emsg(
"Events",
"XRDADMINPATH not defined");
148 strcpy(path, p); n = strlen(p);
149 if (path[n-1] !=
'/') {path[n] =
'/'; n++;}
150 strcpy(&path[n],
"ofsEvents");
157 msgFD = msgSock->
Detach();
179 0,
"Event receiver")))
180 {eDest->
Emsg(
"Evr", rc,
"create event reader thread");
187 0,
"Event flusher")))
188 {eDest->
Emsg(
"Evr", rc,
"create event flush thread");
213 while((lp = eventFIFO.
GetLine()))
215 if ((tp = eventFIFO.
GetToken()) && *tp)
216 {
if (!strcmp(tp,
"stage")) eventStage();
217 else eDest->
Emsg(
"Evr",
"Unknown event name -", tp);
248 if (!(anEvent = Events.Find(Client->
Path)))
249 Events.Add(Client->
Path,
new theEvent(0, 0, Client), maxLife);
250 else {aClient = anEvent->
aClient;
256 aClient = aClient->
Next;
261 if (anEvent->
Happened) sendEvent(anEvent);
267 if (aClient)
delete Client;
279 void XrdOfsEvr::eventStage()
282 char *tp, *
eMsg, *altMsg = 0;
283 struct theEvent *anEvent;
288 {
eDest->
Emsg(
"Evr",
"Missing stage event status");
return;}
290 if (!strcmp(tp,
"OK")) {rc = 0;
293 else if (!strcmp(tp,
"ENOENT")) {rc = ENOENT;
294 altMsg = (
char *)
"file does not exist.";
296 else if (!strcmp(tp,
"BAD")) {rc = -1;
298 altMsg = (
char *)
"Dynamic staging failed.";
301 eDest->
Emsg(
"Evr",
"Invalid stage event status -", tp);
302 altMsg = (
char *)
"Dynamic staging malfunctioned.";
309 {
eDest->
Emsg(
"Evr",
"Missing stage event path");
return;}
313 }
else eMsg = altMsg;
319 {
if (rc == 0) Balancer->
Added(tp);
326 if (!(anEvent = Events.Find(tp)))
327 Events.Add(tp,
new theEvent(rc,
eMsg), maxLife);
328 else {
if (anEvent->finalRC == 0)
329 {anEvent->finalRC = rc;
330 if (
eMsg) anEvent->finalMsg = strdup(
eMsg);
331 anEvent->Happened = 1;
333 if (anEvent->aClient) sendEvent(anEvent);
342 void XrdOfsEvr::sendEvent(theEvent *ep)
352 while((cp = ep->aClient))
354 einfo->
setErrInfo(ep->finalRC, (ep->finalMsg ? ep->finalMsg :
""));
355 cp->evtCB->Done(Result, einfo);
356 ep->aClient = cp->Next;
357 if (doDel)
delete cp;
358 else {cp->Next = deferQ; deferQ = cp; doDel = 1;}
363 if (!runQ) {runQ = 1; mySem.
Post();}
static XrdSysError eDest(0,"crypto_")
void * XrdOfsEvFlush(void *pp)
int XrdOfsScrubScan(const char *key, XrdOfsEvr::theEvent *cip, void *xargp)
void * XrdOfsEvRecv(void *pp)
virtual void Added(const char *path, int Pend=0)
virtual void Removed(const char *path)
static XrdNetSocket * Create(XrdSysError *Say, const char *path, const char *fn, mode_t mode, int isudp=0)
unsigned long long evtCBarg
void Work4Event(theClient *Client)
int Init(XrdSysError *eObj)
void Wait4Event(const char *path, XrdOucErrInfo *einfo)
struct XrdOfsStats::StatsData Data
virtual int Same(unsigned long long arg1, unsigned long long arg2)=0
static int Export(const char *Var, const char *Val)
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
int setErrInfo(int code, const char *emsg)
int Attach(int FileDescriptor, int bsz=2047)
char * GetToken(int lowcase=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)