53 PollPoll(
int &rc,
int numfd,
int pFD[2]);
62 bool Include(
Channel *cP,
int &eNum,
const char **eTxt,
bool &isLocked);
64 bool Modify (
Channel *cP,
int &eNum,
const char **eTxt,
bool &isLocked);
69 void Dispatch(
int ptent,
int pollEv);
70 void FDMod(
int ptnum,
int fd,
int events);
71 void FDRem(
int ptnum);
74 static const int disFD = 0x80000000;
77 struct pollfd *pollTab;
80 struct pollfd *pnewTab;
96 XrdSys::IOEvents::Poller::newPoller(
int pipeFD[2],
105 if (!(myPoller =
new PollPoll(eNum, 1024, pipeFD))) eNum = ENOMEM;
109 if (!eNum)
return (
Poller *)myPoller;
111 if (eTxt) *eTxt =
"creating poller";
129 if (!(pollTab = (
struct pollfd *)malloc(numfd*
sizeof(
struct pollfd))))
130 {rc = errno;
return;}
134 for (i = 1; i < numfd; i++)
135 {pollTab[i].fd = -1; pollTab[i].events = 0; pollTab[i].revents = 0;}
139 pollTab[0].fd = pFD[0];
140 pollTab[0].events = POLLIN | POLLRDNORM;
141 pollTab[0].revents = 0;
152 {rc = errno;
return;}
156 memset(chnlTab, 0, numfd*
sizeof(
Channel *));
173 int i, num2poll, numpolled;
185 do {num2poll = pollNum;
187 do {numpolled = poll(pollTab, num2poll, TmoGet());}
188 while(numpolled < 0 && (errno == EAGAIN || errno == EINTR));
193 {memcpy(pnewTab, pollTab, pollMax*
sizeof(
struct pollfd));
194 free(pollTab); pollTab = pnewTab; pnewTab = 0; pollMax = chnlMax;
197 if (numpolled == 0) CbkTMO();
198 else if (numpolled < 0)
205 if( rc == EBADF && parentPID != getpid() )
return;
206 std::cerr <<
"PPoll: "<<
XrdSysE2T(rc)<<
" polling for events"<<std::endl;
209 else{
if (pollTab[0].revents) numpolled--;
210 for (i = 1; i < num2poll && numpolled; i++)
211 {
if (pollTab[i].revents)
213 Dispatch(i, pollTab[i].revents);
216 if (pollTab[0].revents && !Process())
return;
225 void XrdSys::IOEvents::PollPoll::Dispatch(
int ptent,
int pollEv)
227 static const short pollER = POLLERR| POLLHUP | POLLNVAL;
228 static const short pollOK = POLLIN | POLLRDNORM | POLLPRI | POLLOUT;
229 static const short pollRD = POLLIN | POLLRDNORM | POLLPRI;
230 static const short pollWR = POLLOUT;
233 int eNum, events = 0;
237 if (!(cP = chnlTab[ptent])) {FDRem(ptent);
return;}
243 if (pollEv & POLLHUP) eNum = ECONNRESET;
244 else if (pollEv & POLLERR) eNum = EPIPE;
245 else if (pollEv & POLLNVAL)eNum = EBADF;
248 else if (pollEv & pollOK)
253 else {eTxt =
"polling"; eNum = EIO;}
257 if (!CbkXeq(cP, events, eNum, eTxt)) FDRem(ptent);
265 bool &isLocked,
bool dover)
271 ctnum = GetPollEnt(cP);
273 if (chnlTab[ctnum] != cP) {pollMutex.UnLock();
return;}
286 PipeData cmdbuff((
char)PipeData::RmFD,0,(
short)ctnum,cP->
GetFD());
287 if (isLocked) {isLocked =
false; UnLockChannel(cP);}
298 void XrdSys::IOEvents::PollPoll::FDMod(
int ptnum,
int fd,
int events)
305 {memcpy(pnewTab, pollTab, pollMax*
sizeof(
struct pollfd));
307 pollTab = pnewTab; pnewTab = 0; pollMax = chnlMax;
313 pollTab[ptnum].fd = fd;
314 pollTab[ptnum].events = 0;
315 pollTab[ptnum].revents = 0;
317 pollTab[ptnum].events = POLLIN | POLLRDNORM;
319 pollTab[ptnum].events |= POLLOUT;
321 pollTab[ptnum].fd |= disFD;
325 if (chnlNum >= pollNum) pollNum = chnlNum+1;
334 void XrdSys::IOEvents::PollPoll::FDRem(
int ptnum)
344 if (ctnum == chnlNum-1)
345 {
while(ctnum > 0 && !chnlTab[ctnum]) ctnum--;
351 pollTab[ptnum].fd = -1;
352 pollTab[ptnum].events = 0;
353 pollTab[ptnum].revents = 0;
357 if (ptnum == pollNum-1)
358 {
while(ptnum > 0 && pollTab[ptnum].fd == -1) ptnum--;
372 static const int incVal = 256;
373 static const int cpSz =
sizeof(
Channel *);
374 static const int ptSz =
sizeof(
struct pollfd);
382 if (eTxt) *eTxt =
"adding channel";
390 if (eTxt) *eTxt =
"adding channel";
398 while((ctnum < chnlMax) && (chnlTab[ctnum] != 0)) ctnum++;
404 if (ctnum >= chnlMax)
405 {
Channel **cnewTab = (
Channel **)realloc(chnlTab,(chnlMax+incVal)*cpSz);
406 if (pnewTab) free(pnewTab);
407 pnewTab = (
struct pollfd *)malloc((chnlMax+incVal)*ptSz);
408 if (!cnewTab || !pnewTab)
411 if (eTxt) *eTxt =
"adding channel";
412 if (cnewTab) free(cnewTab);
413 if (pnewTab) free(pnewTab);
416 memset(&cnewTab[ctnum], 0, incVal*cpSz);
417 memset(&pnewTab[ctnum],-1, incVal*ptSz);
418 chnlTab = cnewTab; chnlMax += incVal; chnlNum = ctnum+1;
419 }
else if (ctnum > chnlNum) chnlNum = ctnum;
424 SetPollEnt(cP, ctnum);
437 (
short)ctnum, fd, 0);
438 if (isLocked) {isLocked =
false; UnLockChannel(cP);}
466 (
short)GetPollEnt(cP), cP->
GetFD(), 0);
467 if (isLocked) {isLocked =
false; UnLockChannel(cP);}
480 bool XrdSys::IOEvents::PollPoll::Process()
486 {
case PipeData::MdFD: FDMod(reqBuff.ent, reqBuff.fd, reqBuff.evt);
488 case PipeData::MiFD: FDMod(reqBuff.ent, reqBuff.fd, reqBuff.evt);
489 reqBuff.theSem->Post();
491 case PipeData::RmFD: FDRem(reqBuff.ent);
492 reqBuff.theSem->Post();
494 case PipeData::NoOp:
break;
495 case PipeData::Post: reqBuff.theSem->Post();
497 case PipeData::Stop: reqBuff.theSem->Post();
523 if (pollTab) {free(pollTab); pollTab = 0;}
524 if (pnewTab) {free(pnewTab); pnewTab = 0;}
525 if (chnlTab) {free(chnlTab); chnlTab = 0;}
const char * XrdSysE2T(int errcode)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ errorEvents
Error event non-r/w specific.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
PollPoll(int &rc, int numfd, int pFD[2])