33 #include <sys/types.h>
35 #include <sys/epoll.h>
36 #include <sys/eventfd.h>
47 int pfd, wfd, bytes, alignment, pagsz = getpagesize();
48 struct epoll_event *pp;
53 if ((pfd = epoll_create(maxfd)) >= 0)
fcntl(pfd, F_SETFD, FD_CLOEXEC);
56 if ((pfd = epoll_create1(EPOLL_CLOEXEC)) < 0)
58 {
Log.
Emsg(
"Poll", errno,
"create epoll device");
return 0;}
60 if ((wfd = eventfd(0, EFD_CLOEXEC)) < 0)
62 "create an eventfd as the wait-poller descriptor");
69 bytes = maxfd *
sizeof(
struct epoll_event);
70 alignment = (bytes < pagsz ? 1024 : pagsz);
71 if (posix_memalign((
void **)&pp, alignment, bytes))
72 {
Log.
Emsg(
"Poll", ENOMEM,
"create poll table");
80 memset((
void *)pp, 0, bytes);
90 if (PollTab) free(PollTab);
91 if (WaitFd >= 0)
close(WaitFd);
92 if (PollDfd >= 0)
close(PollDfd);
99 int XrdPollE::AddWaitFd()
101 const unsigned int myPollEvts = EPOLLIN;
102 struct epoll_event myEvent = {myPollEvts, {(
void *)&WaitFd}};
106 if (epoll_ctl(PollDfd, EPOLL_CTL_ADD, WaitFd, &myEvent) < 0)
108 Log.
Emsg(
"Poll", rc,
"include the wait FD in the poll set");
131 struct epoll_event myEvents = {0, (
void *)&pInfo};
136 if (epoll_ctl(PollDfd, EPOLL_CTL_MOD, pInfo.
FD, &myEvents))
137 {
Log.
Emsg(
"Poll", errno,
"disable link", lp->ID);
return;}
143 TRACEI(POLL,
"Poller " <<
PID <<
" async disabling link " <<pInfo.
FD);
157 struct epoll_event myEvents = {ePollEvents, {(
void *)&pInfo}};
167 if (epoll_ctl(PollDfd, EPOLL_CTL_MOD, pInfo.
FD, &myEvents))
210 void XrdPollE::HandleWaitFd(
const unsigned int events)
221 if (!(events & EPOLLIN) || (events & (EPOLLERR | EPOLLHUP)))
223 Log.
Emsg(
"Poll",
"wait-poller handler:",
x2Text(events, eBuff));
224 if (events & (EPOLLERR | EPOLLHUP))
229 if (eventfd_read(WaitFd, &cnt) < 0)
230 {
Log.
Emsg(
"Poll", errno,
"read from the wait-poller descriptor");
234 for (ic=0;ic<cnt;++ic) WaitFdSem.
Post();
243 struct epoll_event myEvent = {0, {(
void *)&pInfo}};
248 if ((rc = epoll_ctl(PollDfd, EPOLL_CTL_ADD, pInfo.
FD, &myEvent)) < 0)
260 void XrdPollE::remFD(
XrdPollInfo &pInfo,
unsigned int events)
262 struct epoll_event myEvents = {0, {(
void *)&pInfo}};
271 {
TRACEI(POLL,
"Poller " <<
PID <<
" removing FD " <<pInfo.
FD);
272 if (epoll_ctl(PollDfd, EPOLL_CTL_DEL, pInfo.
FD, &myEvents)
273 && (errno != ENOENT || events != 0))
276 if (events & (EPOLLHUP |
EPOLLRDHUP)) why =
"sever";
277 else if (events & EPOLLERR) why =
"error";
279 snprintf(buff,
sizeof(buff),
"exclude fd %d during %s (%x) event; link",
280 pInfo.
FD, why, events);
293 int rc, i, numpolled, num2sched;
294 unsigned int waitFdEvents;
297 const short pollOK = EPOLLIN | EPOLLPRI;
301 if ((rc = AddWaitFd()))
314 do {
do {numpolled = epoll_wait(PollDfd, PollTab, PollMax, -1);}
315 while (numpolled < 0 && errno == EINTR);
316 if (numpolled == 0)
continue;
318 {
Log.
Emsg(
"Poll", errno,
"poll for events");
325 jfirst = jlast = 0; num2sched = 0;
326 haveWaiters =
false; waitFdEvents = 0;
327 for (i = 0; i < numpolled; i++)
328 {
if (PollTab[i].data.ptr == &WaitFd)
329 {haveWaiters =
true; waitFdEvents = PollTab[i].events;}
330 else if ((pInfo = (
XrdPollInfo *)PollTab[i].data.ptr))
332 remFD(*pInfo, PollTab[i].events);
334 if (!(PollTab[i].events & pollOK)
335 || (PollTab[i].events & POLLRDHUP))
339 if (!jlast) jlast=(
XrdJob *)lp;
342 PollTab[i].events = 0;
343 if (epoll_ctl(PollDfd,EPOLL_CTL_MOD,pInfo.
FD,&PollTab[i]))
347 }
else Log.
Emsg(
"Poll",
"null link event!!!!");
353 else if (num2sched)
Sched.
Schedule(num2sched, jfirst, jlast);
355 if (haveWaiters) HandleWaitFd(waitFdEvents);
363 void XrdPollE::Wait4Poller()
368 if (eventfd_write(WaitFd, 1) < 0)
369 {
Log.
Emsg(
"Poll", errno,
"write to the wait-poller descriptor");
382 if (events & EPOLLERR)
return "socket error";
384 if (events & (EPOLLHUP |
EPOLLRDHUP))
return "hangup";
386 sprintf(buff,
"unusual event (%.4x)", events);
int fcntl(int fd, int cmd,...)
char * ID
Pointer to the client's link identity.
const char * x2Text(unsigned int evf, char *buff)
void Exclude(XrdPollInfo &pInfo)
void Disable(XrdPollInfo &pInfo, const char *etxt=0)
void Start(XrdSysSemaphore *syncp, int &rc)
int Enable(XrdPollInfo &pInfo)
int Include(XrdPollInfo &pInfo)
static XrdPoll * newPoller(int pollid, int numfd)
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)