XRootD
XrdSysIOEventsPollKQ.icc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d S y s I O E v e n t s P o l l K Q . i c c */
4 /* */
5 /* (c) 2014 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdlib>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <sys/event.h>
35 #include <sys/time.h>
36 
37 #include "XrdSys/XrdSysAtomics.hh"
38 #include "XrdSys/XrdSysE2T.hh"
39 #ifndef Atomic
40 #define Atomic(x) x
41 #endif
42 
43 
44 /******************************************************************************/
45 /* C l a s s P o l l E */
46 /******************************************************************************/
47 
48 namespace XrdSys
49 {
50 namespace IOEvents
51 {
52 class PollKQ : public Poller
53 {
54 public:
55 
56 static int AllocMem(void **memP, int slots);
57 
58  PollKQ(struct kevent *ptab, int numfd, int pfd, int pFD[2])
59  : Poller(pFD[0], pFD[1]), pollTab(ptab), cbNext(0),
60  pollDfd(pfd), pollMax(numfd), pollNum(1), numPoll(0)
61  {EV_SET(&armPipe, reqFD, EVFILT_READ,
62  EV_ADD|EV_CLEAR|EV_ENABLE, 0, 0, 0);
63  }
64  ~PollKQ() {Stop();}
65 
66 protected:
67 
68  void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg);
69 
70  void Exclude(Channel *cP, bool &isLocked, bool dover=1);
71 
72  bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
73 
74  bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
75 
76  void Shutdown();
77 
78 private:
79  int AllocPT(int slots);
80  void Dispatch(Channel *cP, int i);
81  bool Process(int next);
82 
83 struct kevent *pollTab;
84 struct kevent armPipe;
85  int cbNext;
86  int pollDfd;
87  int pollMax;
88  Atomic(int) pollNum;
89  int numPoll;
90 static void *deadChP;
91 
92 static const int rEnabled = 1;
93 static const int rFilterX = 2;
94 static const int wEnabled = 4;
95 static const int wFilterX = 8;
96 };
97  void *PollKQ::deadChP = 0;
98 };
99 };
100 
101 /******************************************************************************/
102 /* C l a s s P o l l e r */
103 /******************************************************************************/
104 /******************************************************************************/
105 /* Static: n e w P o l l e r */
106 /******************************************************************************/
107 
109 XrdSys::IOEvents::Poller::newPoller(int pipeFD[2],
110  int &eNum,
111  const char **eTxt)
112 
113 {
114  static const int allocFD = 1024;
115  struct kevent *pp, chlist;
116  int pfd;
117 
118 // Open the kqueue
119 //
120  if ((pfd = kqueue()) < 0)
121  {eNum = errno;
122  if (eTxt) *eTxt = "creating kqueue";
123  return 0;
124  }
125 
126 // Add the request side of the pipe fd to the poll set (always fd[0])
127 //
128  EV_SET(&chlist,pipeFD[0],EVFILT_READ,EV_ADD|EV_ONESHOT|EV_ENABLE,0,0,0);
129  if (kevent(pfd, &chlist, 1, 0, 0, 0) < 0)
130  { eNum = errno;
131  *eTxt = "adding communication pipe";
132  return 0;
133  }
134 
135 // Allocate the event table
136 //
137  if ((eNum = XrdSys::IOEvents::PollKQ::AllocMem((void **)&pp, allocFD)))
138  {eNum = ENOMEM;
139  if (eTxt) *eTxt = "creating kqueue table";
140  close(pfd);
141  return 0;
142  }
143 
144 // Create new poll object
145 //
146  return (Poller *)new PollKQ(pp, allocFD, pfd, pipeFD);
147 }
148 
149 /******************************************************************************/
150 /* C l a s s P o l l E */
151 /******************************************************************************/
152 /******************************************************************************/
153 /* A l l o c M e m */
154 /******************************************************************************/
155 
156 int XrdSys::IOEvents::PollKQ::AllocMem(void **memP, int slots)
157 {
158  int rc, bytes, alignment, pagsz = getpagesize();
159 
160 // Calculate the size of the poll table and allocate it
161 //
162  bytes = slots * sizeof(struct kevent);
163  alignment = (bytes < pagsz ? 1024 : pagsz);
164  if (!(rc = posix_memalign(memP, alignment, bytes))) memset(*memP, 0, bytes);
165  return rc;
166 }
167 
168 /******************************************************************************/
169 /* Private: A l l o c P T */
170 /******************************************************************************/
171 
172 int XrdSys::IOEvents::PollKQ::AllocPT(int slots)
173 {
174  struct kevent *pp;
175 
176 // Calclulate new slots
177 //
178  if (pollMax >= slots) slots = pollMax + 256;
179  else slots = pollMax + (slots/256*256) + (slots%256 ? 256 : 0);
180 
181 // Allocate a new table and if successful, replace the old one
182 //
183  if (!AllocMem((void **)&pp, slots))
184  {free(pollTab);
185  pollTab = pp;
186  pollMax = slots;
187  }
188 
189 // All done
190 //
191  return 0;
192 }
193 
194 /******************************************************************************/
195 /* Protected: B e g i n */
196 /******************************************************************************/
197 
199  int &retcode,
200  const char **eTxt)
201 {
202  struct timespec *tmP, tmOut;
203  Channel *cP;
204  long long tmVal;
205  int numpolled, pollN;
206 
207 // Indicate to the starting thread that all went well
208 //
209  retcode = 0;
210  *eTxt = 0;
211  syncsem->Post();
212  tmOut.tv_nsec = 0;
213 
214 // Now start dispatching channels that are ready. We use the wakePend flag to
215 // keep the chatter down when we actually wakeup.
216 //
217  do {if ((tmVal = TmoGet()) < 0) tmP = 0;
218  else {tmOut.tv_sec = tmVal / 1000; tmP = &tmOut;}
219  do {numpolled = kevent(pollDfd, 0, 0, pollTab, pollMax, tmP);}
220  while (numpolled < 0 && errno == EINTR);
221  wakePend = true; numPoll = numpolled;
222  if (numpolled == 0) CbkTMO();
223  else if (numpolled < 0)
224  {int rc = errno;
225  //--------------------------------------------------------------
226  // If we are in a child process and the poll file descriptor
227  // has been closed, there is an immense chance the fork will be
228  // followed by an exec, in which case we don't want to abort
229  //--------------------------------------------------------------
230  if( rc == EBADF && parentPID != getpid() ) return;
231  std::cerr <<"KQ: " <<XrdSysE2T(rc) <<" polling for events" <<std::endl;
232  abort();
233  }
234  else for (int i = 0; i < numpolled; i++)
235  {if ((cP = (Channel *)pollTab[i].udata)) Dispatch(cP, i);
236  else if (!Process(i+1)) return;
237  }
238 
239  pollN = AtomicGet(pollNum);
240  if (pollMax < pollN) AllocPT(pollN);
241 
242  } while(1);
243 }
244 
245 /******************************************************************************/
246 /* Private: D i s p a t c h */
247 /******************************************************************************/
248 
249 void XrdSys::IOEvents::PollKQ::Dispatch(XrdSys::IOEvents::Channel *cP, int i)
250 {
251  static const uint16_t pollER = EV_EOF | EV_ERROR;
252  const char *eTxt;
253  int eNum, events;
254  bool isLocked = false;
255 
256 // Make sure this not a dispatch to a dead channel (rare but true)
257 //
258  if (cP == (XrdSys::IOEvents::Channel *)&deadChP) return;
259 
260 // Translate the event to something reasonable
261 //
262  if (!(pollTab[i].flags & pollER))
263  {if (pollTab[i].filter == EVFILT_READ) events = CallBack::ReadyToRead;
264  else events = CallBack::ReadyToWrite;
265  eNum = 0; eTxt = 0;
266  } else {
267  if (pollTab[i].fflags) eNum = pollTab[i].fflags;
268  else eNum = ECONNRESET;
269  eTxt = "polling"; events = 0;
270  }
271 
272 // Execute the callback
273 //
274  cbNext = i+1;
275  if (!CbkXeq(cP, events, eNum, eTxt)) Exclude(cP, isLocked, 0);
276  cbNext = 0;
277 }
278 
279 /******************************************************************************/
280 /* Protected: E x c l u d e */
281 /******************************************************************************/
282 
284  bool &isLocked, bool dover)
285 {
286  struct kevent chlist[2];
287  int i = 0, theFD = cP->GetFD(), kqStatus = GetPollEnt(cP);
288 
289 // Setup the removal elements.
290 // may have been closed prior to this call (though this shouldn't happen).
291 //
292  if (kqStatus & rFilterX)
293  {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_DELETE, 0, 0, cP);}
294  if (kqStatus & wFilterX)
295  {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_DELETE, 0, 0, cP);}
296 
297 // Remove this channel from the poll set. We ignore errors as the descriptor
298 // may have been closed prior to this call (though this shouldn't happen).
299 //
300  if (i) kevent(pollDfd, chlist, i, 0, 0, 0);
301  SetPollEnt(cP, 0);
302  AtomicDec(pollNum);
303 
304 // If we need to verify this action, sync with the poller thread (note that the
305 // poller thread will not ask for this action unless it wants to deadlock). We
306 // may actually deadlock anyway if the channel lock is held. We are allowed to
307 // release it if the caller locked it. This will prevent a deadlock. Otherwise,
308 // if we are in a callback and this channel is not the one that initiated the
309 // exclude then we must make sure that we cancel any pending callback to the
310 // excluded channel as it may have been deleted and we won't know that here.
311 //
312  if (dover)
313  {PipeData cmdbuff;
314  if (isLocked)
315  {isLocked = false;
316  UnLockChannel(cP);
317  }
318  cmdbuff.req = PipeData::RmFD;
319  cmdbuff.fd = theFD;
320  SendCmd(cmdbuff);
321  } else {
322  if (cbNext)
323  for (int i = cbNext; i < numPoll; i++)
324  {if (cP == (Channel *)pollTab[i].udata)
325  pollTab[i].udata = &deadChP;
326  }
327  }
328 }
329 
330 /******************************************************************************/
331 /* Protected: I n c l u d e */
332 /******************************************************************************/
333 
335  int &eNum,
336  const char **eTxt,
337  bool &isLocked)
338 {
339 
340 // We simply call modify as this will add events to the kqueue as needed
341 //
342  if (!Modify(cP, eNum, eTxt, isLocked))
343  {if (eTxt) *eTxt = "adding channel";
344  return false;
345  }
346 
347 // All went well. Bump the number in the set. The poller thread will
348 // reallocate the poll table if need be.
349 //
350  AtomicInc(pollNum);
351  return true;
352 }
353 
354 /******************************************************************************/
355 /* Protected: M o d i f y */
356 /******************************************************************************/
357 
359  int &eNum,
360  const char **eTxt,
361  bool &isLocked)
362 {
363  (void)isLocked;
364  struct kevent chlist[2];
365  int i = 0;
366  int events = cP->GetEvents(), theFD = cP->GetFD();
367  int kqStatus = GetPollEnt(cP);
368 
369 // Establish new read event mask
370 //
371  if (events & Channel:: readEvents)
372  {if (!(kqStatus & rEnabled))
373  {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, cP);
374  kqStatus |= rEnabled | rFilterX;
375  i++;
376  }
377  } else {
378  if (kqStatus & rEnabled)
379  {EV_SET(&chlist[i], theFD, EVFILT_READ, EV_DISABLE, 0, 0, cP);
380  kqStatus &= ~rEnabled;
381  i++;
382  }
383  }
384 
385 // Establish new write event mask
386 //
387  if (events & Channel::writeEvents)
388  {if (!(kqStatus & wEnabled))
389  {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, cP);
390  kqStatus |= wEnabled | wFilterX;
391  i++;
392  }
393  } else {
394  if (kqStatus & wEnabled)
395  {EV_SET(&chlist[i], theFD, EVFILT_WRITE, EV_DISABLE, 0, 0, cP);
396  kqStatus &= ~wEnabled;
397  i++;
398  }
399  }
400 
401 // Modify this fd if anything changed
402 //
403  if (i)
404  {if (kevent(pollDfd, chlist, i, 0, 0, 0) < 0)
405  {eNum = errno;
406  if (eTxt) *eTxt = "modifying poll events";
407  return false;
408  }
409  SetPollEnt(cP, kqStatus);
410  }
411 
412 // All done
413 //
414  return true;
415 }
416 
417 /******************************************************************************/
418 /* Private: P r o c e s s */
419 /******************************************************************************/
420 
421 bool XrdSys::IOEvents::PollKQ::Process(int next)
422 {
423 
424 // Get the pipe request and check out actions of interest.
425 //
426  if (GetRequest())
427  { if (reqBuff.req == PipeData::RmFD)
428  {Channel *cP;
429  for (int i = next; i < numPoll; i++)
430  {if ((cP = (Channel *)pollTab[i].udata)
431  && cP != (XrdSys::IOEvents::Channel *)&deadChP
432  && reqBuff.fd == (int)pollTab[i].ident)
433  pollTab[i].udata = &deadChP;
434  }
435  reqBuff.theSem->Post();
436  }
437  else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
438  return false;
439  }
440  }
441 
442 // Renable the pipe as kqueue essentially disables it once we do a read-out
443 //
444  kevent(pollDfd, &armPipe, 1, 0, 0, 0);
445 
446 // All done
447 //
448  return true;
449 }
450 
451 /******************************************************************************/
452 /* Protected: S h u t d o w n */
453 /******************************************************************************/
454 
456 {
457  static XrdSysMutex shutMutex;
458 
459 // To avoid race conditions, we serialize this code
460 //
461  shutMutex.Lock();
462 
463 // Release the poll table
464 //
465  if (pollTab) {free(pollTab); pollTab = 0;}
466 
467 // Close the kqueue file descriptor
468 //
469  if (pollDfd >= 0) {close(pollDfd); pollDfd = -1;}
470 
471 // All done
472 //
473  shutMutex.UnLock();
474 }
#define close(a)
Definition: XrdPosix.hh:43
#define eMsg(x)
#define AtomicInc(x)
#define AtomicDec(x)
#define AtomicGet(x)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
#define Atomic(x)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
PollKQ(struct kevent *ptab, int numfd, int pfd, int pFD[2])
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
static int AllocMem(void **memP, int slots)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
Poller(int cFD, int rFD)