XRootD
XrdSysIOEventsPollPort.icc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d S y s I O E v e n t s P o l l E . i c c */
4 /* */
5 /* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdlib>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <port.h>
35 
36 @include "XrdSys/XrdSysE2T.hh"
37 
38 
39 /******************************************************************************/
40 /* C l a s s P o l l P o r t */
41 /******************************************************************************/
42 
43 namespace XrdSys
44 {
45 namespace IOEvents
46 {
47 class PollPort : public Poller
48 {
49 public:
50 
51 static int AllocMem(void **memP, int slots);
52 
53  PollPort(port_event_t *ptab, int numfd, int pfd, int pFD[2])
54  : Poller(pFD[0], pFD[1]), pollTab(ptab),
55  pollDfd(pfd), pollMax(numfd)
56  {}
57  ~PollPort() {Stop();}
58 
59 static const int pollER = POLLERR| POLLHUP;
60 static const int pollOK = POLLIN | POLLRDNORM | POLLPRI | POLLOUT;
61 static const int pollRD = POLLIN | POLLRDNORM | POLLPRI;
62 static const int pollWR = POLLOUT;
63 
64 protected:
65 
66  void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg);
67 
68 timespec_t *BegTO(timespec_t &theTO)
69  {int toval = TmoGet();
70  if (toval < 0) return 0;
71  theTO.tv_sec = toval/1000;
72  theTO.tv_nsec= 0;
73  return &theTO;
74  }
75 
76  void Exclude(Channel *cP, bool &isLocked, bool dover=1);
77 
78  bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
79 
80  bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
81 
82  void Shutdown();
83 
84 private:
85 
86  void Dispatch(Channel *cP, int pollEv);
87  bool Process(int curr);
88 
89  port_event_t *pollTab;
90  Channel *cbNow;
91  int cbCurr;
92  int pollDfd;
93  int pollMax;
94  unsigned int numPoll;
95 static void *deadChP;
96 };
97  void *PollPort::deadChP = 0;
98 };
99 };
100 
101 /******************************************************************************/
102 /* C l a s s P o l l e r */
103 /******************************************************************************/
104 /******************************************************************************/
105 /* Static: n e w P o l l e r */
106 /******************************************************************************/
107 
109 XrdSys::IOEvents::Poller::newPoller(int pipeFD[2],
110  int &eNum,
111  const char **eTxt)
112 
113 {
114  static const int allocFD = 170;
115  port_event_t *pp;
116  int pfd;
117 
118 // reate an event driver
119 //
120  if ((pfd = port_create()) < 0)
121  {eNum = errno;
122  if (eTxt) *eTxt = "creating event port";
123  return 0;
124  }
125  fcntl(pfd, F_SETFD, FD_CLOEXEC);
126 
127 // Add the request side of the pipe fd to the poll set (always fd[0])
128 //
129  if (port_associate(pfd, PORT_SOURCE_FD, pipeFD[0], PollPort::pollRD, 0))
130  { eNum = errno;
131  *eTxt = "adding communication pipe";
132  return 0;
133  }
134 
135 // Allocate the event table
136 //
137  if ((eNum = XrdSys::IOEvents::PollPort::AllocMem((void **)&pp, allocFD)))
138  {if (eTxt) *eTxt = "creating port event table";
139  close(pfd);
140  return 0;
141  }
142 
143 // Create new poll object
144 //
145  return (Poller *)new PollPort(pp, allocFD, pfd, pipeFD);
146 }
147 
148 /******************************************************************************/
149 /* C l a s s P o l l P o r t */
150 /******************************************************************************/
151 /******************************************************************************/
152 /* A l l o c M e m */
153 /******************************************************************************/
154 
155 int XrdSys::IOEvents::PollPort::AllocMem(void **memP, int slots)
156 {
157  int bytes, alignment, pagsz = getpagesize();
158 
159 // Calculate the size of the poll table and allocate it
160 //
161  bytes = slots * sizeof(port_event_t);
162  alignment = (bytes < pagsz ? 1024 : pagsz);
163  if (posix_memalign(memP, alignment, bytes)) return ENOMEM;
164  memset(*memP, 0, bytes);
165  return 0;
166 }
167 
168 /******************************************************************************/
169 /* Protected: B e g i n */
170 /******************************************************************************/
171 
173  int &retcode,
174  const char **eTxt)
175 {
176  unsigned int numpolled;
177  int rc;
178  timespec_t toVal;
179  Channel *cP;
180 
181 // Indicate to the starting thread that all went well
182 //
183  retcode = 0;
184  *eTxt = 0;
185  syncsem->Post();
186 
187 // Now start dispatching channels that are ready. We use the wakePend flag to
188 // keep the chatter down when we actually wakeup. There is also a "feature" of
189 // poll_getn() that can return an errno of zero upon a timeout, sigh.
190 //
191  do {numpolled = 1; errno = 0;
192  do {rc = port_getn(pollDfd, pollTab, pollMax, &numpolled, BegTO(toVal));}
193  while (rc < 0 && errno == EINTR);
194  wakePend = true; numPoll = numpolled;
195  if (rc)
196  {if (errno == ETIME || !errno) CbkTMO();
197  else {int rc = errno;
198  //--------------------------------------------------------------
199  // If we are in a child process and the poll file descriptor
200  // has been closed, there is an immense chance the fork will be
201  // followed by an exec, in which case we don't want to abort
202  //--------------------------------------------------------------
203  if( rc == EBADF && parentPID != getpid() ) return;
204  std::cerr <<"PollP: " <<XrdSysE2T(rc) <<" polling for events" <<std::endl;
205  abort();
206  }
207  }
208  for (int i = 0; i < (int)numpolled; i++)
209  if (pollTab[i].portev_source == PORT_SOURCE_FD)
210  {if ((cP = (Channel *)pollTab[i].portev_user))
211  {cbCurr = i; Dispatch(cP, pollTab[i].portev_events);}
212  else if (!Process(i)) return;
213  }
214  } while(1);
215 }
216 
217 /******************************************************************************/
218 /* Private: D i s p a t c h */
219 /******************************************************************************/
220 
221 void XrdSys::IOEvents::PollPort::Dispatch(XrdSys::IOEvents::Channel *cP,
222  int pollEv)
223 {
224  const char *eTxt;
225  int eNum, events = 0;
226  bool isLocked = false;
227 
228 // Make sure this not a dispatch to a dead channel (rare but true)
229 //
230  if (cP == (XrdSys::IOEvents::Channel *)&deadChP) return;
231 
232 // Translate the event to something reasonable
233 //
234  if (pollEv & pollER)
235  {eTxt = "polling";
236  eNum = (pollEv & POLLERR ? EPIPE : ECONNRESET); // Error or HUP
237  }
238  else if (pollEv & pollOK)
239  {if (pollEv & pollRD) events |= CallBack::ReadyToRead;
240  if (pollEv & pollWR) events |= CallBack::ReadyToWrite;
241  eNum = 0; eTxt = 0;
242  }
243  else {eTxt = "polling"; eNum = EIO;}
244 
245 // Execute the callback
246 //
247  cbNow = cP;
248  if (!CbkXeq(cP, events, eNum, eTxt)) Exclude(cP, isLocked, 0);
249  else Modify(cP, eNum, &eTxt, isLocked);
250  cbNow = 0;
251 }
252 
253 /******************************************************************************/
254 /* Protected: E x c l u d e */
255 /******************************************************************************/
256 
258  bool &isLocked, bool dover)
259 {
260 
261 // Remove this channel from the poll set. We ignore errors as the descriptor
262 // may have been closed prior to this call (though this shouldn't happen).
263 //
264  port_dissociate(pollDfd, PORT_SOURCE_FD, cP->GetFD());
265 
266 // If we need to verify this action, sync with the poller thread (note that the
267 // poller thread will not ask for this action unless it wants to deadlock). We
268 // may actually deadlock anyway if the channel lock is held. We are allowed to
269 // release it if the caller locked it. This will prevent a deadlock.
270 //
271  if (dover)
272  {PipeData cmdbuff;
273  if (isLocked)
274  {isLocked = false;
275  UnLockChannel(cP);
276  }
277  cmdbuff.req = PipeData::RmFD;
278  cmdbuff.fd = cP->GetFD();
279  SendCmd(cmdbuff);
280  } else {
281  if (cbNow && cbNow != cP)
282  for (int i = cbCurr+1; i < numPoll; i++)
283  {if (cP == (Channel *)pollTab[i].portev_user)
284  pollTab[i].portev_user = &deadChP;
285  }
286  }
287 }
288 
289 /******************************************************************************/
290 /* Protected: I n c l u d e */
291 /******************************************************************************/
292 
294  int &eNum,
295  const char **eTxt,
296  bool &isLocked)
297 {
298  int pEvents = 0, events = cP->GetEvents();
299 
300 // Establish new event mask
301 //
302  if (events & Channel:: readEvents) pEvents = pollRD;
303  if (events & Channel::writeEvents) pEvents |= pollWR;
304 
305 // Add this fd to the poll set
306 //
307  if (port_associate(pollDfd, PORT_SOURCE_FD, cP->GetFD(), pEvents, cP))
308  {eNum = errno;
309  if (eTxt) *eTxt = "adding channel";
310  return false;
311  }
312 
313 // All went well.
314 //
315  return true;
316 }
317 
318 /******************************************************************************/
319 /* Protected: M o d i f y */
320 /******************************************************************************/
321 
323  int &eNum,
324  const char **eTxt,
325  bool &isLocked)
326 {
327  int pEvents = 0, events = cP->GetEvents();
328 
329 // Establish new event mask
330 //
331  if (events & Channel:: readEvents) pEvents = pollRD;
332  if (events & Channel::writeEvents) pEvents |= pollWR;
333 
334 // Associate the fd to the poll set
335 //
336  if (port_associate(pollDfd, PORT_SOURCE_FD, cP->GetFD(), pEvents, cP))
337  {eNum = errno;
338  if (eTxt) *eTxt = "modifying poll events";
339  return false;
340  }
341 
342 // All done
343 //
344  return true;
345 }
346 
347 /******************************************************************************/
348 /* Private: P r o c e s s */
349 /******************************************************************************/
350 
351 bool XrdSys::IOEvents::PollPort::Process(int curr)
352 {
353 // Get the pipe request and check out actions of interest.
354 //
355  if (GetRequest())
356  { if (reqBuff.req == PipeData::RmFD)
357  {Channel *cP;
358  for (int i = curr+1; i < numPoll; i++)
359  {if (reqBuff.fd == (int)pollTab[i].portev_object)
360  pollTab[i].portev_user = &deadChP;
361  }
362  reqBuff.theSem->Post();
363  }
364  else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
365  return false;
366  }
367  }
368 
369 // Associate the pipe again and return true
370 //
371  port_associate(pollDfd, PORT_SOURCE_FD, reqFD, pollRD, 0);
372  return true;
373 }
374 
375 /******************************************************************************/
376 /* Protected: S h u t d o w n */
377 /******************************************************************************/
378 
380 {
381  static XrdSysMutex shutMutex;
382 
383 // To avoid race conditions, we serialize this code
384 //
385  shutMutex.Lock();
386 
387 // Release the poll table
388 //
389  if (pollTab) {free(pollTab); pollTab = 0;}
390 
391 // Close the epoll file descriptor
392 //
393  if (pollDfd >= 0) {close(pollDfd); pollDfd = -1;}
394 
395 // All done
396 //
397  shutMutex.UnLock();
398 }
int fcntl(int fd, int cmd,...)
#define close(a)
Definition: XrdPosix.hh:43
#define eMsg(x)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
PollPort(port_event_t *ptab, int numfd, int pfd, int pFD[2])
timespec_t * BegTO(timespec_t &theTO)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
static int AllocMem(void **memP, int slots)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
Poller(int cFD, int rFD)