32 #include <sys/types.h>
36 @include
"XrdSys/XrdSysE2T.hh"
51 static int AllocMem(
void **memP,
int slots);
53 PollPort(port_event_t *ptab,
int numfd,
int pfd,
int pFD[2])
54 :
Poller(pFD[0], pFD[1]), pollTab(ptab),
55 pollDfd(pfd), pollMax(numfd)
59 static const int pollER = POLLERR| POLLHUP;
60 static const int pollOK = POLLIN | POLLRDNORM | POLLPRI | POLLOUT;
61 static const int pollRD = POLLIN | POLLRDNORM | POLLPRI;
68 timespec_t *
BegTO(timespec_t &theTO)
70 if (toval < 0)
return 0;
71 theTO.tv_sec = toval/1000;
78 bool Include(
Channel *cP,
int &eNum,
const char **eTxt,
bool &isLocked);
80 bool Modify (
Channel *cP,
int &eNum,
const char **eTxt,
bool &isLocked);
86 void Dispatch(
Channel *cP,
int pollEv);
87 bool Process(
int curr);
89 port_event_t *pollTab;
97 void *PollPort::deadChP = 0;
109 XrdSys::IOEvents::Poller::newPoller(
int pipeFD[2],
114 static const int allocFD = 170;
120 if ((pfd = port_create()) < 0)
122 if (eTxt) *eTxt =
"creating event port";
125 fcntl(pfd, F_SETFD, FD_CLOEXEC);
131 *eTxt =
"adding communication pipe";
138 {
if (eTxt) *eTxt =
"creating port event table";
145 return (
Poller *)
new PollPort(pp, allocFD, pfd, pipeFD);
157 int bytes, alignment, pagsz = getpagesize();
161 bytes = slots *
sizeof(port_event_t);
162 alignment = (bytes < pagsz ? 1024 : pagsz);
163 if (posix_memalign(memP, alignment, bytes))
return ENOMEM;
164 memset(*memP, 0, bytes);
176 unsigned int numpolled;
191 do {numpolled = 1; errno = 0;
192 do {rc = port_getn(pollDfd, pollTab, pollMax, &numpolled, BegTO(toVal));}
193 while (rc < 0 && errno == EINTR);
194 wakePend =
true; numPoll = numpolled;
196 {
if (errno ==
ETIME || !errno) CbkTMO();
197 else {
int rc = errno;
203 if( rc == EBADF && parentPID != getpid() )
return;
204 std::cerr <<
"PollP: " <<
XrdSysE2T(rc) <<
" polling for events" <<std::endl;
208 for (
int i = 0; i < (int)numpolled; i++)
209 if (pollTab[i].portev_source == PORT_SOURCE_FD)
210 {
if ((cP = (
Channel *)pollTab[i].portev_user))
211 {cbCurr = i; Dispatch(cP, pollTab[i].portev_events);}
212 else if (!Process(i))
return;
225 int eNum, events = 0;
226 bool isLocked =
false;
236 eNum = (pollEv & POLLERR ? EPIPE : ECONNRESET);
238 else if (pollEv & pollOK)
243 else {eTxt =
"polling"; eNum = EIO;}
248 if (!CbkXeq(cP, events, eNum, eTxt)) Exclude(cP, isLocked, 0);
249 else Modify(cP, eNum, &eTxt, isLocked);
258 bool &isLocked,
bool dover)
264 port_dissociate(pollDfd, PORT_SOURCE_FD, cP->
GetFD());
277 cmdbuff.
req = PipeData::RmFD;
281 if (cbNow && cbNow != cP)
282 for (
int i = cbCurr+1; i < numPoll; i++)
283 {
if (cP == (
Channel *)pollTab[i].portev_user)
284 pollTab[i].portev_user = &deadChP;
298 int pEvents = 0, events = cP->
GetEvents();
307 if (port_associate(pollDfd, PORT_SOURCE_FD, cP->
GetFD(), pEvents, cP))
309 if (eTxt) *eTxt =
"adding channel";
327 int pEvents = 0, events = cP->
GetEvents();
336 if (port_associate(pollDfd, PORT_SOURCE_FD, cP->
GetFD(), pEvents, cP))
338 if (eTxt) *eTxt =
"modifying poll events";
351 bool XrdSys::IOEvents::PollPort::Process(
int curr)
356 {
if (reqBuff.req == PipeData::RmFD)
358 for (
int i = curr+1; i < numPoll; i++)
359 {
if (reqBuff.fd == (
int)pollTab[i].portev_object)
360 pollTab[i].portev_user = &deadChP;
362 reqBuff.theSem->Post();
364 else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
371 port_associate(pollDfd, PORT_SOURCE_FD, reqFD, pollRD, 0);
389 if (pollTab) {free(pollTab); pollTab = 0;}
393 if (pollDfd >= 0) {
close(pollDfd); pollDfd = -1;}
int fcntl(int fd, int cmd,...)
const char * XrdSysE2T(int errcode)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
PollPort(port_event_t *ptab, int numfd, int pfd, int pFD[2])
timespec_t * BegTO(timespec_t &theTO)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
static int AllocMem(void **memP, int slots)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)