XRootD
XrdCl::PollerBuiltIn Class Reference

A poller implementation using the build-in XRootD poller. More...

#include <XrdClPollerBuiltIn.hh>

+ Inheritance diagram for XrdCl::PollerBuiltIn:
+ Collaboration diagram for XrdCl::PollerBuiltIn:

Public Member Functions

 PollerBuiltIn ()
 Constructor. More...
 
 ~PollerBuiltIn ()
 
virtual bool AddSocket (Socket *socket, SocketHandler *handler)
 
virtual bool EnableReadNotification (Socket *socket, bool notify, uint16_t timeout=60)
 
virtual bool EnableWriteNotification (Socket *socket, bool notify, uint16_t timeout=60)
 
virtual bool Finalize ()
 Finalize the poller. More...
 
virtual bool Initialize ()
 Initialize the poller. More...
 
virtual bool IsRegistered (Socket *socket)
 Check whether the socket is registered with the poller. More...
 
virtual bool IsRunning () const
 Is the event loop running? More...
 
virtual bool RemoveSocket (Socket *socket)
 Remove the socket. More...
 
virtual bool Start ()
 Start polling. More...
 
virtual bool Stop ()
 Stop polling. More...
 
- Public Member Functions inherited from XrdCl::Poller
virtual ~Poller ()
 Destructor. More...
 

Detailed Description

A poller implementation using the build-in XRootD poller.

Definition at line 40 of file XrdClPollerBuiltIn.hh.

Constructor & Destructor Documentation

◆ PollerBuiltIn()

XrdCl::PollerBuiltIn::PollerBuiltIn ( )
inline

Constructor.

Definition at line 46 of file XrdClPollerBuiltIn.hh.

46 : pNbPoller( GetNbPollerInit() ){}

◆ ~PollerBuiltIn()

XrdCl::PollerBuiltIn::~PollerBuiltIn ( )
inline

Definition at line 48 of file XrdClPollerBuiltIn.hh.

48 {}

Member Function Documentation

◆ AddSocket()

bool XrdCl::PollerBuiltIn::AddSocket ( Socket socket,
SocketHandler handler 
)
virtual

Add socket to the polling loop

Parameters
socketthe socket
handlerobject handling the events

Implements XrdCl::Poller.

Definition at line 253 of file XrdClPollerBuiltIn.cc.

