XRootD
XrdClPostMaster.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #include "XrdCl/XrdClPostMaster.hh"
22 #include "XrdCl/XrdClMessage.hh"
23 #include "XrdCl/XrdClConstants.hh"
24 #include "XrdCl/XrdClDefaultEnv.hh"
25 #include "XrdCl/XrdClPoller.hh"
27 #include "XrdCl/XrdClJobManager.hh"
29 #include "XrdCl/XrdClChannel.hh"
30 #include "XrdCl/XrdClConstants.hh"
31 #include "XrdCl/XrdClLog.hh"
33 
34 #include "XrdSys/XrdSysPthread.hh"
35 
36 namespace XrdCl
37 {
38  struct ConnErrJob : public Job
39  {
41  std::function<void( const URL&, const XRootDStatus& )> handler) : url( url ),
42  status( status ),
43  handler( handler )
44  {
45  }
46 
47  void Run( void *arg )
48  {
49  handler( url, status );
50  delete this;
51  }
52 
55  std::function<void( const URL&, const XRootDStatus& )> handler;
56  };
57 
59  {
60  PostMasterImpl() : pPoller( 0 ), pInitialized( false ), pRunning( false )
61  {
62  Env *env = DefaultEnv::GetEnv();
63  int workerThreads = DefaultWorkerThreads;
64  env->GetInt( "WorkerThreads", workerThreads );
65 
66  pTaskManager = new TaskManager();
67  pJobManager = new JobManager(workerThreads);
68  }
69 
71  {
72  delete pPoller;
73  delete pTaskManager;
74  delete pJobManager;
75  }
76 
77  typedef std::map<std::string, Channel*> ChannelMap;
78 
84  bool pRunning;
86 
88  std::unique_ptr<Job> pOnConnJob;
89  std::function<void( const URL&, const XRootDStatus& )> pOnConnErrCB;
90 
92  };
93 
94  //----------------------------------------------------------------------------
95  // Constructor
96  //----------------------------------------------------------------------------
98  {
99  }
100 
101  //----------------------------------------------------------------------------
102  // Destructor
103  //----------------------------------------------------------------------------
105  {
106  }
107 
108  //----------------------------------------------------------------------------
109  // Initializer
110  //----------------------------------------------------------------------------
112  {
113  Env *env = DefaultEnv::GetEnv();
114  std::string pollerPref = DefaultPollerPreference;
115  env->GetString( "PollerPreference", pollerPref );
116 
117  pImpl->pPoller = PollerFactory::CreatePoller( pollerPref );
118 
119  if( !pImpl->pPoller )
120  return false;
121 
122  bool st = pImpl->pPoller->Initialize();
123 
124  if( !st )
125  {
126  delete pImpl->pPoller;
127  return false;
128  }
129 
130  pImpl->pJobManager->Initialize();
131  pImpl->pInitialized = true;
132  return true;
133  }
134 
135  //----------------------------------------------------------------------------
136  // Finalizer
137  //----------------------------------------------------------------------------
139  {
140  //--------------------------------------------------------------------------
141  // Clean up the channels
142  //--------------------------------------------------------------------------
143  if( !pImpl->pInitialized )
144  return true;
145 
146  pImpl->pInitialized = false;
147  pImpl->pJobManager->Finalize();
148  PostMasterImpl::ChannelMap::iterator it;
149 
150  for( it = pImpl->pChannelMap.begin(); it != pImpl->pChannelMap.end(); ++it )
151  delete it->second;
152 
153  pImpl->pChannelMap.clear();
154  return pImpl->pPoller->Finalize();
155  }
156 
157  //----------------------------------------------------------------------------
158  // Start the post master
159  //----------------------------------------------------------------------------
161  {
162  if( !pImpl->pInitialized )
163  return false;
164 
165  if( !pImpl->pPoller->Start() )
166  return false;
167 
168  if( !pImpl->pTaskManager->Start() )
169  {
170  pImpl->pPoller->Stop();
171  return false;
172  }
173 
174  if( !pImpl->pJobManager->Start() )
175  {
176  pImpl->pPoller->Stop();
177  pImpl->pTaskManager->Stop();
178  return false;
179  }
180 
181  pImpl->pRunning = true;
182  return true;
183  }
184 
185  //----------------------------------------------------------------------------
186  // Stop the postmaster
187  //----------------------------------------------------------------------------
189  {
190  if( !pImpl->pInitialized || !pImpl->pRunning )
191  return true;
192 
193  if( !pImpl->pJobManager->Stop() )
194  return false;
195  if( !pImpl->pPoller->Stop() )
196  return false;
197  if( !pImpl->pTaskManager->Stop() )
198  return false;
199  pImpl->pRunning = false;
200  return true;
201  }
202 
203  //----------------------------------------------------------------------------
204  // Reinitialize after fork
205  //----------------------------------------------------------------------------
207  {
208  return true;
209  }
210 
211  //----------------------------------------------------------------------------
212  // Send the message asynchronously
213  //----------------------------------------------------------------------------
215  Message *msg,
216  MsgHandler *handler,
217  bool stateful,
218  time_t expires )
219  {
220  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
221  Channel *channel = GetChannel( url );
222 
223  if( !channel )
225 
226  return channel->Send( msg, handler, stateful, expires );
227  }
228 
230  Message *msg,
231  MsgHandler *inHandler )
232  {
234  VirtualRedirector *redirector = registry.Get( url );
235  if( !redirector )
236  return Status( stError, errInvalidOp );
237  return redirector->HandleRequest( msg, inHandler );
238  }
239 
240  //----------------------------------------------------------------------------
241  // Query the transport handler
242  //----------------------------------------------------------------------------
244  uint16_t query,
245  AnyObject &result )
246  {
247  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
248  Channel *channel = 0;
249  {
250  XrdSysMutexHelper scopedLock2( pImpl->pChannelMapMutex );
251  PostMasterImpl::ChannelMap::iterator it =
252  pImpl->pChannelMap.find( url.GetChannelId() );
253  if( it == pImpl->pChannelMap.end() )
254  return Status( stError, errInvalidOp );
255  channel = it->second;
256  }
257 
258  if( !channel )
259  return Status( stError, errNotSupported );
260 
261  return channel->QueryTransport( query, result );
262  }
263 
264  //----------------------------------------------------------------------------
265  // Register channel event handler
266  //----------------------------------------------------------------------------
268  ChannelEventHandler *handler )
269  {
270  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
271  Channel *channel = GetChannel( url );
272 
273  if( !channel )
274  return Status( stError, errNotSupported );
275 
276  channel->RegisterEventHandler( handler );
277  return Status();
278  }
279 
280  //----------------------------------------------------------------------------
281  // Remove a channel event handler
282  //----------------------------------------------------------------------------
284  ChannelEventHandler *handler )
285  {
286  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
287  Channel *channel = GetChannel( url );
288 
289  if( !channel )
290  return Status( stError, errNotSupported );
291 
292  channel->RemoveEventHandler( handler );
293  return Status();
294  }
295 
296  //------------------------------------------------------------------------
297  // Get the task manager object user by the post master
298  //------------------------------------------------------------------------
300  {
301  return pImpl->pTaskManager;
302  }
303 
304  //------------------------------------------------------------------------
305  // Get the job manager object user by the post master
306  //------------------------------------------------------------------------
308  {
309  return pImpl->pJobManager;
310  }
311 
312  //------------------------------------------------------------------------
313  // Shut down a channel
314  //------------------------------------------------------------------------
316  {
317  return ForceDisconnect(url, false);
318  }
319 
320  //------------------------------------------------------------------------
321  // Shut down a channel
322  //------------------------------------------------------------------------
323  Status PostMaster::ForceDisconnect( const URL &url, bool hush )
324  {
325  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock, false );
326  PostMasterImpl::ChannelMap::iterator it =
327  pImpl->pChannelMap.find( url.GetChannelId() );
328 
329  if( it == pImpl->pChannelMap.end() )
330  return Status( stError, errInvalidOp );
331 
332  it->second->ForceDisconnect( hush );
333  delete it->second;
334  pImpl->pChannelMap.erase( it );
335 
336  return Status();
337  }
338 
340  {
341  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock, false );
342  PostMasterImpl::ChannelMap::iterator it =
343  pImpl->pChannelMap.find( url.GetChannelId() );
344 
345  if( it == pImpl->pChannelMap.end() )
346  return Status( stError, errInvalidOp );
347 
348  it->second->ForceReconnect();
349  return Status();
350  }
351 
352  //------------------------------------------------------------------------
353  // Get the number of connected data streams
354  //------------------------------------------------------------------------
355  uint16_t PostMaster::NbConnectedStrm( const URL &url )
356  {
357  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
358  Channel *channel = GetChannel( url );
359  if( !channel ) return 0;
360  return channel->NbConnectedStrm();
361  }
362 
363  //------------------------------------------------------------------------
365  //------------------------------------------------------------------------
367  std::shared_ptr<Job> onConnJob )
368  {
369  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
370  Channel *channel = GetChannel( url );
371  if( !channel ) return;
372  channel->SetOnDataConnectHandler( onConnJob );
373  }
374 
375  //------------------------------------------------------------------------
377  //------------------------------------------------------------------------
378  void PostMaster::SetOnConnectHandler( std::unique_ptr<Job> onConnJob )
379  {
380  XrdSysMutexHelper lck( pImpl->pMtx );
381  pImpl->pOnConnJob = std::move( onConnJob );
382  }
383 
384  //------------------------------------------------------------------------
385  // Set the global connection error handler
386  //------------------------------------------------------------------------
387  void PostMaster::SetConnectionErrorHandler( std::function<void( const URL&, const XRootDStatus& )> handler )
388  {
389  XrdSysMutexHelper lck( pImpl->pMtx );
390  pImpl->pOnConnErrCB = std::move( handler );
391  }
392 
393  //------------------------------------------------------------------------
394  // Notify the global on-connect handler
395  //------------------------------------------------------------------------
397  {
398  XrdSysMutexHelper lck( pImpl->pMtx );
399  if( pImpl->pOnConnJob )
400  {
401  URL *ptr = new URL( url );
402  pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
403  }
404  }
405 
406  //------------------------------------------------------------------------
407  // Notify the global error connection handler
408  //------------------------------------------------------------------------
409  void PostMaster::NotifyConnErrHandler( const URL &url, const XRootDStatus &status )
410  {
411  XrdSysMutexHelper lck( pImpl->pMtx );
412  if( pImpl->pOnConnErrCB )
413  {
414  ConnErrJob *job = new ConnErrJob( url, status, pImpl->pOnConnErrCB );
415  pImpl->pJobManager->QueueJob( job, nullptr );
416  }
417  }
418 
419  //----------------------------------------------------------------------------
421  //----------------------------------------------------------------------------
422  void PostMaster::CollapseRedirect( const URL &alias, const URL &url )
423  {
424  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
425 
426  //--------------------------------------------------------------------------
427  // Get the passive channel
428  //--------------------------------------------------------------------------
429  PostMasterImpl::ChannelMap::iterator it =
430  pImpl->pChannelMap.find( alias.GetChannelId() );
431  Channel *passive = 0;
432  if( it != pImpl->pChannelMap.end() )
433  passive = it->second;
434  //--------------------------------------------------------------------------
435  // If the channel does not exist there's nothing to do
436  //--------------------------------------------------------------------------
437  else return;
438 
439  //--------------------------------------------------------------------------
440  // Check if this URL is eligible for collapsing
441  //--------------------------------------------------------------------------
442  if( !passive->CanCollapse( url ) ) return;
443 
444  //--------------------------------------------------------------------------
445  // Create the active channel
446  //--------------------------------------------------------------------------
448  TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
449 
450  if( !trHandler )
451  {
452  Log *log = DefaultEnv::GetLog();
453  log->Error( PostMasterMsg, "Unable to get transport handler for %s "
454  "protocol", url.GetProtocol().c_str() );
455  return;
456  }
457 
458  Log *log = DefaultEnv::GetLog();
459  log->Info( PostMasterMsg, "Label channel %s with alias %s.",
460  url.GetHostId().c_str(), alias.GetHostId().c_str() );
461 
462  Channel *active = new Channel( alias, pImpl->pPoller, trHandler,
463  pImpl->pTaskManager, pImpl->pJobManager, url );
464  pImpl->pChannelMap[alias.GetChannelId()] = active;
465 
466  //--------------------------------------------------------------------------
467  // The passive channel will be deallocated by TTL
468  //--------------------------------------------------------------------------
469  }
470 
471  //------------------------------------------------------------------------
472  // Decrement file object instance count bound to this channel
473  //------------------------------------------------------------------------
474  void PostMaster::DecFileInstCnt( const URL &url )
475  {
476  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
477  Channel *channel = GetChannel( url );
478 
479  if( !channel ) return;
480 
481  return channel->DecFileInstCnt();
482  }
483 
484  //------------------------------------------------------------------------
485  //true if underlying threads are running, false otherwise
486  //------------------------------------------------------------------------
488  {
489  return pImpl->pRunning;
490  }
491 
492  //----------------------------------------------------------------------------
493  // Get the channel
494  //----------------------------------------------------------------------------
495  Channel *PostMaster::GetChannel( const URL &url )
496  {
497  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
498  Channel *channel = 0;
499  PostMasterImpl::ChannelMap::iterator it = pImpl->pChannelMap.find( url.GetChannelId() );
500 
501  if( it == pImpl->pChannelMap.end() )
502  {
504  TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
505 
506  if( !trHandler )
507  {
508  Log *log = DefaultEnv::GetLog();
509  log->Error( PostMasterMsg, "Unable to get transport handler for %s "
510  "protocol", url.GetProtocol().c_str() );
511  return 0;
512  }
513 
514  channel = new Channel( url, pImpl->pPoller, trHandler, pImpl->pTaskManager,
515  pImpl->pJobManager );
516  pImpl->pChannelMap[url.GetChannelId()] = channel;
517  }
518  else
519  channel = it->second;
520  return channel;
521  }
522 }
A communication channel between the client and the server.
Definition: XrdClChannel.hh:49
uint16_t NbConnectedStrm()
Get the number of connected data streams.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void DecFileInstCnt()
Decrement file object instance count bound to this channel.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
bool CanCollapse(const URL &url)
Status QueryTransport(uint16_t query, AnyObject &result)
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A synchronized queue.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
static Poller * CreatePoller(const std::string &preference)
Interface for socket pollers.
Definition: XrdClPoller.hh:87
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
bool Start()
Start the post master.
bool Finalize()
Finalizer.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status ForceReconnect(const URL &url)
Reconnect the channel.
bool Stop()
Stop the postmaster.
bool Reinitialize()
Reinitialize after fork.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
uint16_t NbConnectedStrm(const URL &url)
Get the number of connected data streams.
void SetOnConnectHandler(std::unique_ptr< Job > onConnJob)
Set the global connection error handler.
Status RemoveEventHandler(const URL &url, ChannelEventHandler *handler)
Remove a channel event handler.
virtual ~PostMaster()
Destructor.
PostMaster()
Constructor.
void SetConnectionErrorHandler(std::function< void(const URL &, const XRootDStatus &)> handler)
Set the global on-error on-connect handler for control streams.
Status ForceDisconnect(const URL &url)
Shut down a channel.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
Status RegisterEventHandler(const URL &url, ChannelEventHandler *handler)
Register channel event handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
JobManager * GetJobManager()
Get the job manager object user by the post master.
bool Initialize()
Initializer.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Perform the handshake and the authentication for each physical stream.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition: XrdClURL.hh:31
std::string GetChannelId() const
Definition: XrdClURL.cc:505
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
An interface for metadata redirectors.
virtual XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)=0
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t PostMasterMsg
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const char *const DefaultPollerPreference
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const int DefaultWorkerThreads
void Run(void *arg)
The job logic.
XRootDStatus status
ConnErrJob(const URL &url, const XRootDStatus &status, std::function< void(const URL &, const XRootDStatus &)> handler)
std::function< void(const URL &, const XRootDStatus &)> handler
TaskManager * pTaskManager
std::map< std::string, Channel * > ChannelMap
std::unique_ptr< Job > pOnConnJob
XrdSysRWLock pDisconnectLock
XrdSysMutex pChannelMapMutex
std::function< void(const URL &, const XRootDStatus &)> pOnConnErrCB
Procedure execution status.
Definition: XrdClStatus.hh:115