XRootD
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor. More...
 
 ~Stream ()
 Destructor. More...
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty. More...
 
void Disconnect (bool force=false)
 Disconnect the stream. More...
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection. More...
 
void ForceError (XRootDStatus status)
 Force error. More...
 
const std::string & GetName () const
 Return stream name. More...
 
const URLGetURL () const
 Get the URL. More...
 
XRootDStatus Initialize ()
 Initializer. More...
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed. More...
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error. More...
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error. More...
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed. More...
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout. More...
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout. More...
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream. More...
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler. More...
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler. More...
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending. More...
 
void SetChannelData (AnyObject *channelData)
 Set the channel data. More...
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue. More...
 
void SetJobManager (JobManager *jobManager)
 Set job manager. More...
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams. More...
 
void SetPoller (Poller *poller)
 Set the poller. More...
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager. More...
 
void SetTransport (TransportHandler *transport)
 Set the transport. More...
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58  {
59  Disconnected = 0,
60  Connected = 1,
61  Connecting = 2,
62  Error = 3
63  };
@ Disconnected
Not connected.
Definition: XrdClStream.hh:59
@ Error
Broken.
Definition: XrdClStream.hh:62
@ Connected
Connected.
Definition: XrdClStream.hh:60
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:61

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL url,
const URL prefer = URL() 
)

Constructor.

Definition at line 96 of file XrdClStream.cc.

96  :
97  pUrl( url ),
98  pPrefer( prefer ),
99  pTransport( 0 ),
100  pPoller( 0 ),
101  pTaskManager( 0 ),
102  pJobManager( 0 ),
103  pIncomingQueue( 0 ),
104  pChannelData( 0 ),
105  pLastStreamError( 0 ),
106  pConnectionCount( 0 ),
107  pConnectionInitTime( 0 ),
108  pAddressType( Utils::IPAll ),
109  pSessionId( 0 ),
110  pBytesSent( 0 ),
111  pBytesReceived( 0 )
112  {
113  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115 
116  std::ostringstream o;
117  o << pUrl->GetHostId();
118  pStreamName = o.str();
119 
120  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126 
127  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129 
130  pAddressType = Utils::String2AddressType( netStack );
131  if( pAddressType == Utils::AddressType::IPAuto )
132  {
133  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
134  if( !( stacks & XrdNetUtils::hasIP64 ) )
135  {
136  if( stacks & XrdNetUtils::hasIPv4 )
137  pAddressType = Utils::AddressType::IPv4;
138  else if( stacks & XrdNetUtils::hasIPv6 )
139  pAddressType = Utils::AddressType::IPv6;
140  }
141  }
142 
143  Log *log = DefaultEnv::GetLog();
144  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146  "Window: %d", pStreamName.c_str(), netStack.c_str(),
147  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148  }
static Log * GetLog()
Get default log.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:94
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:122
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:80
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:103
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:680
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition: XrdConfig.cc:111

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::URL::GetHostId(), XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 153 of file XrdClStream.cc.

154  {
155  Disconnect( true );
156 
157  Log *log = DefaultEnv::GetLog();
158  log->Debug( PostMasterMsg, "[%s] Destroying stream",
159  pStreamName.c_str() );
160 
161  MonitorDisconnection( XRootDStatus() );
162 
163  SubStreamList::iterator it;
164  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165  delete *it;
166  }
void Disconnect(bool force=false)
Disconnect the stream.
Definition: XrdClStream.cc:363

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1171 of file XrdClStream.cc.

