45 int bytes, alignment, pagsz = getpagesize();
50 bytes = maxfd *
sizeof(
struct pollfd);
51 alignment = (bytes < pagsz ? 1024 : pagsz);
52 if (posix_memalign((
void **)&pp, alignment, bytes))
53 {
Log.
Emsg(
"Poll", ENOMEM,
"create poll table");
59 memset((
void *)pp, 0, bytes);
84 if (PollTab) free(PollTab);
103 while((ptnum < PollTNum) && (PollTab[ptnum].fd != -1)) ptnum++;
108 {
Log.
Emsg(
"Attach",
"Attach",pInfo.
Link.
ID,
"failed; poll table overflow.");
115 pfd = &(PollTab[ptnum]);
117 pfd->events = POLLIN | POLLRDNORM;
123 if (ptnum == PollTNum) PollTNum++;
143 if (pInfo.
inQ) dqLink(&pInfo);
151 TRACEI(POLL,
"Poller " <<
PID <<
" async disabling link FD " <<pInfo.
FD);
156 memset(&cmdbuff, 0,
sizeof(cmdbuff));
158 cmdbuff[0].
Parms.Arg.fd = pInfo.
FD;
161 cmdbuff[1].
Parms.theSem = &mySem;
163 if (
write(
CmdFD, &cmdbuff,
sizeof(cmdbuff)) < 0) myerrno = errno;
168 if (myerrno)
Log.
Emsg(
"Poll", myerrno,
"disable link", pInfo.
Link.
ID);
170 if (etxt &&
Finish(pInfo, etxt))
198 TRACEI(POLL,
"sending poller " <<
PID <<
" enable for link " <<pInfo.
FD);
200 cmdbuff.
Parms.Arg.fd = pInfo.
FD;
203 if (
write(
CmdFD, &cmdbuff,
sizeof(cmdbuff)) < 0) myerrno = errno;
209 {
Log.
Emsg(
"Poll", myerrno,
"enable link", pInfo.
Link.
ID);
return 0;}
232 else if (pInfo.
inQ) dqLink(&pInfo);
236 TRACEI(POLL,
"sending poller " <<
PID <<
" detach for link " <<pInfo.
FD);
238 cmdbuff[0].
Parms.Arg.fd = pInfo.
FD;
241 cmdbuff[1].
Parms.theSem = &mySem;
243 if (
write(
CmdFD, &cmdbuff,
sizeof(cmdbuff)) < 0) myerrno = errno;
248 if (myerrno)
Log.
Emsg(
"Poll", myerrno,
"detach link", pInfo.
Link.
ID);
258 int numpolled, num2sched;
263 const short pollOK = POLLIN | POLLRDNORM;
267 PollTab[0].fd =
ReqFD;
268 PollTab[0].events = pollOK;
269 PollTab[0].revents = 0;
279 std::vector<struct pollfd> PollTabCopy;
283 PollTabCopy.resize(PollTNum);
284 memcpy(PollTabCopy.data(), PollTab,
sizeof(
struct pollfd) * PollTNum);
287 do {numpolled = poll(PollTabCopy.data(), PollTabCopy.size(), -1);}
288 while(numpolled < 0 && (errno == EAGAIN || errno == EINTR));
293 {
if (errno != EINTR) Restart(errno);
304 for (
size_t idx=0; idx<PollTabCopy.size(); idx++)
305 PollTab[idx].revents = PollTabCopy[idx].revents;
309 if (PollTab[0].revents & pollOK)
311 doRequests(numpolled);
312 if (--numpolled <= 0)
continue;
318 plp = 0; nlp = PollQ; jfirst = jlast = 0; num2sched = 0;
319 while ((pInfo = nlp) && numpolled > 0)
320 {
if ((pollevents = pInfo->
PollEnt->revents))
322 if (plp) nlp = plp->
Next = pInfo->
Next;
323 else nlp = PollQ = pInfo->
Next;
324 numpolled--; pInfo->
inQ =
false;
325 if (!(pollevents & pollOK))
329 Log.
Emsg(
"Poll",
"Disabled event occurred for", lp->
ID);
332 if (!jlast) jlast=(
XrdJob *)lp;
337 plp = pInfo; nlp = pInfo->
Next;
339 if (numpolled) Recover(numpolled);
345 else if (num2sched)
Sched.
Schedule(num2sched, jfirst, jlast);
364 if ((lastent = PollTNum-1) < 0)
365 {
Log.
Emsg(
"Poll",
"Underflow during detach"); abort();}
368 do {PollTNum--;}
while(PollTNum && PollTab[PollTNum-1].fd == -1);
375 void XrdPollPoll::doRequests(
int maxreq)
378 int pti, ptfd, num2do;
384 num2do = (maxreq < 3 ? -1 : maxreq);
395 if ((ptfd = abs(PollTab[pti].fd)) !=
ReqBuff.
Parms.Arg.fd)
396 {
auto fd = PollTab[pti].fd;
407 {PollTab[pti].events = POLLIN | POLLRDNORM;
408 PollTab[pti].fd = ptfd;
410 act =
" enabled fd ";
413 {PollTab[pti].fd = -ptfd;
414 act =
" disabled fd ";
418 {PollTab[pti].fd = -1;
420 act =
" detached fd ";
423 else {PollGuard.UnLock();
424 Log.
Emsg(
"Poll",
"Received an invalid poll pipe request");
429 <<
" entry " <<pti <<
" now at " <<PollTNum);
445 plp = 0; nlp = PollQ;
446 while (nlp && (pInfo != nlp)) {plp=nlp; nlp = nlp->
Next;}
450 if (nlp) {
if (plp) plp->
Next = nlp->
Next;
451 else PollQ = nlp->
Next;
463 void XrdPollPoll::LogEvent(
int req,
int pollfd,
int cmdfd)
465 const char *opn, *id1, *id2;
475 {sprintf(buff,
"poll %d failed; FD %d",
PID, cmdfd);
476 Log.
Emsg(
"Poll", opn, buff,
"does not map to a link");
481 else id1 =
"unknown";
483 else id2 =
"unknown";
484 snprintf(buff,
sizeof(buff)-1,
485 "%d poll fd=%d (%s) not equal %s cmd fd=%d (%s).",
486 PID, pollfd, id1, opn, cmdfd, id2);
488 Log.
Emsg(
"Poll",
"cmd/poll mismatch:", buff);
495 void XrdPollPoll::Recover(
int numleft)
502 for (i = 1; i < PollTNum; i++)
503 if (PollTab[i].revents)
507 PollTab[i].fd = -PollTab[i].fd;
517 void XrdPollPoll::Restart(
int ecode)
523 TRACE(POLL,
PID <<
'-' <<
TID <<
" Poll error " <<ecode);
524 Log.
Emsg(
"Poll", errno,
"poll");
529 while((pInfo = PollQ))
530 {PollQ = pInfo->
Next;
532 Finish(*pInfo,
"Unexpected polling error");
ssize_t write(int fildes, const void *buf, size_t nbyte)
static XrdLink * fd2link(int fd)
static XrdPollInfo * fd2PollInfo(int fd)
char * ID
Pointer to the client's link identity.
int Include(XrdPollInfo &pInfo)
XrdPollPoll(struct pollfd *pp, int numfd)
void Start(XrdSysSemaphore *syncp, int &rc)
void Disable(XrdPollInfo &pInfo, const char *etxt=0)
void Exclude(XrdPollInfo &pInfo)
int Enable(XrdPollInfo &pInfo)
static char * Poll2Text(short events)
static XrdPoll * newPoller(int pollid, int numfd)
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
union XrdPoll::PipeData::@18 Parms