64 env->
GetInt(
"WorkerThreads", workerThreads );
115 env->
GetString(
"PollerPreference", pollerPref );
119 if( !pImpl->pPoller )
122 bool st = pImpl->pPoller->Initialize();
126 delete pImpl->pPoller;
130 pImpl->pJobManager->Initialize();
131 pImpl->pInitialized =
true;
143 if( !pImpl->pInitialized )
146 pImpl->pInitialized =
false;
147 pImpl->pJobManager->Finalize();
148 PostMasterImpl::ChannelMap::iterator it;
150 for( it = pImpl->pChannelMap.begin(); it != pImpl->pChannelMap.end(); ++it )
153 pImpl->pChannelMap.clear();
154 return pImpl->pPoller->Finalize();
162 if( !pImpl->pInitialized )
165 if( !pImpl->pPoller->Start() )
168 if( !pImpl->pTaskManager->Start() )
170 pImpl->pPoller->Stop();
174 if( !pImpl->pJobManager->Start() )
176 pImpl->pPoller->Stop();
177 pImpl->pTaskManager->Stop();
181 pImpl->pRunning =
true;
190 if( !pImpl->pInitialized || !pImpl->pRunning )
193 if( !pImpl->pJobManager->Stop() )
195 if( !pImpl->pPoller->Stop() )
197 if( !pImpl->pTaskManager->Stop() )
199 pImpl->pRunning =
false;
221 Channel *channel = GetChannel( url );
226 return channel->
Send( msg, handler, stateful, expires );
251 PostMasterImpl::ChannelMap::iterator it =
253 if( it == pImpl->pChannelMap.end() )
255 channel = it->second;
271 Channel *channel = GetChannel( url );
287 Channel *channel = GetChannel( url );
301 return pImpl->pTaskManager;
309 return pImpl->pJobManager;
326 PostMasterImpl::ChannelMap::iterator it =
329 if( it == pImpl->pChannelMap.end() )
332 it->second->ForceDisconnect( hush );
334 pImpl->pChannelMap.erase( it );
342 PostMasterImpl::ChannelMap::iterator it =
345 if( it == pImpl->pChannelMap.end() )
348 it->second->ForceReconnect();
358 Channel *channel = GetChannel( url );
359 if( !channel )
return 0;
367 std::shared_ptr<Job> onConnJob )
370 Channel *channel = GetChannel( url );
371 if( !channel )
return;
381 pImpl->pOnConnJob = std::move( onConnJob );
390 pImpl->pOnConnErrCB = std::move( handler );
399 if( pImpl->pOnConnJob )
401 URL *ptr =
new URL( url );
402 pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
412 if( pImpl->pOnConnErrCB )
415 pImpl->pJobManager->QueueJob( job,
nullptr );
429 PostMasterImpl::ChannelMap::iterator it =
432 if( it != pImpl->pChannelMap.end() )
433 passive = it->second;
463 pImpl->pTaskManager, pImpl->pJobManager, url );
477 Channel *channel = GetChannel( url );
479 if( !channel )
return;
489 return pImpl->pRunning;
495 Channel *PostMaster::GetChannel(
const URL &url )
499 PostMasterImpl::ChannelMap::iterator it = pImpl->pChannelMap.find( url.
GetChannelId() );
501 if( it == pImpl->pChannelMap.end() )
514 channel =
new Channel( url, pImpl->pPoller, trHandler, pImpl->pTaskManager,
515 pImpl->pJobManager );
519 channel = it->second;
A communication channel between the client and the server.
uint16_t NbConnectedStrm()
Get the number of connected data streams.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void DecFileInstCnt()
Decrement file object instance count bound to this channel.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
bool CanCollapse(const URL &url)
Status QueryTransport(uint16_t query, AnyObject &result)
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
Interface for a job to be run by the job manager.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Info(uint64_t topic, const char *format,...)
Print an info.
The message representation used throughout the system.
static Poller * CreatePoller(const std::string &preference)
Interface for socket pollers.
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
bool Start()
Start the post master.
bool Finalize()
Finalizer.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status ForceReconnect(const URL &url)
Reconnect the channel.
bool Stop()
Stop the postmaster.
bool Reinitialize()
Reinitialize after fork.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
uint16_t NbConnectedStrm(const URL &url)
Get the number of connected data streams.
void SetOnConnectHandler(std::unique_ptr< Job > onConnJob)
Set the global connection error handler.
Status RemoveEventHandler(const URL &url, ChannelEventHandler *handler)
Remove a channel event handler.
virtual ~PostMaster()
Destructor.
void SetConnectionErrorHandler(std::function< void(const URL &, const XRootDStatus &)> handler)
Set the global on-error on-connect handler for control streams.
Status ForceDisconnect(const URL &url)
Shut down a channel.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
Status RegisterEventHandler(const URL &url, ChannelEventHandler *handler)
Register channel event handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
JobManager * GetJobManager()
Get the job manager object user by the post master.
bool Initialize()
Initializer.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Perform the handshake and the authentication for each physical stream.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
std::string GetChannelId() const
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
const std::string & GetProtocol() const
Get the protocol.
An interface for metadata redirectors.
virtual XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)=0
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const uint16_t errInvalidOp
const char *const DefaultPollerPreference
const uint16_t errNotSupported
const int DefaultWorkerThreads
void Run(void *arg)
The job logic.
ConnErrJob(const URL &url, const XRootDStatus &status, std::function< void(const URL &, const XRootDStatus &)> handler)
std::function< void(const URL &, const XRootDStatus &)> handler
TaskManager * pTaskManager
std::map< std::string, Channel * > ChannelMap
std::unique_ptr< Job > pOnConnJob
XrdSysRWLock pDisconnectLock
XrdSysMutex pChannelMapMutex
std::function< void(const URL &, const XRootDStatus &)> pOnConnErrCB
Procedure execution status.