32 #include <sys/types.h>
34 #include <sys/epoll.h>
55 static int AllocMem(
void **memP,
int slots);
57 PollE(
struct epoll_event *ptab,
int numfd,
int pfd,
int pFD[2])
58 :
Poller(pFD[0], pFD[1]), pollTab(ptab), cbNow(0),
59 pollDfd(pfd), pollMax(numfd), pollNum(1), numPoll(0),
70 bool Include(
Channel *cP,
int &eNum,
const char **eTxt,
bool &isLocked);
72 bool Modify (
Channel *cP,
int &eNum,
const char **eTxt,
bool &isLocked);
77 int AllocPT(
int slots);
78 void Dispatch(
Channel *cP, uint32_t pollEv);
79 bool Process(
int curr);
81 struct epoll_event *pollTab;
90 void *PollE::deadChP = 0;
102 XrdSys::IOEvents::Poller::newPoller(
int pipeFD[2],
107 static const int allocFD = 1024;
108 struct epoll_event *pp, myEvent = {(EPOLLIN | EPOLLPRI), {0}};
113 #ifndef EPOLL_CLOEXEC
114 if ((pfd = epoll_create(allocFD)) >= 0)
fcntl(pfd, F_SETFD, FD_CLOEXEC);
117 if ((pfd = epoll_create1(EPOLL_CLOEXEC)) < 0)
120 if (eTxt) *eTxt =
"creating epoll device";
126 if (epoll_ctl(pfd, EPOLL_CTL_ADD, pipeFD[0], &myEvent))
128 *eTxt =
"adding communication pipe";
136 if (eTxt) *eTxt =
"creating epoll table";
143 return (
Poller *)
new PollE(pp, allocFD, pfd, pipeFD);
155 int rc, bytes, alignment, pagsz = getpagesize();
159 bytes = slots *
sizeof(
struct epoll_event);
160 alignment = (bytes < pagsz ? 1024 : pagsz);
161 if (!(rc = posix_memalign(memP, alignment, bytes))) memset(*memP, 0, bytes);
169 int XrdSys::IOEvents::PollE::AllocPT(
int slots)
171 struct epoll_event *pp;
175 if (pollMax >= slots) slots = pollMax + 256;
176 else slots = pollMax + (slots/256*256) + (slots%256 ? 256 : 0);
180 if (!AllocMem((
void **)&pp, slots))
199 int numpolled, pollN;
211 do {
do {numpolled = epoll_wait(pollDfd, pollTab, pollMax, TmoGet());}
212 while (numpolled < 0 && errno == EINTR);
215 if (numpolled == 0) CbkTMO();
216 else if (numpolled < 0)
223 if( rc == EBADF && parentPID != getpid() )
return;
224 std::cerr <<
"EPoll: "<<
XrdSysE2T(rc)<<
" polling for events "<<std::endl;
227 else for (
int i = 0; i < numpolled; i++)
228 {
if ((cP = (
Channel *)pollTab[i].data.ptr))
229 {cbCurr = i; Dispatch(cP, pollTab[i].events);}
230 else if (!Process(i))
return;
234 if (pollMax < pollN) AllocPT(pollN);
246 static const uint32_t pollER = EPOLLERR| EPOLLHUP;
247 static const uint32_t pollOK = EPOLLIN | EPOLLPRI | EPOLLOUT;
248 static const uint32_t pollRD = EPOLLIN | EPOLLPRI;
249 static const uint32_t pollWR = EPOLLOUT;
251 int eNum, events = 0;
252 bool isLocked =
false;
262 eNum = (pollEv & EPOLLERR ? EPIPE : ECONNRESET);
264 else if (pollEv & pollOK)
269 else {eTxt =
"polling"; eNum = EIO;}
274 if (!CbkXeq(cP, events, eNum, eTxt)) Exclude(cP, isLocked, 0);
283 bool &isLocked,
bool dover)
289 epoll_ctl(pollDfd, EPOLL_CTL_DEL, cP->
GetFD(), 0);
306 cmdbuff.
req = PipeData::RmFD;
310 if (cbNow && cbNow != cP)
311 for (
int i = cbCurr+1; i < numPoll; i++)
312 {
if (cP == (
Channel *)pollTab[i].data.ptr)
313 pollTab[i].data.ptr = &deadChP;
327 struct epoll_event myEvent = {0, {(
void *)cP}};
337 if (epoll_ctl(pollDfd, EPOLL_CTL_ADD, cP->
GetFD(), &myEvent))
339 if (eTxt) *eTxt =
"adding channel";
359 struct epoll_event myEvents = {0, {(
void *)cP}};
370 if (epoll_ctl(pollDfd, EPOLL_CTL_MOD, cP->
GetFD(), &myEvents))
372 if (eTxt) *eTxt =
"modifying poll events";
385 bool XrdSys::IOEvents::PollE::Process(
int curr)
390 {
if (reqBuff.req == PipeData::RmFD)
392 for (
int i = curr+1; i < numPoll; i++)
393 {
if ((cP = (
Channel *)pollTab[i].data.ptr)
395 && reqBuff.fd == cP->
GetFD()) pollTab[i].data.ptr=&deadChP;
397 reqBuff.theSem->Post();
399 else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
423 if (pollTab) {free(pollTab); pollTab = 0;}
427 if (pollDfd >= 0) {
close(pollDfd); pollDfd = -1;}
int fcntl(int fd, int cmd,...)
#define CPP_ATOMIC_STORE(x, val, order)
const char * XrdSysE2T(int errcode)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
PollE(struct epoll_event *ptab, int numfd, int pfd, int pFD[2])
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
static int AllocMem(void **memP, int slots)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)