XRootD
XrdClPollerBuiltIn.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClDefaultEnv.hh"
28 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClSocket.hh"
30 #include "XrdCl/XrdClOptimizers.hh"
31 #include "XrdSys/XrdSysE2T.hh"
32 #include "XrdSys/XrdSysIOEvents.hh"
33 
34 namespace
35 {
36  //----------------------------------------------------------------------------
37  // A helper struct passed to the callback as a custom arg
38  //----------------------------------------------------------------------------
39  struct PollerHelper
40  {
41  PollerHelper():
42  channel(0), callBack(0), readEnabled(false), writeEnabled(false),
43  readTimeout(0), writeTimeout(0)
44  {}
47  bool readEnabled;
48  bool writeEnabled;
49  uint16_t readTimeout;
50  uint16_t writeTimeout;
51  };
52 
53  //----------------------------------------------------------------------------
54  // Call back implementation
55  //----------------------------------------------------------------------------
56  class SocketCallBack: public XrdSys::IOEvents::CallBack
57  {
58  public:
59  SocketCallBack( XrdCl::Socket *sock, XrdCl::SocketHandler *sh ):
60  pSocket( sock ), pHandler( sh ) {}
61  virtual ~SocketCallBack() {};
62 
63  virtual bool Event( XrdSys::IOEvents::Channel *chP,
64  void *cbArg,
65  int evFlags )
66  {
67  using namespace XrdCl;
68  uint8_t ev = 0;
69 
70  if( evFlags & ReadyToRead ) ev |= SocketHandler::ReadyToRead;
71  if( evFlags & ReadTimeOut ) ev |= SocketHandler::ReadTimeOut;
72  if( evFlags & ReadyToWrite ) ev |= SocketHandler::ReadyToWrite;
73  if( evFlags & WriteTimeOut ) ev |= SocketHandler::WriteTimeOut;
74 
75  Log *log = DefaultEnv::GetLog();
76  if( unlikely(log->GetLevel() >= Log::DumpMsg) )
77  {
78  log->Dump( PollerMsg, "%s Got an event: %s",
79  pSocket->GetName().c_str(),
80  SocketHandler::EventTypeToString( ev ).c_str() );
81  }
82 
83  pHandler->Event( ev, pSocket );
84  return true;
85  }
86  private:
87  XrdCl::Socket *pSocket;
88  XrdCl::SocketHandler *pHandler;
89  };
90 }
91 
92 
93 namespace XrdCl
94 {
95  //----------------------------------------------------------------------------
96  // Initialize the poller
97  //----------------------------------------------------------------------------
99  {
100  return true;
101  }
102 
103  //----------------------------------------------------------------------------
104  // Finalize the poller
105  //----------------------------------------------------------------------------
107  {
108  //--------------------------------------------------------------------------
109  // Clean up the channels
110  //--------------------------------------------------------------------------
111  SocketMap::iterator it;
112  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
113  {
114  PollerHelper *helper = (PollerHelper*)it->second;
115  if( helper->channel ) helper->channel->Delete();
116  delete helper->callBack;
117  delete helper;
118  }
119  pSocketMap.clear();
120 
121  return true;
122  }
123 
124  //------------------------------------------------------------------------
125  // Start polling
126  //------------------------------------------------------------------------
128  {
129  //--------------------------------------------------------------------------
130  // Start the poller
131  //--------------------------------------------------------------------------
132  using namespace XrdSys;
133 
134  Log *log = DefaultEnv::GetLog();
135  log->Debug( PollerMsg, "Creating and starting the built-in poller..." );
136  XrdSysMutexHelper scopedLock( pMutex );
137  int errNum = 0;
138  const char *errMsg = 0;
139 
140  for( int i = 0; i < pNbPoller; ++i )
141  {
142  XrdSys::IOEvents::Poller* poller = IOEvents::Poller::Create( errNum, &errMsg );
143  if( !poller )
144  {
145  log->Error( PollerMsg, "Unable to create the internal poller object: "
146  "%s (%s)", XrdSysE2T( errno ), errMsg );
147  return false;
148  }
149  pPollerPool.push_back( poller );
150  }
151 
152  pNext = pPollerPool.begin();
153 
154  log->Debug( PollerMsg, "Using %d poller threads", pNbPoller );
155 
156  //--------------------------------------------------------------------------
157  // Check if we have any descriptors to reinsert from the last time we
158  // were started
159  //--------------------------------------------------------------------------
160  SocketMap::iterator it;
161  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
162  {
163  PollerHelper *helper = (PollerHelper*)it->second;
164  Socket *socket = it->first;
165  helper->channel = new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
166  helper->callBack );
167  if( helper->readEnabled )
168  {
169  bool status = helper->channel->Enable( IOEvents::Channel::readEvents,
170  helper->readTimeout, &errMsg );
171  if( !status )
172  {
173  log->Error( PollerMsg, "Unable to enable read notifications "
174  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
175 
176  return false;
177  }
178  }
179 
180  if( helper->writeEnabled )
181  {
182  bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
183  helper->writeTimeout, &errMsg );
184  if( !status )
185  {
186  log->Error( PollerMsg, "Unable to enable write notifications "
187  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
188 
189  return false;
190  }
191  }
192  }
193  return true;
194  }
195 
196  //------------------------------------------------------------------------
197  // Stop polling
198  //------------------------------------------------------------------------
200  {
201  using namespace XrdSys::IOEvents;
202 
203  Log *log = DefaultEnv::GetLog();
204  log->Debug( PollerMsg, "Stopping the poller..." );
205 
206  XrdSysMutexHelper scopedLock( pMutex );
207 
208  if( pPollerPool.empty() )
209  {
210  log->Debug( PollerMsg, "Stopping a poller that has not been started" );
211  return true;
212  }
213 
214  while( !pPollerPool.empty() )
215  {
216  XrdSys::IOEvents::Poller *poller = pPollerPool.back();
217  pPollerPool.pop_back();
218 
219  if( !poller ) continue;
220 
221  scopedLock.UnLock();
222  poller->Stop();
223  delete poller;
224  scopedLock.Lock( &pMutex );
225  }
226  pNext = pPollerPool.end();
227  pPollerMap.clear();
228 
229  SocketMap::iterator it;
230  const char *errMsg = 0;
231 
232  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
233  {
234  PollerHelper *helper = (PollerHelper*)it->second;
235  if( !helper->channel ) continue;
236  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
237  if( !status )
238  {
239  Socket *socket = it->first;
240  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
241  socket->GetName().c_str(), errMsg );
242  }
243  helper->channel->Delete();
244  helper->channel = 0;
245  }
246 
247  return true;
248  }
249 
250  //------------------------------------------------------------------------
251  // Add socket to the polling queue
252  //------------------------------------------------------------------------
254  SocketHandler *handler )
255  {
256  Log *log = DefaultEnv::GetLog();
257  XrdSysMutexHelper scopedLock( pMutex );
258 
259  if( !socket )
260  {
261  log->Error( PollerMsg, "Invalid socket, impossible to poll" );
262  return false;
263  }
264 
265  if( socket->GetStatus() != Socket::Connected &&
266  socket->GetStatus() != Socket::Connecting )
267  {
268  log->Error( PollerMsg, "Socket is not in a state valid for polling" );
269  return false;
270  }
271 
272  log->Debug( PollerMsg, "Adding socket %p to the poller", socket );
273 
274  //--------------------------------------------------------------------------
275  // Check if the socket is already registered
276  //--------------------------------------------------------------------------
277  SocketMap::const_iterator it = pSocketMap.find( socket );
278  if( it != pSocketMap.end() )
279  {
280  log->Warning( PollerMsg, "%s Already registered with this poller",
281  socket->GetName().c_str() );
282  return false;
283  }
284 
285  //--------------------------------------------------------------------------
286  // Create the socket helper
287  //--------------------------------------------------------------------------
288  XrdSys::IOEvents::Poller* poller = RegisterAndGetPoller( socket );
289 
290  PollerHelper *helper = new PollerHelper();
291  helper->callBack = new ::SocketCallBack( socket, handler );
292 
293  if( poller )
294  {
295  helper->channel = new XrdSys::IOEvents::Channel( poller,
296  socket->GetFD(),
297  helper->callBack );
298  }
299 
300  handler->Initialize( this );
301  pSocketMap[socket] = helper;
302  return true;
303  }
304 
305  //------------------------------------------------------------------------
306  // Remove the socket
307  //------------------------------------------------------------------------
309  {
310  using namespace XrdSys::IOEvents;
311  Log *log = DefaultEnv::GetLog();
312 
313  //--------------------------------------------------------------------------
314  // Find the right socket
315  //--------------------------------------------------------------------------
316  XrdSysMutexHelper scopedLock( pMutex );
317  SocketMap::iterator it = pSocketMap.find( socket );
318  if( it == pSocketMap.end() )
319  return true;
320 
321  log->Debug( PollerMsg, "%s Removing socket from the poller",
322  socket->GetName().c_str() );
323 
324  // unregister from the poller it's currently associated with
325  UnregisterFromPoller( socket );
326 
327  //--------------------------------------------------------------------------
328  // Remove the socket
329  //--------------------------------------------------------------------------
330  PollerHelper *helper = (PollerHelper*)it->second;
331  pSocketMap.erase( it );
332  scopedLock.UnLock();
333 
334  if( helper->channel )
335  {
336  const char *errMsg;
337  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
338  if( !status )
339  {
340  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
341  socket->GetName().c_str(), errMsg );
342  return false;
343  }
344  helper->channel->Delete();
345  }
346  delete helper->callBack;
347  delete helper;
348  return true;
349  }
350 
351  //----------------------------------------------------------------------------
352  // Notify the handler about read events
353  //----------------------------------------------------------------------------
355  bool notify,
356  uint16_t timeout )
357  {
358  using namespace XrdSys::IOEvents;
359  Log *log = DefaultEnv::GetLog();
360 
361  if( !socket )
362  {
363  log->Error( PollerMsg, "Invalid socket, read events unavailable" );
364  return false;
365  }
366 
367  //--------------------------------------------------------------------------
368  // Check if the socket is registered
369  //--------------------------------------------------------------------------
370  XrdSysMutexHelper scopedLock( pMutex );
371  SocketMap::const_iterator it = pSocketMap.find( socket );
372  if( it == pSocketMap.end() )
373  {
374  log->Warning( PollerMsg, "%s Socket is not registered",
375  socket->GetName().c_str() );
376  return false;
377  }
378 
379  PollerHelper *helper = (PollerHelper*)it->second;
380  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
381 
382  //--------------------------------------------------------------------------
383  // Enable read notifications
384  //--------------------------------------------------------------------------
385  if( notify )
386  {
387  if( helper->readEnabled )
388  return true;
389  helper->readTimeout = timeout;
390 
391  log->Dump( PollerMsg, "%s Enable read notifications, timeout: %d",
392  socket->GetName().c_str(), timeout );
393 
394  if( poller )
395  {
396  const char *errMsg;
397  bool status = helper->channel->Enable( Channel::readEvents, timeout,
398  &errMsg );
399  if( !status )
400  {
401  log->Error( PollerMsg, "%s Unable to enable read notifications: %s",
402  socket->GetName().c_str(), errMsg );
403  return false;
404  }
405  }
406  helper->readEnabled = true;
407  }
408 
409  //--------------------------------------------------------------------------
410  // Disable read notifications
411  //--------------------------------------------------------------------------
412  else
413  {
414  if( !helper->readEnabled )
415  return true;
416 
417  log->Dump( PollerMsg, "%s Disable read notifications",
418  socket->GetName().c_str() );
419 
420  if( poller )
421  {
422  const char *errMsg;
423  bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
424  if( !status )
425  {
426  log->Error( PollerMsg, "%s Unable to disable read notifications: %s",
427  socket->GetName().c_str(), errMsg );
428  return false;
429  }
430  }
431  helper->readEnabled = false;
432  }
433  return true;
434  }
435 
436  //----------------------------------------------------------------------------
437  // Notify the handler about write events
438  //----------------------------------------------------------------------------
440  bool notify,
441  uint16_t timeout )
442  {
443  using namespace XrdSys::IOEvents;
444  Log *log = DefaultEnv::GetLog();
445 
446  if( !socket )
447  {
448  log->Error( PollerMsg, "Invalid socket, write events unavailable" );
449  return false;
450  }
451 
452  //--------------------------------------------------------------------------
453  // Check if the socket is registered
454  //--------------------------------------------------------------------------
455  XrdSysMutexHelper scopedLock( pMutex );
456  SocketMap::const_iterator it = pSocketMap.find( socket );
457  if( it == pSocketMap.end() )
458  {
459  log->Warning( PollerMsg, "%s Socket is not registered",
460  socket->GetName().c_str() );
461  return false;
462  }
463 
464  PollerHelper *helper = (PollerHelper*)it->second;
465  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
466 
467  //--------------------------------------------------------------------------
468  // Enable write notifications
469  //--------------------------------------------------------------------------
470  if( notify )
471  {
472  if( helper->writeEnabled )
473  return true;
474 
475  helper->writeTimeout = timeout;
476 
477  log->Dump( PollerMsg, "%s Enable write notifications, timeout: %d",
478  socket->GetName().c_str(), timeout );
479 
480  if( poller )
481  {
482  const char *errMsg;
483  bool status = helper->channel->Enable( Channel::writeEvents, timeout,
484  &errMsg );
485  if( !status )
486  {
487  log->Error( PollerMsg, "%s Unable to enable write notifications: %s",
488  socket->GetName().c_str(), errMsg );
489  return false;
490  }
491  }
492  helper->writeEnabled = true;
493  }
494 
495  //--------------------------------------------------------------------------
496  // Disable read notifications
497  //--------------------------------------------------------------------------
498  else
499  {
500  if( !helper->writeEnabled )
501  return true;
502 
503  log->Dump( PollerMsg, "%s Disable write notifications",
504  socket->GetName().c_str() );
505  if( poller )
506  {
507  const char *errMsg;
508  bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
509  if( !status )
510  {
511  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
512  socket->GetName().c_str(), errMsg );
513  return false;
514  }
515  }
516  helper->writeEnabled = false;
517  }
518  return true;
519  }
520 
521  //----------------------------------------------------------------------------
522  // Check whether the socket is registered with the poller
523  //----------------------------------------------------------------------------
525  {
526  XrdSysMutexHelper scopedLock( pMutex );
527  SocketMap::iterator it = pSocketMap.find( socket );
528  return it != pSocketMap.end();
529  }
530 
531  //----------------------------------------------------------------------------
532  // Return poller threads in round-robin fashion
533  //----------------------------------------------------------------------------
534  XrdSys::IOEvents::Poller* PollerBuiltIn::GetNextPoller()
535  {
536  if( pPollerPool.empty() ) return 0;
537 
538  PollerPool::iterator ret = pNext;
539  ++pNext;
540  if( pNext == pPollerPool.end() )
541  pNext = pPollerPool.begin();
542  return *ret;
543  }
544 
545  //----------------------------------------------------------------------------
546  // Return the poller associated with the respective channel
547  //----------------------------------------------------------------------------
548  XrdSys::IOEvents::Poller* PollerBuiltIn::RegisterAndGetPoller(const Socket * socket)
549  {
550  PollerMap::iterator itr = pPollerMap.find( socket->GetChannelID() );
551  if( itr == pPollerMap.end() )
552  {
553  XrdSys::IOEvents::Poller* poller = GetNextPoller();
554  if( poller )
555  pPollerMap[socket->GetChannelID()] = std::make_pair( poller, size_t( 1 ) );
556  return poller;
557  }
558 
559  ++( itr->second.second );
560  return itr->second.first;
561  }
562 
563  void PollerBuiltIn::UnregisterFromPoller( const Socket *socket )
564  {
565  PollerMap::iterator itr = pPollerMap.find( socket->GetChannelID() );
566  if( itr == pPollerMap.end() ) return;
567  --itr->second.second;
568  if( itr->second.second == 0 )
569  pPollerMap.erase( itr );
570 
571  }
572 
573  XrdSys::IOEvents::Poller* PollerBuiltIn::GetPoller(const Socket * socket)
574  {
575  PollerMap::iterator itr = pPollerMap.find( socket->GetChannelID() );
576  if( itr == pPollerMap.end() ) return 0;
577  return itr->second.first;
578  }
579 
580  //----------------------------------------------------------------------------
581  // Get the initial value for pNbPoller
582  //----------------------------------------------------------------------------
583  int PollerBuiltIn::GetNbPollerInit()
584  {
585  Env * env = DefaultEnv::GetEnv();
587  env->GetInt("ParallelEvtLoop", ret);
588  return ret;
589  }
590 }
#define unlikely(x)
bool Create
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition: XrdClLog.hh:258
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
virtual bool EnableWriteNotification(Socket *socket, bool notify, uint16_t timeout=60)
virtual bool AddSocket(Socket *socket, SocketHandler *handler)
virtual bool RemoveSocket(Socket *socket)
Remove the socket.
virtual bool EnableReadNotification(Socket *socket, bool notify, uint16_t timeout=60)
virtual bool Stop()
Stop polling.
virtual bool IsRegistered(Socket *socket)
Check whether the socket is registered with the poller.
virtual bool Finalize()
Finalize the poller.
virtual bool Initialize()
Initialize the poller.
virtual bool Start()
Start polling.
virtual void Initialize(Poller *)
Initializer.
Definition: XrdClPoller.hh:55
A network socket.
Definition: XrdClSocket.hh:43
std::string GetName() const
Get the string representation of the socket.
Definition: XrdClSocket.cc:672
const AnyObject * GetChannelID() const
Definition: XrdClSocket.hh:255
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
int GetFD()
Get the file descriptor.
Definition: XrdClSocket.hh:214
SocketStatus GetStatus() const
Get the socket status.
Definition: XrdClSocket.hh:125
void Lock(XrdSysMutex *Mutex)
virtual bool Event(Channel *chP, void *cbArg, int evFlags)=0
bool Enable(int events, int timeout=0, const char **eText=0)
const uint64_t PollerMsg
const int DefaultParallelEvtLoop