42 channel(0), callBack(0), readEnabled(false), writeEnabled(false),
43 readTimeout(0), writeTimeout(0)
50 uint16_t writeTimeout;
60 pSocket( sock ), pHandler( sh ) {}
61 virtual ~SocketCallBack() {};
67 using namespace XrdCl;
70 if( evFlags & ReadyToRead ) ev |= SocketHandler::ReadyToRead;
71 if( evFlags & ReadTimeOut ) ev |= SocketHandler::ReadTimeOut;
72 if( evFlags & ReadyToWrite ) ev |= SocketHandler::ReadyToWrite;
73 if( evFlags & WriteTimeOut ) ev |= SocketHandler::WriteTimeOut;
75 Log *log = DefaultEnv::GetLog();
79 pSocket->GetName().c_str(),
80 SocketHandler::EventTypeToString( ev ).c_str() );
83 pHandler->Event( ev, pSocket );
111 SocketMap::iterator it;
112 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
114 PollerHelper *helper = (PollerHelper*)it->second;
115 if( helper->channel ) helper->channel->Delete();
116 delete helper->callBack;
135 log->
Debug(
PollerMsg,
"Creating and starting the built-in poller..." );
138 const char *errMsg = 0;
140 for(
int i = 0; i < pNbPoller; ++i )
145 log->
Error(
PollerMsg,
"Unable to create the internal poller object: "
149 pPollerPool.push_back( poller );
152 pNext = pPollerPool.begin();
160 SocketMap::iterator it;
161 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
163 PollerHelper *helper = (PollerHelper*)it->second;
164 Socket *socket = it->first;
165 helper->channel =
new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
167 if( helper->readEnabled )
169 bool status = helper->channel->
Enable( IOEvents::Channel::readEvents,
170 helper->readTimeout, &errMsg );
174 "while re-starting %s (%s)",
XrdSysE2T( errno ), errMsg );
180 if( helper->writeEnabled )
182 bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
183 helper->writeTimeout, &errMsg );
187 "while re-starting %s (%s)",
XrdSysE2T( errno ), errMsg );
208 if( pPollerPool.empty() )
210 log->
Debug(
PollerMsg,
"Stopping a poller that has not been started" );
214 while( !pPollerPool.empty() )
217 pPollerPool.pop_back();
219 if( !poller )
continue;
224 scopedLock.
Lock( &pMutex );
226 pNext = pPollerPool.end();
229 SocketMap::iterator it;
230 const char *errMsg = 0;
232 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
234 PollerHelper *helper = (PollerHelper*)it->second;
235 if( !helper->channel )
continue;
236 bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
239 Socket *socket = it->first;
240 log->
Error(
PollerMsg,
"%s Unable to disable write notifications: %s",
241 socket->
GetName().c_str(), errMsg );
243 helper->channel->Delete();
268 log->
Error(
PollerMsg,
"Socket is not in a state valid for polling" );
272 log->
Debug(
PollerMsg,
"Adding socket %p to the poller", socket );
277 SocketMap::const_iterator it = pSocketMap.find( socket );
278 if( it != pSocketMap.end() )
290 PollerHelper *helper =
new PollerHelper();
291 helper->callBack = new ::SocketCallBack( socket, handler );
301 pSocketMap[socket] = helper;
317 SocketMap::iterator it = pSocketMap.find( socket );
318 if( it == pSocketMap.end() )
325 UnregisterFromPoller( socket );
330 PollerHelper *helper = (PollerHelper*)it->second;
331 pSocketMap.erase( it );
334 if( helper->channel )
337 bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
340 log->
Error(
PollerMsg,
"%s Unable to disable write notifications: %s",
341 socket->
GetName().c_str(), errMsg );
344 helper->channel->Delete();
346 delete helper->callBack;
363 log->
Error(
PollerMsg,
"Invalid socket, read events unavailable" );
371 SocketMap::const_iterator it = pSocketMap.find( socket );
372 if( it == pSocketMap.end() )
379 PollerHelper *helper = (PollerHelper*)it->second;
387 if( helper->readEnabled )
389 helper->readTimeout = timeout;
391 log->
Dump(
PollerMsg,
"%s Enable read notifications, timeout: %d",
392 socket->
GetName().c_str(), timeout );
397 bool status = helper->channel->Enable( Channel::readEvents, timeout,
401 log->
Error(
PollerMsg,
"%s Unable to enable read notifications: %s",
402 socket->
GetName().c_str(), errMsg );
406 helper->readEnabled =
true;
414 if( !helper->readEnabled )
423 bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
426 log->
Error(
PollerMsg,
"%s Unable to disable read notifications: %s",
427 socket->
GetName().c_str(), errMsg );
431 helper->readEnabled =
false;
448 log->
Error(
PollerMsg,
"Invalid socket, write events unavailable" );
456 SocketMap::const_iterator it = pSocketMap.find( socket );
457 if( it == pSocketMap.end() )
464 PollerHelper *helper = (PollerHelper*)it->second;
472 if( helper->writeEnabled )
475 helper->writeTimeout = timeout;
477 log->
Dump(
PollerMsg,
"%s Enable write notifications, timeout: %d",
478 socket->
GetName().c_str(), timeout );
483 bool status = helper->channel->Enable( Channel::writeEvents, timeout,
487 log->
Error(
PollerMsg,
"%s Unable to enable write notifications: %s",
488 socket->
GetName().c_str(), errMsg );
492 helper->writeEnabled =
true;
500 if( !helper->writeEnabled )
508 bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
511 log->
Error(
PollerMsg,
"%s Unable to disable write notifications: %s",
512 socket->
GetName().c_str(), errMsg );
516 helper->writeEnabled =
false;
527 SocketMap::iterator it = pSocketMap.find( socket );
528 return it != pSocketMap.end();
536 if( pPollerPool.empty() )
return 0;
538 PollerPool::iterator ret = pNext;
540 if( pNext == pPollerPool.end() )
541 pNext = pPollerPool.begin();
550 PollerMap::iterator itr = pPollerMap.find( socket->
GetChannelID() );
551 if( itr == pPollerMap.end() )
555 pPollerMap[socket->
GetChannelID()] = std::make_pair( poller,
size_t( 1 ) );
559 ++( itr->second.second );
560 return itr->second.first;
563 void PollerBuiltIn::UnregisterFromPoller(
const Socket *socket )
565 PollerMap::iterator itr = pPollerMap.find( socket->
GetChannelID() );
566 if( itr == pPollerMap.end() )
return;
567 --itr->second.second;
568 if( itr->second.second == 0 )
569 pPollerMap.erase( itr );
575 PollerMap::iterator itr = pPollerMap.find( socket->
GetChannelID() );
576 if( itr == pPollerMap.end() )
return 0;
577 return itr->second.first;
583 int PollerBuiltIn::GetNbPollerInit()
587 env->
GetInt(
"ParallelEvtLoop", ret);
const char * XrdSysE2T(int errcode)
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
void Error(uint64_t topic, const char *format,...)
Report an error.
LogLevel GetLevel() const
Get the log level.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
virtual bool EnableWriteNotification(Socket *socket, bool notify, uint16_t timeout=60)
virtual bool AddSocket(Socket *socket, SocketHandler *handler)
virtual bool RemoveSocket(Socket *socket)
Remove the socket.
virtual bool EnableReadNotification(Socket *socket, bool notify, uint16_t timeout=60)
virtual bool Stop()
Stop polling.
virtual bool IsRegistered(Socket *socket)
Check whether the socket is registered with the poller.
virtual bool Finalize()
Finalize the poller.
virtual bool Initialize()
Initialize the poller.
virtual bool Start()
Start polling.
virtual void Initialize(Poller *)
Initializer.
std::string GetName() const
Get the string representation of the socket.
const AnyObject * GetChannelID() const
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
int GetFD()
Get the file descriptor.
SocketStatus GetStatus() const
Get the socket status.
void Lock(XrdSysMutex *Mutex)
virtual bool Event(Channel *chP, void *cbArg, int evFlags)=0
bool Enable(int events, int timeout=0, const char **eText=0)
const int DefaultParallelEvtLoop