32 #include <sys/types.h>
34 #include <sys/event.h>
56 static int AllocMem(
void **memP,
int slots);
58 PollKQ(
struct kevent *ptab,
int numfd,
int pfd,
int pFD[2])
59 :
Poller(pFD[0], pFD[1]), pollTab(ptab), cbNext(0),
60 pollDfd(pfd), pollMax(numfd), pollNum(1), numPoll(0)
61 {EV_SET(&armPipe,
reqFD, EVFILT_READ,
62 EV_ADD|EV_CLEAR|EV_ENABLE, 0, 0, 0);
72 bool Include(
Channel *cP,
int &eNum,
const char **eTxt,
bool &isLocked);
74 bool Modify (
Channel *cP,
int &eNum,
const char **eTxt,
bool &isLocked);
79 int AllocPT(
int slots);
80 void Dispatch(
Channel *cP,
int i);
81 bool Process(
int next);
83 struct kevent *pollTab;
84 struct kevent armPipe;
92 static const int rEnabled = 1;
93 static const int rFilterX = 2;
94 static const int wEnabled = 4;
95 static const int wFilterX = 8;
97 void *PollKQ::deadChP = 0;
109 XrdSys::IOEvents::Poller::newPoller(
int pipeFD[2],
114 static const int allocFD = 1024;
115 struct kevent *pp, chlist;
120 if ((pfd = kqueue()) < 0)
122 if (eTxt) *eTxt =
"creating kqueue";
128 EV_SET(&chlist,pipeFD[0],EVFILT_READ,EV_ADD|EV_ONESHOT|EV_ENABLE,0,0,0);
129 if (kevent(pfd, &chlist, 1, 0, 0, 0) < 0)
131 *eTxt =
"adding communication pipe";
139 if (eTxt) *eTxt =
"creating kqueue table";
146 return (
Poller *)
new PollKQ(pp, allocFD, pfd, pipeFD);
158 int rc, bytes, alignment, pagsz = getpagesize();
162 bytes = slots *
sizeof(
struct kevent);
163 alignment = (bytes < pagsz ? 1024 : pagsz);
164 if (!(rc = posix_memalign(memP, alignment, bytes))) memset(*memP, 0, bytes);
172 int XrdSys::IOEvents::PollKQ::AllocPT(
int slots)
178 if (pollMax >= slots) slots = pollMax + 256;
179 else slots = pollMax + (slots/256*256) + (slots%256 ? 256 : 0);
183 if (!AllocMem((
void **)&pp, slots))
202 struct timespec *tmP, tmOut;
205 int numpolled, pollN;
217 do {
if ((tmVal = TmoGet()) < 0) tmP = 0;
218 else {tmOut.tv_sec = tmVal / 1000; tmP = &tmOut;}
219 do {numpolled = kevent(pollDfd, 0, 0, pollTab, pollMax, tmP);}
220 while (numpolled < 0 && errno == EINTR);
221 wakePend =
true; numPoll = numpolled;
222 if (numpolled == 0) CbkTMO();
223 else if (numpolled < 0)
230 if( rc == EBADF && parentPID != getpid() )
return;
231 std::cerr <<
"KQ: " <<
XrdSysE2T(rc) <<
" polling for events" <<std::endl;
234 else for (
int i = 0; i < numpolled; i++)
235 {
if ((cP = (
Channel *)pollTab[i].udata)) Dispatch(cP, i);
236 else if (!Process(i+1))
return;
240 if (pollMax < pollN) AllocPT(pollN);
251 static const uint16_t pollER = EV_EOF | EV_ERROR;
254 bool isLocked =
false;
262 if (!(pollTab[i].flags & pollER))
267 if (pollTab[i].fflags) eNum = pollTab[i].fflags;
268 else eNum = ECONNRESET;
269 eTxt =
"polling"; events = 0;
275 if (!CbkXeq(cP, events, eNum, eTxt)) Exclude(cP, isLocked, 0);
284 bool &isLocked,
bool dover)
286 struct kevent chlist[2];
287 int i = 0, theFD = cP->
GetFD(), kqStatus = GetPollEnt(cP);
292 if (kqStatus & rFilterX)
293 {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_DELETE, 0, 0, cP);}
294 if (kqStatus & wFilterX)
295 {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_DELETE, 0, 0, cP);}
300 if (i) kevent(pollDfd, chlist, i, 0, 0, 0);
318 cmdbuff.
req = PipeData::RmFD;
323 for (
int i = cbNext; i < numPoll; i++)
324 {
if (cP == (
Channel *)pollTab[i].udata)
325 pollTab[i].udata = &deadChP;
342 if (!Modify(cP, eNum, eTxt, isLocked))
343 {
if (eTxt) *eTxt =
"adding channel";
364 struct kevent chlist[2];
367 int kqStatus = GetPollEnt(cP);
372 {
if (!(kqStatus & rEnabled))
373 {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, cP);
374 kqStatus |= rEnabled | rFilterX;
378 if (kqStatus & rEnabled)
379 {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_DISABLE, 0, 0, cP);
380 kqStatus &= ~rEnabled;
388 {
if (!(kqStatus & wEnabled))
389 {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, cP);
390 kqStatus |= wEnabled | wFilterX;
394 if (kqStatus & wEnabled)
395 {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_DISABLE, 0, 0, cP);
396 kqStatus &= ~wEnabled;
404 {
if (kevent(pollDfd, chlist, i, 0, 0, 0) < 0)
406 if (eTxt) *eTxt =
"modifying poll events";
409 SetPollEnt(cP, kqStatus);
421 bool XrdSys::IOEvents::PollKQ::Process(
int next)
427 {
if (reqBuff.req == PipeData::RmFD)
429 for (
int i = next; i < numPoll; i++)
430 {
if ((cP = (
Channel *)pollTab[i].udata)
432 && reqBuff.fd == (
int)pollTab[i].ident)
433 pollTab[i].udata = &deadChP;
435 reqBuff.theSem->Post();
437 else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
444 kevent(pollDfd, &armPipe, 1, 0, 0, 0);
465 if (pollTab) {free(pollTab); pollTab = 0;}
469 if (pollDfd >= 0) {
close(pollDfd); pollDfd = -1;}
const char * XrdSysE2T(int errcode)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
PollKQ(struct kevent *ptab, int numfd, int pfd, int pFD[2])
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
static int AllocMem(void **memP, int slots)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)