255  {
256  Log *log = DefaultEnv::GetLog();
257  XrdSysMutexHelper scopedLock( pMutex );
258 
259  if( !socket )
260  {
261  log->Error( PollerMsg, "Invalid socket, impossible to poll" );
262  return false;
263  }
264 
265  if( socket->GetStatus() != Socket::Connected &&
266  socket->GetStatus() != Socket::Connecting )
267  {
268  log->Error( PollerMsg, "Socket is not in a state valid for polling" );
269  return false;
270  }
271 
272  log->Debug( PollerMsg, "Adding socket %p to the poller", socket );
273 
274  //--------------------------------------------------------------------------
275  // Check if the socket is already registered
276  //--------------------------------------------------------------------------
277  SocketMap::const_iterator it = pSocketMap.find( socket );
278  if( it != pSocketMap.end() )
279  {
280  log->Warning( PollerMsg, "%s Already registered with this poller",
281  socket->GetName().c_str() );
282  return false;
283  }
284 
285  //--------------------------------------------------------------------------
286  // Create the socket helper
287  //--------------------------------------------------------------------------
288  XrdSys::IOEvents::Poller* poller = RegisterAndGetPoller( socket );
289 
290  PollerHelper *helper = new PollerHelper();
291  helper->callBack = new ::SocketCallBack( socket, handler );
292 
293  if( poller )
294  {
295  helper->channel = new XrdSys::IOEvents::Channel( poller,
296  socket->GetFD(),
297  helper->callBack );
298  }
299 
300  handler->Initialize( this );
301  pSocketMap[socket] = helper;
302  return true;
303  }
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
virtual void Initialize(Poller *)
Initializer.
Definition: XrdClPoller.hh:55
std::string GetName() const
Get the string representation of the socket.
Definition: XrdClSocket.cc:672
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
int GetFD()
Get the file descriptor.
Definition: XrdClSocket.hh:214
SocketStatus GetStatus() const
Get the socket status.
Definition: XrdClSocket.hh:125
const uint64_t PollerMsg

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::Socket::GetFD(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::Socket::GetStatus(), XrdCl::SocketHandler::Initialize(), XrdCl::PollerMsg, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ EnableReadNotification()

bool XrdCl::PollerBuiltIn::EnableReadNotification ( Socket socket,
bool  notify,
uint16_t  timeout = 60 
)
virtual

Notify the handler about read events

Parameters
socketthe socket
notifyspecify if the handler should be notified
timeoutif no read event occurred after this time a timeout event will be generated

Implements XrdCl::Poller.

Definition at line 354 of file XrdClPollerBuiltIn.cc.

357  {
358  using namespace XrdSys::IOEvents;
359  Log *log = DefaultEnv::GetLog();
360 
361  if( !socket )
362  {
363  log->Error( PollerMsg, "Invalid socket, read events unavailable" );
364  return false;
365  }
366 
367  //--------------------------------------------------------------------------
368  // Check if the socket is registered
369  //--------------------------------------------------------------------------
370  XrdSysMutexHelper scopedLock( pMutex );
371  SocketMap::const_iterator it = pSocketMap.find( socket );
372  if( it == pSocketMap.end() )
373  {
374  log->Warning( PollerMsg, "%s Socket is not registered",
375  socket->GetName().c_str() );
376  return false;
377  }
378 
379  PollerHelper *helper = (PollerHelper*)it->second;
380  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
381 
382  //--------------------------------------------------------------------------
383  // Enable read notifications
384  //--------------------------------------------------------------------------
385  if( notify )
386  {
387  if( helper->readEnabled )
388  return true;
389  helper->readTimeout = timeout;
390 
391  log->Dump( PollerMsg, "%s Enable read notifications, timeout: %d",
392  socket->GetName().c_str(), timeout );
393 
394  if( poller )
395  {
396  const char *errMsg;
397  bool status = helper->channel->Enable( Channel::readEvents, timeout,
398  &errMsg );
399  if( !status )
400  {
401  log->Error( PollerMsg, "%s Unable to enable read notifications: %s",
402  socket->GetName().c_str(), errMsg );
403  return false;
404  }
405  }
406  helper->readEnabled = true;
407  }
408 
409  //--------------------------------------------------------------------------
410  // Disable read notifications
411  //--------------------------------------------------------------------------
412  else
413  {
414  if( !helper->readEnabled )
415  return true;
416 
417  log->Dump( PollerMsg, "%s Disable read notifications",
418  socket->GetName().c_str() );
419 
420  if( poller )
421  {
422  const char *errMsg;
423  bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
424  if( !status )
425  {
426  log->Error( PollerMsg, "%s Unable to disable read notifications: %s",
427  socket->GetName().c_str(), errMsg );
428  return false;
429  }
430  }
431  helper->readEnabled = false;
432  }
433  return true;
434  }
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299

References XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::PollerMsg, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ EnableWriteNotification()

bool XrdCl::PollerBuiltIn::EnableWriteNotification ( Socket socket,
bool  notify,
uint16_t  timeout = 60 
)
virtual

Notify the handler about write events

Parameters
socketthe socket
notifyspecify if the handler should be notified
timeoutif no write event occurred after this time a timeout event will be generated

Implements XrdCl::Poller.

Definition at line 439 of file XrdClPollerBuiltIn.cc.

442  {
443  using namespace XrdSys::IOEvents;
444  Log *log = DefaultEnv::GetLog();
445 
446  if( !socket )
447  {
448  log->Error( PollerMsg, "Invalid socket, write events unavailable" );
449  return false;
450  }
451 
452  //--------------------------------------------------------------------------
453  // Check if the socket is registered
454  //--------------------------------------------------------------------------
455  XrdSysMutexHelper scopedLock( pMutex );
456  SocketMap::const_iterator it = pSocketMap.find( socket );
457  if( it == pSocketMap.end() )
458  {
459  log->Warning( PollerMsg, "%s Socket is not registered",
460  socket->GetName().c_str() );
461  return false;
462  }
463 
464  PollerHelper *helper = (PollerHelper*)it->second;
465  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
466 
467  //--------------------------------------------------------------------------
468  // Enable write notifications
469  //--------------------------------------------------------------------------
470  if( notify )
471  {
472  if( helper->writeEnabled )
473  return true;
474 
475  helper->writeTimeout = timeout;
476 
477  log->Dump( PollerMsg, "%s Enable write notifications, timeout: %d",
478  socket->GetName().c_str(), timeout );
479 
480  if( poller )
481  {
482  const char *errMsg;
483  bool status = helper->channel->Enable( Channel::writeEvents, timeout,
484  &errMsg );
485  if( !status )
486  {
487  log->Error( PollerMsg, "%s Unable to enable write notifications: %s",
488  socket->GetName().c_str(), errMsg );
489  return false;
490  }
491  }
492  helper->writeEnabled = true;
493  }
494 
495  //--------------------------------------------------------------------------
496  // Disable read notifications
497  //--------------------------------------------------------------------------
498  else
499  {
500  if( !helper->writeEnabled )
501  return true;
502 
503  log->Dump( PollerMsg, "%s Disable write notifications",
504  socket->GetName().c_str() );
505  if( poller )
506  {
507  const char *errMsg;
508  bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
509  if( !status )
510  {
511  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
512  socket->GetName().c_str(), errMsg );
513  return false;
514  }
515  }
516  helper->writeEnabled = false;
517  }
518  return true;
519  }

References XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::PollerMsg, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ Finalize()

bool XrdCl::PollerBuiltIn::Finalize ( )
virtual

Finalize the poller.

Implements XrdCl::Poller.

Definition at line 106 of file XrdClPollerBuiltIn.cc.

107  {
108  //--------------------------------------------------------------------------
109  // Clean up the channels
110  //--------------------------------------------------------------------------
111  SocketMap::iterator it;
112  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
113  {
114  PollerHelper *helper = (PollerHelper*)it->second;
115  if( helper->channel ) helper->channel->Delete();
116  delete helper->callBack;
117  delete helper;
118  }
119  pSocketMap.clear();
120 
121  return true;
122  }

◆ Initialize()

bool XrdCl::PollerBuiltIn::Initialize ( )
virtual

Initialize the poller.

Implements XrdCl::Poller.

Definition at line 98 of file XrdClPollerBuiltIn.cc.

99  {
100  return true;
101  }

◆ IsRegistered()

bool XrdCl::PollerBuiltIn::IsRegistered ( Socket socket)
virtual

Check whether the socket is registered with the poller.

Implements XrdCl::Poller.

Definition at line 524 of file XrdClPollerBuiltIn.cc.

525  {
526  XrdSysMutexHelper scopedLock( pMutex );
527  SocketMap::iterator it = pSocketMap.find( socket );
528  return it != pSocketMap.end();
529  }

◆ IsRunning()

virtual bool XrdCl::PollerBuiltIn::IsRunning ( ) const
inlinevirtual

Is the event loop running?

Implements XrdCl::Poller.

Definition at line 117 of file XrdClPollerBuiltIn.hh.

118  {
119  return !pPollerPool.empty();
120  }

◆ RemoveSocket()

bool XrdCl::PollerBuiltIn::RemoveSocket ( Socket socket)
virtual

Remove the socket.

Implements XrdCl::Poller.

Definition at line 308 of file XrdClPollerBuiltIn.cc.

309  {
310  using namespace XrdSys::IOEvents;
311  Log *log = DefaultEnv::GetLog();
312 
313  //--------------------------------------------------------------------------
314  // Find the right socket
315  //--------------------------------------------------------------------------
316  XrdSysMutexHelper scopedLock( pMutex );
317  SocketMap::iterator it = pSocketMap.find( socket );
318  if( it == pSocketMap.end() )
319  return true;
320 
321  log->Debug( PollerMsg, "%s Removing socket from the poller",
322  socket->GetName().c_str() );
323 
324  // unregister from the poller it's currently associated with
325  UnregisterFromPoller( socket );
326 
327  //--------------------------------------------------------------------------
328  // Remove the socket
329  //--------------------------------------------------------------------------
330  PollerHelper *helper = (PollerHelper*)it->second;
331  pSocketMap.erase( it );
332  scopedLock.UnLock();
333 
334  if( helper->channel )
335  {
336  const char *errMsg;
337  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
338  if( !status )
339  {
340  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
341  socket->GetName().c_str(), errMsg );
342  return false;
343  }
344  helper->channel->Delete();
345  }
346  delete helper->callBack;
347  delete helper;
348  return true;
349  }

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::PollerMsg, and XrdSysMutexHelper::UnLock().