1172  {
1173  Log *log = DefaultEnv::GetLog();
1174 
1175  //--------------------------------------------------------------------------
1176  // Resolve all the addresses of the host we're supposed to connect to
1177  //--------------------------------------------------------------------------
1178  std::vector<XrdNetAddr> prefaddrs;
1179  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1180  if( !st.IsOK() )
1181  {
1182  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1183  , pStreamName.c_str(), url.GetHostName().c_str() );
1184  return false;
1185  }
1186 
1187  //--------------------------------------------------------------------------
1188  // Resolve all the addresses of the alias
1189  //--------------------------------------------------------------------------
1190  std::vector<XrdNetAddr> aliasaddrs;
1191  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1192  if( !st.IsOK() )
1193  {
1194  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1195  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1196  return false;
1197  }
1198 
1199  //--------------------------------------------------------------------------
1200  // Now check if the preferred host is part of the alias
1201  //--------------------------------------------------------------------------
1202  auto itr = prefaddrs.begin();
1203  for( ; itr != prefaddrs.end() ; ++itr )
1204  {
1205  auto itr2 = aliasaddrs.begin();
1206  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1207  if( itr->Same( &*itr2 ) ) return true;
1208  }
1209 
1210  return false;
1211  }
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:165
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:139

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::Channel::CanCollapse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t  subStream)

Disables respective uplink if empty.

Definition at line 568 of file XrdClStream.cc.

569  {
570  XrdSysMutexHelper scopedLock( pMutex );
571  Log *log = DefaultEnv::GetLog();
572 
573  if( pSubStreams[subStream]->outQueue->IsEmpty() )
574  {
575  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
576  pSubStreams[subStream]->socket->GetStreamName().c_str() );
577  pSubStreams[subStream]->socket->DisableUplink();
578  }
579  }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool  force = false)

Disconnect the stream.

Definition at line 363 of file XrdClStream.cc.

364  {
365  XrdSysMutexHelper scopedLock( pMutex );
366  SubStreamList::iterator it;
367  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368  {
369  (*it)->socket->Close();
370  (*it)->status = Socket::Disconnected;
371  }
372  }
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50

References XrdCl::Socket::Disconnected.

Referenced by ~Stream().

+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 187 of file XrdClStream.cc.

188  {
189  XrdSysMutexHelper scopedLock( pMutex );
190 
191  //--------------------------------------------------------------------------
192  // We are in the process of connecting the main stream, so we do nothing
193  // because when the main stream connection is established it will connect
194  // all the other streams
195  //--------------------------------------------------------------------------
196  if( pSubStreams[0]->status == Socket::Connecting )
197  return XRootDStatus();
198 
199  //--------------------------------------------------------------------------
200  // The main stream is connected, so we can verify whether we have
201  // the up and the down stream connected and ready to handle data.
202  // If anything is not right we fall back to stream 0.
203  //--------------------------------------------------------------------------
204  if( pSubStreams[0]->status == Socket::Connected )
205  {
206  if( pSubStreams[path.down]->status != Socket::Connected )
207  path.down = 0;
208 
209  if( pSubStreams[path.up]->status == Socket::Disconnected )
210  {
211  path.up = 0;
212  return pSubStreams[0]->socket->EnableUplink();
213  }
214 
215  if( pSubStreams[path.up]->status == Socket::Connected )
216  return pSubStreams[path.up]->socket->EnableUplink();
217 
218  return XRootDStatus();
219  }
220 
221  //--------------------------------------------------------------------------
222  // The main stream is not connected, we need to check whether enough time
223  // has passed since we last encountered an error (if any) so that we could
224  // re-attempt the connection
225  //--------------------------------------------------------------------------
226  Log *log = DefaultEnv::GetLog();
227  time_t now = ::time(0);
228 
229  if( now-pLastStreamError < pStreamErrorWindow )
230  return pLastFatalError;
231 
232  gettimeofday( &pConnectionStarted, 0 );
233  ++pConnectionCount;
234 
235  //--------------------------------------------------------------------------
236  // Resolve all the addresses of the host we're supposed to connect to
237  //--------------------------------------------------------------------------
238  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239  if( !st.IsOK() )
240  {
241  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242  "the host", pStreamName.c_str() );
243  pLastStreamError = now;
244  st.status = stFatal;
245  pLastFatalError = st;
246  return st;
247  }
248 
249  if( pPrefer.IsValid() )
250  {
251  std::vector<XrdNetAddr> addrresses;
252  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253  if( !st.IsOK() )
254  {
255  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257  }
258  else
259  {
260  std::vector<XrdNetAddr> tmp;
261  tmp.reserve( pAddresses.size() );
262  // first add all remaining addresses
263  auto itr = pAddresses.begin();
264  for( ; itr != pAddresses.end() ; ++itr )
265  {
266  if( !HasNetAddr( *itr, addrresses ) )
267  tmp.push_back( *itr );
268  }
269  // then copy all 'preferred' addresses
270  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271  // and keep the result
272  pAddresses.swap( tmp );
273  }
274  }
275 
277  pAddresses );
278 
279  while( !pAddresses.empty() )
280  {
281  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282  pAddresses.pop_back();
283  pConnectionInitTime = ::time( 0 );
284  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285  if( st.IsOK() )
286  {
287  pSubStreams[0]->status = Socket::Connecting;
288  break;
289  }
290  }
291  return st;
292  }
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:438
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:233
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostId(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 347 of file XrdClStream.cc.

348  {
349  XrdSysMutexHelper scopedLock( pMutex );
350  if( pSubStreams[0]->status == Socket::Connecting )
351  {
352  pSubStreams[0]->status = Socket::Disconnected;
353  XrdCl::PathID path( 0, 0 );
354  XrdCl::XRootDStatus st = EnableLink( path );
355  if( !st.IsOK() )
356  OnConnectError( 0, st );
357  }
358  }
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:187
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:709
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Referenced by XrdCl::Channel::ForceReconnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus  status)

