41 #define TRACE_IDENT pInfo.Link.ID
44 #if defined( __linux__ )
69 int Stats(
char *buff,
int blen,
int do_sync=0) {
return 0;}
123 numAttached=numEnabled=numEvents=numInterrupts=0;
125 if (XrdSysFD_Pipe(fildes) == 0)
130 Log.
Emsg(
"Poll", errno,
"create poll pipe");
135 PipePoll.events = POLLIN | POLLRDNORM;
157 if (pp->numAttached > Pollers[i]->numAttached) pp = Pollers[i];
161 if (!pp->
Include(pInfo)) {doingAttach.UnLock();
return 0;}
167 doingAttach.UnLock();
168 TRACEI(POLL,
"FD " <<pInfo.
FD <<
" attached to poller " <<pp->
PID
169 <<
"; num=" <<pp->numAttached);
183 if (!(pp = pInfo.
Poller))
return;
192 if (!pp->numAttached)
193 {
Log.
Emsg(
"Poll",
"Underflow detaching", pInfo.
Link.
ID); abort();}
195 doingAttach.UnLock();
196 TRACEI(POLL,
"FD " <<pInfo.
FD <<
" detached from poller " <<pp->
PID
197 <<
"; num=" <<pp->numAttached);
211 {
TRACEI(POLL,
"Link " <<pInfo.
FD <<
" already terminating; "
212 <<(etxt ? etxt :
"") <<
" request ignored.");
219 if (!etxt) etxt =
"reason unknown";
221 TRACEI(POLL,
"Link " <<pInfo.
FD <<
" terminating: " <<etxt);
240 {PipeBuff = (
char *)&ReqBuff; PipeBlen =
sizeof(ReqBuff);}
245 do {rc = poll(&PipePoll, 1, 0);}
246 while(rc < 0 && (errno == EAGAIN || errno == EINTR));
247 if (rc < 1)
return 0;
252 do {rlen =
read(ReqFD, PipeBuff, PipeBlen);}
253 while(rlen < 0 && errno == EINTR);
255 {
if (rlen)
Log.
Emsg(
"Poll", errno,
"read from request pipe");
262 if (!(PipeBlen -= rlen))
return 1;
264 TRACE(POLL,
"Poller " <<PID <<
" still needs " <<PipeBlen <<
" req pipe bytes");
274 if (events & POLLERR)
return strdup(
"socket error");
276 if (events & POLLHUP)
return strdup(
"hangup");
278 if (events & POLLNVAL)
return strdup(
"socket closed");
281 sprintf(buff,
"unusual event (%.4x)", events);
304 {
if (!(Pollers[i] = newPoller(i, maxfd)))
return 0;
311 TRACE(POLL,
"Starting poller " <<i);
314 {
Log.
Emsg(
"Poll", retc,
"create poller thread");
return 0;}
315 Pollers[i]->TID = tid;
316 PArg.PollSync.Wait();
318 {
Log.
Emsg(
"Poll", PArg.retcode,
"start poller");
334 static const char statfmt[] =
"<stats id=\"poll\"><att>%d</att>"
335 "<en>%d</en><ev>%d</ev><int>%d</int></stats>";
336 int i, numatt = 0, numen = 0, numev = 0, numint = 0;
349 numatt += pp->numAttached;
357 return snprintf(buff, blen, statfmt, numatt, numen, numev, numint);
364 #if defined( __linux__ )
void * XrdStartPolling(void *parg)
int XrdPoll__Attach(XrdLink *lp)
ssize_t read(int fildes, void *buf, size_t nbyte)
#define XRDSYSTHREAD_BIND
XrdProtocol * setProtocol(XrdProtocol *pp, bool runit=false, bool push=false)
int setEtext(const char *text)
char * ID
Pointer to the client's link identity.
XrdProtocol * getProtocol()
Obtain current protocol object pointer.
int Stats(char *buff, int blen, int do_sync=0)
void Recycle(XrdLink *lp, int x, const char *y)
XrdProtocol * Match(XrdLink *lp)
static const char * TraceID
static XrdPoll * Pollers[XRD_NUMPOLLERS]
virtual int Include(XrdPollInfo &pInfo)=0
virtual void Start(XrdSysSemaphore *syncp, int &rc)=0
virtual void Exclude(XrdPollInfo &pInfo)=0
static char * Poll2Text(short events)
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
static void Detach(XrdPollInfo &pInfo)
static int Setup(int numfd)
static int Stats(char *buff, int blen, int do_sync=0)
static int Attach(XrdPollInfo &pInfo)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)