XRootD
XrdPollPoll.icc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d P o l l P o l l . i c c */
4 /* */
5 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <signal.h>
31 #include <cstdlib>
32 
33 #include "Xrd/XrdLinkCtl.hh"
34 #include "Xrd/XrdPollPoll.hh"
35 #include "Xrd/XrdScheduler.hh"
36 
37 #include <vector>
38 
39 /******************************************************************************/
40 /* n e w P o l l e r */
41 /******************************************************************************/
42 
43 XrdPoll *XrdPoll::newPoller(int pollid, int maxfd)
44 {
45  int bytes, alignment, pagsz = getpagesize();
46  struct pollfd *pp;
47 
48 // Calculate the size of the poll table and allocate it
49 //
50  bytes = maxfd * sizeof(struct pollfd);
51  alignment = (bytes < pagsz ? 1024 : pagsz);
52  if (posix_memalign((void **)&pp, alignment, bytes))
53  {Log.Emsg("Poll", ENOMEM, "create poll table");
54  return 0;
55  }
56 
57 // Create new poll object
58 //
59  memset((void *)pp, 0, bytes);
60  return (XrdPoll *)new XrdPollPoll(pp, maxfd);
61 }
62 
63 /******************************************************************************/
64 /* C o n s t r c u t o r */
65 /******************************************************************************/
66 
67 XrdPollPoll::XrdPollPoll(struct pollfd *pp, int numfd)
68 {
69 
70 // Initialize the standard stuff
71 //
72  PollTab = pp;
73  PollTNum= 0;
74  PollQ = 0;
75  maxent = numfd;
76 }
77 
78 /******************************************************************************/
79 /* D e s t r u c t o r */
80 /******************************************************************************/
81 
83 {
84  if (PollTab) free(PollTab);
85 }
86 
87 /******************************************************************************/
88 /* I n c l u d e */
89 /******************************************************************************/
90 
92 {
93  struct pollfd *pfd;
94  int ptnum;
95 
96 // Lock down the poll data structure
97 //
98  PollMutex.Lock();
99 
100 // Get the next entry to be used
101 //
102  ptnum = 0;
103  while((ptnum < PollTNum) && (PollTab[ptnum].fd != -1)) ptnum++;
104 
105 // Make sure we have enough table entries to add this link
106 //
107  if (ptnum > maxent)
108  {Log.Emsg("Attach","Attach",pInfo.Link.ID,"failed; poll table overflow.");
109  PollMutex.UnLock();
110  return 0;
111  }
112 
113 // Initialize the polltable entry
114 //
115  pfd = &(PollTab[ptnum]);
116  pfd->fd = -pInfo.FD;
117  pfd->events = POLLIN | POLLRDNORM;
118  pfd->revents = 0;
119 
120 // Record relevant information in the link
121 //
122  pInfo.PollEnt = pfd;
123  if (ptnum == PollTNum) PollTNum++;
124 
125 // All done
126 //
127  PollMutex.UnLock();
128  return 1;
129 }
130 
131 /******************************************************************************/
132 /* D i s a b l e */
133 /******************************************************************************/
134 
135 void XrdPollPoll::Disable(XrdPollInfo &pInfo, const char *etxt)
136 {
137  XrdSysSemaphore mySem(0);
138  PipeData cmdbuff[2];
139  int myerrno = 0;
140 
141 // Check if this link is in the pollQ. If so, remove it.
142 //
143  if (pInfo.inQ) dqLink(&pInfo);
144 
145 // Simply return if the link is already disabled
146 //
147  if (!pInfo.isEnabled) return;
148 
149 // Trace this event
150 //
151  TRACEI(POLL, "Poller " <<PID <<" async disabling link FD " <<pInfo.FD);
152 
153 // Send a disable request to the poller thread handling this link. We need to
154 // wait until the operation is actually completed before returning.
155 //
156  memset(&cmdbuff, 0, sizeof(cmdbuff));
157  cmdbuff[0].req = PipeData::DiFD;
158  cmdbuff[0].Parms.Arg.fd = pInfo.FD;
159  cmdbuff[0].Parms.Arg.ent = pInfo.PollEnt - PollTab;
160  cmdbuff[1].req = PipeData::Post;
161  cmdbuff[1].Parms.theSem = &mySem;
162  PollPipe.Lock();
163  if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
164  PollPipe.UnLock();
165 
166 // Verify that all went well and if termination wanted, terminate the link
167 //
168  if (myerrno) Log.Emsg("Poll", myerrno, "disable link", pInfo.Link.ID);
169  else {mySem.Wait();
170  if (etxt && Finish(pInfo, etxt))
171  Sched.Schedule((XrdJob *)&pInfo.Link);
172  }
173 }
174 
175 /******************************************************************************/
176 /* E n a b l e */
177 /******************************************************************************/
178 
180 {
181  PipeData cmdbuff;
182  int myerrno = 0;
183 
184 // Simply return if the link is already enabled
185 //
186  if (pInfo.isEnabled) return 1;
187 
188 // Add this link element to the queue
189 //
190  PollMutex.Lock();
191  pInfo.Next = PollQ;
192  PollQ = &pInfo;
193  pInfo.inQ = true;
194  PollMutex.UnLock();
195 
196 // Send an enable request to the poller thread handling this link
197 //
198  TRACEI(POLL, "sending poller " <<PID <<" enable for link " <<pInfo.FD);
199  cmdbuff.req = PipeData::EnFD;
200  cmdbuff.Parms.Arg.fd = pInfo.FD;
201  cmdbuff.Parms.Arg.ent = pInfo.PollEnt - PollTab;
202  PollPipe.Lock();
203  if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
204  PollPipe.UnLock();
205 
206 // Verify that all went well. Note that the link stays in the pollQ.
207 //
208  if (myerrno)
209  {Log.Emsg("Poll", myerrno, "enable link", pInfo.Link.ID); return 0;}
210 
211 // All done
212 //
213  return 1;
214 }
215 
216 /******************************************************************************/
217 /* E x c l u d e */
218 /******************************************************************************/
219 
221 {
222  XrdSysSemaphore mySem(0);
223  PipeData cmdbuff[2];
224  int myerrno = 0;
225 
226 // Make sure this link is not enabled
227 //
228  if (pInfo.isEnabled)
229  {Log.Emsg("Poll", "Detach of enabled link", pInfo.Link.ID);
230  Disable(pInfo);
231  }
232  else if (pInfo.inQ) dqLink(&pInfo);
233 
234 // Send a deatch request to the poller thread handling this link
235 //
236  TRACEI(POLL, "sending poller " <<PID <<" detach for link " <<pInfo.FD);
237  cmdbuff[0].req = PipeData::RmFD;
238  cmdbuff[0].Parms.Arg.fd = pInfo.FD;
239  cmdbuff[0].Parms.Arg.ent = pInfo.PollEnt - PollTab;
240  cmdbuff[1].req = PipeData::Post;
241  cmdbuff[1].Parms.theSem = &mySem;
242  PollPipe.Lock();
243  if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
244  PollPipe.UnLock();
245 
246 // Verify that all went well and if termination wanted, terminate the link
247 //
248  if (myerrno) Log.Emsg("Poll", myerrno, "detach link", pInfo.Link.ID);
249  else mySem.Wait();
250 }
251 
252 /******************************************************************************/
253 /* S t a r t */
254 /******************************************************************************/
255 
256 void XrdPollPoll::Start(XrdSysSemaphore *syncsem, int &retcode)
257 {
258  int numpolled, num2sched;
259  XrdJob *jfirst, *jlast;
260  XrdPollInfo *plp, *nlp, *pInfo;
261  XrdLink *lp;
262  short pollevents;
263  const short pollOK = POLLIN | POLLRDNORM;
264 
265 // Set up he first entry in the poll table to be our communications port
266 //
267  PollTab[0].fd = ReqFD;
268  PollTab[0].events = pollOK;
269  PollTab[0].revents = 0;
270  PollTNum = 1;
271 
272 // Signal the caller to continue
273 //
274  retcode = 0;
275  syncsem->Post();
276 
277 // Now do the main poll loop
278 //
279  std::vector<struct pollfd> PollTabCopy;
280  do {// Duplicate the polling table so we don't need to hold the PollMutex
281  // while we are sleeping in the poll()
282  PollMutex.Lock();
283  PollTabCopy.resize(PollTNum);
284  memcpy(PollTabCopy.data(), PollTab, sizeof(struct pollfd) * PollTNum);
285  PollMutex.UnLock();
286 
287  do {numpolled = poll(PollTabCopy.data(), PollTabCopy.size(), -1);}
288  while(numpolled < 0 && (errno == EAGAIN || errno == EINTR));
289 
290  // Check if we had a polling error
291  //
292  if (numpolled < 0)
293  {if (errno != EINTR) Restart(errno);
294  else numInterrupts++;
295  continue;
296  }
297  numEvents += numpolled;
298 
299  // Note this thread is the only one that writes directly to the poll
300  // table (everything else is a read). Hence, it's OK to assume that
301  // the table after the poll() is unchanged and we can write back the
302  // revents field.
303  PollMutex.Lock();
304  for (size_t idx=0; idx<PollTabCopy.size(); idx++)
305  PollTab[idx].revents = PollTabCopy[idx].revents;
306 
307  // Check out base poll table entry, we can do this without a lock
308  //
309  if (PollTab[0].revents & pollOK)
310  {PollMutex.UnLock();
311  doRequests(numpolled);
312  if (--numpolled <= 0) continue;
313  PollMutex.Lock();
314  }
315 
316  // Checkout which links must be dispatched (do this locked)
317  //
318  plp = 0; nlp = PollQ; jfirst = jlast = 0; num2sched = 0;
319  while ((pInfo = nlp) && numpolled > 0)
320  {if ((pollevents = pInfo->PollEnt->revents))
321  {pInfo->PollEnt->fd = -pInfo->PollEnt->fd;
322  if (plp) nlp = plp->Next = pInfo->Next;
323  else nlp = PollQ = pInfo->Next;
324  numpolled--; pInfo->inQ = false;
325  if (!(pollevents & pollOK))
326  Finish(*pInfo, Poll2Text(pollevents));
327  lp = &(pInfo->Link);
328  if (!(pInfo->isEnabled))
329  Log.Emsg("Poll", "Disabled event occurred for", lp->ID);
330  else {pInfo->isEnabled = false;
331  lp->NextJob = jfirst; jfirst = (XrdJob *)lp;
332  if (!jlast) jlast=(XrdJob *)lp;
333  num2sched++;
334  continue;
335  }
336  }
337  plp = pInfo; nlp = pInfo->Next;
338  }
339  if (numpolled) Recover(numpolled);
340  PollMutex.UnLock();
341 
342  // Schedule the polled links
343  //
344  if (num2sched == 1) Sched.Schedule(jfirst);
345  else if (num2sched) Sched.Schedule(num2sched, jfirst, jlast);
346  } while(1);
347 }
348 
349 /******************************************************************************/
350 /* P r i v a t e M e t h o d s */
351 /******************************************************************************/
352 /******************************************************************************/
353 /* d o D e t a c h */
354 /******************************************************************************/
355 // Detach a given offset in the poll table, `pti`, from the PollTab.
356 //
357 // This method must be called with the PollMutex held.
359 {
360  int lastent;
361 
362 // Get some starting values
363 //
364  if ((lastent = PollTNum-1) < 0)
365  {Log.Emsg("Poll","Underflow during detach"); abort();}
366 
367  if (pti == lastent)
368  do {PollTNum--;} while(PollTNum && PollTab[PollTNum-1].fd == -1);
369 }
370 
371 /******************************************************************************/
372 /* d o R e q u e s t s */
373 /******************************************************************************/
374 // This must be called with the PollMutex unlocked
375 void XrdPollPoll::doRequests(int maxreq)
376 {
377  const char *act;
378  int pti, ptfd, num2do;
379  XrdPollInfo *piP;
380 
381 // To keep ourselves from being swamped, base request read-aheads on the number
382 // of pending poll events.
383 //
384  num2do = (maxreq < 3 ? -1 : maxreq);
385 
386 // Now process all poll table manipulation requests
387 //
388  while(num2do-- && getRequest())
389  {XrdSysMutexHelper PollGuard(PollMutex);
390  if (ReqBuff.req == PipeData::Post)
391  {ReqBuff.Parms.theSem->Post();
392  continue;
393  }
394  pti = ReqBuff.Parms.Arg.ent;
395  if ((ptfd = abs(PollTab[pti].fd)) != ReqBuff.Parms.Arg.fd)
396  {auto fd = PollTab[pti].fd;
397  PollGuard.UnLock();
398  LogEvent(ReqBuff.req, fd, ReqBuff.Parms.Arg.fd);
399  continue;
400  }
401  if (!(piP = XrdLinkCtl::fd2PollInfo(ptfd)))
402  {PollGuard.UnLock();
403  LogEvent(ReqBuff.req, -1, ptfd);
404  continue;
405  }
406  if (ReqBuff.req == PipeData::EnFD)
407  {PollTab[pti].events = POLLIN | POLLRDNORM;
408  PollTab[pti].fd = ptfd;
409  piP->isEnabled = true; numEnabled++;
410  act = " enabled fd ";
411  }
412  else if (ReqBuff.req == PipeData::DiFD)
413  {PollTab[pti].fd = -ptfd;
414  act = " disabled fd ";
415  piP->isEnabled = false;
416  }
417  else if (ReqBuff.req == PipeData::RmFD)
418  {PollTab[pti].fd = -1;
419  doDetach(pti);
420  act = " detached fd ";
421  piP->isEnabled = false;
422  }
423  else {PollGuard.UnLock();
424  Log.Emsg("Poll", "Received an invalid poll pipe request");
425  continue;
426  }
427  PollGuard.UnLock();
428  TRACE(POLL, "Poller " <<PID <<act <<ReqBuff.Parms.Arg.fd
429  <<" entry " <<pti <<" now at " <<PollTNum);
430  }
431 }
432 
433 /******************************************************************************/
434 /* d q L i n k */
435 /******************************************************************************/
436 
437 void XrdPollPoll::dqLink(XrdPollInfo *pInfo)
438 {
439  XrdPollInfo *plp, *nlp;
440 
441 // Find matching link in the queue
442 //
443  PollMutex.Lock();
444  pInfo->inQ = false;
445  plp = 0; nlp = PollQ;
446  while (nlp && (pInfo != nlp)) {plp=nlp; nlp = nlp->Next;}
447 
448 // If we found the link, remove it. Otherwise complain
449 //
450  if (nlp) {if (plp) plp->Next = nlp->Next;
451  else PollQ = nlp->Next;
452  PollMutex.UnLock();
453  }
454  else {PollMutex.UnLock();
455  Log.Emsg("dqLink", "Link not found in Q", pInfo->Link.ID);
456  }
457 }
458 
459 /******************************************************************************/
460 /* L o g E v e n t */
461 /******************************************************************************/
462 
463 void XrdPollPoll::LogEvent(int req, int pollfd, int cmdfd)
464 {
465  const char *opn, *id1, *id2;
466  char buff[4096];
467  XrdLink *lp;
468 
469  if (ReqBuff.req == PipeData::EnFD) opn = "enable";
470  else if (ReqBuff.req == PipeData::DiFD) opn = "disable";
471  else if (ReqBuff.req == PipeData::RmFD) opn = "detach";
472  else opn = "???";
473 
474  if (pollfd < 0)
475  {sprintf(buff, "poll %d failed; FD %d", PID, cmdfd);
476  Log.Emsg("Poll", opn, buff, "does not map to a link");
477  return;
478  }
479 
480  if ((lp = XrdLinkCtl::fd2link(pollfd))) id1 = lp->ID;
481  else id1 = "unknown";
482  if ((lp = XrdLinkCtl::fd2link(cmdfd))) id2 = lp->ID;
483  else id2 = "unknown";
484  snprintf(buff, sizeof(buff)-1,
485  "%d poll fd=%d (%s) not equal %s cmd fd=%d (%s).",
486  PID, pollfd, id1, opn, cmdfd, id2);
487 
488  Log.Emsg("Poll", "cmd/poll mismatch:", buff);
489 }
490 
491 /******************************************************************************/
492 /* R e c o v e r */
493 /******************************************************************************/
494 // This must be called with PollMutex locked.
495 void XrdPollPoll::Recover(int numleft)
496 {
497  int i;
498  XrdPollInfo *piP;
499 
500 // Turn off any unaccounted links
501 //
502  for (i = 1; i < PollTNum; i++)
503  if (PollTab[i].revents)
504  {if (!(piP = XrdLinkCtl::fd2PollInfo(PollTab[i].fd)))
505  PollTab[i].fd = -1;
506  else {piP->isEnabled = false;
507  PollTab[i].fd = -PollTab[i].fd;
508  Log.Emsg("Poll","Improper poll event for",piP->Link.ID);
509  }
510  }
511 }
512 
513 /******************************************************************************/
514 /* R e s t a r t */
515 /******************************************************************************/
516 // This must be called with the PollMutex unlocked
517 void XrdPollPoll::Restart(int ecode)
518 {
519  XrdPollInfo *pInfo;
520 
521 // Issue error message
522 //
523  TRACE(POLL, PID <<'-' <<TID <<" Poll error " <<ecode);
524  Log.Emsg("Poll", errno, "poll");
525 
526 // For any outstanding link here, close the link and detach it
527 //
528  PollMutex.Lock();
529  while((pInfo = PollQ))
530  {PollQ = pInfo->Next;
531  pInfo->PollEnt->fd = -1;
532  Finish(*pInfo, "Unexpected polling error");
533  Sched.Schedule((XrdJob *)&(pInfo->Link));
534  }
535  PollMutex.UnLock();
536 }
ssize_t write(int fildes, const void *buf, size_t nbyte)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
#define TRACEI(act, x)
Definition: XrdTrace.hh:66
Definition: XrdJob.hh:43
XrdJob * NextJob
Definition: XrdJob.hh:46
static XrdLink * fd2link(int fd)
Definition: XrdLinkCtl.hh:72
static XrdPollInfo * fd2PollInfo(int fd)
Definition: XrdLinkCtl.hh:103
struct pollfd * PollEnt
Definition: XrdPollInfo.hh:42
XrdPollInfo * Next
Definition: XrdPollInfo.hh:40
XrdLink & Link
Definition: XrdPollInfo.hh:41
bool isEnabled
Definition: XrdPollInfo.hh:46
int Include(XrdPollInfo &pInfo)
Definition: XrdPollPoll.icc:91
XrdPollPoll(struct pollfd *pp, int numfd)
Definition: XrdPollPoll.icc:67
void doDetach(int pti)
void Start(XrdSysSemaphore *syncp, int &rc)
void Disable(XrdPollInfo &pInfo, const char *etxt=0)
void Exclude(XrdPollInfo &pInfo)
int Enable(XrdPollInfo &pInfo)
int numInterrupts
Definition: XrdPoll.hh:134
pthread_t TID
Definition: XrdPoll.hh:83
int PID
Definition: XrdPoll.hh:82
XrdSysMutex PollPipe
Definition: XrdPoll.hh:115
int ReqFD
Definition: XrdPoll.hh:118
int numEvents
Definition: XrdPoll.hh:133
int getRequest()
Definition: XrdPoll.cc:232
PipeData ReqBuff
Definition: XrdPoll.hh:126
static char * Poll2Text(short events)
Definition: XrdPoll.cc:272
static XrdPoll * newPoller(int pollid, int numfd)
Definition: XrdPollE.icc:45
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
Definition: XrdPoll.cc:204
int numEnabled
Definition: XrdPoll.hh:132
int CmdFD
Definition: XrdPoll.hh:117
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysError Log
Definition: XrdConfig.cc:112
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54
union XrdPoll::PipeData::@18 Parms