Force error.

Definition at line 913 of file XrdClStream.cc.

914  {
915  XrdSysMutexHelper scopedLock( pMutex );
916  Log *log = DefaultEnv::GetLog();
917  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
918  {
919  if( pSubStreams[substream]->status != Socket::Connected ) continue;
920  pSubStreams[substream]->socket->Close();
921  pSubStreams[substream]->status = Socket::Disconnected;
922  log->Error( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
923  pStreamName.c_str(), status.ToString().c_str() );
924 
925  //--------------------------------------------------------------------
926  // Reinsert the stuff that we have failed to sent
927  //--------------------------------------------------------------------
928  if( pSubStreams[substream]->outMsgHelper.msg )
929  {
930  OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
931  pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
932  h.stateful );
933  pSubStreams[substream]->outMsgHelper.Reset();
934  }
935 
936  //--------------------------------------------------------------------
937  // Reinsert the receiving handler and reset any partially read partial
938  //--------------------------------------------------------------------
939  if( pSubStreams[substream]->inMsgHelper.handler )
940  {
941  InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
942  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
943  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
944  if( xrdHandler ) xrdHandler->PartialReceived();
945  h.Reset();
946  }
947  }
948 
949  pConnectionCount = 0;
950 
951  //------------------------------------------------------------------------
952  // We're done here, unlock the stream mutex to avoid deadlocks and
953  // report the disconnection event to the handlers
954  //------------------------------------------------------------------------
955  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
956  "message handlers.", pStreamName.c_str() );
957 
958  SubStreamList::iterator it;
959  OutQueue q;
960  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
961  q.GrabItems( *(*it)->outQueue );
962  scopedLock.UnLock();
963 
964  q.Report( status );
965 
966  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
967  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
968  }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::Log::Error(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::Channel::ForceDisconnect(), and XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetName()

const std::string& XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171  {
172  return pStreamName;
173  }

◆ GetURL()

const URL* XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158  {
159  return pUrl;
160  }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 171 of file XrdClStream.cc.

172  {
173  if( !pTransport || !pPoller || !pChannelData )
174  return XRootDStatus( stError, errUninitialized );
175 
176  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177  pChannelData, 0, this );
178  pSubStreams.push_back( new SubStreamData() );
179  pSubStreams[0]->socket = s;
180  return XRootDStatus();
181  }
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32

References XrdCl::errUninitialized, and XrdCl::stError.

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t  stream,
MsgHandler *&  incHandler 
)

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1140 of file XrdClStream.cc.

