XRootD
XrdClChannel.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdCl/XrdClChannel.hh"
26 #include "XrdCl/XrdClDefaultEnv.hh"
27 #include "XrdCl/XrdClStream.hh"
28 #include "XrdCl/XrdClSocket.hh"
29 #include "XrdCl/XrdClConstants.hh"
30 #include "XrdCl/XrdClLog.hh"
33 #include "XrdSys/XrdSysPthread.hh"
34 
35 #include <ctime>
36 
37 namespace XrdCl
38 {
40  {
41  public:
42  //------------------------------------------------------------------------
43  // Constructor
44  //------------------------------------------------------------------------
45  TickGeneratorTask( XrdCl::Channel *channel, const std::string &hostId ):
46  pChannel( channel )
47  {
48  std::string name = "TickGeneratorTask for: ";
49  name += hostId;
50  SetName( name );
51  }
52 
53  //------------------------------------------------------------------------
54  // Run the task
55  //------------------------------------------------------------------------
56  time_t Run( time_t now )
57  {
58  XrdSysMutexHelper lck( pMtx );
59  if( !pChannel ) return 0;
60 
61  using namespace XrdCl;
62  pChannel->Tick( now );
63 
64  Env *env = DefaultEnv::GetEnv();
65  int timeoutResolution = DefaultTimeoutResolution;
66  env->GetInt( "TimeoutResolution", timeoutResolution );
67  return now+timeoutResolution;
68  }
69 
70  void Invalidate()
71  {
72  XrdSysMutexHelper lck( pMtx );
73  pChannel = 0;
74  }
75 
76  private:
77  XrdCl::Channel *pChannel;
78  XrdSysMutex pMtx;
79  };
80 
81  //----------------------------------------------------------------------------
82  // Constructor
83  //----------------------------------------------------------------------------
84  Channel::Channel( const URL &url,
85  Poller *poller,
86  TransportHandler *transport,
87  TaskManager *taskManager,
88  JobManager *jobManager,
89  const URL &prefurl ):
90  pUrl( url.GetHostId() ),
91  pPoller( poller ),
92  pTransport( transport ),
93  pTaskManager( taskManager ),
94  pTickGenerator( 0 ),
95  pJobManager( jobManager )
96  {
97  Env *env = DefaultEnv::GetEnv();
98  Log *log = DefaultEnv::GetLog();
99 
100  int timeoutResolution = DefaultTimeoutResolution;
101  env->GetInt( "TimeoutResolution", timeoutResolution );
102 
103  pTransport->InitializeChannel( url, pChannelData );
104  log->Debug( PostMasterMsg, "Creating new channel to: %s",
105  url.GetChannelId().c_str() );
106 
107  pUrl.SetParams( url.GetParams() );
108  pUrl.SetProtocol( url.GetProtocol() );
109 
110  //--------------------------------------------------------------------------
111  // Create the stream
112  //--------------------------------------------------------------------------
113  pStream = new Stream( &pUrl, prefurl );
114  pStream->SetTransport( transport );
115  pStream->SetPoller( poller );
116  pStream->SetIncomingQueue( &pIncoming );
117  pStream->SetTaskManager( taskManager );
118  pStream->SetJobManager( jobManager );
119  pStream->SetChannelData( &pChannelData );
120  pStream->Initialize();
121 
122  //--------------------------------------------------------------------------
123  // Register the task generating timeout events
124  //--------------------------------------------------------------------------
125  pTickGenerator = new TickGeneratorTask( this, pUrl.GetChannelId() );
126  pTaskManager->RegisterTask( pTickGenerator, ::time(0)+timeoutResolution );
127  }
128 
129  //----------------------------------------------------------------------------
130  // Destructor
131  //----------------------------------------------------------------------------
133  {
134  pTickGenerator->Invalidate();
135  delete pStream;
136  pTransport->FinalizeChannel( pChannelData );
137  }
138 
139  //----------------------------------------------------------------------------
140  // Send the message asynchronously
141  //----------------------------------------------------------------------------
143  MsgHandler *handler,
144  bool stateful,
145  time_t expires )
146 
147  {
148  return pStream->Send( msg, handler, stateful, expires );
149  }
150 
151  //----------------------------------------------------------------------------
152  // Handle a time event
153  //----------------------------------------------------------------------------
154  void Channel::Tick( time_t now )
155  {
156  pStream->Tick( now );
157  }
158 
159  //----------------------------------------------------------------------------
160  // Force disconnect of all streams
161  //----------------------------------------------------------------------------
163  {
164  return ForceDisconnect(false);
165  }
166 
167  //----------------------------------------------------------------------------
168  // Force disconnect of all streams
169  //----------------------------------------------------------------------------
171  {
172  //--------------------------------------------------------------------------
173  // Disconnect and recreate the streams
174  //--------------------------------------------------------------------------
175  pStream->ForceError( Status( stError, errOperationInterrupted ), hush );
176 
177  return Status();
178  }
179 
180  //----------------------------------------------------------------------------
181  // Force reconnect
182  //----------------------------------------------------------------------------
184  {
185  pStream->ForceConnect();
186  return Status();
187  }
188 
189  //------------------------------------------------------------------------
190  // Get the number of connected data streams
191  //------------------------------------------------------------------------
193  {
194  return XRootDTransport::NbConnectedStrm( pChannelData );
195  }
196 
197  //------------------------------------------------------------------------
198  // Set the on-connect handler for data streams
199  //------------------------------------------------------------------------
200  void Channel::SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
201  {
202  pStream->SetOnDataConnectHandler( onConnJob );
203  }
204 
205  //------------------------------------------------------------------------
206  // Check if channel can be collapsed using given URL
207  //------------------------------------------------------------------------
208  bool Channel::CanCollapse( const URL &url )
209  {
210  return pStream->CanCollapse( url );
211  }
212 
213  //------------------------------------------------------------------------
214  // Decrement file object instance count bound to this channel
215  //------------------------------------------------------------------------
217  {
218  pTransport->DecFileInstCnt( pChannelData );
219  }
220 
221  //----------------------------------------------------------------------------
222  // Query the transport handler
223  //----------------------------------------------------------------------------
224  Status Channel::QueryTransport( uint16_t query, AnyObject &result )
225  {
226  if( query < 2000 )
227  return pTransport->Query( query, result, pChannelData );
228  return pStream->Query( query, result );
229  }
230 
231  //----------------------------------------------------------------------------
232  // Register channel event handler
233  //----------------------------------------------------------------------------
235  {
236  pStream->RegisterEventHandler( handler );
237  }
238 
239  //------------------------------------------------------------------------
240  // Remove a channel event handler
241  //------------------------------------------------------------------------
243  {
244  pStream->RemoveEventHandler( handler );
245  }
246 }
A communication channel between the client and the server.
Definition: XrdClChannel.hh:49
uint16_t NbConnectedStrm()
Get the number of connected data streams.
Status ForceReconnect()
Force reconnect.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
~Channel()
Destructor.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void DecFileInstCnt()
Decrement file object instance count bound to this channel.
Status ForceDisconnect()
Force disconnect of all streams.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
void Tick(time_t now)
Handle a time event.
bool CanCollapse(const URL &url)
Status QueryTransport(uint16_t query, AnyObject &result)
Channel(const URL &url, Poller *poller, TransportHandler *transport, TaskManager *taskManager, JobManager *jobManager, const URL &prefurl=URL())
Definition: XrdClChannel.cc:84
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A synchronized queue.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
Interface for socket pollers.
Definition: XrdClPoller.hh:87
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
Definition: XrdClStream.cc:297
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:91
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:107
bool CanCollapse(const URL &url)
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:99
void ForceConnect()
Force connection.
Definition: XrdClStream.cc:347
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Definition: XrdClStream.cc:913
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:123
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
Definition: XrdClStream.hh:263
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:131
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Tick(time_t now)
Definition: XrdClStream.cc:377
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:115
XRootDStatus Initialize()
Initializer.
Definition: XrdClStream.cc:171
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
void SetName(const std::string &name)
Set name of the task.
time_t Run(time_t now)
Definition: XrdClChannel.cc:56
TickGeneratorTask(XrdCl::Channel *channel, const std::string &hostId)
Definition: XrdClChannel.cc:45
Perform the handshake and the authentication for each physical stream.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual void FinalizeChannel(AnyObject &channelData)=0
Finalize channel.
virtual void InitializeChannel(const URL &url, AnyObject &channelData)=0
Initialize channel.
virtual void DecFileInstCnt(AnyObject &channelData)=0
Decrement file object instance count bound to this channel.
URL representation.
Definition: XrdClURL.hh:31
std::string GetChannelId() const
Definition: XrdClURL.cc:505
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:395
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
void SetProtocol(const std::string &protocol)
Set protocol.
Definition: XrdClURL.hh:126
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t PostMasterMsg
const int DefaultTimeoutResolution
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
Procedure execution status.
Definition: XrdClStatus.hh:115