XRootD
XrdOfsEvs.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O f s E v s . c c */
4 /* */
5 /* (c) 2005 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /* Based on code developed by Derek Feichtinger, CERN. */
30 /******************************************************************************/
31 
32 #include <cctype>
33 #include <cstdarg>
34 #include <stddef.h>
35 #include <cstdlib>
36 #include <cstdio>
37 #include <cstring>
38 #include <sys/stat.h>
39 
40 #include "XrdOfs/XrdOfsEvs.hh"
41 #include "XrdSys/XrdSysError.hh"
42 #include "XrdOuc/XrdOucProg.hh"
43 #include "XrdOuc/XrdOucStream.hh"
44 #include "XrdNet/XrdNetOpts.hh"
45 #include "XrdNet/XrdNetSocket.hh"
46 #include "XrdSys/XrdSysPlatform.hh"
47 
48 /******************************************************************************/
49 /* L o c a l C l a s s e s */
50 /******************************************************************************/
51 
53 {
54 public:
55 
57 char *text;
58 int tlen;
59 int isBig;
60 
61  XrdOfsEvsMsg(char *tval=0, int big=0)
62  {text = tval; tlen=0; isBig = big; next=0;}
63 
64  ~XrdOfsEvsMsg() {if (text) free(text);}
65 };
66 
67 /******************************************************************************/
68 /* E x t e r n a l L i n k a g e s */
69 /******************************************************************************/
70 
71 void *XrdOfsEvsSend(void *pp)
72 {
73  XrdOfsEvs *evs = (XrdOfsEvs *)pp;
74  evs->sendEvents();
75  return (void *)0;
76 }
77 
78 /******************************************************************************/
79 /* S t a t i c D e f i n i t i o n s */
80 /******************************************************************************/
81 
82 XrdOfsEvsFormat XrdOfsEvs::MsgFmt[XrdOfsEvs::nCount];
83 
84 const int XrdOfsEvs::minMsgSize;
85 const int XrdOfsEvs::maxMsgSize;
86 
87 /******************************************************************************/
88 /* X r d E v s F o r m a t : : D e f */
89 /******************************************************************************/
90 
91 void XrdOfsEvsFormat::Def(evFlags theFlags, const char *Fmt, ...)
92 {
93  va_list ap;
94  int theVal, i = 0;
95 
96 // Return if already defined
97 //
98  if (Format) return;
99 
100 // Set flags and format. Prepare the arg vector
101 //
102  Flags = theFlags;
103  Format = Fmt;
104  memset(Args, 0, sizeof(Args));
105 
106 // Pick up all arguments
107 //
108  va_start(ap, Fmt);
109  while((theVal = va_arg(ap, int)) >= 0)
110  Args[i++] = static_cast<XrdOfsEvsInfo::evArg>(theVal);
111  va_end(ap);
112 }
113 
114 /******************************************************************************/
115 /* C o n s t r u c t o r */
116 /******************************************************************************/
117 
118 XrdOfsEvs::XrdOfsEvs(Event theEvents, const char *Target, int minq, int maxq)
119 {
120 
121 // Set common variables
122 //
123  enEvents = static_cast<Event>(theEvents & enMask);
124  endIT = 0;
125  theTarget = strdup(Target);
126  eDest = 0;
127  theProg = 0;
128  maxMin = minq; maxMax = maxq;
129  msgFirst = msgLast = msgFreeMax = msgFreeMin = 0;
130  numMax = numMin = 0;
131  tid = 0;
132  msgFD = -1;
133 
134 // Initialize all static format entries that have not been initialized yet.
135 // Note that format may be specified prior to this object being created!
136 //
137 // <tid> chmod <mode> <path>
138 //
139  MsgFmt[Chmod & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s chmod %s %s\n",
142 // <tid> closer <path>
143 //
144  MsgFmt[Closer & Mask].Def(XrdOfsEvsFormat::Null, "%s closer %s\n",
146 
147 // <tid> closew <path>
148 //
149  MsgFmt[Closew & Mask].Def(XrdOfsEvsFormat::Null, "%s closew %s\n",
151 
152 // <tid> create <mode> <path>
153 //
154  MsgFmt[Create & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s create %s %s\n",
157 // <tid> mkdir <mode> <path>
158 //
159  MsgFmt[Mkdir & Mask].Def(XrdOfsEvsFormat::cvtMode, "%s mkdir %s %s\n",
162 // <tid> mv <path> <path>
163 //
164  MsgFmt[Mv & Mask].Def(XrdOfsEvsFormat::Null, "%s mv %s %s\n",
167 // <tid> openr <path>
168 //
169  MsgFmt[Openr & Mask].Def(XrdOfsEvsFormat::Null, "%s openr %s\n",
171 
172 // <tid> openw <path>
173 //
174  MsgFmt[Openw & Mask].Def(XrdOfsEvsFormat::Null, "%s openw %s\n",
176 
177 // <tid> rm <path>
178 //
179  MsgFmt[Rm & Mask].Def(XrdOfsEvsFormat::Null, "%s rm %s\n",
181 
182 // <tid> rmdir <path>
183 //
184  MsgFmt[Rmdir & Mask].Def(XrdOfsEvsFormat::Null, "%s rmdir %s\n",
186 
187 // <tid> trunc <size>
188 //
189  MsgFmt[Trunc & Mask].Def(XrdOfsEvsFormat::cvtFSize,"%s trunc %s\n",
191 
192 // <tid> fwrite <path>
193 //
194  MsgFmt[Fwrite & Mask].Def(XrdOfsEvsFormat::Null, "%s fwrite %s\n",
196 }
197 
198 /******************************************************************************/
199 /* D e s t r u c t o r */
200 /******************************************************************************/
201 
203 {
204  XrdOfsEvsMsg *tp;
205 
206 // Kill the notification thread. This may cause a msg block to be orphaned
207 // but, in practice, this object does not really get deleted after being
208 // started. So, the problem is moot.
209 //
210  endIT = 1;
211  if (tid) XrdSysThread::Kill(tid);
212 
213 // Release all queued message bocks
214 //
215  qMut.Lock();
216  while ((tp = msgFirst)) {msgFirst = tp->next; delete tp;}
217  if (theTarget) free(theTarget);
218  if (msgFD >= 0)close(msgFD);
219  if (theProg) delete theProg;
220  qMut.UnLock();
221 
222 // Release all free message blocks
223 //
224  fMut.Lock();
225  while ((tp = msgFreeMax)) {msgFreeMax = tp->next; delete tp;}
226  while ((tp = msgFreeMin)) {msgFreeMin = tp->next; delete tp;}
227  fMut.UnLock();
228 }
229 
230 /******************************************************************************/
231 /* N o t i f y */
232 /******************************************************************************/
233 
235 {
236  static int warnings = 0;
237  XrdOfsEvsFormat *fP;
238  XrdOfsEvsMsg *tp;
239  char modebuff[8], sizebuff[16];
240  int eNum, isBig = (eID & Mv), msgSize = (isBig ? maxMsgSize : minMsgSize);
241 
242 // Validate event number and set event name
243 //
244  eNum = eID & Mask;
245  if (eNum < 0 || eNum >= nCount) return;
246 
247 // Check if we need to do any conversions
248 //
249  fP = &MsgFmt[eNum];
251  {sprintf(modebuff, "%o", static_cast<int>((Info.FMode() & S_IAMB)));
252  Info.Set(XrdOfsEvsInfo::evFMODE, modebuff);
253  } else Info.Set(XrdOfsEvsInfo::evFMODE, "$FMODE");
255  {sprintf(sizebuff, "%lld", Info.FSize());
256  Info.Set(XrdOfsEvsInfo::evFSIZE, sizebuff);
257  } else Info.Set(XrdOfsEvsInfo::evFSIZE, "$FSIZE");
258 
259 // Get a message block
260 //
261  if (!(tp = getMsg(isBig)))
262  {if ((++warnings & 0xff) == 1)
263  {eDest->Emsg("Notify", "Ran out of message objects;", eName(eNum),
264  "event notification not sent.");
265  }
266  return;
267  }
268 
269 // Format the message
270 //
271  tp->tlen = fP->SNP(Info, tp->text, msgSize);
272 
273 // Put the message on the queue and return
274 //
275  tp->next = 0;
276  qMut.Lock();
277  if (msgLast) {msgLast->next = tp; msgLast = tp;}
278  else msgFirst = msgLast = tp;
279  qMut.UnLock();
280  qSem.Post();
281 }
282 
283 /******************************************************************************/
284 /* P a r s e */
285 /******************************************************************************/
286 
287 int XrdOfsEvs::Parse(XrdSysError &Eroute, XrdOfsEvs::Event eNum, char *mText)
288 {
289  static struct valVar {const char *vname;
292  Vars[] = {
302  };
303  int numvars = sizeof(Vars)/sizeof(struct valVar);
304  char parms[1024], *pP = parms;
305  char *pE = parms+sizeof(parms)-((XrdOfsEvsInfo::evARGS*2)-8);
306  char varbuff[16], *bVar, *eVar;
307  int i, j, aNum = 0, Args[XrdOfsEvsInfo::evARGS] = {0};
309 
310 // Parse the text
311 //
312  parms[0] = '\0';
313  while(*mText && pP < pE)
314  {if (*mText == '\\' && *(mText+1) == '$')
315  {*pP++ = '$'; mText += 2; continue;}
316  else if (*mText != '$') {*pP++ = *mText++; continue;}
317  bVar = mText+1;
318  if (*mText == '{') {eVar = index(mText, '}'); j = 1;}
319  else if (*mText == '[') {eVar = index(mText, ']'); j = 1;}
320  else {eVar = bVar; while(isalpha(*eVar)) eVar++; j = 0;}
321  i = eVar - bVar;
322  if (i < 1 || i >= (int)sizeof(varbuff))
323  {Eroute.Emsg("Parse","Invalid notifymsg variable starting at",mText);
324  return 1;
325  }
326  strncpy(varbuff, bVar, i); varbuff[i] = '\0';
327  for (i = 0; i < numvars; i++)
328  if (!strcmp(varbuff, Vars[i].vname)) break;
329  if (i >= numvars)
330  {Eroute.Emsg("Parse", "Unknown notifymsg variable -",varbuff);
331  return 1;
332  }
333  if (aNum >= XrdOfsEvsInfo::evARGS)
334  {Eroute.Say("Parse", "Too many notifymsg variables"); return 1;}
335  strcpy(pP, "%s"); pP += 2;
336  Args[aNum++] = Vars[i].vnum;
337  ArgOpts = static_cast<XrdOfsEvsFormat::evFlags>(ArgOpts|Vars[i].vopt);
338  mText = eVar+j;
339  }
340 
341 // Check if we overran the buffer or didn't have any text
342 //
343  if (pP >= pE)
344  {Eroute.Emsg("Parse","notifymsg text too long");return 1;}
345  if (!parms[0])
346  {Eroute.Emsg("Parse","notifymsg text not specified");return 1;}
347 
348 // Set the format
349 //
350  strcpy(pP, "\n");
351  eNum = static_cast<Event>(eNum & Mask);
352  MsgFmt[eNum].Set(ArgOpts, strdup(parms), Args);
353 
354 // All done
355 //
356  return 0;
357 }
358 
359 /******************************************************************************/
360 /* s e n d E v e n t s */
361 /******************************************************************************/
362 
364 {
365  XrdOfsEvsMsg *tp;
366  const char *theData[2] = {0,0};
367  int theDlen[2] = {0,0};
368 
369 // This is an endless loop that just gets things off the event queue and
370 // send them out. This allows us to only hang a simgle thread should the
371 // receiver get blocked, instead of the whole process.
372 //
373  while(1)
374  {qSem.Wait();
375  qMut.Lock();
376  if (endIT) break;
377  if ((tp = msgFirst) && !(msgFirst = tp->next)) msgLast = 0;
378  qMut.UnLock();
379  if (tp)
380  {if (!theProg) Feed(tp->text, tp->tlen);
381  else {theData[0] = tp->text; theDlen[0] = tp->tlen;
382  theProg->Feed(theData, theDlen);
383  }
384  retMsg(tp);
385  }
386  }
387  qMut.UnLock();
388 }
389 
390 /******************************************************************************/
391 /* S t a r t */
392 /******************************************************************************/
393 
395 {
396  int rc;
397 
398 // Set the error object pointer
399 //
400  eDest = eobj;
401 
402 // Check if we need to create a socket to a path
403 //
404  if (*theTarget == '>')
405  {XrdNetSocket *msgSock;
406  if (!(msgSock = XrdNetSocket::Create(eobj,theTarget+1,0,0660,XRDNET_FIFO)))
407  return -1;
408  msgFD = msgSock->Detach();
409  delete msgSock;
410 
411  } else {
412 
413  // Allocate a new program object if we don't have one
414  //
415  if (theProg) return 0;
416  theProg = new XrdOucProg(eobj);
417 
418  // Setup the program
419  //
420  if (theProg->Setup(theTarget, eobj)) return -1;
421  if ((rc = theProg->Start()))
422  {eobj->Emsg("Evs", rc, "start event collector"); return -1;}
423  }
424 
425 // Now start a thread to get messages and send them to the collector
426 //
427  if ((rc = XrdSysThread::Run(&tid, XrdOfsEvsSend, static_cast<void *>(this),
428  0, "Event notification sender")))
429  {eobj->Emsg("Evs", rc, "create event notification thread");
430  return -1;
431  }
432 
433 // All done
434 //
435  return 0;
436 }
437 
438 /******************************************************************************/
439 /* P r i v a t e M e t h o d s */
440 /******************************************************************************/
441 /******************************************************************************/
442 /* e N a m e */
443 /******************************************************************************/
444 
445 const char *XrdOfsEvs::eName(int eNum)
446 {
447  static const char *eventName[] = {"Chmod", "closer", "closew", "create",
448  "fwrite", "mkdir", "mv", "openr",
449  "opnw", "rm", "rmdir", "trunc"};
450 
451  eNum = (eNum & Mask);
452  return (eNum < 0 || eNum >= nCount ? "?" : eventName[eNum]);
453 }
454 
455 /******************************************************************************/
456 /* F e e d */
457 /******************************************************************************/
458 
459 int XrdOfsEvs::Feed(const char *data, int dlen)
460 {
461  int retc;
462 
463 // Write the data. ince this is a udp socket all the data goes or none does
464 //
465  do { retc = write(msgFD, (const void *)data, (size_t)dlen);}
466  while (retc < 0 && errno == EINTR);
467  if (retc < 0)
468  {eDest->Emsg("EvsFeed", errno, "write to event socket", theTarget);
469  return -1;
470  }
471 
472 // All done
473 //
474  return 0;
475 }
476 
477 /******************************************************************************/
478 /* g e t M s g */
479 /******************************************************************************/
480 
481 XrdOfsEvsMsg *XrdOfsEvs::getMsg(int bigmsg)
482 {
483  XrdOfsEvsMsg *tp;
484  int msz = 0;
485 
486 // Lock the free queue
487 //
488  fMut.Lock();
489 
490 // Get a free element from the big or small queue, as needed
491 //
492  if (bigmsg)
493  if ((tp = msgFreeMax)) msgFreeMax = tp->next;
494  else msz = maxMsgSize;
495  else if ((tp = msgFreeMin)) msgFreeMin = tp->next;
496  else msz = minMsgSize;
497 
498 // Check if we have to allocate a new item
499 //
500  if (!tp && (numMax + numMin) < (maxMax + maxMin))
501  {if ((tp = new XrdOfsEvsMsg((char *)malloc(msz), bigmsg)))
502  {if (!(tp->text)) {delete tp; tp = 0;}
503  else if (bigmsg) numMax++;
504  else numMin++;
505  }
506  }
507 
508 // Unlock and return result
509 //
510  fMut.UnLock();
511  return tp;
512 }
513 
514 /******************************************************************************/
515 /* r e t M s g */
516 /******************************************************************************/
517 
518 void XrdOfsEvs::retMsg(XrdOfsEvsMsg *tp)
519 {
520 
521 // Lock the free queue
522 //
523  fMut.Lock();
524 
525 // Check if we exceeded the hold quotax
526 //
527  if (tp->isBig)
528  if (numMax > maxMax) {delete tp; numMax--;}
529  else {tp->next = msgFreeMax; msgFreeMax = tp;}
530  else
531  if (numMin > maxMin) {delete tp; numMin--;}
532  else {tp->next = msgFreeMin; msgFreeMin = tp;}
533 
534 // Unlock and return
535 //
536  fMut.UnLock();
537 }
#define S_IAMB
Definition: XrdConfig.cc:159
#define XRDNET_FIFO
Definition: XrdNetOpts.hh:83
void * XrdOfsEvsSend(void *pp)
Definition: XrdOfsEvs.cc:71
@ Info
ssize_t write(int fildes, const void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:43
static XrdNetSocket * Create(XrdSysError *Say, const char *path, const char *fn, mode_t mode, int isudp=0)
int SNP(XrdOfsEvsInfo &Info, char *buff, int blen)
Definition: XrdOfsEvs.hh:97
XrdOfsEvsInfo::evArg Args[XrdOfsEvsInfo::evARGS]
Definition: XrdOfsEvs.hh:95
void Def(evFlags theFlags, const char *Fmt,...)
Definition: XrdOfsEvs.cc:91
const char * Format
Definition: XrdOfsEvs.hh:93
evFlags Flags
Definition: XrdOfsEvs.hh:94
void Set(evFlags theFlags, const char *Fmt, int *fullArgs)
Definition: XrdOfsEvs.hh:106
XrdOfsEvsMsg * next
Definition: XrdOfsEvs.cc:56
XrdOfsEvsMsg(char *tval=0, int big=0)
Definition: XrdOfsEvs.cc:61
char * text
Definition: XrdOfsEvs.cc:57
static int Parse(XrdSysError &Eroute, Event eNum, char *mText)
Definition: XrdOfsEvs.cc:287
XrdOfsEvs(Event theEvents, const char *Target, int minq=90, int maxq=10)
Definition: XrdOfsEvs.cc:118
static const int maxMsgSize
Definition: XrdOfsEvs.hh:137
void sendEvents(void)
Definition: XrdOfsEvs.cc:363
int Start(XrdSysError *eobj)
Definition: XrdOfsEvs.cc:394
static const int minMsgSize
Definition: XrdOfsEvs.hh:136
void Notify(Event eNum, XrdOfsEvsInfo &Info)
Definition: XrdOfsEvs.cc:234
int Start(void)
Definition: XrdOucProg.cc:349
int Feed(const char *data[], const int dlen[])
Definition: XrdOucProg.cc:63
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
Definition: XrdOucProg.cc:296
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static int Kill(pthread_t tid)