50 const char *statName[] = {
"isClear",
"isCBMode",
"isDead"};
57 #define STATUS statName[(int)chStat]
59 #define STATUSOF(x) statName[(int)(x->chStat)]
61 #define SINGLETON(dlvar, theitem)\
62 theitem ->dlvar .next == theitem
64 #define INSERT(dlvar, curitem, newitem) \
65 newitem ->dlvar .next = curitem; \
66 newitem ->dlvar .prev = curitem ->dlvar .prev; \
67 curitem ->dlvar .prev-> dlvar .next = newitem; \
68 curitem ->dlvar .prev = newitem
70 #define REMOVE(dlbase, dlvar, curitem) \
71 if (dlbase == curitem) dlbase = (SINGLETON(dlvar,curitem) \
72 ? 0 : curitem ->dlvar .next);\
73 curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
74 curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
75 curitem ->dlvar .next = curitem;\
76 curitem ->dlvar .prev = curitem
78 #define REVENTS(x) x & Channel:: readEvents
80 #define WEVENTS(x) x & Channel::writeEvents
82 #define ISPOLLER XrdSysThread::Same(XrdSysThread::ID(),pollTid)
84 #define BOOLNAME(x) (x ? "true" : "false")
86 #define DO_TRACE(x,fd,y) \
87 {PollerInit::traceMTX.Lock(); \
88 std::cerr <<"IOE fd "<<fd<<' '<<#x <<": "<<y<<'\n'<< std::flush; \
89 PollerInit::traceMTX.UnLock();}
91 #define TRACING PollerInit::doTrace
93 #define IF_TRACE(x,fd,y) if (TRACING) DO_TRACE(x,fd,y)
95 #define TRACE_LOK " channel now " <<(isLocked ? "locked" : "unlocked")
97 #define TRACE_MOD(x,fd,y) \
98 IF_TRACE(x,fd,"Modify(" <<y <<") == " \
99 <<BOOLNAME(retval) <<TRACE_LOK)
101 #define TRACE_NOD(x,fd,y) \
102 IF_TRACE(x,fd,"Modify(" <<y <<") skipped; no events changed")
109 = (
sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff);
139 static void *
Start(
void *parg);
175 {(void)syncp; (void)rc; (void)eTxt;}
178 {(void)cP; (void)isLocked; (void)dover;}
182 if (!(eNum =
GetFault(cP))) eNum = EPROTO;
183 if (eTxt) *eTxt =
"initializing channel";
189 if (!(eNum =
GetFault(cP))) eNum = EPROTO;
190 if (eTxt) *eTxt =
"modifying channel";
223 if (eTxt) *eTxt =
"initializing channel";
228 {
bool rc =
Init(cP, eNum, eTxt, isLocked);
262 if (eTxt) *eTxt =
"initializing channel";
267 {
return Init(cP, eNum, eTxt, isLocked);}
287 : chPollXQ(pollP), chCB(cbP), chCBA(cbArg)
289 attList.next = attList.prev =
this;
290 tmoList.next = tmoList.prev =
this;
306 bool isLocked =
true;
317 if (!chPollXQ || chPollXQ == &
pollErr1)
326 chPollXQ->Detach(
this,isLocked,
false);
327 if (!isLocked) chMutex.Lock();
339 IF_TRACE(Delete,chFD,
"waiting for callback");
341 chCBA = (
void *)&cbDone;
358 int eNum = 0, newev, curev;
359 bool retval =
true, isLocked =
true;
367 if (chPoller == &
pollWait) curev =
static_cast<int>(reMod);
368 else curev =
static_cast<int>(chEvents);
372 IF_TRACE(Disable,chFD,
"->Disable(" <<events <<
") chev=" <<curev);
377 newev = curev & ~events;
384 retval = chPoller->Modify(
this, eNum, eText, isLocked);
389 if (isLocked) chMutex.UnLock();
393 if (!retval) errno = eNum;
404 int eNum = 0, newev, curev, tmoSet = 0;
405 bool retval, setTO, isLocked =
true;
413 if (chPoller == &
pollWait) curev =
static_cast<int>(reMod);
414 else curev =
static_cast<int>(chEvents);
418 IF_TRACE(Enable,chFD,
"->Enable("<<events<<
','<<timeout<<
") chev="<<curev);
423 newev = (curev ^ events) & events;
424 chEvents = curev | events;
429 {
if (timeout > 0) chRTO = timeout;
430 else if (timeout < 0) chRTO = 0;
435 {
if (timeout > 0) chWTO = timeout;
436 else if (timeout < 0) chWTO = 0;
442 if (tmoSet && chPoller != &
pollErr1)
443 setTO = chPollXQ->TmoAdd(
this, tmoSet);
453 {retval = chPoller->Modify(
this, eNum, eText, isLocked);
466 if (isLocked) chMutex.UnLock();
467 bool isWakePend =
CPP_ATOMIC_LOAD(chPollXQ->wakePend, std::memory_order_consume);
468 if (retval && !isWakePend && setTO && isLocked) chPollXQ->WakeUp();
472 if (!retval) errno = eNum;
495 chPoller = thePoller;
520 if (chStat != isDead)
533 bool isLocked =
true;
541 if (chStat == isDead)
554 chPollXQ->Detach(
this, isLocked,
true);
555 if (!isLocked) chMutex.Lock();
584 pipePoll.events = POLLIN | POLLRDNORM;
602 if ((pcP = attBase)) {
INSERT(attList, pcP, cP);}
623 while((cP = tmoBase) && cP->deadLine <= time(0))
624 {
int dlType = cP->dlType;
626 CbkXeq(cP, dlType, 0, 0);
637 int eNum,
const char *eTxt)
641 bool cbok, retval, isRead, isWrite, isLocked =
true;
646 {
const char *cbtype = (cP->chPoller == cP->chPollXQ ?
"norm" :
647 (cP->chPoller == &
pollInit ?
"init" :
648 (cP->chPoller == &
pollWait ?
"wait" :
"err")));
649 DO_TRACE(CbkXeq,cP->chFD,
"callback events=" <<events
650 <<
" chev=" <<
static_cast<int>(cP->chEvents)
651 <<
" toq=" <<(cP->inTOQ != 0) <<
" erc=" <<eNum
652 <<
" callback " <<(cP->chCB ?
"present" :
"missing")
653 <<
" poller=" <<cbtype);
663 if (isRead) cP->rdDL = maxTime;
665 if (isWrite) cP->wrDL = maxTime;
668 isRead = isWrite =
false;
676 if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
678 {cP->chPoller = &
pollErr1; cP->chFault = eNum;
682 oldEvents = cP->chEvents;
684 retval = cP->chPoller->
Modify(cP, eNum, 0, isLocked);
686 if (!isLocked) cP->chMutex.
Lock();
687 cP->chEvents = oldEvents;
697 {cP->chPoller = &
pollErr1; cP->chFault = eNum;
698 cP->chStat = Channel::isCBMode;
701 cP->chCB->
Fatal(cP,cP->chCBA, eNum, eTxt);
702 if (chDead)
return true;
703 cbkMHelp.
Lock(&(cP->chMutex));
709 else {cP->chPoller = &
pollErr1; cP->chFault = eNum; cP->inPSet = 0;
717 cP->chStat = Channel::isCBMode;
720 IF_TRACE(CbkXeq,cP->chFD,
"invoking callback; events=" <<events);
721 cbok = cP->chCB->
Event(cP,cP->chCBA, events);
727 if (chDead)
return true;
728 cbkMHelp.
Lock(&(cP->chMutex));
733 if (cP->chStat != Channel::isCBMode)
734 {
if (cP->chStat == Channel::isDead)
738 cP->chStat = Channel::isClear;
743 if (!cbok) Detach(cP,isLocked,
false);
744 else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
750 if (!isLocked) cP->chMutex.
Lock();
768 if (XrdSysFD_Pipe(fildes))
770 if (eTxt) *eTxt =
"creating poll pipe";
776 if (!(pArg.
pollP = newPoller(fildes, eNum, eTxt)))
786 {
if (eTxt) *eTxt =
"creating poller thread";
return 0;}
798 {
if (eTxt) *eTxt = (pArg.
retMsg ? pArg.
retMsg :
"starting poller");
812 if (eTxt) *eTxt =
"";
821 bool &isLocked,
bool keep)
825 bool detFD = (cP->inPSet != 0);
831 REMOVE(tmoBase, tmoList, cP);
846 if (cP->attList.next != cP) {
REMOVE(attBase, attList, cP);}
847 else if (attBase == cP) attBase = 0;
855 if (cmdFD >= 0) Exclude(cP, isLocked, !
ISPOLLER);
874 {pipeBuff = (
char *)&reqBuff; pipeBlen =
sizeof(reqBuff);}
879 do {rc = poll(&pipePoll, 1, 0);}
880 while(rc < 0 && (errno == EAGAIN || errno == EINTR));
881 if (rc < 1)
return 0;
886 do {rlen =
read(reqFD, pipeBuff, pipeBlen);}
887 while(rlen < 0 && errno == EINTR);
889 {std::cerr <<
"Poll: "<<
XrdSysE2T(errno)<<
" reading from request pipe\n"<< std::flush;
896 if (!(pipeBlen -= rlen))
return 1;
906 const char **eTxt,
bool &isLocked)
917 {cP->reMod = cP->chEvents;
919 IF_TRACE(Init,cP->chFD,
"defer events=" <<cP->reMod);
925 IF_TRACE(Init,cP->chFD,
"begin events=" <<
int(cP->chEvents));
929 if (!(cP->chEvents))
return true;
934 {eNum = EDESTADDRREQ;
935 if (eTxt) *eTxt =
"enabling without a callback";
942 cP->chPoller = &
pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
943 retval = cP->chPollXQ->
Include(cP, eNum, eTxt, isLocked);
945 if (!isLocked) {cP->chMutex.
Lock(); isLocked =
true;}
951 if (!retval) {cP->chPoller = &
pollErr1; cP->chFault = eNum;}
952 else {cP->chPoller = cP->chPollXQ;
955 {cP->chEvents = cP->reMod;
956 retval = cP->chPoller->
Modify(cP, eNum, eTxt, isLocked);
958 if (!isLocked) {cP->chMutex.
Lock(); isLocked =
true;}
976 if (events & POLLERR)
return EPIPE;
978 if (events & POLLHUP)
return ECONNRESET;
980 if (events & POLLNVAL)
return EBADF;
997 if (cmd.
req >= PipeData::Post)
1001 while (wlen < 0 && errno == EINTR);
1002 if (wlen > 0) mySem.
Wait();
1005 while (wlen < 0 && errno == EINTR);
1010 return (wlen >= 0 ? 0 : errno);
1036 memset(
static_cast<void*
>( &cmdbuff ), 0,
sizeof(cmdbuff));
1037 cmdbuff.
req = PipeData::Stop;
1045 if (cmdFD == -1) {adMutex.UnLock();
return;}
1055 close(cmdFD); cmdFD = -1;
1056 close(reqFD); reqFD = -1;
1061 while((cP = attBase))
1062 {
REMOVE(attBase, attList, cP);
1066 if (cP->inTOQ) TmoDel(cP);
1067 cP->Reset(&
pollErr1, cP->chFD, EIDRM);
1070 {cP->chStat = Channel::isClear;
1071 theCB = cP->chCB; cbArg = cP->chCBA;
1073 theCB->
Stop(cP, cbArg);
1074 }
else cP->chMutex.
UnLock();
1093 bool setRTO, setWTO;
1103 {
REMOVE(tmoBase, tmoList, cP);
1109 tmoSet|= cP->dlType >> 4;
1116 if (setRTO &&
REVENTS(cP->chEvents) && cP->chRTO)
1117 cP->rdDL = cP->chRTO + tNow;
1118 if (setWTO &&
WEVENTS(cP->chEvents) && cP->chWTO)
1119 cP->wrDL = cP->chWTO + tNow;
1123 if (cP->rdDL < cP->wrDL)
1129 IF_TRACE(TmoAdd, cP->chFD,
"t=" <<tNow <<
" rdDL=" <<setRTO <<
' ' <<cP->rdDL
1130 <<
" wrDL=" <<setWTO <<
' ' <<cP->wrDL);
1134 if (cP->deadLine == maxTime)
return false;
1138 if ((ncP = tmoBase))
1139 {
do {
if (cP->deadLine < ncP->deadLine)
break;
1140 ncP = ncP->tmoList.next;
1141 }
while(ncP != tmoBase);
1142 INSERT(tmoList, ncP, cP);
1143 if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1144 }
else tmoBase = cP;
1149 return (tmoBase == cP);
1167 REMOVE(tmoBase, tmoList, cP);
1187 do {
if (!tmoBase) {wtval = -1;
break;}
1188 wtval = (tmoBase->deadLine - time(0)) * 1000;
1189 if (wtval > 0)
break;
1206 void XrdSys::IOEvents::Poller::WakeUp()
1208 static PipeData cmdbuff(PipeData::NoOp);
1223 bool isWakePend =
CPP_ATOMIC_LOAD(wakePend, std::memory_order_consume);
1224 if (isWakePend) {toMutex.UnLock();}
1235 #if defined( __solaris__ )
1237 #elif defined( __linux__ )
1239 #elif defined(__APPLE__)
ssize_t write(int fildes, const void *buf, size_t nbyte)
ssize_t read(int fildes, void *buf, size_t nbyte)
#define CPP_ATOMIC_LOAD(x, order)
#define CPP_ATOMIC_STORE(x, val, order)
const char * XrdSysE2T(int errcode)
#define IF_TRACE(x, fd, y)
#define REMOVE(dlbase, dlvar, curitem)
#define TRACE_NOD(x, fd, y)
#define DO_TRACE(x, fd, y)
#define TRACE_MOD(x, fd, y)
#define INSERT(dlvar, curitem, newitem)
#define XRDSYSTHREAD_BIND
void Lock(XrdSysMutex *Mutex)
static int Same(pthread_t t1, pthread_t t2)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static pthread_t ID(void)
static void * Start(void *parg)
virtual void Fatal(Channel *chP, void *cbArg, int eNum, const char *eTxt)
virtual bool Event(Channel *chP, void *cbArg, int evFlags)=0
virtual void Stop(Channel *chP, void *cbArg)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ ReadTimeOut
Read timeout.
@ WriteTimeOut
Write timeout.
@ ValidEvents
Mask to test for valid events.
void SetCallBack(CallBack *cbP, void *cbArg=0)
void GetCallBack(CallBack **cbP, void **cbArg)
@ errorEvents
Error event non-r/w specific.
@ stopEvent
Poller stop event.
bool Enable(int events, int timeout=0, const char **eText=0)
Channel(Poller *pollP, int fd, CallBack *cbP=0, void *cbArg=0)
bool Disable(int events, const char **eText=0)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
static XrdSysMutex traceMTX
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
virtual bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
virtual bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
int GetFault(Channel *cP)
static Poller * Create(int &eNum, const char **eTxt=0, int crOpts=0)
virtual void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)
int SendCmd(PipeData &cmd)
int Poll2Enum(short events)
bool TmoAdd(Channel *cP, int tmoSet)
void SetPollEnt(Channel *cP, int ptEnt)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
XrdSysSemaphore * pollSync