XRootD
XrdSysIOEventsPollE.icc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d S y s I O E v e n t s P o l l E . i c c */
4 /* */
5 /* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdlib>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <sys/epoll.h>
35 
36 #include "XrdSys/XrdSysAtomics.hh"
37 #include "XrdSys/XrdSysE2T.hh"
38 #ifndef Atomic
39 #define Atomic(x) x
40 #endif
41 
42 
43 /******************************************************************************/
44 /* C l a s s P o l l E */
45 /******************************************************************************/
46 
47 namespace XrdSys
48 {
49 namespace IOEvents
50 {
51 class PollE : public Poller
52 {
53 public:
54 
55 static int AllocMem(void **memP, int slots);
56 
57  PollE(struct epoll_event *ptab, int numfd, int pfd, int pFD[2])
58  : Poller(pFD[0], pFD[1]), pollTab(ptab), cbNow(0),
59  pollDfd(pfd), pollMax(numfd), pollNum(1), numPoll(0),
60  cbCurr(0)
61  {}
62  ~PollE() {Stop();}
63 
64 protected:
65 
66  void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg);
67 
68  void Exclude(Channel *cP, bool &isLocked, bool dover=1);
69 
70  bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
71 
72  bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
73 
74  void Shutdown();
75 
76 private:
77  int AllocPT(int slots);
78  void Dispatch(Channel *cP, uint32_t pollEv);
79  bool Process(int curr);
80 
81 struct epoll_event *pollTab;
82  Channel *cbNow;
83  int pollDfd;
84  int pollMax;
85  Atomic(int) pollNum;
86  int numPoll;
87  int cbCurr;
88 static void *deadChP;
89 };
90  void *PollE::deadChP = 0;
91 };
92 };
93 
94 /******************************************************************************/
95 /* C l a s s P o l l e r */
96 /******************************************************************************/
97 /******************************************************************************/
98 /* Static: n e w P o l l e r */
99 /******************************************************************************/
100 
102 XrdSys::IOEvents::Poller::newPoller(int pipeFD[2],
103  int &eNum,
104  const char **eTxt)
105 
106 {
107  static const int allocFD = 1024;
108  struct epoll_event *pp, myEvent = {(EPOLLIN | EPOLLPRI), {0}};
109  int pfd;
110 
111 // Open the /dev/poll driver
112 //
113 #ifndef EPOLL_CLOEXEC
114  if ((pfd = epoll_create(allocFD)) >= 0) fcntl(pfd, F_SETFD, FD_CLOEXEC);
115  else
116 #else
117  if ((pfd = epoll_create1(EPOLL_CLOEXEC)) < 0)
118 #endif
119  {eNum = errno;
120  if (eTxt) *eTxt = "creating epoll device";
121  return 0;
122  }
123 
124 // Add the request side of the pipe fd to the poll set (always fd[0])
125 //
126  if (epoll_ctl(pfd, EPOLL_CTL_ADD, pipeFD[0], &myEvent))
127  { eNum = errno;
128  *eTxt = "adding communication pipe";
129  return 0;
130  }
131 
132 // Allocate the poll table
133 //
134  if ((eNum = XrdSys::IOEvents::PollE::AllocMem((void **)&pp, allocFD)))
135  {eNum = ENOMEM;
136  if (eTxt) *eTxt = "creating epoll table";
137  close(pfd);
138  return 0;
139  }
140 
141 // Create new poll object
142 //
143  return (Poller *)new PollE(pp, allocFD, pfd, pipeFD);
144 }
145 
146 /******************************************************************************/
147 /* C l a s s P o l l E */
148 /******************************************************************************/
149 /******************************************************************************/
150 /* A l l o c M e m */
151 /******************************************************************************/
152 
153 int XrdSys::IOEvents::PollE::AllocMem(void **memP, int slots)
154 {
155  int rc, bytes, alignment, pagsz = getpagesize();
156 
157 // Calculate the size of the poll table and allocate it
158 //
159  bytes = slots * sizeof(struct epoll_event);
160  alignment = (bytes < pagsz ? 1024 : pagsz);
161  if (!(rc = posix_memalign(memP, alignment, bytes))) memset(*memP, 0, bytes);
162  return rc;
163 }
164 
165 /******************************************************************************/
166 /* Private: A l l o c P T */
167 /******************************************************************************/
168 
169 int XrdSys::IOEvents::PollE::AllocPT(int slots)
170 {
171  struct epoll_event *pp;
172 
173 // Calclulate new slots
174 //
175  if (pollMax >= slots) slots = pollMax + 256;
176  else slots = pollMax + (slots/256*256) + (slots%256 ? 256 : 0);
177 
178 // Allocate a new table and if successful, replace the old one
179 //
180  if (!AllocMem((void **)&pp, slots))
181  {free(pollTab);
182  pollTab = pp;
183  pollMax = slots;
184  }
185 
186 // All done
187 //
188  return 0;
189 }
190 
191 /******************************************************************************/
192 /* Protected: B e g i n */
193 /******************************************************************************/
194 
196  int &retcode,
197  const char **eTxt)
198 {
199  int numpolled, pollN;
200  Channel *cP;
201 
202 // Indicate to the starting thread that all went well
203 //
204  retcode = 0;
205  *eTxt = 0;
206  syncsem->Post();
207 
208 // Now start dispatching channels that are ready. We use the wakePend flag to
209 // keep the chatter down when we actually wakeup.
210 //
211  do {do {numpolled = epoll_wait(pollDfd, pollTab, pollMax, TmoGet());}
212  while (numpolled < 0 && errno == EINTR);
213  CPP_ATOMIC_STORE(wakePend, true, std::memory_order_release);
214  numPoll = numpolled;
215  if (numpolled == 0) CbkTMO();
216  else if (numpolled < 0)
217  {int rc = errno;
218  //--------------------------------------------------------------
219  // If we are in a child process and the epoll file descriptor
220  // has been closed, there is an immense chance the fork will be
221  // followed by an exec, in which case we don't want to abort
222  //--------------------------------------------------------------
223  if( rc == EBADF && parentPID != getpid() ) return;
224  std::cerr <<"EPoll: "<<XrdSysE2T(rc)<<" polling for events "<<std::endl;
225  abort();
226  }
227  else for (int i = 0; i < numpolled; i++)
228  {if ((cP = (Channel *)pollTab[i].data.ptr))
229  {cbCurr = i; Dispatch(cP, pollTab[i].events);}
230  else if (!Process(i)) return;
231  }
232 
233  pollN = AtomicGet(pollNum);
234  if (pollMax < pollN) AllocPT(pollN);
235 
236  } while(1);
237 }
238 
239 /******************************************************************************/
240 /* Private: D i s p a t c h */
241 /******************************************************************************/
242 
243 void XrdSys::IOEvents::PollE::Dispatch(XrdSys::IOEvents::Channel *cP,
244  uint32_t pollEv)
245 {
246  static const uint32_t pollER = EPOLLERR| EPOLLHUP;
247  static const uint32_t pollOK = EPOLLIN | EPOLLPRI | EPOLLOUT;
248  static const uint32_t pollRD = EPOLLIN | EPOLLPRI;
249  static const uint32_t pollWR = EPOLLOUT;
250  const char *eTxt;
251  int eNum, events = 0;
252  bool isLocked = false;
253 
254 // Make sure this not a dispatch to a dead channel (rare but true)
255 //
256  if (cP == (XrdSys::IOEvents::Channel *)&deadChP) return;
257 
258 // Translate the event to something reasonable
259 //
260  if (pollEv & pollER)
261  {eTxt = "polling";
262  eNum = (pollEv & EPOLLERR ? EPIPE : ECONNRESET); // Error or HUP
263  }
264  else if (pollEv & pollOK)
265  {if (pollEv & pollRD) events |= CallBack::ReadyToRead;
266  if (pollEv & pollWR) events |= CallBack::ReadyToWrite;
267  eNum = 0; eTxt = 0;
268  }
269  else {eTxt = "polling"; eNum = EIO;}
270 
271 // Execute the callback
272 //
273  cbNow = cP;
274  if (!CbkXeq(cP, events, eNum, eTxt)) Exclude(cP, isLocked, 0);
275  cbNow = 0;
276 }
277 
278 /******************************************************************************/
279 /* Protected: E x c l u d e */
280 /******************************************************************************/
281 
283  bool &isLocked, bool dover)
284 {
285 
286 // Remove this channel from the poll set. We ignore errors as the descriptor
287 // may have been closed prior to this call (though this shouldn't happen).
288 //
289  epoll_ctl(pollDfd, EPOLL_CTL_DEL, cP->GetFD(), 0);
290  AtomicDec(pollNum);
291 
292 // If we need to verify this action, sync with the poller thread (note that the
293 // poller thread will not ask for this action unless it wants to deadlock). We
294 // may actually deadlock anyway if the channel lock is held. We are allowed to
295 // release it if the caller locked it. This will prevent a deadlock. Otherwise,
296 // if we are in a callback and this channel is not the one that initiated the
297 // exclude then we must make sure that we cancel any pending callback to the
298 // excluded channel as it may have been deleted and we won't know that here.
299 //
300  if (dover)
301  {PipeData cmdbuff;
302  if (isLocked)
303  {isLocked = false;
304  UnLockChannel(cP);
305  }
306  cmdbuff.req = PipeData::RmFD;
307  cmdbuff.fd = cP->GetFD();
308  SendCmd(cmdbuff);
309  } else {
310  if (cbNow && cbNow != cP)
311  for (int i = cbCurr+1; i < numPoll; i++)
312  {if (cP == (Channel *)pollTab[i].data.ptr)
313  pollTab[i].data.ptr = &deadChP;
314  }
315  }
316 }
317 
318 /******************************************************************************/
319 /* Protected: I n c l u d e */
320 /******************************************************************************/
321 
323  int &eNum,
324  const char **eTxt,
325  bool &isLocked)
326 {
327  struct epoll_event myEvent = {0, {(void *)cP}};
328  int events = cP->GetEvents();
329 
330 // Establish new event mask
331 //
332  if (events & Channel:: readEvents) myEvent.events = EPOLLIN | EPOLLPRI;
333  if (events & Channel::writeEvents) myEvent.events |= EPOLLOUT;
334 
335 // Add this fd to the poll set
336 //
337  if (epoll_ctl(pollDfd, EPOLL_CTL_ADD, cP->GetFD(), &myEvent))
338  {eNum = errno;
339  if (eTxt) *eTxt = "adding channel";
340  return false;
341  }
342 
343 // All went well. Bump the number in the set. The poller thread will
344 // reallocate the poll table if need be.
345 //
346  AtomicInc(pollNum);
347  return true;
348 }
349 
350 /******************************************************************************/
351 /* Protected: M o d i f y */
352 /******************************************************************************/
353 
355  int &eNum,
356  const char **eTxt,
357  bool &isLocked)
358 {
359  struct epoll_event myEvents = {0, {(void *)cP}};
360  int events = cP->GetEvents();
361 
362 // Establish new event mask
363 //
364  if (events & Channel:: readEvents) myEvents.events |= EPOLLIN | EPOLLPRI;
365  if (events & Channel::writeEvents) myEvents.events |= EPOLLOUT;
366 
367 // Modify this fd. Unlike solaris, epoll_ctl() does not block when the pollfd
368 // is being waited upon by another thread.
369 //
370  if (epoll_ctl(pollDfd, EPOLL_CTL_MOD, cP->GetFD(), &myEvents))
371  {eNum = errno;
372  if (eTxt) *eTxt = "modifying poll events";
373  return false;
374  }
375 
376 // All done
377 //
378  return true;
379 }
380 
381 /******************************************************************************/
382 /* Private: P r o c e s s */
383 /******************************************************************************/
384 
385 bool XrdSys::IOEvents::PollE::Process(int curr)
386 {
387 // Get the pipe request and check out actions of interest.
388 //
389  if (GetRequest())
390  { if (reqBuff.req == PipeData::RmFD)
391  {Channel *cP;
392  for (int i = curr+1; i < numPoll; i++)
393  {if ((cP = (Channel *)pollTab[i].data.ptr)
394  && cP != (XrdSys::IOEvents::Channel *)&deadChP
395  && reqBuff.fd == cP->GetFD()) pollTab[i].data.ptr=&deadChP;
396  }
397  reqBuff.theSem->Post();
398  }
399  else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
400  return false;
401  }
402  }
403 
404 // Return true
405 //
406  return true;
407 }
408 
409 /******************************************************************************/
410 /* Protected: S h u t d o w n */
411 /******************************************************************************/
412 
414 {
415  static XrdSysMutex shutMutex;
416 
417 // To avoid race conditions, we serialize this code
418 //
419  shutMutex.Lock();
420 
421 // Release the poll table
422 //
423  if (pollTab) {free(pollTab); pollTab = 0;}
424 
425 // Close the epoll file descriptor
426 //
427  if (pollDfd >= 0) {close(pollDfd); pollDfd = -1;}
428 
429 // All done
430 //
431  shutMutex.UnLock();
432 }
int fcntl(int fd, int cmd,...)
#define close(a)
Definition: XrdPosix.hh:43
#define eMsg(x)
#define AtomicInc(x)
#define CPP_ATOMIC_STORE(x, val, order)
#define AtomicDec(x)
#define AtomicGet(x)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
#define Atomic(x)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
PollE(struct epoll_event *ptab, int numfd, int pfd, int pFD[2])
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
static int AllocMem(void **memP, int slots)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
Poller(int cFD, int rFD)