XRootD
XrdPoll.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d P o l l . c c */
4 /* */
5 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <unistd.h>
31 #include <cstdio>
32 #include <cstdlib>
33 
34 #include "XrdSys/XrdSysError.hh"
35 #include "XrdSys/XrdSysFD.hh"
36 #include "XrdSys/XrdSysPlatform.hh"
37 #include "XrdSys/XrdSysPthread.hh"
38 #include "Xrd/XrdLink.hh"
39 #include "Xrd/XrdProtocol.hh"
40 
41 #define TRACE_IDENT pInfo.Link.ID
42 #include "Xrd/XrdTrace.hh"
43 
44 #if defined( __linux__ )
45 #include "Xrd/XrdPollE.hh"
46 //#include "Xrd/XrdPollPoll.hh"
47 #else
48 #include "Xrd/XrdPollPoll.hh"
49 #endif
50 
51 #include "Xrd/XrdPollInfo.hh"
52 
53 /******************************************************************************/
54 /* L o c a l C l a s s e s */
55 /******************************************************************************/
56 
57 class XrdPoll_End : public XrdProtocol
58 {
59 public:
60 
61 void DoIt() {}
62 
63 XrdProtocol *Match(XrdLink *lp) {return (XrdProtocol *)0;}
64 
65 int Process(XrdLink *lp) {return -1;}
66 
67 void Recycle(XrdLink *lp, int x, const char *y) {}
68 
69 int Stats(char *buff, int blen, int do_sync=0) {return 0;}
70 
71  XrdPoll_End() : XrdProtocol("link termination") {}
73 };
74 
75 /******************************************************************************/
76 /* G l o b a l D a t a */
77 /******************************************************************************/
78 
80 
81  XrdSysMutex XrdPoll::doingAttach;
82 
83  const char *XrdPoll::TraceID = "Poll";
84 
85 namespace XrdGlobal
86 {
87 extern XrdSysError Log;
88 extern XrdScheduler Sched;
89 }
90 
91 using namespace XrdGlobal;
92 
93 /******************************************************************************/
94 /* T h r e a d S t a r t u p I n t e r f a c e */
95 /******************************************************************************/
96 
97 struct XrdPollArg
99  int retcode;
101 
102  XrdPollArg() : PollSync(0, "poll sync") {}
104  };
105 
106 
107 void *XrdStartPolling(void *parg)
108 {
109  struct XrdPollArg *PArg = (struct XrdPollArg *)parg;
110  PArg->Poller->Start(&(PArg->PollSync), PArg->retcode);
111  return (void *)0;
112 }
113 
114 /******************************************************************************/
115 /* C o n s t r u c t o r */
116 /******************************************************************************/
117 
119 {
120  int fildes[2];
121 
122  TID=0;
123  numAttached=numEnabled=numEvents=numInterrupts=0;
124 
125  if (XrdSysFD_Pipe(fildes) == 0)
126  {CmdFD = fildes[1];
127  ReqFD = fildes[0];
128  } else {
129  CmdFD = ReqFD = -1;
130  Log.Emsg("Poll", errno, "create poll pipe");
131  }
132  PipeBuff = 0;
133  PipeBlen = 0;
134  PipePoll.fd = ReqFD;
135  PipePoll.events = POLLIN | POLLRDNORM;
136 }
137 
138 /******************************************************************************/
139 /* A t t a c h */
140 /******************************************************************************/
141 
142 int XrdPoll__Attach(XrdLink *lp) {return lp->Activate();}
143 
145 {
146  int i;
147  XrdPoll *pp;
148 
149 // We allow only one attach at a time to simplify the processing
150 //
151  doingAttach.Lock();
152 
153 // Find a poller with the smallest number of entries
154 //
155  pp = Pollers[0];
156  for (i = 1; i < XRD_NUMPOLLERS; i++)
157  if (pp->numAttached > Pollers[i]->numAttached) pp = Pollers[i];
158 
159 // Include this FD into the poll set of the poller
160 //
161  if (!pp->Include(pInfo)) {doingAttach.UnLock(); return 0;}
162 
163 // Complete the link setup
164 //
165  pInfo.Poller = pp;
166  pp->numAttached++;
167  doingAttach.UnLock();
168  TRACEI(POLL, "FD " <<pInfo.FD <<" attached to poller " <<pp->PID
169  <<"; num=" <<pp->numAttached);
170  return 1;
171 }
172 
173 /******************************************************************************/
174 /* D e t a c h */
175 /******************************************************************************/
176 
178 {
179  XrdPoll *pp;
180 
181 // If link is not attached, simply return
182 //
183  if (!(pp = pInfo.Poller)) return;
184 
185 // Exclude this link from the associated poll set
186 //
187  pp->Exclude(pInfo);
188 
189 // Make sure we are consistent
190 //
191  doingAttach.Lock();
192  if (!pp->numAttached)
193  {Log.Emsg("Poll","Underflow detaching", pInfo.Link.ID); abort();}
194  pp->numAttached--;
195  doingAttach.UnLock();
196  TRACEI(POLL, "FD " <<pInfo.FD <<" detached from poller " <<pp->PID
197  <<"; num=" <<pp->numAttached);
198 }
199 
200 /******************************************************************************/
201 /* F i n i s h */
202 /******************************************************************************/
203 
204 int XrdPoll::Finish(XrdPollInfo &pInfo, const char *etxt)
205 {
206  static XrdPoll_End LinkEnd;
207 
208 // If this link is already scheduled for termination, ignore this call.
209 //
210  if (pInfo.Link.getProtocol() == &LinkEnd)
211  {TRACEI(POLL, "Link " <<pInfo.FD <<" already terminating; "
212  <<(etxt ? etxt : "") <<" request ignored.");
213  return 0;
214  }
215 
216 // Set the protocol pointer to be link termination
217 //
218  pInfo.Link.setProtocol(&LinkEnd, false, true);
219  if (!etxt) etxt = "reason unknown";
220  pInfo.Link.setEtext(etxt);
221  TRACEI(POLL, "Link " <<pInfo.FD <<" terminating: " <<etxt);
222  return 1;
223 }
224 
225 /******************************************************************************/
226 /* g e t R e q u e s t */
227 /******************************************************************************/
228 
229 // Warning: This method runs unlocked. The caller must have exclusive use of
230 // the ReqBuff otherwise unpredictable results will occur.
231 
233 {
234  ssize_t rlen;
235  int rc;
236 
237 // See if we are to resume a read or start a fresh one
238 //
239  if (!PipeBlen)
240  {PipeBuff = (char *)&ReqBuff; PipeBlen = sizeof(ReqBuff);}
241 
242 // Wait for the next request. Some OS's (like Linux) don't support non-blocking
243 // pipes. So, we must front the read with a poll.
244 //
245  do {rc = poll(&PipePoll, 1, 0);}
246  while(rc < 0 && (errno == EAGAIN || errno == EINTR));
247  if (rc < 1) return 0;
248 
249 // Now we can put up a read without a delay. Normally a full command will be
250 // present. Under some heavy conditions, this may not be the case.
251 //
252  do {rlen = read(ReqFD, PipeBuff, PipeBlen);}
253  while(rlen < 0 && errno == EINTR);
254  if (rlen <= 0)
255  {if (rlen) Log.Emsg("Poll", errno, "read from request pipe");
256  return 0;
257  }
258 
259 // Check if all the data has arrived. If not all the data is present, defer
260 // this request until more data arrives.
261 //
262  if (!(PipeBlen -= rlen)) return 1;
263  PipeBuff += rlen;
264  TRACE(POLL, "Poller " <<PID <<" still needs " <<PipeBlen <<" req pipe bytes");
265  return 0;
266 }
267 
268 /******************************************************************************/
269 /* P o l l 2 T e x t */
270 /******************************************************************************/
271 
272 char *XrdPoll::Poll2Text(short events)
273 {
274  if (events & POLLERR) return strdup("socket error");
275 
276  if (events & POLLHUP) return strdup("hangup");
277 
278  if (events & POLLNVAL) return strdup("socket closed");
279 
280  {char buff[64];
281  sprintf(buff, "unusual event (%.4x)", events);
282  return strdup(buff);
283  }
284  return (char *)0;
285 }
286 
287 /******************************************************************************/
288 /* S e t u p */
289 /******************************************************************************/
290 
291 int XrdPoll::Setup(int numfd)
292 {
293  pthread_t tid;
294  int maxfd, retc, i;
295  struct XrdPollArg PArg;
296 
297 // Calculate the number of table entries per poller
298 //
299  maxfd = (numfd / XRD_NUMPOLLERS) + 16;
300 
301 // Verify that we initialized the poller table
302 //
303  for (i = 0; i < XRD_NUMPOLLERS; i++)
304  {if (!(Pollers[i] = newPoller(i, maxfd))) return 0;
305  Pollers[i]->PID = i;
306 
307  // Now start a thread to handle this poller object
308  //
309  PArg.Poller = Pollers[i];
310  PArg.retcode= 0;
311  TRACE(POLL, "Starting poller " <<i);
312  if ((retc = XrdSysThread::Run(&tid,XrdStartPolling,(void *)&PArg,
313  XRDSYSTHREAD_BIND, "Poller")))
314  {Log.Emsg("Poll", retc, "create poller thread"); return 0;}
315  Pollers[i]->TID = tid;
316  PArg.PollSync.Wait();
317  if (PArg.retcode)
318  {Log.Emsg("Poll", PArg.retcode, "start poller");
319  return 0;
320  }
321  }
322 
323 // All done
324 //
325  return 1;
326 }
327 
328 /******************************************************************************/
329 /* S t a t s */
330 /******************************************************************************/
331 
332 int XrdPoll::Stats(char *buff, int blen, int do_sync)
333 {
334  static const char statfmt[] = "<stats id=\"poll\"><att>%d</att>"
335  "<en>%d</en><ev>%d</ev><int>%d</int></stats>";
336  int i, numatt = 0, numen = 0, numev = 0, numint = 0;
337  XrdPoll *pp;
338 
339 // Return number of bytes if so wanted
340 //
341  if (!buff) return (sizeof(statfmt)+(4*16))*XRD_NUMPOLLERS;
342 
343 // Get statistics. While we wish we could honor do_sync, doing so would be
344 // costly and hardly worth it. So, we do not include code such as:
345 // x = pp->y; if (do_sync) while(x != pp->y) x = pp->y; tot += x;
346 //
347  for (i = 0; i < XRD_NUMPOLLERS; i++)
348  {pp = Pollers[i];
349  numatt += pp->numAttached;
350  numen += pp->numEnabled;
351  numev += pp->numEvents;
352  numint += pp->numInterrupts;
353  }
354 
355 // Format and return
356 //
357  return snprintf(buff, blen, statfmt, numatt, numen, numev, numint);
358 }
359 
360 /******************************************************************************/
361 /* I m p l e m e n t a t i o n S p e c i f i c s */
362 /******************************************************************************/
363 
364 #if defined( __linux__ )
365 #include "Xrd/XrdPollE.icc"
366 //#include "Xrd/XrdPollPoll.icc"
367 #else
368 #include "Xrd/XrdPollPoll.icc"
369 #endif
void * XrdStartPolling(void *parg)
Definition: XrdPoll.cc:107
int XrdPoll__Attach(XrdLink *lp)
Definition: XrdPoll.cc:142
#define XRD_NUMPOLLERS
Definition: XrdPoll.hh:35
ssize_t read(int fildes, void *buf, size_t nbyte)
#define XRDSYSTHREAD_BIND
#define TRACE(act, x)
Definition: XrdTrace.hh:63
#define TRACEI(act, x)
Definition: XrdTrace.hh:66
XrdLink & Link
Definition: XrdPollInfo.hh:41
XrdPoll * Poller
Definition: XrdPollInfo.hh:43
int Stats(char *buff, int blen, int do_sync=0)
Definition: XrdPoll.cc:69
void DoIt()
Definition: XrdPoll.cc:61
XrdPoll_End()
Definition: XrdPoll.cc:71
~XrdPoll_End()
Definition: XrdPoll.cc:72
void Recycle(XrdLink *lp, int x, const char *y)
Definition: XrdPoll.cc:67
XrdProtocol * Match(XrdLink *lp)
Definition: XrdPoll.cc:63
int Process(XrdLink *lp)
Definition: XrdPoll.cc:65
static const char * TraceID
Definition: XrdPoll.hh:94
int numInterrupts
Definition: XrdPoll.hh:134
XrdPoll()
Definition: XrdPoll.cc:118
static XrdPoll * Pollers[XRD_NUMPOLLERS]
Definition: XrdPoll.hh:87
int PID
Definition: XrdPoll.hh:82
virtual int Include(XrdPollInfo &pInfo)=0
virtual void Start(XrdSysSemaphore *syncp, int &rc)=0
virtual void Exclude(XrdPollInfo &pInfo)=0
int numEvents
Definition: XrdPoll.hh:133
int getRequest()
Definition: XrdPoll.cc:232
static char * Poll2Text(short events)
Definition: XrdPoll.cc:272
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
Definition: XrdPoll.cc:204
static void Detach(XrdPollInfo &pInfo)
Definition: XrdPoll.cc:177
int numEnabled
Definition: XrdPoll.hh:132
static int Setup(int numfd)
Definition: XrdPoll.cc:291
static int Stats(char *buff, int blen, int do_sync=0)
Definition: XrdPoll.cc:332
static int Attach(XrdPollInfo &pInfo)
Definition: XrdPoll.cc:144
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
XrdSysError Log
Definition: XrdConfig.cc:112
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54
XrdPoll * Poller
Definition: XrdPoll.cc:98
int retcode
Definition: XrdPoll.cc:99
~XrdPollArg()
Definition: XrdPoll.cc:103
XrdPollArg()
Definition: XrdPoll.cc:102
XrdSysSemaphore PollSync
Definition: XrdPoll.cc:100