+ Here is the call graph for this function:

◆ Start()

bool XrdCl::PollerBuiltIn::Start ( )
virtual

Start polling.

Implements XrdCl::Poller.

Definition at line 127 of file XrdClPollerBuiltIn.cc.

128  {
129  //--------------------------------------------------------------------------
130  // Start the poller
131  //--------------------------------------------------------------------------
132  using namespace XrdSys;
133 
134  Log *log = DefaultEnv::GetLog();
135  log->Debug( PollerMsg, "Creating and starting the built-in poller..." );
136  XrdSysMutexHelper scopedLock( pMutex );
137  int errNum = 0;
138  const char *errMsg = 0;
139 
140  for( int i = 0; i < pNbPoller; ++i )
141  {
142  XrdSys::IOEvents::Poller* poller = IOEvents::Poller::Create( errNum, &errMsg );
143  if( !poller )
144  {
145  log->Error( PollerMsg, "Unable to create the internal poller object: "
146  "%s (%s)", XrdSysE2T( errno ), errMsg );
147  return false;
148  }
149  pPollerPool.push_back( poller );
150  }
151 
152  pNext = pPollerPool.begin();
153 
154  log->Debug( PollerMsg, "Using %d poller threads", pNbPoller );
155 
156  //--------------------------------------------------------------------------
157  // Check if we have any descriptors to reinsert from the last time we
158  // were started
159  //--------------------------------------------------------------------------
160  SocketMap::iterator it;
161  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
162  {
163  PollerHelper *helper = (PollerHelper*)it->second;
164  Socket *socket = it->first;
165  helper->channel = new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
166  helper->callBack );
167  if( helper->readEnabled )
168  {
169  bool status = helper->channel->Enable( IOEvents::Channel::readEvents,
170  helper->readTimeout, &errMsg );
171  if( !status )
172  {
173  log->Error( PollerMsg, "Unable to enable read notifications "
174  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
175 
176  return false;
177  }
178  }
179 
180  if( helper->writeEnabled )
181  {
182  bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
183  helper->writeTimeout, &errMsg );
184  if( !status )
185  {
186  log->Error( PollerMsg, "Unable to enable write notifications "
187  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
188 
189  return false;
190  }
191  }
192  }
193  return true;
194  }
bool Create
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
A network socket.
Definition: XrdClSocket.hh:43
bool Enable(int events, int timeout=0, const char **eText=0)

References Create, XrdCl::Log::Debug(), XrdSys::IOEvents::Channel::Enable(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::PollerMsg, and XrdSysE2T().

+ Here is the call graph for this function:

◆ Stop()

bool XrdCl::PollerBuiltIn::Stop ( )
virtual

Stop polling.

Implements XrdCl::Poller.

Definition at line 199 of file XrdClPollerBuiltIn.cc.

200  {
201  using namespace XrdSys::IOEvents;
202 
203  Log *log = DefaultEnv::GetLog();
204  log->Debug( PollerMsg, "Stopping the poller..." );
205 
206  XrdSysMutexHelper scopedLock( pMutex );
207 
208  if( pPollerPool.empty() )
209  {
210  log->Debug( PollerMsg, "Stopping a poller that has not been started" );
211  return true;
212  }
213 
214  while( !pPollerPool.empty() )
215  {
216  XrdSys::IOEvents::Poller *poller = pPollerPool.back();
217  pPollerPool.pop_back();
218 
219  if( !poller ) continue;
220 
221  scopedLock.UnLock();
222  poller->Stop();
223  delete poller;
224  scopedLock.Lock( &pMutex );
225  }
226  pNext = pPollerPool.end();
227  pPollerMap.clear();
228 
229  SocketMap::iterator it;
230  const char *errMsg = 0;
231 
232  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
233  {
234  PollerHelper *helper = (PollerHelper*)it->second;
235  if( !helper->channel ) continue;
236  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
237  if( !status )
238  {
239  Socket *socket = it->first;
240  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
241  socket->GetName().c_str(), errMsg );
242  }
243  helper->channel->Delete();
244  helper->channel = 0;
245  }
246 
247  return true;
248  }

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdSysMutexHelper::Lock(), XrdCl::PollerMsg, XrdSys::IOEvents::Poller::Stop(), and XrdSysMutexHelper::UnLock().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: