XRootD
XrdOfsEvr Class Reference

#include <XrdOfsEvr.hh>

+ Collaboration diagram for XrdOfsEvr:

Classes

class  theClient
 
struct  theEvent
 

Public Member Functions

 XrdOfsEvr ()
 
 ~XrdOfsEvr ()
 
void flushEvents ()
 
int Init (XrdCmsClient *trg=0)
 
int Init (XrdSysError *eObj)
 
void recvEvents ()
 
void Wait4Event (const char *path, XrdOucErrInfo *einfo)
 
void Work4Event (theClient *Client)
 

Detailed Description

Definition at line 42 of file XrdOfsEvr.hh.

Constructor & Destructor Documentation

◆ XrdOfsEvr()

XrdOfsEvr::XrdOfsEvr ( )
inline

Definition at line 59 of file XrdOfsEvr.hh.

59 : mySem(0), eDest(0), Balancer(0) {runQ = 0; deferQ = 0;}

◆ ~XrdOfsEvr()

XrdOfsEvr::~XrdOfsEvr ( )

Definition at line 75 of file XrdOfsEvr.cc.

76 {
77 
78 // Close the FIFO. This will cause the reader to exit
79 //
80  myMutex.Lock();
81  eventFIFO.Close();
82  myMutex.UnLock();
83 }
void Close(int hold=0)

References XrdOucStream::Close(), XrdSysMutex::Lock(), and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

Member Function Documentation

◆ flushEvents()

void XrdOfsEvr::flushEvents ( )

Definition at line 89 of file XrdOfsEvr.cc.

90 {
91  theClient *tp, *ntp;
92  int expWait, expClock;
93 
94 // Compute the hash flush interval
95 //
96  if ((expWait = maxLife/4) == 0) expWait = 60;
97  expClock = expWait;
98 
99 // We wait for the right period of time, unless there is a deferred event
100 //
101  do {myMutex.Lock();
102  if ((ntp = deferQ)) deferQ = 0;
103  else runQ = 0;
104  myMutex.UnLock();
105  while(ntp)
106  {XrdSysTimer::Wait(1000*60);
107  expClock -= 60;
108  myMutex.Lock();
109  while((tp = ntp))
110  {Events.Del(tp->Path);
111  ntp = tp->Next;
112  delete tp;
113  }
114  if ((ntp = deferQ)) deferQ = 0;
115  else runQ = 0;
116  myMutex.UnLock();
117  if (expClock <= 0)
118  {myMutex.Lock();
119  Events.Apply(XrdOfsScrubScan, (void *)0);
120  myMutex.UnLock();
121  expClock = expWait;
122  }
123  }
124  mySem.Wait();
125  } while(1);
126 }
int XrdOfsScrubScan(const char *key, XrdOfsEvr::theEvent *cip, void *xargp)
Definition: XrdOfsEvr.cc:68
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227

References XrdSysMutex::Lock(), XrdOfsEvr::theClient::Next, XrdOfsEvr::theClient::Path, XrdSysMutex::UnLock(), XrdSysSemaphore::Wait(), XrdSysTimer::Wait(), and XrdOfsScrubScan().

Referenced by XrdOfsEvFlush().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Init() [1/2]

int XrdOfsEvr::Init ( XrdCmsClient trg = 0)

Definition at line 167 of file XrdOfsEvr.cc.

168 {
169  pthread_t tid;
170  int rc;
171 
172 // Set the balancer pointers (err object set in 1st phase Init).
173 //
174  Balancer = trgp;
175 
176 // Now start a thread to get incoming messages
177 //
178  if ((rc = XrdSysThread::Run(&tid, XrdOfsEvRecv, static_cast<void *>(this),
179  0, "Event receiver")))
180  {eDest->Emsg("Evr", rc, "create event reader thread");
181  return 0;
182  }
183 
184 // Now start a thread to flush posted events
185 //
186  if ((rc = XrdSysThread::Run(&tid, XrdOfsEvFlush,static_cast<void *>(this),
187  0, "Event flusher")))
188  {eDest->Emsg("Evr", rc, "create event flush thread");
189  return 0;
190  }
191 
192 // All done
193 //
194  return 1;
195 }
void * XrdOfsEvFlush(void *pp)
Definition: XrdOfsEvr.cc:61
void * XrdOfsEvRecv(void *pp)
Definition: XrdOfsEvr.cc:54
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)

References XrdSysError::Emsg(), XrdSysThread::Run(), XrdOfsEvFlush(), and XrdOfsEvRecv().

+ Here is the call graph for this function:

◆ Init() [2/2]

int XrdOfsEvr::Init ( XrdSysError eObj)

Definition at line 132 of file XrdOfsEvr.cc.

133 {
134  XrdNetSocket *msgSock;
135  char *p, path[2048];
136  int n;
137 
138 // Set he error object (need to do only once)
139 //
140  eDest = eobj;
141 
142 // Create path to the pipe we will creat
143 //
144  if (!(p = getenv("XRDADMINPATH")) || !*p)
145  {eobj->Emsg("Events", "XRDADMINPATH not defined");
146  return 0;
147  }
148  strcpy(path, p); n = strlen(p);
149  if (path[n-1] != '/') {path[n] = '/'; n++;}
150  strcpy(&path[n], "ofsEvents");
151  XrdOucEnv::Export("XRDOFSEVENTS", path);
152 
153 // Now create a socket to a path
154 //
155  if (!(msgSock = XrdNetSocket::Create(eobj,path,0,0660,XRDNET_FIFO)))
156  return 0;
157  msgFD = msgSock->Detach();
158  delete msgSock;
159 
160 // We succeeded and are now ready for the call to he second stage below
161 //
162  return 1;
163 }
#define XRDNET_FIFO
Definition: XrdNetOpts.hh:83
static XrdNetSocket * Create(XrdSysError *Say, const char *path, const char *fn, mode_t mode, int isudp=0)
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:188

References XrdNetSocket::Create(), XrdNetSocket::Detach(), XrdSysError::Emsg(), XrdOucEnv::Export(), and XRDNET_FIFO.

Referenced by XrdOfs::Configure().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ recvEvents()

void XrdOfsEvr::recvEvents ( )

Definition at line 201 of file XrdOfsEvr.cc.

202 {
203  EPNAME("recvEvent");
204  const char *tident = 0;
205  char *lp,*tp;
206 
207 // Attach the fifo FD to the stream
208 //
209  eventFIFO.Attach(msgFD);
210 
211 // Now just start reading the events until the FD is closed
212 //
213  while((lp = eventFIFO.GetLine()))
214  {DEBUG("-->" <<lp);
215  if ((tp = eventFIFO.GetToken()) && *tp)
216  {if (!strcmp(tp, "stage")) eventStage();
217  else eDest->Emsg("Evr", "Unknown event name -", tp);
218  }
219  }
220 }
#define tident
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
char * GetLine()
int Attach(int FileDescriptor, int bsz=2047)
char * GetToken(int lowcase=0)

References XrdOucStream::Attach(), DEBUG, XrdSysError::Emsg(), EPNAME, XrdOucStream::GetLine(), XrdOucStream::GetToken(), and tident.

Referenced by XrdOfsEvRecv().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Wait4Event()

void XrdOfsEvr::Wait4Event ( const char *  path,
XrdOucErrInfo einfo 
)

Definition at line 226 of file XrdOfsEvr.cc.

227 {
228 
229 // Replace original callback with our callback so we can queue this event
230 // after the wait request has been sent to the client. This avoids a race
231 // where the client might get the resume signal before the wait request.
232 //
233  einfo->setErrCB((XrdOucEICB *)new theClient(this, einfo, path));
234 }
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)

References XrdOucErrInfo::setErrCB().

Referenced by XrdOfsFile::open().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Work4Event()

void XrdOfsEvr::Work4Event ( theClient Client)

Definition at line 240 of file XrdOfsEvr.cc.

241 {
242  struct theEvent *anEvent;
243  theClient *aClient = 0;
244 
245 // First ste is to see if this event was posted
246 //
247  myMutex.Lock();
248  if (!(anEvent = Events.Find(Client->Path)))
249  Events.Add(Client->Path, new theEvent(0, 0, Client), maxLife);
250  else {aClient = anEvent->aClient;
251  while(aClient)
252  {if (aClient->evtCB->Same(Client->evtCBarg,aClient->evtCBarg))
253  {aClient->evtCBarg = Client->evtCBarg;
254  break;
255  }
256  aClient = aClient->Next;
257  }
258  if (!aClient) {Client->Next = anEvent->aClient;
259  anEvent->aClient = Client;
260  }
261  if (anEvent->Happened) sendEvent(anEvent);
262  }
263  myMutex.UnLock();
264 
265 // Delete the Client object if we really don't need it
266 //
267  if (aClient) delete Client;
268 }

References XrdOfsEvr::theEvent::aClient, XrdOfsEvr::theClient::evtCB, XrdOfsEvr::theClient::evtCBarg, XrdOfsEvr::theEvent::Happened, XrdSysMutex::Lock(), XrdOfsEvr::theClient::Next, XrdOfsEvr::theClient::Path, XrdOucEICB::Same(), and XrdSysMutex::UnLock().

Referenced by XrdOfsEvr::theClient::Done().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: