XRootD
XrdSysIOEvents.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d S y s I O E v e n t s . c c */
4 /* */
5 /* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstdlib>
33 
34 #include "XrdSys/XrdSysE2T.hh"
35 #include "XrdSys/XrdSysFD.hh"
36 #include "XrdSys/XrdSysIOEvents.hh"
37 #include "XrdSys/XrdSysHeaders.hh"
38 #include "XrdSys/XrdSysPlatform.hh"
39 #include "XrdSys/XrdSysPthread.hh"
40 
41 /******************************************************************************/
42 /* L o c a l D a t a */
43 /******************************************************************************/
44 
45 namespace
46 {
47 // Status code to name array corresponding to:
48 // enum Status {isClear = 0, isCBMode, isDead};
49 //
50  const char *statName[] = {"isClear", "isCBMode", "isDead"};
51 }
52 
53 /******************************************************************************/
54 /* L o c a l D e f i n e s */
55 /******************************************************************************/
56 
57 #define STATUS statName[(int)chStat]
58 
59 #define STATUSOF(x) statName[(int)(x->chStat)]
60 
61 #define SINGLETON(dlvar, theitem)\
62  theitem ->dlvar .next == theitem
63 
64 #define INSERT(dlvar, curitem, newitem) \
65  newitem ->dlvar .next = curitem; \
66  newitem ->dlvar .prev = curitem ->dlvar .prev; \
67  curitem ->dlvar .prev-> dlvar .next = newitem; \
68  curitem ->dlvar .prev = newitem
69 
70 #define REMOVE(dlbase, dlvar, curitem) \
71  if (dlbase == curitem) dlbase = (SINGLETON(dlvar,curitem) \
72  ? 0 : curitem ->dlvar .next);\
73  curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
74  curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
75  curitem ->dlvar .next = curitem;\
76  curitem ->dlvar .prev = curitem
77 
78 #define REVENTS(x) x & Channel:: readEvents
79 
80 #define WEVENTS(x) x & Channel::writeEvents
81 
82 #define ISPOLLER XrdSysThread::Same(XrdSysThread::ID(),pollTid)
83 
84 #define BOOLNAME(x) (x ? "true" : "false")
85 
86 #define DO_TRACE(x,fd,y) \
87  {PollerInit::traceMTX.Lock(); \
88  std::cerr <<"IOE fd "<<fd<<' '<<#x <<": "<<y<<'\n'<< std::flush; \
89  PollerInit::traceMTX.UnLock();}
90 
91 #define TRACING PollerInit::doTrace
92 
93 #define IF_TRACE(x,fd,y) if (TRACING) DO_TRACE(x,fd,y)
94 
95 #define TRACE_LOK " channel now " <<(isLocked ? "locked" : "unlocked")
96 
97 #define TRACE_MOD(x,fd,y) \
98  IF_TRACE(x,fd,"Modify(" <<y <<") == " \
99  <<BOOLNAME(retval) <<TRACE_LOK)
100 
101 #define TRACE_NOD(x,fd,y) \
102  IF_TRACE(x,fd,"Modify(" <<y <<") skipped; no events changed")
103 
104 /******************************************************************************/
105 /* G l o b a l D a t a */
106 /******************************************************************************/
107 
109  = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff);
110 
111  pid_t XrdSys::IOEvents::Poller::parentPID = getpid();
112 
113 /******************************************************************************/
114 /* L o c a l C l a s s e s */
115 /******************************************************************************/
116 /******************************************************************************/
117 /* T h r e a d S t a r t u p I n t e r f a c e */
118 /******************************************************************************/
119 
120 namespace XrdSys
121 {
122 namespace IOEvents
123 {
124 struct pollArg
126  const char *retMsg;
127  int retCode;
129 
130  pollArg() : retMsg(0), retCode(0)
131  {pollSync = new XrdSysSemaphore(0, "poll sync");}
132  ~pollArg() {}
133  };
134 
136 {
137 public:
138 
139 static void *Start(void *parg);
140 };
141 
142 void *BootStrap::Start(void *parg)
143 {
144  struct pollArg *pollArg = (struct pollArg *)parg;
145  Poller *thePoller = pollArg->pollP;
146  XrdSysSemaphore *theSem = pollArg->pollSync;
147  thePoller->pollTid = XrdSysThread::ID();
148 
149  thePoller->Begin(theSem, pollArg->retCode, &(pollArg->retMsg));
150  delete theSem;
151 
152  return (void *)0;
153 }
154 
155 /******************************************************************************/
156 /* P o l l e r E r r 1 */
157 /******************************************************************************/
158 
159 // This class is set in the channel when an error occurs but cannot be reflected
160 // immediately because either there is no callback function or all events are
161 // disabled. We need to do this because error events can be physically presented
162 // by the kernel even when logical events are disabled. Note that the error
163 // number and text will have been set and remain set as the channel was actually
164 // disabled preventing any new operation on the channel.
165 //
166 class PollerErr1 : public Poller
167 {
168 public:
169 
170  PollerErr1() : Poller(-1, -1) {}
172 
173 protected:
174 void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
175  {(void)syncp; (void)rc; (void)eTxt;}
176 
177 void Exclude(Channel *cP, bool &isLocked, bool dover=1)
178  {(void)cP; (void)isLocked; (void)dover;}
179 
180 bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
181  {(void)isLocked;
182  if (!(eNum = GetFault(cP))) eNum = EPROTO;
183  if (eTxt) *eTxt = "initializing channel";
184  return false;
185  }
186 
187 bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
188  {(void)isLocked;
189  if (!(eNum = GetFault(cP))) eNum = EPROTO;
190  if (eTxt) *eTxt = "modifying channel";
191  return false;
192  }
193 
194 void Shutdown() {}
195 };
196 
197 /******************************************************************************/
198 /* P o l l e r I n i t */
199 /******************************************************************************/
200 
201 // This class is used as the initial poller on a channel. It is responsible
202 // for adding the file descriptor to the poll set upon the initial enable. This
203 // avoids enabling a channel prior to it receiving a call back function.
204 //
205 class PollerInit : public Poller
206 {
207 public:
208 
209  PollerInit() : Poller(-1, -1) {}
211 
213 static bool doTrace;
214 
215 protected:
216 
217 void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
218 
219 void Exclude(Channel *cP, bool &isLocked, bool dover=1) {}
220 
221 bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
222  {eNum = EPROTO;
223  if (eTxt) *eTxt = "initializing channel";
224  return false;
225  }
226 
227 bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
228  {bool rc = Init(cP, eNum, eTxt, isLocked);
229  IF_TRACE(Modify,cP->GetFD(), "Init() returned " <<BOOLNAME(rc));
230  return rc;
231  }
232 
233 void Shutdown() {}
234 };
235 
236 bool PollerInit::doTrace = (getenv("XrdSysIOE_TRACE") != 0);
238 
239 /******************************************************************************/
240 /* P o l l e r W a i t */
241 /******************************************************************************/
242 
243 // This class is set in the channel when we need to serialize aces to the
244 // channel. Channel methods (as some others) check for this to see if they need
245 // to defer the current operation. We need to do his because some poller
246 // implementations must release the channel lock to avoid a deadlock.
247 //
248 class PollerWait : public Poller
249 {
250 public:
251 
252  PollerWait() : Poller(-1, -1) {}
254 
255 protected:
256 void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
257 
258 void Exclude(Channel *cP, bool &isLocked, bool dover=1) {}
259 
260 bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
261  {eNum = EIDRM;
262  if (eTxt) *eTxt = "initializing channel";
263  return false;
264  }
265 
266 bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
267  {return Init(cP, eNum, eTxt, isLocked);}
268 
269 void Shutdown() {}
270 };
271 
275 };
276 };
277 
278 /******************************************************************************/
279 /* C l a s s C h a n n e l M e t h o d s */
280 /******************************************************************************/
281 /******************************************************************************/
282 /* C o n s t r u c t o r */
283 /******************************************************************************/
284 
286  CallBack *cbP, void *cbArg)
287  : chPollXQ(pollP), chCB(cbP), chCBA(cbArg)
288 {
289  attList.next = attList.prev = this;
290  tmoList.next = tmoList.prev = this;
291  inTOQ = 0;
292  pollEnt = 0;
293  chStat = isClear;
294  Reset(&pollInit, fd);
295 
296  pollP->Attach(this);
297 }
298 
299 /******************************************************************************/
300 /* D e l e t e */
301 /******************************************************************************/
302 
304 {
305  Poller *myPoller;
306  bool isLocked = true;
307 
308 // Do some tracing
309 //
310  IF_TRACE(Delete,chFD,"status="<<STATUS);
311 
312 // Lock ourselves during the delete process. If the channel is disassociated
313 // or the real poller is set to the error poller then this channel is clean
314 // and can be deleted (i.e. the channel ran through Detach()).
315 //
316  chMutex.Lock();
317  if (!chPollXQ || chPollXQ == &pollErr1)
318  {chMutex.UnLock();
319  delete this;
320  return;
321  }
322 
323 // Disable and remove ourselves from all queues
324 //
325  myPoller = chPollXQ;
326  chPollXQ->Detach(this,isLocked,false);
327  if (!isLocked) chMutex.Lock();
328 
329 // If we are in callback mode then we will need to delay the destruction until
330 // after the callback completes unless this is the poller thread. In that case,
331 // we need to tell the poller that we have been destroyed in a shelf-stable way.
332 //
333  if (chStat)
334  {if (XrdSysThread::Same(XrdSysThread::ID(),myPoller->pollTid))
335  {myPoller->chDead = true;
336  chMutex.UnLock();
337  } else {
338  XrdSysSemaphore cbDone(0);
339  IF_TRACE(Delete,chFD,"waiting for callback");
340  chStat = isDead;
341  chCBA = (void *)&cbDone;
342  chMutex.UnLock();
343  cbDone.Wait();
344  }
345  }
346 // It is now safe to release the storage
347 //
348  IF_TRACE(Delete,chFD,"chan="<< std::hex<<(void *)this<< std::dec);
349  delete this;
350 }
351 
352 /******************************************************************************/
353 /* D i s a b l e */
354 /******************************************************************************/
355 
356 bool XrdSys::IOEvents::Channel::Disable(int events, const char **eText)
357 {
358  int eNum = 0, newev, curev;
359  bool retval = true, isLocked = true;
360 
361 // Lock this channel
362 //
363  chMutex.Lock();
364 
365 // Get correct current events; depending on the state of the channel
366 //
367  if (chPoller == &pollWait) curev = static_cast<int>(reMod);
368  else curev = static_cast<int>(chEvents);
369 
370 // Trace this entry
371 //
372  IF_TRACE(Disable,chFD,"->Disable(" <<events <<") chev=" <<curev);
373 
374 // Calculate new event mask
375 //
376  events &= allEvents;
377  newev = curev & ~events;
378 
379 // If something has changed, then modify the event mask in the poller. The
380 // poller may or may not unlock this channel during the process.
381 //
382  if (newev != curev)
383  {chEvents = newev;
384  retval = chPoller->Modify(this, eNum, eText, isLocked);
385  TRACE_MOD(Disable,chFD,newev);
386  } else {
387  TRACE_NOD(Disable,chFD,newev);
388  }
389  if (isLocked) chMutex.UnLock();
390 
391 // All done
392 //
393  if (!retval) errno = eNum;
394  return retval;
395 }
396 
397 /******************************************************************************/
398 /* E n a b l e */
399 /******************************************************************************/
400 
401 bool XrdSys::IOEvents::Channel::Enable(int events, int timeout,
402  const char **eText)
403 {
404  int eNum = 0, newev, curev, tmoSet = 0;
405  bool retval, setTO, isLocked = true;
406 
407 // Lock ourselves against any changes (this is a recursive mutex)
408 //
409  chMutex.Lock();
410 
411 // Get correct current events; depending on the state of the channel
412 //
413  if (chPoller == &pollWait) curev = static_cast<int>(reMod);
414  else curev = static_cast<int>(chEvents);
415 
416 // Trace this entry
417 //
418  IF_TRACE(Enable,chFD,"->Enable("<<events<<','<<timeout<<") chev="<<curev);
419 
420 // Establish events that should be enabled
421 //
422  events &= allEvents;
423  newev = (curev ^ events) & events;
424  chEvents = curev | events;
425 
426 // Handle timeout changes now
427 //
428  if (REVENTS(events))
429  { if (timeout > 0) chRTO = timeout;
430  else if (timeout < 0) chRTO = 0;
431  if (rdDL != Poller::maxTime || chRTO) tmoSet |= CallBack::ReadyToRead;
432  }
433 
434  if (WEVENTS(events))
435  { if (timeout > 0) chWTO = timeout;
436  else if (timeout < 0) chWTO = 0;
437  if (wrDL != Poller::maxTime || chWTO) tmoSet |= CallBack::ReadyToWrite;
438  }
439 
440 // Check if we have to reset the timeout. We need to hold the channel lock here.
441 //
442  if (tmoSet && chPoller != &pollErr1)
443  setTO = chPollXQ->TmoAdd(this, tmoSet);
444  else setTO = false;
445 
446 // Check if any modifcations needed here. If so, invoke the modifier. Note that
447 // the modify will unlock the channel if the operation causes a wait. So,
448 // we cannot depend on the channel being locked upon return. The reason we do
449 // not unlock here is because we must ensure the channel doesn't change while
450 // we call modify. We let modify determine what to do.
451 //
452  if (newev)
453  {retval = chPoller->Modify(this, eNum, eText, isLocked);
454  TRACE_MOD(Enable,chFD,(curev | events));
455  } else {
456  retval = true;
457  TRACE_NOD(Enable,chFD,(curev | events));
458  }
459 
460 // We need to notify the poller thread if the added deadline is the first in the
461 // queue and the poller is waiting. We also optimize for the case where the
462 // poller thread is always woken up to perform an action in which case it
463 // doesn't need a separate wakeup. We only do this if the enable succeeed. Note
464 // that we cannot hold the channel mutex for this call because it may wait.
465 //
466  if (isLocked) chMutex.UnLock();
467  bool isWakePend = CPP_ATOMIC_LOAD(chPollXQ->wakePend, std::memory_order_consume);
468  if (retval && !isWakePend && setTO && isLocked) chPollXQ->WakeUp();
469 
470 // All done
471 //
472  if (!retval) errno = eNum;
473  return retval;
474 }
475 
476 /******************************************************************************/
477 /* G e t C a l l B a c k */
478 /******************************************************************************/
479 
481 {
482  chMutex.Lock();
483  *cbP = chCB;
484  *cbArg = chCBA;
485  chMutex.UnLock();
486 }
487 
488 /******************************************************************************/
489 /* Private: R e s e t */
490 /******************************************************************************/
491 
492 void XrdSys::IOEvents::Channel::Reset(XrdSys::IOEvents::Poller *thePoller,
493  int fd, int eNum)
494 {
495  chPoller = thePoller;
496  chFD = fd;
497  chFault = eNum;
498  chRTO = 0;
499  chWTO = 0;
500  chEvents = 0;
501  dlType = 0;
502  inPSet = 0;
503  reMod = 0;
504  rdDL = Poller::maxTime;
505  wrDL = Poller::maxTime;
506  deadLine = Poller::maxTime;
507 }
508 
509 /******************************************************************************/
510 /* S e t C a l l B a c k */
511 /******************************************************************************/
512 
514 {
515 
516 // We only need to have the channel lock to set the callback. If the object
517 // is in the process of being destroyed, we do nothing.
518 //
519  chMutex.Lock();
520  if (chStat != isDead)
521  {chCB = cbP;
522  chCBA = cbArg;
523  }
524  chMutex.UnLock();
525 }
526 
527 /******************************************************************************/
528 /* S e t F D */
529 /******************************************************************************/
530 
532 {
533  bool isLocked = true;
534 
535 // Obtain the channel lock. If the object is in callback mode we have some
536 // extra work to do. If normal callback then indicate the channel transitioned
537 // to prevent it being automatically re-enabled. If it's being destroyed, then
538 // do nothing. Otherwise, this is a stupid double setFD call.
539 //
540  chMutex.Lock();
541  if (chStat == isDead)
542  {chMutex.UnLock();
543  return;
544  }
545 
546 // This is a tricky deal here because we need to protect ourselves from other
547 // threads as well as the poller trying to do a callback. We first, set the
548 // poller target. This means the channel is no longer ready and callbacks will
549 // be skipped. We then remove the current file descriptor. This may unlock the
550 // channel but at this point that's ok.
551 //
552  if (inPSet)
553  {chPoller = &pollWait;
554  chPollXQ->Detach(this, isLocked, true);
555  if (!isLocked) chMutex.Lock();
556  }
557 
558 // Indicate channel needs to be re-enabled then unlock the channel
559 //
560  Reset(&pollInit, fd);
561  chMutex.UnLock();
562 }
563 
564 /******************************************************************************/
565 /* C l a s s P o l l e r */
566 /******************************************************************************/
567 /******************************************************************************/
568 /* C o n s t r u c t o r */
569 /******************************************************************************/
570 
572 {
573 
574 // Now initialize local class members
575 //
576  attBase = 0;
577  tmoBase = 0;
578  cmdFD = cFD;
579  reqFD = rFD;
580  wakePend = false;
581  pipeBuff = 0;
582  pipeBlen = 0;
583  pipePoll.fd = rFD;
584  pipePoll.events = POLLIN | POLLRDNORM;
585  tmoMask = 255;
586 }
587 
588 /******************************************************************************/
589 /* A t t a c h */
590 /******************************************************************************/
591 
592 void XrdSys::IOEvents::Poller::Attach(XrdSys::IOEvents::Channel *cP)
593 {
594  Channel *pcP;
595 
596 // We allow only one attach at a time to simplify the processing
597 //
598  adMutex.Lock();
599 
600 // Chain this channel into the list of attached channels
601 //
602  if ((pcP = attBase)) {INSERT(attList, pcP, cP);}
603  else attBase = cP;
604 
605 // All done
606 //
607  adMutex.UnLock();
608 }
609 
610 /******************************************************************************/
611 /* C b k T M O */
612 /******************************************************************************/
613 
615 {
616  Channel *cP;
617 
618 // Process each element in the timeout queue, calling the callback function
619 // if the timeout has passed. As this method can be called with a lock on the
620 // channel mutex, we need to drop it prior to calling the callback.
621 //
622  toMutex.Lock();
623  while((cP = tmoBase) && cP->deadLine <= time(0))
624  {int dlType = cP->dlType;
625  toMutex.UnLock();
626  CbkXeq(cP, dlType, 0, 0);
627  toMutex.Lock();
628  }
629  toMutex.UnLock();
630 }
631 
632 /******************************************************************************/
633 /* C b k X e q */
634 /******************************************************************************/
635 
637  int eNum, const char *eTxt)
638 {
639  XrdSysMutexHelper cbkMHelp(cP->chMutex);
640  char oldEvents;
641  bool cbok, retval, isRead, isWrite, isLocked = true;
642 
643 // Perform any required tracing
644 //
645  if (TRACING)
646  {const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
647  (cP->chPoller == &pollInit ? "init" :
648  (cP->chPoller == &pollWait ? "wait" : "err")));
649  DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
650  <<" chev=" <<static_cast<int>(cP->chEvents)
651  <<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
652  <<" callback " <<(cP->chCB ? "present" : "missing")
653  <<" poller=" <<cbtype);
654  }
655 
656 // Remove this from the timeout queue if there and reset the deadlines based
657 // on the event we are reflecting. This separates read and write deadlines
658 //
659  if (cP->inTOQ)
660  {TmoDel(cP);
661  cP->dlType |= (events & CallBack::ValidEvents) << 4;
662  isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
663  if (isRead) cP->rdDL = maxTime;
664  isWrite= events & (CallBack::ReadyToWrite | CallBack::WriteTimeOut);
665  if (isWrite) cP->wrDL = maxTime;
666  } else {
667  cP->dlType &= CallBack::ValidEvents;
668  isRead = isWrite = false;
669  }
670 
671 // Verify that there is a callback here and the channel is ready. If not,
672 // disable this channel for the events being refelcted unless the event is a
673 // fatal error. In this case we need to abandon the channel since error events
674 // may continue to be generated as we can't always disable them.
675 //
676  if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
677  {if (eNum)
678  {cP->chPoller = &pollErr1; cP->chFault = eNum;
679  cP->inPSet = 0;
680  return false;
681  }
682  oldEvents = cP->chEvents;
683  cP->chEvents = 0;
684  retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
685  TRACE_MOD(CbkXeq,cP->chFD,0);
686  if (!isLocked) cP->chMutex.Lock();
687  cP->chEvents = oldEvents;
688  return true;
689  }
690 
691 // Resolve the problem where we get an error event but the channel wants them
692 // presented as a read or write event. If neither is possible then defer the
693 // error until the channel is enabled again.
694 //
695  if (eNum)
696  {if (cP->chEvents & Channel::errorEvents)
697  {cP->chPoller = &pollErr1; cP->chFault = eNum;
698  cP->chStat = Channel::isCBMode;
699  chDead = false;
700  cbkMHelp.UnLock();
701  cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
702  if (chDead) return true;
703  cbkMHelp.Lock(&(cP->chMutex));
704  cP->inPSet = 0;
705  return false;
706  }
707  if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
708  else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
709  else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
710  return false;
711  }
712  }
713 
714 // Indicate that we are in callback mode then drop the channel lock and effect
715 // the callback. This allows the callback to freely manage locks.
716 //
717  cP->chStat = Channel::isCBMode;
718  chDead = false;
719  cbkMHelp.UnLock();
720  IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
721  cbok = cP->chCB->Event(cP,cP->chCBA, events);
722  IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));
723 
724 // If channel destroyed by the callback, bail really fast. Otherwise, regain
725 // the channel lock.
726 //
727  if (chDead) return true;
728  cbkMHelp.Lock(&(cP->chMutex));
729 
730 // If the channel is being destroyed; then another thread must have done so.
731 // Tell it the callback has finished and just return.
732 //
733  if (cP->chStat != Channel::isCBMode)
734  {if (cP->chStat == Channel::isDead)
735  ((XrdSysSemaphore *)cP->chCBA)->Post();
736  return true;
737  }
738  cP->chStat = Channel::isClear;
739 
740 // Handle enable or disable here. If we keep the channel enabled then reset
741 // the timeout if it hasn't been handled via a call from the callback.
742 //
743  if (!cbok) Detach(cP,isLocked,false);
744  else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
745  TmoAdd(cP, 0);
746 
747 // All done. While the mutex should not have been unlocked, we relock it if
748 // it has to keep the mutex helper from croaking.
749 //
750  if (!isLocked) cP->chMutex.Lock();
751  return true;
752 }
753 
754 /******************************************************************************/
755 /* Static: C r e a t e */
756 /******************************************************************************/
757 
759  const char **eTxt,
760  int crOpts)
761 {
762  int fildes[2];
763  struct pollArg pArg;
764  pthread_t tid;
765 
766 // Create a pipe used to break the poll wait loop
767 //
768  if (XrdSysFD_Pipe(fildes))
769  {eNum = errno;
770  if (eTxt) *eTxt = "creating poll pipe";
771  return 0;
772  }
773 
774 // Create an actual implementation of a poller
775 //
776  if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
777  {close(fildes[0]);
778  close(fildes[1]);
779  return 0;
780  }
781 
782 // Now start a thread to handle this poller object
783 //
785  (void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
786  {if (eTxt) *eTxt = "creating poller thread"; return 0;}
787 
788 // Now wait for the thread to finish initializing before we allow use
789 // Note that the bootstrap takes ownership of the semaphore and will delete it
790 // once the thread positing the semaphore actually ends. This is to avoid
791 // semaphore bugs present in certain (e.g. Linux) kernels.
792 //
793  pArg.pollSync->Wait();
794 
795 // Check if all went well
796 //
797  if (pArg.retCode)
798  {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
799  eNum = pArg.retCode;
800  delete pArg.pollP;
801  return 0;
802  }
803 
804 // Set creation options in the new poller
805 //
806  if (crOpts & optTOM)
808 
809 // All done
810 //
811  eNum = 0;
812  if (eTxt) *eTxt = "";
813  return pArg.pollP;
814 }
815 
816 /******************************************************************************/
817 /* D e t a c h */
818 /******************************************************************************/
819 
820 void XrdSys::IOEvents::Poller::Detach(XrdSys::IOEvents::Channel *cP,
821  bool &isLocked, bool keep)
822 {
823 // The caller must hold the channel lock!
824 //
825  bool detFD = (cP->inPSet != 0);
826 
827 // First remove the channel from the timeout queue
828 //
829  if (cP->inTOQ)
830  {toMutex.Lock();
831  REMOVE(tmoBase, tmoList, cP);
832  toMutex.UnLock();
833  }
834 
835 // Allow only one detach at a time
836 //
837  adMutex.Lock();
838 
839 // Preset channel to prohibit callback if we are not keeping this channel
840 //
841  if (!keep)
842  {cP->Reset(&pollErr1, cP->chFD);
843  cP->chPollXQ = &pollErr1;
844  cP->chCB = 0;
845  cP->chCBA = 0;
846  if (cP->attList.next != cP) {REMOVE(attBase, attList, cP);}
847  else if (attBase == cP) attBase = 0;
848  }
849 
850 // Exclude this channel from the associated poll set, don't hold the ad lock
851 //
852  adMutex.UnLock();
853  if (detFD)
854  {cP->inPSet = 0;
855  if (cmdFD >= 0) Exclude(cP, isLocked, !ISPOLLER);
856  }
857 }
858 
859 /******************************************************************************/
860 /* Protected: G e t R e q u e s t */
861 /******************************************************************************/
862 
863 // Warning: This method runs unlocked. The caller must have exclusive use of
864 // the reqBuff otherwise unpredictable results will occur.
865 
867 {
868  ssize_t rlen;
869  int rc;
870 
871 // See if we are to resume a read or start a fresh one
872 //
873  if (!pipeBlen)
874  {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
875 
876 // Wait for the next request. Some OS's (like Linux) don't support non-blocking
877 // pipes. So, we must front the read with a poll.
878 //
879  do {rc = poll(&pipePoll, 1, 0);}
880  while(rc < 0 && (errno == EAGAIN || errno == EINTR));
881  if (rc < 1) return 0;
882 
883 // Now we can put up a read without a delay. Normally a full command will be
884 // present. Under some heavy conditions, this may not be the case.
885 //
886  do {rlen = read(reqFD, pipeBuff, pipeBlen);}
887  while(rlen < 0 && errno == EINTR);
888  if (rlen <= 0)
889  {std::cerr <<"Poll: "<<XrdSysE2T(errno)<<" reading from request pipe\n"<< std::flush;
890  return 0;
891  }
892 
893 // Check if all the data has arrived. If not all the data is present, defer
894 // this request until more data arrives.
895 //
896  if (!(pipeBlen -= rlen)) return 1;
897  pipeBuff += rlen;
898  return 0;
899 }
900 
901 /******************************************************************************/
902 /* Protected: I n i t */
903 /******************************************************************************/
904 
906  const char **eTxt, bool &isLocked)
907 {
908 // The channel must be locked upon entry!
909 //
910  bool retval;
911 
912 
913 // If we are already in progress then simply update the shadow events and
914 // resuppress all current events.
915 //
916  if (cP->chPoller == &pollWait)
917  {cP->reMod = cP->chEvents;
918  cP->chEvents = 0;
919  IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
920  return true;
921  }
922 
923 // Trace this entry
924 //
925  IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));
926 
927 // If no events are enabled at this point, just return
928 //
929  if (!(cP->chEvents)) return true;
930 
931 // Refuse to enable a channel without a callback function
932 //
933  if (!(cP->chCB))
934  {eNum = EDESTADDRREQ;
935  if (eTxt) *eTxt = "enabling without a callback";
936  return false;
937  }
938 
939 // So, now we can include the channel in the poll set. We will include it
940 // with no events enabled to prevent callbacks prior to completion here.
941 //
942  cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
943  retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
944  IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
945  if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
946 
947 // Determine what future poller to use. If we can use the regular poller then
948 // set the correct event mask for the channel. Note that we could have lost
949 // control but the correct events will be reflected in the "reMod" member.
950 //
951  if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
952  else {cP->chPoller = cP->chPollXQ;
953  cP->inPSet = 1;
954  if (cP->reMod)
955  {cP->chEvents = cP->reMod;
956  retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
957  TRACE_MOD(Init,cP->chFD,int(cP->reMod));
958  if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
959  } else {
960  TRACE_NOD(Init,cP->chFD,0);
961  }
962  }
963 
964 // All done
965 //
966  cP->reMod = 0;
967  return retval;
968 }
969 
970 /******************************************************************************/
971 /* P o l l 2 E n u m */
972 /******************************************************************************/
973 
975 {
976  if (events & POLLERR) return EPIPE;
977 
978  if (events & POLLHUP) return ECONNRESET;
979 
980  if (events & POLLNVAL) return EBADF;
981 
982  return EOPNOTSUPP;
983 }
984 
985 /******************************************************************************/
986 /* S e n d C m d */
987 /******************************************************************************/
988 
990 {
991  int wlen;
992 
993 // Pipe writes are atomic so we don't need locks. Some commands require
994 // confirmation. We handle that here based on the command. Note that pipes
995 // gaurantee that all of the data will be written or we will block.
996 //
997  if (cmd.req >= PipeData::Post)
998  {XrdSysSemaphore mySem(0);
999  cmd.theSem = &mySem;
1000  do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1001  while (wlen < 0 && errno == EINTR);
1002  if (wlen > 0) mySem.Wait();
1003  } else {
1004  do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1005  while (wlen < 0 && errno == EINTR);
1006  }
1007 
1008 // All done
1009 //
1010  return (wlen >= 0 ? 0 : errno);
1011 }
1012 
1013 /******************************************************************************/
1014 /* Protected: S e t P o l l E n t */
1015 /******************************************************************************/
1016 
1018 {
1019  cP->pollEnt = pe;
1020 }
1021 
1022 /******************************************************************************/
1023 /* S t o p */
1024 /******************************************************************************/
1025 
1027 {
1028  PipeData cmdbuff;
1029  CallBack *theCB;
1030  Channel *cP;
1031  void *cbArg;
1032  int doCB;
1033 
1034 // Initialize the pipdata structure
1035 //
1036  memset(static_cast<void*>( &cmdbuff ), 0, sizeof(cmdbuff));
1037  cmdbuff.req = PipeData::Stop;
1038 
1039 // Lock all of this
1040 //
1041  adMutex.Lock();
1042 
1043 // If we are already shutdown then we are done
1044 //
1045  if (cmdFD == -1) {adMutex.UnLock(); return;}
1046 
1047 // First we must stop the poller thread in an orderly fashion.
1048 //
1049  adMutex.UnLock();
1050  SendCmd(cmdbuff);
1051  adMutex.Lock();
1052 
1053 // Close the pipe communication mechanism
1054 //
1055  close(cmdFD); cmdFD = -1;
1056  close(reqFD); reqFD = -1;
1057 
1058 // Run through cleaning up the channels. While there should not be any other
1059 // operations happening on this poller, we take the conservative approach.
1060 //
1061  while((cP = attBase))
1062  {REMOVE(attBase, attList, cP);
1063  adMutex.UnLock();
1064  cP->chMutex.Lock();
1065  doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
1066  if (cP->inTOQ) TmoDel(cP);
1067  cP->Reset(&pollErr1, cP->chFD, EIDRM);
1068  cP->chPollXQ = &pollErr1;
1069  if (doCB)
1070  {cP->chStat = Channel::isClear;
1071  theCB = cP->chCB; cbArg = cP->chCBA;
1072  cP->chMutex.UnLock();
1073  theCB->Stop(cP, cbArg);
1074  } else cP->chMutex.UnLock();
1075  adMutex.Lock();
1076  }
1077 
1078 // Now invoke the poller specific shutdown
1079 //
1080  Shutdown();
1081  adMutex.UnLock();
1082 }
1083 
1084 /******************************************************************************/
1085 /* T m o A d d */
1086 /******************************************************************************/
1087 
1089 {
1090  XrdSysMutexHelper mHelper(toMutex);
1091  time_t tNow;
1092  Channel *ncP;
1093  bool setRTO, setWTO;
1094 
1095 // Do some tracing
1096 //
1097  IF_TRACE(TmoAdd,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1098  <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1099 
1100 // Remove element from timeout queue if it is there
1101 //
1102  if (cP->inTOQ)
1103  {REMOVE(tmoBase, tmoList, cP);
1104  cP->inTOQ = 0;
1105  }
1106 
1107 // Determine which timeouts need to be reset
1108 //
1109  tmoSet|= cP->dlType >> 4;
1110  setRTO = (tmoSet&tmoMask) & (CallBack::ReadyToRead |CallBack:: ReadTimeOut);
1111  setWTO = (tmoSet&tmoMask) & (CallBack::ReadyToWrite|CallBack::WriteTimeOut);
1112 
1113 // Reset the required deadlines
1114 //
1115  tNow = time(0);
1116  if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
1117  cP->rdDL = cP->chRTO + tNow;
1118  if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
1119  cP->wrDL = cP->chWTO + tNow;
1120 
1121 // Calculate the closest enabled deadline
1122 //
1123  if (cP->rdDL < cP->wrDL)
1124  {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
1125  } else {
1126  cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
1127  if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
1128  }
1129  IF_TRACE(TmoAdd, cP->chFD, "t=" <<tNow <<" rdDL=" <<setRTO <<' ' <<cP->rdDL
1130  <<" wrDL=" <<setWTO <<' ' <<cP->wrDL);
1131 
1132 // If no timeout really applies, we are done
1133 //
1134  if (cP->deadLine == maxTime) return false;
1135 
1136 // Add the channel to the timeout queue in correct deadline position.
1137 //
1138  if ((ncP = tmoBase))
1139  {do {if (cP->deadLine < ncP->deadLine) break;
1140  ncP = ncP->tmoList.next;
1141  } while(ncP != tmoBase);
1142  INSERT(tmoList, ncP, cP);
1143  if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1144  } else tmoBase = cP;
1145  cP->inTOQ = 1;
1146 
1147 // Indicate to the caller whether or not a wakeup is required
1148 //
1149  return (tmoBase == cP);
1150 }
1151 
1152 /******************************************************************************/
1153 /* T m o D e l */
1154 /******************************************************************************/
1155 
1157 {
1158 
1159 // Do some tracing
1160 //
1161  IF_TRACE(TmoDel,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1162  <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1163 
1164 // Get the timeout queue lock and remove the channel from the queue
1165 //
1166  toMutex.Lock();
1167  REMOVE(tmoBase, tmoList, cP);
1168  cP->inTOQ = 0;
1169  toMutex.UnLock();
1170 }
1171 
1172 /******************************************************************************/
1173 /* T m o G e t */
1174 /******************************************************************************/
1175 
1177 {
1178  int wtval;
1179 
1180 // Lock the timeout queue
1181 //
1182  toMutex.Lock();
1183 
1184 // Calculate wait time. If the deadline passed, invoke the timeout callback.
1185 // we will need to drop the timeout lock as we don't have the channel lock.
1186 //
1187  do {if (!tmoBase) {wtval = -1; break;}
1188  wtval = (tmoBase->deadLine - time(0)) * 1000;
1189  if (wtval > 0) break;
1190  toMutex.UnLock();
1191  CbkTMO();
1192  toMutex.Lock();
1193  } while(1);
1194 
1195 // Return the value
1196 //
1197  CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
1198  toMutex.UnLock();
1199  return wtval;
1200 }
1201 
1202 /******************************************************************************/
1203 /* W a k e U p */
1204 /******************************************************************************/
1205 
1206 void XrdSys::IOEvents::Poller::WakeUp()
1207 {
1208  static PipeData cmdbuff(PipeData::NoOp);
1209 
1210 // Send it off to wakeup the poller thread, but only if here is no wakeup in
1211 // progress.
1212 //
1213 // We use a mutex here because we want to produce a synchronization point - all
1214 // threads that might be interested timeouts and wakeups are going to incur a
1215 // cache bounce for the page where wakePend resides; they will see a consistent
1216 // view of the wakePend flag. For those threads, this is equivalent to
1217 // an atomic with memory_order std::memory_order_seq_cst (the strongest ordering).
1218 // However, the threads that are not interested in timeouts will not get a flush
1219 // for their copy of the wakePend page. They will still have the weaker memory
1220 // ordering of consume/release (which is guaranteed anyway on all current architectures
1221 // except for DEC Alpha).
1222  toMutex.Lock();
1223  bool isWakePend = CPP_ATOMIC_LOAD(wakePend, std::memory_order_consume);
1224  if (isWakePend) {toMutex.UnLock();}
1225  else {CPP_ATOMIC_STORE(wakePend, true, std::memory_order_release);
1226  toMutex.UnLock();
1227  SendCmd(cmdbuff);
1228  }
1229 }
1230 
1231 /******************************************************************************/
1232 /* I m p l e m e n t a t i o n S p e c i f i c s */
1233 /******************************************************************************/
1234 
1235 #if defined( __solaris__ )
1237 #elif defined( __linux__ )
1239 #elif defined(__APPLE__)
1241 #else
1243 #endif
ssize_t write(int fildes, const void *buf, size_t nbyte)
ssize_t read(int fildes, void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:43
#define CPP_ATOMIC_LOAD(x, order)
#define CPP_ATOMIC_STORE(x, val, order)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
#define IF_TRACE(x, fd, y)
#define TRACE_LOK
#define TRACING
#define REMOVE(dlbase, dlvar, curitem)
#define STATUS
#define TRACE_NOD(x, fd, y)
#define STATUSOF(x)
#define DO_TRACE(x, fd, y)
#define REVENTS(x)
#define BOOLNAME(x)
#define TRACE_MOD(x, fd, y)
#define ISPOLLER
#define INSERT(dlvar, curitem, newitem)
#define WEVENTS(x)
#define XRDSYSTHREAD_BIND
void Lock(XrdSysMutex *Mutex)
static int Same(pthread_t t1, pthread_t t2)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static pthread_t ID(void)
static void * Start(void *parg)
virtual void Fatal(Channel *chP, void *cbArg, int eNum, const char *eTxt)
virtual bool Event(Channel *chP, void *cbArg, int evFlags)=0
virtual void Stop(Channel *chP, void *cbArg)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ WriteTimeOut
Write timeout.
@ ValidEvents
Mask to test for valid events.
void SetCallBack(CallBack *cbP, void *cbArg=0)
void GetCallBack(CallBack **cbP, void **cbArg)
@ errorEvents
Error event non-r/w specific.
@ stopEvent
Poller stop event.
bool Enable(int events, int timeout=0, const char **eText=0)
Channel(Poller *pollP, int fd, CallBack *cbP=0, void *cbArg=0)
bool Disable(int events, const char **eText=0)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
virtual bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
virtual bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
int GetFault(Channel *cP)
Poller(int cFD, int rFD)
static Poller * Create(int &eNum, const char **eTxt=0, int crOpts=0)
void TmoDel(Channel *cP)
virtual void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)
int SendCmd(PipeData &cmd)
int Poll2Enum(short events)
bool TmoAdd(Channel *cP, int tmoSet)
void SetPollEnt(Channel *cP, int ptEnt)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
@ dec
Definition: XrdSysTrace.hh:42
@ hex
Definition: XrdSysTrace.hh:42
XrdSysSemaphore * pollSync