XRootD
XrdPollE.icc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d P o l l E . i c c */
4 /* */
5 /* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <fcntl.h>
32 #include <cstdlib>
33 #include <sys/types.h>
34 #include <sys/stat.h>
35 #include <sys/epoll.h>
36 #include <sys/eventfd.h>
37 
38 #include "Xrd/XrdPollE.hh"
39 #include "Xrd/XrdScheduler.hh"
40 
41 /******************************************************************************/
42 /* n e w P o l l e r */
43 /******************************************************************************/
44 
45 XrdPoll *XrdPoll::newPoller(int pollid, int maxfd)
46 {
47  int pfd, wfd, bytes, alignment, pagsz = getpagesize();
48  struct epoll_event *pp;
49 
50 // Open the /dev/poll driver
51 //
52 #ifndef EPOLL_CLOEXEC
53  if ((pfd = epoll_create(maxfd)) >= 0) fcntl(pfd, F_SETFD, FD_CLOEXEC);
54  else
55 #else
56  if ((pfd = epoll_create1(EPOLL_CLOEXEC)) < 0)
57 #endif
58  {Log.Emsg("Poll", errno, "create epoll device"); return 0;}
59 
60  if ((wfd = eventfd(0, EFD_CLOEXEC)) < 0)
61  {Log.Emsg("Poll", errno,
62  "create an eventfd as the wait-poller descriptor");
63  close(pfd);
64  return 0;
65  }
66 
67 // Calculate the size of the poll table and allocate it
68 //
69  bytes = maxfd * sizeof(struct epoll_event);
70  alignment = (bytes < pagsz ? 1024 : pagsz);
71  if (posix_memalign((void **)&pp, alignment, bytes))
72  {Log.Emsg("Poll", ENOMEM, "create poll table");
73  close(wfd);
74  close(pfd);
75  return 0;
76  }
77 
78 // Create new poll object
79 //
80  memset((void *)pp, 0, bytes);
81  return (XrdPoll *)new XrdPollE(pp, maxfd, pfd, wfd);
82 }
83 
84 /******************************************************************************/
85 /* D e s t r u c t o r */
86 /******************************************************************************/
87 
89 {
90  if (PollTab) free(PollTab);
91  if (WaitFd >= 0) close(WaitFd);
92  if (PollDfd >= 0) close(PollDfd);
93 }
94 
95 /******************************************************************************/
96 /* A d d W a i t F d */
97 /******************************************************************************/
98 
99 int XrdPollE::AddWaitFd()
100 {
101  const unsigned int myPollEvts = EPOLLIN;
102  struct epoll_event myEvent = {myPollEvts, {(void *)&WaitFd}};
103 
104 // Add the waitfd to the poll set
105 //
106  if (epoll_ctl(PollDfd, EPOLL_CTL_ADD, WaitFd, &myEvent) < 0)
107  {int rc = errno;
108  Log.Emsg("Poll", rc, "include the wait FD in the poll set");
109  return rc;
110  }
111 
112  return 0;
113 }
114 
115 /******************************************************************************/
116 /* D i s a b l e */
117 /******************************************************************************/
118 
119 void XrdPollE::Disable(XrdPollInfo &pInfo, const char *etxt)
120 {
121 
122 // Simply return if the link is already disabled
123 //
124  if (!pInfo.isEnabled) return;
125 
126 // If Linux 2.6.9 we use EPOLLONESHOT to automatically disable a polled fd.
127 // So, the Disable() method need not do anything. Prior kernels did not have
128 // this mechanism so we need to do this manually.
129 //
130 #ifndef EPOLLONESHOT
131  struct epoll_event myEvents = {0, (void *)&pInfo};
132 
133 // Disable this fd. Unlike solaris, epoll_ctl() does not block when the pollfd
134 // is being waited upon by another thread.
135 //
136  if (epoll_ctl(PollDfd, EPOLL_CTL_MOD, pInfo.FD, &myEvents))
137  {Log.Emsg("Poll", errno, "disable link", lp->ID); return;}
138 #endif
139 
140 // Trace this event
141 //
142  pInfo.isEnabled = false;
143  TRACEI(POLL, "Poller " <<PID <<" async disabling link " <<pInfo.FD);
144 
145 // Check if this link needs to be rescheduled. If so, the caller better have
146 // the link opMutex lock held for this to work!
147 //
148  if (etxt && Finish(pInfo, etxt)) Sched.Schedule((XrdJob *)&pInfo.Link);
149 }
150 
151 /******************************************************************************/
152 /* E n a b l e */
153 /******************************************************************************/
154 
156 {
157  struct epoll_event myEvents = {ePollEvents, {(void *)&pInfo}};
158 
159 // Simply return if the link is already enabled
160 //
161  if (pInfo.isEnabled) return 1;
162 
163 // Enable this fd. Unlike solaris, epoll_ctl() does not block when the pollfd
164 // is being waited upon by another thread.
165 //
166  pInfo.isEnabled = true;
167  if (epoll_ctl(PollDfd, EPOLL_CTL_MOD, pInfo.FD, &myEvents))
168  {Log.Emsg("Poll", errno, "enable link", pInfo.Link.ID);
169  pInfo.isEnabled = false;
170  return 0;
171  }
172 
173 // Do final processing
174 //
175  TRACE(POLL, "Poller " <<PID <<" enabled " <<pInfo.Link.ID);
176  numEnabled++;
177  return 1;
178 }
179 
180 /******************************************************************************/
181 /* E x c l u d e */
182 /******************************************************************************/
183 
185 {
186 
187 // Make sure this link is not enabled
188 //
189  if (pInfo.isEnabled)
190  {Log.Emsg("Poll", "Detach of enabled link", pInfo.Link.ID);
191  Disable(pInfo);
192  }
193 
194 // Forcibly remove the link from the poll set
195 //
196  remFD(pInfo, 0);
197 
198 // Wait to make sure the poll thread has completed handling the last set of
199 // events which may have included this Link. The handing includes examining the
200 // Link's PollInfo before deciding if to schedule the Link. After we return,
201 // the PollInfo may be reset and the Link could be subsequently reused.
202 //
203  Wait4Poller();
204 }
205 
206 /******************************************************************************/
207 /* H a n d l e W a i t F d */
208 /******************************************************************************/
209 
210 void XrdPollE::HandleWaitFd(const unsigned int events)
211 {
212 // Used by the polling thread to signal waiters that a polling loop has been
213 // completed.
214 //
215  eventfd_t ic, cnt;
216 
217 // We don't expect anything but EPOLLIN. But if we get an error (errors
218 // can be reported, despite not being selected events) abort rather than
219 // potentially keeping the poll thread busy repeatedly looping.
220 //
221  if (!(events & EPOLLIN) || (events & (EPOLLERR | EPOLLHUP)))
222  {char eBuff[64];
223  Log.Emsg("Poll", "wait-poller handler:", x2Text(events, eBuff));
224  if (events & (EPOLLERR | EPOLLHUP))
225  abort();
226  return;
227  }
228 
229  if (eventfd_read(WaitFd, &cnt) < 0)
230  {Log.Emsg("Poll", errno, "read from the wait-poller descriptor");
231  return;
232  }
233 
234  for (ic=0;ic<cnt;++ic) WaitFdSem.Post();
235 }
236 
237 /******************************************************************************/
238 /* I n c l u d e */
239 /******************************************************************************/
240 
242 {
243  struct epoll_event myEvent = {0, {(void *)&pInfo}};
244  int rc;
245 
246 // Add this fd to the poll set
247 //
248  if ((rc = epoll_ctl(PollDfd, EPOLL_CTL_ADD, pInfo.FD, &myEvent)) < 0)
249  Log.Emsg("Poll", errno, "include link", pInfo.Link.ID);
250 
251 // All done
252 //
253  return rc == 0;
254 }
255 
256 /******************************************************************************/
257 /* r e m F D */
258 /******************************************************************************/
259 
260 void XrdPollE::remFD(XrdPollInfo &pInfo, unsigned int events)
261 {
262  struct epoll_event myEvents = {0, {(void *)&pInfo}};
263 
264 // It works out that ONESHOT mode or even CTL_MOD requests do not necessarily
265 // prevent epoll_wait() from returning on an error event. So, we must manually
266 // remove the fd from the set and assume the logic was actually correct. If it
267 // wasn't then the client will eventually timeout and retry the request. This
268 // may cause a double remove; so we don't consider these an arror.
269 //
270  if (pInfo.FD > 0)
271  {TRACEI(POLL, "Poller " <<PID <<" removing FD " <<pInfo.FD);
272  if (epoll_ctl(PollDfd, EPOLL_CTL_DEL, pInfo.FD, &myEvents)
273  && (errno != ENOENT || events != 0))
274  {const char *why;
275  char buff[96];
276  if (events & (EPOLLHUP | EPOLLRDHUP)) why = "sever";
277  else if (events & EPOLLERR) why = "error";
278  else why = "disc";
279  snprintf(buff, sizeof(buff), "exclude fd %d during %s (%x) event; link",
280  pInfo.FD, why, events);
281  Log.Emsg("Poll", errno, buff, pInfo.Link.ID);
282  }
283  }
284 }
285 
286 /******************************************************************************/
287 /* S t a r t */
288 /******************************************************************************/
289 
290 void XrdPollE::Start(XrdSysSemaphore *syncsem, int &retcode)
291 {
292  char eBuff[64];
293  int rc, i, numpolled, num2sched;
294  unsigned int waitFdEvents;
295  bool haveWaiters;
296  XrdJob *jfirst, *jlast;
297  const short pollOK = EPOLLIN | EPOLLPRI;
298  XrdLink *lp;
299  XrdPollInfo *pInfo;
300 
301  if ((rc = AddWaitFd()))
302  {retcode = rc;
303  syncsem->Post();
304  return;
305  }
306 
307 // Indicate to the starting thread that all went well
308 //
309  retcode = 0;
310  syncsem->Post();
311 
312 // Now start dispatching links that are ready
313 //
314  do {do {numpolled = epoll_wait(PollDfd, PollTab, PollMax, -1);}
315  while (numpolled < 0 && errno == EINTR);
316  if (numpolled == 0) continue;
317  if (numpolled < 0)
318  {Log.Emsg("Poll", errno, "poll for events");
319  abort();
320  }
321  numEvents += numpolled;
322 
323  // Checkout which links must be dispatched (no need to lock)
324  //
325  jfirst = jlast = 0; num2sched = 0;
326  haveWaiters = false; waitFdEvents = 0;
327  for (i = 0; i < numpolled; i++)
328  {if (PollTab[i].data.ptr == &WaitFd)
329  {haveWaiters = true; waitFdEvents = PollTab[i].events;}
330  else if ((pInfo = (XrdPollInfo *)PollTab[i].data.ptr))
331  {if (!(pInfo->isEnabled) && pInfo->FD >= 0)
332  remFD(*pInfo, PollTab[i].events);
333  else {pInfo->isEnabled = 0;
334  if (!(PollTab[i].events & pollOK)
335  || (PollTab[i].events & POLLRDHUP))
336  Finish(*pInfo, x2Text(PollTab[i].events, eBuff));
337  lp = &(pInfo->Link);
338  lp->NextJob = jfirst; jfirst = (XrdJob *)lp;
339  if (!jlast) jlast=(XrdJob *)lp;
340  num2sched++;
341 #ifndef EPOLLONESHOT
342  PollTab[i].events = 0;
343  if (epoll_ctl(PollDfd,EPOLL_CTL_MOD,pInfo.FD,&PollTab[i]))
344  Log.Emsg("Poll",errno,"disable link",pInfo.Link.ID);
345 #endif
346  }
347  } else Log.Emsg("Poll", "null link event!!!!");
348  }
349 
350  // Schedule the polled links
351  //
352  if (num2sched == 1) Sched.Schedule(jfirst);
353  else if (num2sched) Sched.Schedule(num2sched, jfirst, jlast);
354 
355  if (haveWaiters) HandleWaitFd(waitFdEvents);
356  } while(1);
357 }
358 
359 /******************************************************************************/
360 /* W a i t 4 P o l l e r */
361 /******************************************************************************/
362 
363 void XrdPollE::Wait4Poller()
364 {
365 // Makes the caller wait for the polling thread to complete an event processing
366 // loop.
367 //
368  if (eventfd_write(WaitFd, 1) < 0)
369  {Log.Emsg("Poll", errno, "write to the wait-poller descriptor");
370  return;
371  }
372 
373  WaitFdSem.Wait();
374 }
375 
376 /******************************************************************************/
377 /* x 2 T e x t */
378 /******************************************************************************/
379 
380 const char *XrdPollE::x2Text(unsigned int events, char *buff)
381 {
382  if (events & EPOLLERR) return "socket error";
383 
384  if (events & (EPOLLHUP | EPOLLRDHUP)) return "hangup";
385 
386  sprintf(buff, "unusual event (%.4x)", events);
387  return buff;
388 }
#define EPOLLRDHUP
Definition: XrdPollE.hh:37
int fcntl(int fd, int cmd,...)
#define close(a)
Definition: XrdPosix.hh:43
#define TRACE(act, x)
Definition: XrdTrace.hh:63
#define TRACEI(act, x)
Definition: XrdTrace.hh:66
Definition: XrdJob.hh:43
XrdJob * NextJob
Definition: XrdJob.hh:46
const char * x2Text(unsigned int evf, char *buff)
Definition: XrdPollE.icc:380
void Exclude(XrdPollInfo &pInfo)
Definition: XrdPollE.icc:184
void Disable(XrdPollInfo &pInfo, const char *etxt=0)
Definition: XrdPollE.icc:119
void Start(XrdSysSemaphore *syncp, int &rc)
Definition: XrdPollE.icc:290
int Enable(XrdPollInfo &pInfo)
Definition: XrdPollE.icc:155
int Include(XrdPollInfo &pInfo)
Definition: XrdPollE.icc:241
~XrdPollE()
Definition: XrdPollE.icc:88
XrdLink & Link
Definition: XrdPollInfo.hh:41
bool isEnabled
Definition: XrdPollInfo.hh:46
int PID
Definition: XrdPoll.hh:82
int numEvents
Definition: XrdPoll.hh:133
static XrdPoll * newPoller(int pollid, int numfd)
Definition: XrdPollE.icc:45
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
Definition: XrdPoll.cc:204
int numEnabled
Definition: XrdPoll.hh:132
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysError Log
Definition: XrdConfig.cc:112
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54