XRootD
XrdSysIOEventsPollPoll.icc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d P o l l P o l l . i c c */
4 /* */
5 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <unistd.h>
31 #include <cstdlib>
32 #include <signal.h>
33 
34 #include "XrdSys/XrdSysE2T.hh"
35 #include "XrdSys/XrdSysError.hh"
36 #include "Xrd/XrdPollPoll.hh"
37 #include "Xrd/XrdScheduler.hh"
38 
39 
40 
41 /******************************************************************************/
42 /* C l a s s P o l l P o l l */
43 /******************************************************************************/
44 
45 namespace XrdSys
46 {
47 namespace IOEvents
48 {
49 class PollPoll : public Poller
50 {
51 public:
52 
53  PollPoll(int &rc, int numfd, int pFD[2]);
54  ~PollPoll() {Stop();}
55 
56 protected:
57 
58  void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg);
59 
60  void Exclude(Channel *cP, bool &isLocked, bool dover=1);
61 
62  bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
63 
64  bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
65 
66  void Shutdown();
67 
68 private:
69  void Dispatch(int ptent, int pollEv);
70  void FDMod(int ptnum, int fd, int events);
71  void FDRem(int ptnum);
72  bool Process();
73 
74 static const int disFD = 0x80000000;
75 
76 XrdSysRecMutex pollMutex;
77 struct pollfd *pollTab;
78  int pollMax;
79  int pollNum;
80 struct pollfd *pnewTab;
81  Channel **chnlTab;
82  int chnlMax;
83  int chnlNum;
84 };
85 };
86 };
87 
88 /******************************************************************************/
89 /* C l a s s P o l l e r */
90 /******************************************************************************/
91 /******************************************************************************/
92 /* Static: n e w P o l l e r */
93 /******************************************************************************/
94 
96 XrdSys::IOEvents::Poller::newPoller(int pipeFD[2],
97  int &eNum,
98  const char **eTxt)
99 
100 {
101  PollPoll *myPoller;
102 
103 // Allocate new poller
104 //
105  if (!(myPoller = new PollPoll(eNum, 1024, pipeFD))) eNum = ENOMEM;
106 
107 // Check if all went ell
108 //
109  if (!eNum) return (Poller *)myPoller;
110  delete myPoller;
111  if (eTxt) *eTxt = "creating poller";
112  return 0;
113 }
114 
115 /******************************************************************************/
116 /* C l a s s P o l l P o l l */
117 /******************************************************************************/
118 /******************************************************************************/
119 /* C o n s t r c u t o r */
120 /******************************************************************************/
121 
122 XrdSys::IOEvents::PollPoll::PollPoll(int &rc, int numfd, int pFD[2])
123  : Poller(pFD[0], pFD[1])
124 {
125  int i;
126 
127 // Allocate initial poll table
128 //
129  if (!(pollTab = (struct pollfd *)malloc(numfd*sizeof(struct pollfd))))
130  {rc = errno; return;}
131 
132 // Initialize it
133 //
134  for (i = 1; i < numfd; i++)
135  {pollTab[i].fd = -1; pollTab[i].events = 0; pollTab[i].revents = 0;}
136 
137 // The first element of the poll tab is the communications pipe
138 //
139  pollTab[0].fd = pFD[0];
140  pollTab[0].events = POLLIN | POLLRDNORM;
141  pollTab[0].revents = 0;
142 
143 // Initialize remaining poll data
144 //
145  pollNum = 1;
146  pollMax = numfd;
147  pnewTab = 0;
148 
149 // Allocate initial channel table
150 //
151  if (!(chnlTab = (Channel **)malloc(numfd*sizeof(Channel *))))
152  {rc = errno; return;}
153 
154 // Initialize it
155 //
156  memset(chnlTab, 0, numfd*sizeof(Channel *));
157  chnlMax = numfd;
158  chnlNum = 1;
159 
160 // All done
161 //
162  rc = 0;
163 }
164 
165 /******************************************************************************/
166 /* Protected: B e g i n */
167 /******************************************************************************/
168 
170  int &retcode,
171  const char **eTxt)
172 {
173  int i, num2poll, numpolled;
174 
175 // Indicate to the starting thread that all went well
176 //
177  retcode = 0;
178  *eTxt = 0;
179  syncsem->Post();
180 
181 // Now start dispatching channels that are ready. We use the wakePend flag to
182 // keep the chatter down when we actually wakeup.
183 //
184  pollMutex.Lock();
185  do {num2poll = pollNum;
186  pollMutex.UnLock();
187  do {numpolled = poll(pollTab, num2poll, TmoGet());}
188  while(numpolled < 0 && (errno == EAGAIN || errno == EINTR));
189  pollMutex.Lock();
190  wakePend = true;
191 
192  if (pnewTab)
193  {memcpy(pnewTab, pollTab, pollMax*sizeof(struct pollfd));
194  free(pollTab); pollTab = pnewTab; pnewTab = 0; pollMax = chnlMax;
195  }
196 
197  if (numpolled == 0) CbkTMO();
198  else if (numpolled < 0)
199  {int rc = errno;
200  //--------------------------------------------------------------
201  // If we are in a child process and the poll file descriptor
202  // has been closed, there is an immense chance the fork will be
203  // followed by an exec, in which case we don't want to abort
204  //--------------------------------------------------------------
205  if( rc == EBADF && parentPID != getpid() ) return;
206  std::cerr <<"PPoll: "<<XrdSysE2T(rc)<<" polling for events"<<std::endl;
207  abort();
208  }
209  else{if (pollTab[0].revents) numpolled--;
210  for (i = 1; i < num2poll && numpolled; i++)
211  {if (pollTab[i].revents)
212  {numpolled--;
213  Dispatch(i, pollTab[i].revents);
214  }
215  }
216  if (pollTab[0].revents && !Process()) return;
217  }
218  } while(1);
219 }
220 
221 /******************************************************************************/
222 /* Private: D i s p a t c h */
223 /******************************************************************************/
224 
225 void XrdSys::IOEvents::PollPoll::Dispatch(int ptent, int pollEv)
226 {
227  static const short pollER = POLLERR| POLLHUP | POLLNVAL;
228  static const short pollOK = POLLIN | POLLRDNORM | POLLPRI | POLLOUT;
229  static const short pollRD = POLLIN | POLLRDNORM | POLLPRI;
230  static const short pollWR = POLLOUT;
231  Channel *cP;
232  const char *eTxt;
233  int eNum, events = 0;
234 
235 // Check if we really have a channel here
236 //
237  if (!(cP = chnlTab[ptent])) {FDRem(ptent); return;}
238 
239 // Translate the event to something reasonable
240 //
241  if (pollEv & pollER)
242  {eTxt = "polling";
243  if (pollEv & POLLHUP) eNum = ECONNRESET;
244  else if (pollEv & POLLERR) eNum = EPIPE;
245  else if (pollEv & POLLNVAL)eNum = EBADF;
246  else eNum = EIO;
247  }
248  else if (pollEv & pollOK)
249  {if (pollEv & pollRD) events |= CallBack::ReadyToRead;
250  if (pollEv & pollWR) events |= CallBack::ReadyToWrite;
251  eNum = 0; eTxt = 0;
252  }
253  else {eTxt = "polling"; eNum = EIO;}
254 
255 // Execute the callback
256 //
257  if (!CbkXeq(cP, events, eNum, eTxt)) FDRem(ptent);
258 }
259 
260 /******************************************************************************/
261 /* Protected: E x c l u d e */
262 /******************************************************************************/
263 
265  bool &isLocked, bool dover)
266 {
267  int ctnum;
268 
269 // Verify that this channel is assigned.
270 //
271  ctnum = GetPollEnt(cP);
272  pollMutex.Lock();
273  if (chnlTab[ctnum] != cP) {pollMutex.UnLock(); return;}
274  pollMutex.UnLock();
275 
276 // If we are the poller thread then we can remove this now. Note that we will
277 // still have the poll mutex because the caller would have it as well.
278 // Otherwise, send a message to the poller to do this. We will need to release
279 // the channel lock to prevent deadlocks. The caller will relock as needed.
280 // This message always synchronizes with the poller.
281 //
282  if (ISPOLLER)
283  {FDRem(ctnum);
284  return;
285  } else {
286  PipeData cmdbuff((char)PipeData::RmFD,0,(short)ctnum,cP->GetFD());
287  if (isLocked) {isLocked = false; UnLockChannel(cP);}
288  SendCmd(cmdbuff);
289  }
290 }
291 
292 /******************************************************************************/
293 /* Private: F D M o d */
294 /******************************************************************************/
295 
296 // pollMutex must be locked
297 //
298 void XrdSys::IOEvents::PollPoll::FDMod(int ptnum, int fd, int events)
299 {
300  XrdSysMutexHelper mHelper(pollMutex);
301 
302 // First step is to see if we need to swap to a new poll table
303 //
304  if (pnewTab)
305  {memcpy(pnewTab, pollTab, pollMax*sizeof(struct pollfd));
306  free(pollTab);
307  pollTab = pnewTab; pnewTab = 0; pollMax = chnlMax;
308  }
309 
310 
311 // Initialize poll table entry
312 //
313  pollTab[ptnum].fd = fd;
314  pollTab[ptnum].events = 0;
315  pollTab[ptnum].revents = 0;
316  if (events & Channel:: readEvents)
317  pollTab[ptnum].events = POLLIN | POLLRDNORM;
318  if (events & Channel::writeEvents)
319  pollTab[ptnum].events |= POLLOUT;
320  if (!pollTab[ptnum].events && !(events & Channel::errorEvents))
321  pollTab[ptnum].fd |= disFD;
322 
323 // Reset poll marker, as needed
324 //
325  if (chnlNum >= pollNum) pollNum = chnlNum+1;
326 }
327 
328 /******************************************************************************/
329 /* Private: F D R e m */
330 /******************************************************************************/
331 
332 // pollMutex must be locked
333 //
334 void XrdSys::IOEvents::PollPoll::FDRem(int ptnum)
335 {
336  int ctnum = ptnum;
337 
338 // Free up the channel
339 //
340  chnlTab[ctnum] = 0;
341 
342 // See if we need to adjust the channel count
343 //
344  if (ctnum == chnlNum-1)
345  {while(ctnum > 0 && !chnlTab[ctnum]) ctnum--;
346  chnlNum = ctnum+1;
347  }
348 
349 // Free up this entry
350 //
351  pollTab[ptnum].fd = -1;
352  pollTab[ptnum].events = 0;
353  pollTab[ptnum].revents = 0;
354 
355 // Now see if we need to adjust our poll count
356 //
357  if (ptnum == pollNum-1)
358  {while(ptnum > 0 && pollTab[ptnum].fd == -1) ptnum--;
359  pollNum = ptnum+1;
360  }
361 }
362 
363 /******************************************************************************/
364 /* I n c l u d e */
365 /******************************************************************************/
366 
368  int &eNum,
369  const char **eTxt,
370  bool &isLocked)
371 {
372  static const int incVal = 256;
373  static const int cpSz = sizeof(Channel *);
374  static const int ptSz = sizeof(struct pollfd);
375  int fd, ctnum;
376 
377 // Validate the file descriptor
378 //
379  fd = cP->GetFD();
380  if (fd & 0xffff0000)
381  {eNum = EBADF;
382  if (eTxt) *eTxt = "adding channel";
383  return false;
384  }
385 
386 // Make sure this channel is not already assigned to this poller
387 //
388  if (GetPollEnt(cP))
389  {eNum = EEXIST;
390  if (eTxt) *eTxt = "adding channel";
391  return false;
392  }
393 
394 // Get the next channel table entry to be used
395 //
396  pollMutex.Lock();
397  ctnum = 1;
398  while((ctnum < chnlMax) && (chnlTab[ctnum] != 0)) ctnum++;
399 
400 // Reallocate channel table if we don't have enough space. We also pre-allocate
401 // a new poll table so that we can reflect failure to the caller as the poller
402 // can't do that. The poller will swap the new one for the old one.
403 //
404  if (ctnum >= chnlMax)
405  {Channel **cnewTab = (Channel **)realloc(chnlTab,(chnlMax+incVal)*cpSz);
406  if (pnewTab) free(pnewTab);
407  pnewTab = (struct pollfd *)malloc((chnlMax+incVal)*ptSz);
408  if (!cnewTab || !pnewTab)
409  {pollMutex.UnLock();
410  eNum = ENOMEM;
411  if (eTxt) *eTxt = "adding channel";
412  if (cnewTab) free(cnewTab);
413  if (pnewTab) free(pnewTab);
414  return false;
415  }
416  memset(&cnewTab[ctnum], 0, incVal*cpSz);
417  memset(&pnewTab[ctnum],-1, incVal*ptSz);
418  chnlTab = cnewTab; chnlMax += incVal; chnlNum = ctnum+1;
419  } else if (ctnum > chnlNum) chnlNum = ctnum;
420 
421 // Record the poll table entry in the channel
422 //
423  chnlTab[ctnum] = cP;
424  SetPollEnt(cP, ctnum);
425  pollMutex.UnLock();
426 
427 // If we are the poller thread, then enable the poll entry in-line. Note that
428 // we will still be holding the poll mutex because the caller also has it.
429 // Otherwise, send a message to the poller to do this. We will need to release
430 // the channel lock to prevent deadlocks. The caller will relock as needed.
431 //
432  if (ISPOLLER)
433  {FDMod(ctnum, fd, cP->GetEvents());
434  return true;
435  } else {
436  PipeData cmdbuff((char)PipeData::MiFD, (char)cP->GetEvents(),
437  (short)ctnum, fd, 0);
438  if (isLocked) {isLocked = false; UnLockChannel(cP);}
439  SendCmd(cmdbuff);
440  }
441 
442 // All done
443 //
444  return true;
445 }
446 
447 /******************************************************************************/
448 /* Protected: M o d i f y */
449 /******************************************************************************/
450 
452  int &eNum,
453  const char **eTxt,
454  bool &isLocked)
455 {
456 
457 // If we are the poller thread, then modify the poll entry in-line. Otherwise,
458 // send a modification message to the poller. This requires that we unlock the
459 // channel to prevent any deadlocks. The caller will relock it as needed.
460 //
461  if (ISPOLLER)
462  {FDMod(GetPollEnt(cP), cP->GetFD(), cP->GetEvents());
463  return true;
464  } else {
465  PipeData cmdbuff((char)PipeData::MdFD, (char)cP->GetEvents(),
466  (short)GetPollEnt(cP), cP->GetFD(), 0);
467  if (isLocked) {isLocked = false; UnLockChannel(cP);}
468  SendCmd(cmdbuff);
469  }
470 
471 // All done
472 //
473  return true;
474 }
475 
476 /******************************************************************************/
477 /* Private: P r o c e s s */
478 /******************************************************************************/
479 
480 bool XrdSys::IOEvents::PollPoll::Process()
481 {
482 // Get the pipe request and check out actions of interest.
483 //
484  while(GetRequest())
485  {switch(reqBuff.req)
486  {case PipeData::MdFD: FDMod(reqBuff.ent, reqBuff.fd, reqBuff.evt);
487  break;
488  case PipeData::MiFD: FDMod(reqBuff.ent, reqBuff.fd, reqBuff.evt);
489  reqBuff.theSem->Post();
490  break;
491  case PipeData::RmFD: FDRem(reqBuff.ent);
492  reqBuff.theSem->Post();
493  break;
494  case PipeData::NoOp: break;
495  case PipeData::Post: reqBuff.theSem->Post();
496  break;
497  case PipeData::Stop: reqBuff.theSem->Post();
498  return false;
499  break;
500  default: break;
501  }
502  }
503 
504 // Return true
505 //
506  return true;
507 }
508 
509 /******************************************************************************/
510 /* Protected: S h u t d o w n */
511 /******************************************************************************/
512 
514 {
515  static XrdSysMutex shutMutex;
516 
517 // To avoid race conditions, we serialize this code
518 //
519  shutMutex.Lock();
520 
521 // Release the appendages
522 //
523  if (pollTab) {free(pollTab); pollTab = 0;}
524  if (pnewTab) {free(pnewTab); pnewTab = 0;}
525  if (chnlTab) {free(chnlTab); chnlTab = 0;}
526 
527 // All done
528 //
529  shutMutex.UnLock();
530 }
#define eMsg(x)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
#define ISPOLLER
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ errorEvents
Error event non-r/w specific.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
PollPoll(int &rc, int numfd, int pFD[2])
Poller(int cFD, int rFD)