1142  {
1143  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1144  if( !mh.handler )
1146 
1147  uint16_t action = mh.handler->InspectStatusRsp();
1148  mh.action |= action;
1149 
1150  if( action & MsgHandler::RemoveHandler )
1151  pIncomingQueue->RemoveMessageHandler( mh.handler );
1152 
1153  if( action & MsgHandler::Raw )
1154  {
1155  incHandler = mh.handler;
1156  return MsgHandler::Raw;
1157  }
1158 
1159  if( action & MsgHandler::Corrupted )
1160  return MsgHandler::Corrupted;
1161 
1162  if( action & MsgHandler::More )
1163  return MsgHandler::More;
1164 
1165  return MsgHandler::None;
1166  }
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > &  msg,
uint16_t  stream 
)

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1119 of file XrdClStream.cc.

1120  {
1121  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1122  if( !mh.handler )
1123  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1124  mh.expires,
1125  mh.action );
1126 
1127  if( !mh.handler )
1128  return nullptr;
1129 
1130  if( mh.action & MsgHandler::Raw )
1131  return mh.handler;
1132  return nullptr;
1133  }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t  subStream)

Call back when a message has been reconstructed.

Definition at line 610 of file XrdClStream.cc.

611  {
612  XrdSysMutexHelper scopedLock( pMutex );
613  pSubStreams[subStream]->status = Socket::Connected;
614 
615  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
616  Log *log = DefaultEnv::GetLog();
617  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
618  subStream, ipstack.c_str() );
619 
620  if( subStream == 0 )
621  {
622  pLastStreamError = 0;
623  pLastFatalError = XRootDStatus();
624  pConnectionCount = 0;
625  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
626  pSessionId = ++sSessCntGen;
627 
628  //------------------------------------------------------------------------
629  // Create the streams if they don't exist yet
630  //------------------------------------------------------------------------
631  if( pSubStreams.size() == 1 && numSub > 1 )
632  {
633  for( uint16_t i = 1; i < numSub; ++i )
634  {
635  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
636  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
637  pChannelData, i, this );
638  pSubStreams.push_back( new SubStreamData() );
639  pSubStreams[i]->socket = s;
640  }
641  }
642 
643  //------------------------------------------------------------------------
644  // Connect the extra streams, if we fail we move all the outgoing items
645  // to stream 0, we don't need to enable the uplink here, because it
646  // should be already enabled after the handshaking process is completed.
647  //------------------------------------------------------------------------
648  if( pSubStreams.size() > 1 )
649  {
650  log->Debug( PostMasterMsg, "[%s] Attempting to connect %d additional "
651  "streams.", pStreamName.c_str(), pSubStreams.size()-1 );
652  for( size_t i = 1; i < pSubStreams.size(); ++i )
653  {
654  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
655  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
656  if( !st.IsOK() )
657  {
658  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
659  pSubStreams[i]->socket->Close();
660  }
661  else
662  {
663  pSubStreams[i]->status = Socket::Connecting;
664  }
665  }
666  }
667 
668  //------------------------------------------------------------------------
669  // Inform monitoring
670  //------------------------------------------------------------------------
671  pBytesSent = 0;
672  pBytesReceived = 0;
673  gettimeofday( &pConnectionDone, 0 );
674  Monitor *mon = DefaultEnv::GetMonitor();
675  if( mon )
676  {
677  Monitor::ConnectInfo i;
678  i.server = pUrl->GetHostId();
679  i.sTOD = pConnectionStarted;
680  i.eTOD = pConnectionDone;
681  i.streams = pSubStreams.size();
682 
683  AnyObject qryResult;
684  std::string *qryResponse = 0;
685  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
686  qryResult.Get( qryResponse );
687  i.auth = *qryResponse;
688  delete qryResponse;
689  mon->Event( Monitor::EvConnect, &i );
690  }
691 
692  //------------------------------------------------------------------------
693  // For every connected control-stream call the global on-connect handler
694  //------------------------------------------------------------------------
696  }
697  else if( pOnDataConnJob )
698  {
699  //------------------------------------------------------------------------
700  // For every connected data-stream call the on-connect handler
701  //------------------------------------------------------------------------
702  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
703  }
704  }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::Monitor::ConnectInfo::auth, XrdCl::TransportQuery::Auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdCl::TransportHandler::SubStreamNumber().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t  subStream,
XRootDStatus  status 
)

On connect error.

Definition at line 709 of file XrdClStream.cc.

710  {
711  XrdSysMutexHelper scopedLock( pMutex );
712  Log *log = DefaultEnv::GetLog();
713  pSubStreams[subStream]->socket->Close();
714  time_t now = ::time(0);
715 
716  //--------------------------------------------------------------------------
717  // For every connection error call the global connection error handler
718  //--------------------------------------------------------------------------
720 
721  //--------------------------------------------------------------------------
722  // If we connected subStream == 0 and cannot connect >0 then we just give
723  // up and move the outgoing messages to another queue
724  //--------------------------------------------------------------------------
725  if( subStream > 0 )
726  {
727  pSubStreams[subStream]->status = Socket::Disconnected;
728  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
729  if( pSubStreams[0]->status == Socket::Connected )
730  {
731  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
732  if( !st.IsOK() )
733  OnFatalError( 0, st, scopedLock );
734  return;
735  }
736 
737  if( pSubStreams[0]->status == Socket::Connecting )
738  return;
739 
740  OnFatalError( subStream, status, scopedLock );
741  return;
742  }
743 
744  //--------------------------------------------------------------------------
745  // Check if we still have time to try and do something in the current window
746  //--------------------------------------------------------------------------
747  time_t elapsed = now-pConnectionInitTime;
748  log->Error( PostMasterMsg, "[%s] elapsed = %d, pConnectionWindow = %d "
749  "seconds.", pStreamName.c_str(), elapsed, pConnectionWindow );
750 
751  //------------------------------------------------------------------------
752  // If we have some IP addresses left we try them
753  //------------------------------------------------------------------------
754  if( !pAddresses.empty() )
755  {
756  XRootDStatus st;
757  do
758  {
759  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
760  pAddresses.pop_back();
761  pConnectionInitTime = ::time( 0 );
762  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
763  }
764  while( !pAddresses.empty() && !st.IsOK() );
765 
766  if( !st.IsOK() )
767  OnFatalError( subStream, st, scopedLock );
768 
769  return;
770  }
771  //------------------------------------------------------------------------
772  // If we still can retry with the same host name, we sleep until the end
773  // of the connection window and try
774  //------------------------------------------------------------------------
775  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
776  && !status.IsFatal() )
777  {
778  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %d "
779  "seconds.", pStreamName.c_str(), pConnectionWindow-elapsed );
780 
781  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
782  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
783  return;
784  }
785  //--------------------------------------------------------------------------
786  // We are out of the connection window, the only thing we can do here
787  // is re-resolving the host name and retrying if we still can
788  //--------------------------------------------------------------------------
789  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
790  {
791  pAddresses.clear();
792  pSubStreams[0]->status = Socket::Disconnected;
793  PathID path( 0, 0 );
794  XRootDStatus st = EnableLink( path );
795  if( !st.IsOK() )
796  OnFatalError( subStream, st, scopedLock );
797  return;
798  }
799 
800  //--------------------------------------------------------------------------
801  // Else, we fail
802  //--------------------------------------------------------------------------
803  OnFatalError( subStream, status, scopedLock );
804  }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), XrdCl::PostMasterMsg, and XrdCl::TaskManager::RegisterTask().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t  subStream,
XRootDStatus  status 
)

On error.

Definition at line 809 of file XrdClStream.cc.

810  {
811  XrdSysMutexHelper scopedLock( pMutex );
812  Log *log = DefaultEnv::GetLog();
813  pSubStreams[subStream]->socket->Close();
814  pSubStreams[subStream]->status = Socket::Disconnected;
815 
816  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
817  pStreamName.c_str(), subStream, status.ToString().c_str() );
818 
819  //--------------------------------------------------------------------------
820  // Reinsert the stuff that we have failed to sent
821  //--------------------------------------------------------------------------
822  if( pSubStreams[subStream]->outMsgHelper.msg )
823  {
824  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
825  pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
826  h.stateful );
827  pSubStreams[subStream]->outMsgHelper.Reset();
828  }
829 
830  //--------------------------------------------------------------------------
831  // Reinsert the receiving handler and reset any partially read partial
832  //--------------------------------------------------------------------------
833  if( pSubStreams[subStream]->inMsgHelper.handler )
834  {
835  InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
836  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
837  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
838  if( xrdHandler ) xrdHandler->PartialReceived();
839  h.Reset();
840  }
841 
842  //--------------------------------------------------------------------------
843  // We are dealing with an error of a peripheral stream. If we don't have
844  // anything to send don't bother recovering. Otherwise move the requests
845  // to stream 0 if possible.
846  //--------------------------------------------------------------------------
847  if( subStream > 0 )
848  {
849  if( pSubStreams[subStream]->outQueue->IsEmpty() )
850  return;
851 
852  if( pSubStreams[0]->status != Socket::Disconnected )
853  {
854  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
855  if( pSubStreams[0]->status == Socket::Connected )
856  {
857  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
858  if( !st.IsOK() )
859  OnFatalError( 0, st, scopedLock );
860  return;
861  }
862  }
863  OnFatalError( subStream, status, scopedLock );
864  return;
865  }
866 
867  //--------------------------------------------------------------------------
868  // If we lost the stream 0 we have lost the session, we re-enable the
869  // stream if we still have things in one of the outgoing queues, otherwise
870  // there is not point to recover at this point.
871  //--------------------------------------------------------------------------
872  if( subStream == 0 )
873  {
874  MonitorDisconnection( status );
875 
876  SubStreamList::iterator it;
877  size_t outstanding = 0;
878  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
879  outstanding += (*it)->outQueue->GetSizeStateless();
880 
881  if( outstanding )
882  {
883  PathID path( 0, 0 );
884  XRootDStatus st = EnableLink( path );
885  if( !st.IsOK() )
886  {
887  OnFatalError( 0, st, scopedLock );
888  return;
889  }
890  }
891 
892  //------------------------------------------------------------------------
893  // We're done here, unlock the stream mutex to avoid deadlocks and
894  // report the disconnection event to the handlers
895  //------------------------------------------------------------------------
896  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
897  "message handlers.", pStreamName.c_str() );
898  OutQueue q;
899  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
900  q.GrabStateful( *(*it)->outQueue );
901  scopedLock.UnLock();
902 
903  q.Report( status );
904  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
905  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
906  return;
907  }
908  }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::Status::IsOK(), XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t  subStream,
std::shared_ptr< Message msg,
uint32_t  bytesReceived 
)

Call back when a message has been reconstructed.

Definition at line 471 of file XrdClStream.cc.

474  {
475  msg->SetSessionId( pSessionId );
476  pBytesReceived += bytesReceived;
477 
478  MsgHandler *handler = nullptr;
479  uint16_t action = 0;
480  {
481  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482  handler = mh.handler;
483  action = mh.action;
484  mh.Reset();
485  }
486 
487  if( !IsPartial( *msg ) )
488  {
489  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490  *pChannelData );
491  if( streamAction & TransportHandler::DigestMsg )
492  return;
493 
494  if( streamAction & TransportHandler::RequestClose )
495  {
496  RequestClose( *msg );
497  return;
498  }
499  }
500 
501  Log *log = DefaultEnv::GetLog();
502 
503  //--------------------------------------------------------------------------
504  // No handler, we discard the message ...
505  //--------------------------------------------------------------------------
506  if( !handler )
507  {
508  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509  log->Warning( PostMasterMsg, "[%s] Discarding received message: 0x%x "
510  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511  pStreamName.c_str(), msg.get(), rsp->hdr.status,
512  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513  return;
514  }
515 
516  //--------------------------------------------------------------------------
517  // We have a handler, so we call the callback
518  //--------------------------------------------------------------------------
519  log->Dump( PostMasterMsg, "[%s] Handling received message: 0x%x.",
520  pStreamName.c_str(), msg.get() );
521 
523  {
524  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: 0x%x.",
525  pStreamName.c_str(), msg->GetDescription().c_str() );
526 
527  // if we are handling partial response we have to take down the timeout fence
528  if( IsPartial( *msg ) )
529  {
530  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531  if( xrdHandler ) xrdHandler->PartialReceived();
532  }
533 
534  return;
535  }
536 
537  Job *job = new HandleIncMsgJob( handler );
538  pJobManager->QueueJob( job );
539  }
kXR_char streamid[2]
Definition: XProtocol.hh:912
ServerResponseHeader hdr
Definition: XProtocol.hh:1284
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t  subStream,
Message msg,
uint32_t  bytesSent 
)

Definition at line 584 of file XrdClStream.cc.

587  {
588  pTransport->MessageSent( msg, subStream, bytesSent,
589  *pChannelData );
590  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
591  pBytesSent += bytesSent;
592  if( h.handler )
593  {
594  h.handler->OnStatusReady( msg, XRootDStatus() );
595  bool rmMsg = false;
596  pIncomingQueue->AddMessageHandler( h.handler, h.handler->GetExpiration(), rmMsg );
597  if( rmMsg )
598  {
599  Log *log = DefaultEnv::GetLog();
600  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
601  pStreamName.c_str(), subStream );
602  }
603  }
604  pSubStreams[subStream]->outMsgHelper.Reset();
605  }
void AddMessageHandler(MsgHandler *handler, time_t expires, bool &rmMsg)
Definition: XrdClInQueue.cc:54
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AddMessageHandler(), XrdCl::MsgHandler::GetExpiration(), XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), XrdCl::MsgHandler::OnStatusReady(), XrdCl::PostMasterMsg, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t  subStream)

On read timeout.

Definition at line 1027 of file XrdClStream.cc.

1028  {
1029  //--------------------------------------------------------------------------
1030  // We only take the main stream into account
1031  //--------------------------------------------------------------------------
1032  if( substream != 0 )
1033  return true;
1034 
1035  //--------------------------------------------------------------------------
1036  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1037  // It is assumed that the underlying transport makes sure that there is no
1038  // pending requests that are not answered, ie. all possible virtual streams
1039  // are de-allocated
1040  //--------------------------------------------------------------------------
1041  Log *log = DefaultEnv::GetLog();
1042  SubStreamList::iterator it;
1043  time_t now = time(0);
1044 
1045  XrdSysMutexHelper scopedLock( pMutex );
1046  uint32_t outgoingMessages = 0;
1047  time_t lastActivity = 0;
1048  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1049  {
1050  outgoingMessages += (*it)->outQueue->GetSize();
1051  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1052  if( lastActivity < sockLastActivity )
1053  lastActivity = sockLastActivity;
1054  }
1055 
1056  if( !outgoingMessages )
1057  {
1058  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1059  *pChannelData );
1060  if( disconnect )
1061  {
1062  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1063  pStreamName.c_str() );
1064  scopedLock.UnLock();
1065  //----------------------------------------------------------------------
1066  // Important note!
1067  //
1068  // This destroys the Stream object itself, the underlined
1069  // AsyncSocketHandler object (that called this method) and the Channel
1070  // object that aggregates this Stream.
1071  //----------------------------------------------------------------------
1073  return false;
1074  }
1075  }
1076 
1077  //--------------------------------------------------------------------------
1078  // Check if the stream is broken
1079  //--------------------------------------------------------------------------
1080  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1081  *pChannelData );
1082  if( !st.IsOK() )
1083  {
1084  scopedLock.UnLock();
1085  OnError( substream, st );
1086  return false;
1087  }
1088  return true;
1089  }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClStream.cc:809
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnError(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t  subStream)

Definition at line 545 of file XrdClStream.cc.

546  {
547  XrdSysMutexHelper scopedLock( pMutex );
548  Log *log = DefaultEnv::GetLog();
549  if( pSubStreams[subStream]->outQueue->IsEmpty() )
550  {
551  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552  pSubStreams[subStream]->socket->GetStreamName().c_str() );
553 
554  pSubStreams[subStream]->socket->DisableUplink();
555  return std::make_pair( (Message *)0, (MsgHandler *)0 );
556  }
557 
558  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560  h.expires,
561  h.stateful );
562  scopedLock.UnLock();
563  if( h.handler )
564  h.handler->OnReadyToSend( h.msg );
565  return std::make_pair( h.msg, h.handler );
566  }

References XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t  subStream)

On write timeout.

Definition at line 1094 of file XrdClStream.cc.

1095  {
1096  return true;
1097  }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t  query,
AnyObject result 
)

Query the stream.

Definition at line 1216 of file XrdClStream.cc.

1217  {
1218  switch( query )
1219  {
1220  case StreamQuery::IpAddr:
1221  {
1222  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1223  return Status();
1224  }
1225 
1226  case StreamQuery::IpStack:
1227  {
1228  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1229  return Status();
1230  }
1231 
1232  case StreamQuery::HostName:
1233  {
1234  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1235  return Status();
1236  }
1237 
1238  default:
1239  return Status( stError, errQueryNotSupported );
1240  }
1241  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Referenced by XrdCl::Channel::QueryTransport().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler handler)

Register channel event handler.

Definition at line 1102 of file XrdClStream.cc.

1103  {
1104  pChannelEvHandlers.AddHandler( handler );
1105  }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

Referenced by XrdCl::Channel::RegisterEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler handler)

Remove a channel event handler.

Definition at line 1110 of file XrdClStream.cc.

1111  {
1112  pChannelEvHandlers.RemoveHandler( handler );
1113  }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

Referenced by XrdCl::Channel::RemoveEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message msg,
MsgHandler handler,
bool  stateful,
time_t  expires 
)

Queue the message for sending.

Definition at line 297 of file XrdClStream.cc.

301  {
302  XrdSysMutexHelper scopedLock( pMutex );
303  Log *log = DefaultEnv::GetLog();
304 
305  //--------------------------------------------------------------------------
306  // Check the session ID and bounce if needed
307  //--------------------------------------------------------------------------
308  if( msg->GetSessionId() &&
309  (pSubStreams[0]->status != Socket::Connected ||
310  pSessionId != msg->GetSessionId()) )
311  return XRootDStatus( stError, errInvalidSession );
312 
313  //--------------------------------------------------------------------------
314  // Decide on the path to send the message
315  //--------------------------------------------------------------------------
316  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317  if( pSubStreams.size() <= path.up )
318  {
319  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320  "substream %d, using 0 instead", pStreamName.c_str(),
321  msg->GetDescription().c_str(), path.up );
322  path.up = 0;
323  }
324 
325  log->Dump( PostMasterMsg, "[%s] Sending message %s (0x%x) through "
326  "substream %d expecting answer at %d", pStreamName.c_str(),
327  msg->GetDescription().c_str(), msg, path.up, path.down );
328 
329  //--------------------------------------------------------------------------
330  // Enable *a* path and insert the message to the right queue
331  //--------------------------------------------------------------------------
332  XRootDStatus st = EnableLink( path );
333  if( st.IsOK() )
334  {
335  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337  expires, stateful );
338  }
339  else
340  st.status = stFatal;
341  return st;
342  }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::Message::GetDescription(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Referenced by XrdCl::Channel::Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116  {
117  pChannelData = channelData;
118  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108  {
109  pIncomingQueue = incomingQueue;
110  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132  {
133  pJobManager = jobManager;
134  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > &  onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264  {
265  XrdSysMutexHelper scopedLock( pMutex );
266  pOnDataConnJob = onConnJob;
267  }

Referenced by XrdCl::Channel::SetOnDataConnectHandler().

+ Here is the caller graph for this function:

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100  {
101  pPoller = poller;
102  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124  {
125  pTaskManager = taskManager;
126  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92  {
93  pTransport = transport;
94  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ Tick()

void XrdCl::Stream::Tick ( time_t  now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 377 of file XrdClStream.cc.

378  {
379  //--------------------------------------------------------------------------
380  // Check for timed-out requests and incoming handlers
381  //--------------------------------------------------------------------------
382  pMutex.Lock();
383  OutQueue q;
384  SubStreamList::iterator it;
385  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386  q.GrabExpired( *(*it)->outQueue, now );
387  pMutex.UnLock();
388 
389  q.Report( XRootDStatus( stError, errOperationExpired ) );
390  pIncomingQueue->ReportTimeout( now );
391  }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdSysMutex::Lock(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdSysMutex::UnLock().

Referenced by XrdCl::Channel::Tick().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: