XRootD
XrdOfsEvr.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O f s E v r . c c */
4 /* */
5 /* (c) 2006 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdlib>
32 #include <cstdio>
33 #include <cstring>
34 
35 #include "XrdCms/XrdCmsClient.hh"
36 #include "XrdOfs/XrdOfsEvr.hh"
37 #include "XrdOfs/XrdOfsStats.hh"
38 #include "XrdOfs/XrdOfsTrace.hh"
39 #include "XrdSys/XrdSysError.hh"
40 #include "XrdSys/XrdSysTimer.hh"
41 #include "XrdOuc/XrdOucEnv.hh"
42 #include "XrdNet/XrdNetOpts.hh"
43 #include "XrdNet/XrdNetSocket.hh"
44 #include "XrdSys/XrdSysHeaders.hh"
45 
46 /******************************************************************************/
47 /* E x t e r n a l L i n k a g e s */
48 /******************************************************************************/
49 
50 extern XrdOfsStats OfsStats;
51 
52 extern XrdSysTrace OfsTrace;
53 
54 void *XrdOfsEvRecv(void *pp)
55 {
56  XrdOfsEvr *evr = (XrdOfsEvr *)pp;
57  evr->recvEvents();
58  return (void *)0;
59 }
60 
61 void *XrdOfsEvFlush(void *pp)
62 {
63  XrdOfsEvr *evr = (XrdOfsEvr *)pp;
64  evr->flushEvents();
65  return (void *)0;
66 }
67 
68 int XrdOfsScrubScan(const char *key, XrdOfsEvr::theEvent *cip, void *xargp)
69  {return 0;}
70 
71 /******************************************************************************/
72 /* D e s t r u c t o r */
73 /******************************************************************************/
74 
76 {
77 
78 // Close the FIFO. This will cause the reader to exit
79 //
80  myMutex.Lock();
81  eventFIFO.Close();
82  myMutex.UnLock();
83 }
84 
85 /******************************************************************************/
86 /* f l u s h E v e n t s */
87 /******************************************************************************/
88 
90 {
91  theClient *tp, *ntp;
92  int expWait, expClock;
93 
94 // Compute the hash flush interval
95 //
96  if ((expWait = maxLife/4) == 0) expWait = 60;
97  expClock = expWait;
98 
99 // We wait for the right period of time, unless there is a deferred event
100 //
101  do {myMutex.Lock();
102  if ((ntp = deferQ)) deferQ = 0;
103  else runQ = 0;
104  myMutex.UnLock();
105  while(ntp)
106  {XrdSysTimer::Wait(1000*60);
107  expClock -= 60;
108  myMutex.Lock();
109  while((tp = ntp))
110  {Events.Del(tp->Path);
111  ntp = tp->Next;
112  delete tp;
113  }
114  if ((ntp = deferQ)) deferQ = 0;
115  else runQ = 0;
116  myMutex.UnLock();
117  if (expClock <= 0)
118  {myMutex.Lock();
119  Events.Apply(XrdOfsScrubScan, (void *)0);
120  myMutex.UnLock();
121  expClock = expWait;
122  }
123  }
124  mySem.Wait();
125  } while(1);
126 }
127 
128 /******************************************************************************/
129 /* I n i t */
130 /******************************************************************************/
131 
132 int XrdOfsEvr::Init(XrdSysError *eobj) // Must be called 1st!
133 {
134  XrdNetSocket *msgSock;
135  char *p, path[2048];
136  int n;
137 
138 // Set he error object (need to do only once)
139 //
140  eDest = eobj;
141 
142 // Create path to the pipe we will creat
143 //
144  if (!(p = getenv("XRDADMINPATH")) || !*p)
145  {eobj->Emsg("Events", "XRDADMINPATH not defined");
146  return 0;
147  }
148  strcpy(path, p); n = strlen(p);
149  if (path[n-1] != '/') {path[n] = '/'; n++;}
150  strcpy(&path[n], "ofsEvents");
151  XrdOucEnv::Export("XRDOFSEVENTS", path);
152 
153 // Now create a socket to a path
154 //
155  if (!(msgSock = XrdNetSocket::Create(eobj,path,0,0660,XRDNET_FIFO)))
156  return 0;
157  msgFD = msgSock->Detach();
158  delete msgSock;
159 
160 // We succeeded and are now ready for the call to he second stage below
161 //
162  return 1;
163 }
164 
165 /******************************************************************************/
166 
168 {
169  pthread_t tid;
170  int rc;
171 
172 // Set the balancer pointers (err object set in 1st phase Init).
173 //
174  Balancer = trgp;
175 
176 // Now start a thread to get incoming messages
177 //
178  if ((rc = XrdSysThread::Run(&tid, XrdOfsEvRecv, static_cast<void *>(this),
179  0, "Event receiver")))
180  {eDest->Emsg("Evr", rc, "create event reader thread");
181  return 0;
182  }
183 
184 // Now start a thread to flush posted events
185 //
186  if ((rc = XrdSysThread::Run(&tid, XrdOfsEvFlush,static_cast<void *>(this),
187  0, "Event flusher")))
188  {eDest->Emsg("Evr", rc, "create event flush thread");
189  return 0;
190  }
191 
192 // All done
193 //
194  return 1;
195 }
196 
197 /******************************************************************************/
198 /* r e c v E v e n t s */
199 /******************************************************************************/
200 
202 {
203  EPNAME("recvEvent");
204  const char *tident = 0;
205  char *lp,*tp;
206 
207 // Attach the fifo FD to the stream
208 //
209  eventFIFO.Attach(msgFD);
210 
211 // Now just start reading the events until the FD is closed
212 //
213  while((lp = eventFIFO.GetLine()))
214  {DEBUG("-->" <<lp);
215  if ((tp = eventFIFO.GetToken()) && *tp)
216  {if (!strcmp(tp, "stage")) eventStage();
217  else eDest->Emsg("Evr", "Unknown event name -", tp);
218  }
219  }
220 }
221 
222 /******************************************************************************/
223 /* W a i t 4 E v e n t */
224 /******************************************************************************/
225 
226 void XrdOfsEvr::Wait4Event(const char *path, XrdOucErrInfo *einfo)
227 {
228 
229 // Replace original callback with our callback so we can queue this event
230 // after the wait request has been sent to the client. This avoids a race
231 // where the client might get the resume signal before the wait request.
232 //
233  einfo->setErrCB((XrdOucEICB *)new theClient(this, einfo, path));
234 }
235 
236 /******************************************************************************/
237 /* W o r k 4 E v e n t */
238 /******************************************************************************/
239 
241 {
242  struct theEvent *anEvent;
243  theClient *aClient = 0;
244 
245 // First ste is to see if this event was posted
246 //
247  myMutex.Lock();
248  if (!(anEvent = Events.Find(Client->Path)))
249  Events.Add(Client->Path, new theEvent(0, 0, Client), maxLife);
250  else {aClient = anEvent->aClient;
251  while(aClient)
252  {if (aClient->evtCB->Same(Client->evtCBarg,aClient->evtCBarg))
253  {aClient->evtCBarg = Client->evtCBarg;
254  break;
255  }
256  aClient = aClient->Next;
257  }
258  if (!aClient) {Client->Next = anEvent->aClient;
259  anEvent->aClient = Client;
260  }
261  if (anEvent->Happened) sendEvent(anEvent);
262  }
263  myMutex.UnLock();
264 
265 // Delete the Client object if we really don't need it
266 //
267  if (aClient) delete Client;
268 }
269 
270 /******************************************************************************/
271 /* P r i v a t e M e t h o d s */
272 /******************************************************************************/
273 /******************************************************************************/
274 /* e v e n t S t a g e */
275 /******************************************************************************/
276 
277 // stage {OK | ENOENT | BAD} <path> [<msg>] \n
278 
279 void XrdOfsEvr::eventStage()
280 {
281  int rc;
282  char *tp, *eMsg, *altMsg = 0;
283  struct theEvent *anEvent;
284 
285 // Get the status token and decode it
286 //
287  if (!(tp = eventFIFO.GetToken()))
288  {eDest->Emsg("Evr", "Missing stage event status"); return;}
289 
290  if (!strcmp(tp, "OK")) {rc = 0;
292  }
293  else if (!strcmp(tp, "ENOENT")) {rc = ENOENT;
294  altMsg = (char *)"file does not exist.";
295  }
296  else if (!strcmp(tp, "BAD")) {rc = -1;
298  altMsg = (char *)"Dynamic staging failed.";
299  }
300  else {rc = -1;
301  eDest->Emsg("Evr", "Invalid stage event status -", tp);
302  altMsg = (char *)"Dynamic staging malfunctioned.";
304  }
305 
306 // Get the path and optional message
307 //
308  if (!(tp = eventFIFO.GetToken(&eMsg)))
309  {eDest->Emsg("Evr", "Missing stage event path"); return;}
310  if (rc)
311  if (eMsg) {while(*eMsg == ' ') eMsg++;
312  if (!*eMsg) eMsg = altMsg;
313  } else eMsg = altMsg;
314  else eMsg = 0;
315 
316 // At this point if we have a balancer, tell it what happened
317 //
318  if (Balancer)
319  {if (rc == 0) Balancer->Added(tp);
320  else Balancer->Removed(tp);
321  }
322 
323 // Either people are waiting for this event or it is preposted event.
324 //
325  myMutex.Lock();
326  if (!(anEvent = Events.Find(tp)))
327  Events.Add(tp, new theEvent(rc, eMsg), maxLife);
328  else {if (anEvent->finalRC == 0)
329  {anEvent->finalRC = rc;
330  if (eMsg) anEvent->finalMsg = strdup(eMsg);
331  anEvent->Happened = 1;
332  }
333  if (anEvent->aClient) sendEvent(anEvent);
334  }
335  myMutex.UnLock();
336 }
337 
338 /******************************************************************************/
339 /* s e n d E v e n t */
340 /******************************************************************************/
341 
342 void XrdOfsEvr::sendEvent(theEvent *ep)
343 {
344  theClient *cp;
345  XrdOucErrInfo *einfo;
346  int doDel = 0, Result = (ep->finalRC ? SFS_ERROR : SFS_OK);
347 
348 // For each client, issue a call back sending the result back
349 // The event also goes in the deferred delete queue as we need to hold on
350 // to it just in case a client is in-transit
351 //
352  while((cp = ep->aClient))
353  {einfo = new XrdOucErrInfo(cp->User, (XrdOucEICB *)0, cp->evtCBarg);
354  einfo->setErrInfo(ep->finalRC, (ep->finalMsg ? ep->finalMsg : ""));
355  cp->evtCB->Done(Result, einfo);
356  ep->aClient = cp->Next;
357  if (doDel) delete cp;
358  else {cp->Next = deferQ; deferQ = cp; doDel = 1;}
359  }
360 
361 // Post the defer queue handler
362 //
363  if (!runQ) {runQ = 1; mySem.Post();}
364 }
#define tident
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
static XrdSysError eDest(0,"crypto_")
#define XRDNET_FIFO
Definition: XrdNetOpts.hh:83
XrdSysTrace OfsTrace
void * XrdOfsEvFlush(void *pp)
Definition: XrdOfsEvr.cc:61
XrdOfsStats OfsStats
Definition: XrdOfs.cc:113
int XrdOfsScrubScan(const char *key, XrdOfsEvr::theEvent *cip, void *xargp)
Definition: XrdOfsEvr.cc:68
void * XrdOfsEvRecv(void *pp)
Definition: XrdOfsEvr.cc:54
#define eMsg(x)
#define SFS_ERROR
#define SFS_OK
virtual void Added(const char *path, int Pend=0)
virtual void Removed(const char *path)
static XrdNetSocket * Create(XrdSysError *Say, const char *path, const char *fn, mode_t mode, int isudp=0)
XrdOucEICB * evtCB
Definition: XrdOfsEvr.hh:76
unsigned long long evtCBarg
Definition: XrdOfsEvr.hh:77
theClient * Next
Definition: XrdOfsEvr.hh:72
void Work4Event(theClient *Client)
Definition: XrdOfsEvr.cc:240
int Init(XrdSysError *eObj)
Definition: XrdOfsEvr.cc:132
void Wait4Event(const char *path, XrdOucErrInfo *einfo)
Definition: XrdOfsEvr.cc:226
void flushEvents()
Definition: XrdOfsEvr.cc:89
~XrdOfsEvr()
Definition: XrdOfsEvr.cc:75
void recvEvents()
Definition: XrdOfsEvr.cc:201
struct XrdOfsStats::StatsData Data
void Add(int &Cntr)
Definition: XrdOfsStats.hh:62
virtual int Same(unsigned long long arg1, unsigned long long arg2)=0
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:188
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
int setErrInfo(int code, const char *emsg)
char * GetLine()
int Attach(int FileDescriptor, int bsz=2047)
void Close(int hold=0)
char * GetToken(int lowcase=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
theClient * aClient
Definition: XrdOfsEvr.hh:89