XRootD
XrdSys::IOEvents::PollKQ Class Reference
+ Inheritance diagram for XrdSys::IOEvents::PollKQ:
+ Collaboration diagram for XrdSys::IOEvents::PollKQ:

Public Member Functions

 PollKQ (struct kevent *ptab, int numfd, int pfd, int pFD[2])
 
 ~PollKQ ()
 
- Public Member Functions inherited from XrdSys::IOEvents::Poller
 Poller (int cFD, int rFD)
 
virtual ~Poller ()
 Destructor. Stop() is effecively called when this object is deleted. More...
 
void Stop ()
 

Static Public Member Functions

static int AllocMem (void **memP, int slots)
 
- Static Public Member Functions inherited from XrdSys::IOEvents::Poller
static PollerCreate (int &eNum, const char **eTxt=0, int crOpts=0)
 

Protected Member Functions

void Begin (XrdSysSemaphore *syncp, int &rc, const char **eMsg)
 
void Exclude (Channel *cP, bool &isLocked, bool dover=1)
 
bool Include (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
 
bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
 
void Shutdown ()
 
- Protected Member Functions inherited from XrdSys::IOEvents::Poller
void CbkTMO ()
 
bool CbkXeq (Channel *cP, int events, int eNum, const char *eTxt)
 
 CPP_ATOMIC_TYPE (bool) wakePend
 
int GetFault (Channel *cP)
 
int GetPollEnt (Channel *cP)
 
int GetRequest ()
 
bool Init (Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
 
void LockChannel (Channel *cP)
 
int Poll2Enum (short events)
 
int SendCmd (PipeData &cmd)
 
void SetPollEnt (Channel *cP, int ptEnt)
 
bool TmoAdd (Channel *cP, int tmoSet)
 
void TmoDel (Channel *cP)
 
int TmoGet ()
 
void UnLockChannel (Channel *cP)
 

Additional Inherited Members

- Public Types inherited from XrdSys::IOEvents::Poller
enum  CreateOpts { optTOM }
 
- Protected Attributes inherited from XrdSys::IOEvents::Poller
ChannelattBase
 
bool chDead
 
int cmdFD
 
int pipeBlen
 
char * pipeBuff
 
struct pollfd pipePoll
 
pthread_t pollTid
 
PipeData reqBuff
 
int reqFD
 
ChanneltmoBase
 
unsigned char tmoMask
 
- Static Protected Attributes inherited from XrdSys::IOEvents::Poller
static time_t maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
 
static pid_t parentPID = getpid()
 

Detailed Description

Definition at line 52 of file XrdSysIOEventsPollKQ.icc.

Constructor & Destructor Documentation

◆ PollKQ()

XrdSys::IOEvents::PollKQ::PollKQ ( struct kevent *  ptab,
int  numfd,
int  pfd,
int  pFD[2] 
)
inline

Definition at line 58 of file XrdSysIOEventsPollKQ.icc.

59  : Poller(pFD[0], pFD[1]), pollTab(ptab), cbNext(0),
60  pollDfd(pfd), pollMax(numfd), pollNum(1), numPoll(0)
61  {EV_SET(&armPipe, reqFD, EVFILT_READ,
62  EV_ADD|EV_CLEAR|EV_ENABLE, 0, 0, 0);
63  }
Poller(int cFD, int rFD)

References XrdSys::IOEvents::Poller::reqFD.

◆ ~PollKQ()

XrdSys::IOEvents::PollKQ::~PollKQ ( )
inline

Definition at line 64 of file XrdSysIOEventsPollKQ.icc.

References XrdSys::IOEvents::Poller::Stop().

+ Here is the call graph for this function:

Member Function Documentation

◆ AllocMem()

int XrdSys::IOEvents::PollKQ::AllocMem ( void **  memP,
int  slots 
)
static

Definition at line 156 of file XrdSysIOEventsPollKQ.icc.

157 {
158  int rc, bytes, alignment, pagsz = getpagesize();
159 
160 // Calculate the size of the poll table and allocate it
161 //
162  bytes = slots * sizeof(struct kevent);
163  alignment = (bytes < pagsz ? 1024 : pagsz);
164  if (!(rc = posix_memalign(memP, alignment, bytes))) memset(*memP, 0, bytes);
165  return rc;
166 }

◆ Begin()

void XrdSys::IOEvents::PollKQ::Begin ( XrdSysSemaphore syncp,
int &  rc,
const char **  eTxt 
)
protectedvirtual

Start the polling event loop. An implementation must be supplied. Begin() is called via the internal BootStrap class from a new thread.

Implements XrdSys::IOEvents::Poller.

Definition at line 198 of file XrdSysIOEventsPollKQ.icc.

201 {
202  struct timespec *tmP, tmOut;
203  Channel *cP;
204  long long tmVal;
205  int numpolled, pollN;
206 
207 // Indicate to the starting thread that all went well
208 //
209  retcode = 0;
210  *eTxt = 0;
211  syncsem->Post();
212  tmOut.tv_nsec = 0;
213 
214 // Now start dispatching channels that are ready. We use the wakePend flag to
215 // keep the chatter down when we actually wakeup.
216 //
217  do {if ((tmVal = TmoGet()) < 0) tmP = 0;
218  else {tmOut.tv_sec = tmVal / 1000; tmP = &tmOut;}
219  do {numpolled = kevent(pollDfd, 0, 0, pollTab, pollMax, tmP);}
220  while (numpolled < 0 && errno == EINTR);
221  wakePend = true; numPoll = numpolled;
222  if (numpolled == 0) CbkTMO();
223  else if (numpolled < 0)
224  {int rc = errno;
225  //--------------------------------------------------------------
226  // If we are in a child process and the poll file descriptor
227  // has been closed, there is an immense chance the fork will be
228  // followed by an exec, in which case we don't want to abort
229  //--------------------------------------------------------------
230  if( rc == EBADF && parentPID != getpid() ) return;
231  std::cerr <<"KQ: " <<XrdSysE2T(rc) <<" polling for events" <<std::endl;
232  abort();
233  }
234  else for (int i = 0; i < numpolled; i++)
235  {if ((cP = (Channel *)pollTab[i].udata)) Dispatch(cP, i);
236  else if (!Process(i+1)) return;
237  }
238 
239  pollN = AtomicGet(pollNum);
240  if (pollMax < pollN) AllocPT(pollN);
241 
242  } while(1);
243 }
#define AtomicGet(x)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104

References AtomicGet, XrdSysSemaphore::Post(), and XrdSysE2T().

+ Here is the call graph for this function:

◆ Exclude()

void XrdSys::IOEvents::PollKQ::Exclude ( Channel cP,
bool &  isLocked,
bool  dover = 1 
)
protectedvirtual

Remove a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implements XrdSys::IOEvents::Poller.

Definition at line 283 of file XrdSysIOEventsPollKQ.icc.

285 {
286  struct kevent chlist[2];
287  int i = 0, theFD = cP->GetFD(), kqStatus = GetPollEnt(cP);
288 
289 // Setup the removal elements.
290 // may have been closed prior to this call (though this shouldn't happen).
291 //
292  if (kqStatus & rFilterX)
293  {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_DELETE, 0, 0, cP);}
294  if (kqStatus & wFilterX)
295  {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_DELETE, 0, 0, cP);}
296 
297 // Remove this channel from the poll set. We ignore errors as the descriptor
298 // may have been closed prior to this call (though this shouldn't happen).
299 //
300  if (i) kevent(pollDfd, chlist, i, 0, 0, 0);
301  SetPollEnt(cP, 0);
302  AtomicDec(pollNum);
303 
304 // If we need to verify this action, sync with the poller thread (note that the
305 // poller thread will not ask for this action unless it wants to deadlock). We
306 // may actually deadlock anyway if the channel lock is held. We are allowed to
307 // release it if the caller locked it. This will prevent a deadlock. Otherwise,
308 // if we are in a callback and this channel is not the one that initiated the
309 // exclude then we must make sure that we cancel any pending callback to the
310 // excluded channel as it may have been deleted and we won't know that here.
311 //
312  if (dover)
313  {PipeData cmdbuff;
314  if (isLocked)
315  {isLocked = false;
316  UnLockChannel(cP);
317  }
318  cmdbuff.req = PipeData::RmFD;
319  cmdbuff.fd = theFD;
320  SendCmd(cmdbuff);
321  } else {
322  if (cbNext)
323  for (int i = cbNext; i < numPoll; i++)
324  {if (cP == (Channel *)pollTab[i].udata)
325  pollTab[i].udata = &deadChP;
326  }
327  }
328 }
#define AtomicDec(x)
int GetPollEnt(Channel *cP)
int SendCmd(PipeData &cmd)
void UnLockChannel(Channel *cP)
void SetPollEnt(Channel *cP, int ptEnt)

References AtomicDec, XrdSys::IOEvents::Poller::PipeData::fd, XrdSys::IOEvents::Channel::GetFD(), and XrdSys::IOEvents::Poller::PipeData::req.

+ Here is the call graph for this function:

◆ Include()

bool XrdSys::IOEvents::PollKQ::Include ( Channel cP,
int &  eNum,
const char **  eTxt,
bool &  isLocked 
)
protectedvirtual

Add a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implements XrdSys::IOEvents::Poller.

Definition at line 334 of file XrdSysIOEventsPollKQ.icc.

338 {
339 
340 // We simply call modify as this will add events to the kqueue as needed
341 //
342  if (!Modify(cP, eNum, eTxt, isLocked))
343  {if (eTxt) *eTxt = "adding channel";
344  return false;
345  }
346 
347 // All went well. Bump the number in the set. The poller thread will
348 // reallocate the poll table if need be.
349 //
350  AtomicInc(pollNum);
351  return true;
352 }
#define AtomicInc(x)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)

References AtomicInc.

◆ Modify()

bool XrdSys::IOEvents::PollKQ::Modify ( Channel cP,
int &  eNum,
const char **  eTxt,
bool &  isLocked 
)
protectedvirtual

Modify the event status of a channel. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implements XrdSys::IOEvents::Poller.

Definition at line 358 of file XrdSysIOEventsPollKQ.icc.

362 {
363  (void)isLocked;
364  struct kevent chlist[2];
365  int i = 0;
366  int events = cP->GetEvents(), theFD = cP->GetFD();
367  int kqStatus = GetPollEnt(cP);
368 
369 // Establish new read event mask
370 //
371  if (events & Channel:: readEvents)
372  {if (!(kqStatus & rEnabled))
373  {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, cP);
374  kqStatus |= rEnabled | rFilterX;
375  i++;
376  }
377  } else {
378  if (kqStatus & rEnabled)
379  {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_DISABLE, 0, 0, cP);
380  kqStatus &= ~rEnabled;
381  i++;
382  }
383  }
384 
385 // Establish new write event mask
386 //
387  if (events & Channel::writeEvents)
388  {if (!(kqStatus & wEnabled))
389  {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, cP);
390  kqStatus |= wEnabled | wFilterX;
391  i++;
392  }
393  } else {
394  if (kqStatus & wEnabled)
395  {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_DISABLE, 0, 0, cP);
396  kqStatus &= ~wEnabled;
397  i++;
398  }
399  }
400 
401 // Modify this fd if anything changed
402 //
403  if (i)
404  {if (kevent(pollDfd, chlist, i, 0, 0, 0) < 0)
405  {eNum = errno;
406  if (eTxt) *eTxt = "modifying poll events";
407  return false;
408  }
409  SetPollEnt(cP, kqStatus);
410  }
411 
412 // All done
413 //
414  return true;
415 }
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.

References XrdSys::IOEvents::Channel::GetEvents(), XrdSys::IOEvents::Channel::GetFD(), XrdSys::IOEvents::Channel::readEvents, and XrdSys::IOEvents::Channel::writeEvents.

+ Here is the call graph for this function:

◆ Shutdown()

void XrdSys::IOEvents::PollKQ::Shutdown ( )
protectedvirtual

Shutdown the poller. An implementation must be supplied. The shutdown method must release any allocated storage and close private file descriptors. The polling thread will have already been terminated and x-thread pipe closed. Warning: the derived destructor must call Stop() and do nothing else!

Implements XrdSys::IOEvents::Poller.

Definition at line 455 of file XrdSysIOEventsPollKQ.icc.

456 {
457  static XrdSysMutex shutMutex;
458 
459 // To avoid race conditions, we serialize this code
460 //
461  shutMutex.Lock();
462 
463 // Release the poll table
464 //
465  if (pollTab) {free(pollTab); pollTab = 0;}
466 
467 // Close the kqueue file descriptor
468 //
469  if (pollDfd >= 0) {close(pollDfd); pollDfd = -1;}
470 
471 // All done
472 //
473  shutMutex.UnLock();
474 }
#define close(a)
Definition: XrdPosix.hh:43

References close, XrdSysMutex::Lock(), and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

The documentation for this class was generated from the following file: