XRootD
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor. More...
 
 ~Stream ()
 Destructor. More...
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty. More...
 
void Disconnect (bool force=false)
 Disconnect the stream. More...
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection. More...
 
void ForceError (XRootDStatus status, bool hush=false)
 Force error. More...
 
const std::string & GetName () const
 Return stream name. More...
 
const URLGetURL () const
 Get the URL. More...
 
XRootDStatus Initialize ()
 Initializer. More...
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed. More...
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error. More...
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error. More...
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed. More...
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout. More...
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout. More...
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream. More...
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler. More...
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler. More...
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending. More...
 
void SetChannelData (AnyObject *channelData)
 Set the channel data. More...
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue. More...
 
void SetJobManager (JobManager *jobManager)
 Set job manager. More...
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams. More...
 
void SetPoller (Poller *poller)
 Set the poller. More...
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager. More...
 
void SetTransport (TransportHandler *transport)
 Set the transport. More...
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58  {
59  Disconnected = 0,
60  Connected = 1,
61  Connecting = 2,
62  Error = 3
63  };
@ Disconnected
Not connected.
Definition: XrdClStream.hh:59
@ Error
Broken.
Definition: XrdClStream.hh:62
@ Connected
Connected.
Definition: XrdClStream.hh:60
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:61

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL url,
const URL prefer = URL() 
)

Constructor.

Definition at line 96 of file XrdClStream.cc.

96  :
97  pUrl( url ),
98  pPrefer( prefer ),
99  pTransport( 0 ),
100  pPoller( 0 ),
101  pTaskManager( 0 ),
102  pJobManager( 0 ),
103  pIncomingQueue( 0 ),
104  pChannelData( 0 ),
105  pLastStreamError( 0 ),
106  pConnectionCount( 0 ),
107  pConnectionInitTime( 0 ),
108  pAddressType( Utils::IPAll ),
109  pSessionId( 0 ),
110  pBytesSent( 0 ),
111  pBytesReceived( 0 )
112  {
113  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115 
116  std::ostringstream o;
117  o << pUrl->GetHostId();
118  pStreamName = o.str();
119 
120  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126 
127  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129 
130  pAddressType = Utils::String2AddressType( netStack );
131  if( pAddressType == Utils::AddressType::IPAuto )
132  {
133  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
134  if( !( stacks & XrdNetUtils::hasIP64 ) )
135  {
136  if( stacks & XrdNetUtils::hasIPv4 )
137  pAddressType = Utils::AddressType::IPv4;
138  else if( stacks & XrdNetUtils::hasIPv6 )
139  pAddressType = Utils::AddressType::IPv6;
140  }
141  }
142 
143  Log *log = DefaultEnv::GetLog();
144  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146  "Window: %d", pStreamName.c_str(), netStack.c_str(),
147  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148  }
static Log * GetLog()
Get default log.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:123
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:104
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:681
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition: XrdConfig.cc:112

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::URL::GetHostId(), XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 153 of file XrdClStream.cc.

154  {
155  Disconnect( true );
156 
157  Log *log = DefaultEnv::GetLog();
158  log->Debug( PostMasterMsg, "[%s] Destroying stream",
159  pStreamName.c_str() );
160 
161  MonitorDisconnection( XRootDStatus() );
162 
163  SubStreamList::iterator it;
164  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165  delete *it;
166  }
void Disconnect(bool force=false)
Disconnect the stream.
Definition: XrdClStream.cc:363

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1173 of file XrdClStream.cc.

1174  {
1175  Log *log = DefaultEnv::GetLog();
1176 
1177  //--------------------------------------------------------------------------
1178  // Resolve all the addresses of the host we're supposed to connect to
1179  //--------------------------------------------------------------------------
1180  std::vector<XrdNetAddr> prefaddrs;
1181  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1182  if( !st.IsOK() )
1183  {
1184  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1185  , pStreamName.c_str(), url.GetHostName().c_str() );
1186  return false;
1187  }
1188 
1189  //--------------------------------------------------------------------------
1190  // Resolve all the addresses of the alias
1191  //--------------------------------------------------------------------------
1192  std::vector<XrdNetAddr> aliasaddrs;
1193  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1194  if( !st.IsOK() )
1195  {
1196  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1197  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1198  return false;
1199  }
1200 
1201  //--------------------------------------------------------------------------
1202  // Now check if the preferred host is part of the alias
1203  //--------------------------------------------------------------------------
1204  auto itr = prefaddrs.begin();
1205  for( ; itr != prefaddrs.end() ; ++itr )
1206  {
1207  auto itr2 = aliasaddrs.begin();
1208  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1209  if( itr->Same( &*itr2 ) ) return true;
1210  }
1211 
1212  return false;
1213  }
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:140

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::Channel::CanCollapse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t  subStream)

Disables respective uplink if empty.

Definition at line 568 of file XrdClStream.cc.

569  {
570  XrdSysMutexHelper scopedLock( pMutex );
571  Log *log = DefaultEnv::GetLog();
572 
573  if( pSubStreams[subStream]->outQueue->IsEmpty() )
574  {
575  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
576  pSubStreams[subStream]->socket->GetStreamName().c_str() );
577  pSubStreams[subStream]->socket->DisableUplink();
578  }
579  }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool  force = false)

Disconnect the stream.

Definition at line 363 of file XrdClStream.cc.

364  {
365  XrdSysMutexHelper scopedLock( pMutex );
366  SubStreamList::iterator it;
367  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368  {
369  (*it)->socket->Close();
370  (*it)->status = Socket::Disconnected;
371  }
372  }
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50

References XrdCl::Socket::Disconnected.

Referenced by ~Stream().

+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 187 of file XrdClStream.cc.

188  {
189  XrdSysMutexHelper scopedLock( pMutex );
190 
191  //--------------------------------------------------------------------------
192  // We are in the process of connecting the main stream, so we do nothing
193  // because when the main stream connection is established it will connect
194  // all the other streams
195  //--------------------------------------------------------------------------
196  if( pSubStreams[0]->status == Socket::Connecting )
197  return XRootDStatus();
198 
199  //--------------------------------------------------------------------------
200  // The main stream is connected, so we can verify whether we have
201  // the up and the down stream connected and ready to handle data.
202  // If anything is not right we fall back to stream 0.
203  //--------------------------------------------------------------------------
204  if( pSubStreams[0]->status == Socket::Connected )
205  {
206  if( pSubStreams[path.down]->status != Socket::Connected )
207  path.down = 0;
208 
209  if( pSubStreams[path.up]->status == Socket::Disconnected )
210  {
211  path.up = 0;
212  return pSubStreams[0]->socket->EnableUplink();
213  }
214 
215  if( pSubStreams[path.up]->status == Socket::Connected )
216  return pSubStreams[path.up]->socket->EnableUplink();
217 
218  return XRootDStatus();
219  }
220 
221  //--------------------------------------------------------------------------
222  // The main stream is not connected, we need to check whether enough time
223  // has passed since we last encountered an error (if any) so that we could
224  // re-attempt the connection
225  //--------------------------------------------------------------------------
226  Log *log = DefaultEnv::GetLog();
227  time_t now = ::time(0);
228 
229  if( now-pLastStreamError < pStreamErrorWindow )
230  return pLastFatalError;
231 
232  gettimeofday( &pConnectionStarted, 0 );
233  ++pConnectionCount;
234 
235  //--------------------------------------------------------------------------
236  // Resolve all the addresses of the host we're supposed to connect to
237  //--------------------------------------------------------------------------
238  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239  if( !st.IsOK() )
240  {
241  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242  "the host", pStreamName.c_str() );
243  pLastStreamError = now;
244  st.status = stFatal;
245  pLastFatalError = st;
246  return st;
247  }
248 
249  if( pPrefer.IsValid() )
250  {
251  std::vector<XrdNetAddr> addrresses;
252  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253  if( !st.IsOK() )
254  {
255  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257  }
258  else
259  {
260  std::vector<XrdNetAddr> tmp;
261  tmp.reserve( pAddresses.size() );
262  // first add all remaining addresses
263  auto itr = pAddresses.begin();
264  for( ; itr != pAddresses.end() ; ++itr )
265  {
266  if( !HasNetAddr( *itr, addrresses ) )
267  tmp.push_back( *itr );
268  }
269  // then copy all 'preferred' addresses
270  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271  // and keep the result
272  pAddresses.swap( tmp );
273  }
274  }
275 
277  pAddresses );
278 
279  while( !pAddresses.empty() )
280  {
281  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282  pAddresses.pop_back();
283  pConnectionInitTime = ::time( 0 );
284  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285  if( st.IsOK() )
286  {
287  pSubStreams[0]->status = Socket::Connecting;
288  break;
289  }
290  }
291  return st;
292  }
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:445
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:234
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostId(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 347 of file XrdClStream.cc.

348  {
349  XrdSysMutexHelper scopedLock( pMutex );
350  if( pSubStreams[0]->status == Socket::Connecting )
351  {
352  pSubStreams[0]->status = Socket::Disconnected;
353  XrdCl::PathID path( 0, 0 );
354  XrdCl::XRootDStatus st = EnableLink( path );
355  if( !st.IsOK() )
356  OnConnectError( 0, st );
357  }
358  }
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:187
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:709
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Referenced by XrdCl::Channel::ForceReconnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus  status,
bool  hush = false 
)

Force error.

Definition at line 913 of file XrdClStream.cc.

914  {
915  XrdSysMutexHelper scopedLock( pMutex );
916  Log *log = DefaultEnv::GetLog();
917  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
918  {
919  if( pSubStreams[substream]->status != Socket::Connected ) continue;
920  pSubStreams[substream]->socket->Close();
921  pSubStreams[substream]->status = Socket::Disconnected;
922 
923  if( !hush )
924  log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
925  pStreamName.c_str(), status.ToString().c_str() );
926 
927  //--------------------------------------------------------------------
928  // Reinsert the stuff that we have failed to sent
929  //--------------------------------------------------------------------
930  if( pSubStreams[substream]->outMsgHelper.msg )
931  {
932  OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
933  pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
934  h.stateful );
935  pSubStreams[substream]->outMsgHelper.Reset();
936  }
937 
938  //--------------------------------------------------------------------
939  // Reinsert the receiving handler and reset any partially read partial
940  //--------------------------------------------------------------------
941  if( pSubStreams[substream]->inMsgHelper.handler )
942  {
943  InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
944  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
945  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
946  if( xrdHandler ) xrdHandler->PartialReceived();
947  h.Reset();
948  }
949  }
950 
951  pConnectionCount = 0;
952 
953  //------------------------------------------------------------------------
954  // We're done here, unlock the stream mutex to avoid deadlocks and
955  // report the disconnection event to the handlers
956  //------------------------------------------------------------------------
957  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
958  "message handlers.", pStreamName.c_str() );
959 
960  SubStreamList::iterator it;
961  OutQueue q;
962  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
963  q.GrabItems( *(*it)->outQueue );
964  scopedLock.UnLock();
965 
966  q.Report( status );
967 
968  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
969  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
970  }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::Channel::ForceDisconnect(), and XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetName()

const std::string& XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171  {
172  return pStreamName;
173  }

◆ GetURL()

const URL* XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158  {
159  return pUrl;
160  }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 171 of file XrdClStream.cc.

172  {
173  if( !pTransport || !pPoller || !pChannelData )
174  return XRootDStatus( stError, errUninitialized );
175 
176  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177  pChannelData, 0, this );
178  pSubStreams.push_back( new SubStreamData() );
179  pSubStreams[0]->socket = s;
180  return XRootDStatus();
181  }
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32

References XrdCl::errUninitialized, and XrdCl::stError.

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t  stream,
MsgHandler *&  incHandler 
)

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1142 of file XrdClStream.cc.

1144  {
1145  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1146  if( !mh.handler )
1148 
1149  uint16_t action = mh.handler->InspectStatusRsp();
1150  mh.action |= action;
1151 
1152  if( action & MsgHandler::RemoveHandler )
1153  pIncomingQueue->RemoveMessageHandler( mh.handler );
1154 
1155  if( action & MsgHandler::Raw )
1156  {
1157  incHandler = mh.handler;
1158  return MsgHandler::Raw;
1159  }
1160 
1161  if( action & MsgHandler::Corrupted )
1162  return MsgHandler::Corrupted;
1163 
1164  if( action & MsgHandler::More )
1165  return MsgHandler::More;
1166 
1167  return MsgHandler::None;
1168  }
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > &  msg,
uint16_t  stream 
)

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1121 of file XrdClStream.cc.

1122  {
1123  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1124  if( !mh.handler )
1125  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1126  mh.expires,
1127  mh.action );
1128 
1129  if( !mh.handler )
1130  return nullptr;
1131 
1132  if( mh.action & MsgHandler::Raw )
1133  return mh.handler;
1134  return nullptr;
1135  }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t  subStream)

Call back when a message has been reconstructed.

Definition at line 610 of file XrdClStream.cc.

611  {
612  XrdSysMutexHelper scopedLock( pMutex );
613  pSubStreams[subStream]->status = Socket::Connected;
614 
615  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
616  Log *log = DefaultEnv::GetLog();
617  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
618  subStream, ipstack.c_str() );
619 
620  if( subStream == 0 )
621  {
622  pLastStreamError = 0;
623  pLastFatalError = XRootDStatus();
624  pConnectionCount = 0;
625  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
626  pSessionId = ++sSessCntGen;
627 
628  //------------------------------------------------------------------------
629  // Create the streams if they don't exist yet
630  //------------------------------------------------------------------------
631  if( pSubStreams.size() == 1 && numSub > 1 )
632  {
633  for( uint16_t i = 1; i < numSub; ++i )
634  {
635  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
636  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
637  pChannelData, i, this );
638  pSubStreams.push_back( new SubStreamData() );
639  pSubStreams[i]->socket = s;
640  }
641  }
642 
643  //------------------------------------------------------------------------
644  // Connect the extra streams, if we fail we move all the outgoing items
645  // to stream 0, we don't need to enable the uplink here, because it
646  // should be already enabled after the handshaking process is completed.
647  //------------------------------------------------------------------------
648  if( pSubStreams.size() > 1 )
649  {
650  log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
651  pStreamName.c_str(), pSubStreams.size() - 1 );
652  for( size_t i = 1; i < pSubStreams.size(); ++i )
653  {
654  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
655  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
656  if( !st.IsOK() )
657  {
658  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
659  pSubStreams[i]->socket->Close();
660  }
661  else
662  {
663  pSubStreams[i]->status = Socket::Connecting;
664  }
665  }
666  }
667 
668  //------------------------------------------------------------------------
669  // Inform monitoring
670  //------------------------------------------------------------------------
671  pBytesSent = 0;
672  pBytesReceived = 0;
673  gettimeofday( &pConnectionDone, 0 );
674  Monitor *mon = DefaultEnv::GetMonitor();
675  if( mon )
676  {
677  Monitor::ConnectInfo i;
678  i.server = pUrl->GetHostId();
679  i.sTOD = pConnectionStarted;
680  i.eTOD = pConnectionDone;
681  i.streams = pSubStreams.size();
682 
683  AnyObject qryResult;
684  std::string *qryResponse = 0;
685  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
686  qryResult.Get( qryResponse );
687  i.auth = *qryResponse;
688  delete qryResponse;
689  mon->Event( Monitor::EvConnect, &i );
690  }
691 
692  //------------------------------------------------------------------------
693  // For every connected control-stream call the global on-connect handler
694  //------------------------------------------------------------------------
696  }
697  else if( pOnDataConnJob )
698  {
699  //------------------------------------------------------------------------
700  // For every connected data-stream call the on-connect handler
701  //------------------------------------------------------------------------
702  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
703  }
704  }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::Monitor::ConnectInfo::auth, XrdCl::TransportQuery::Auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdCl::TransportHandler::SubStreamNumber().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t  subStream,
XRootDStatus  status 
)

On connect error.

Definition at line 709 of file XrdClStream.cc.

710  {
711  XrdSysMutexHelper scopedLock( pMutex );
712  Log *log = DefaultEnv::GetLog();
713  pSubStreams[subStream]->socket->Close();
714  time_t now = ::time(0);
715 
716  //--------------------------------------------------------------------------
717  // For every connection error call the global connection error handler
718  //--------------------------------------------------------------------------
720 
721  //--------------------------------------------------------------------------
722  // If we connected subStream == 0 and cannot connect >0 then we just give
723  // up and move the outgoing messages to another queue
724  //--------------------------------------------------------------------------
725  if( subStream > 0 )
726  {
727  pSubStreams[subStream]->status = Socket::Disconnected;
728  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
729  if( pSubStreams[0]->status == Socket::Connected )
730  {
731  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
732  if( !st.IsOK() )
733  OnFatalError( 0, st, scopedLock );
734  return;
735  }
736 
737  if( pSubStreams[0]->status == Socket::Connecting )
738  return;
739 
740  OnFatalError( subStream, status, scopedLock );
741  return;
742  }
743 
744  //--------------------------------------------------------------------------
745  // Check if we still have time to try and do something in the current window
746  //--------------------------------------------------------------------------
747  time_t elapsed = now-pConnectionInitTime;
748  log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
749  pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
750 
751  //------------------------------------------------------------------------
752  // If we have some IP addresses left we try them
753  //------------------------------------------------------------------------
754  if( !pAddresses.empty() )
755  {
756  XRootDStatus st;
757  do
758  {
759  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
760  pAddresses.pop_back();
761  pConnectionInitTime = ::time( 0 );
762  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
763  }
764  while( !pAddresses.empty() && !st.IsOK() );
765 
766  if( !st.IsOK() )
767  OnFatalError( subStream, st, scopedLock );
768 
769  return;
770  }
771  //------------------------------------------------------------------------
772  // If we still can retry with the same host name, we sleep until the end
773  // of the connection window and try
774  //------------------------------------------------------------------------
775  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
776  && !status.IsFatal() )
777  {
778  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
779  pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
780 
781  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
782  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
783  return;
784  }
785  //--------------------------------------------------------------------------
786  // We are out of the connection window, the only thing we can do here
787  // is re-resolving the host name and retrying if we still can
788  //--------------------------------------------------------------------------
789  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
790  {
791  pAddresses.clear();
792  pSubStreams[0]->status = Socket::Disconnected;
793  PathID path( 0, 0 );
794  XRootDStatus st = EnableLink( path );
795  if( !st.IsOK() )
796  OnFatalError( subStream, st, scopedLock );
797  return;
798  }
799 
800  //--------------------------------------------------------------------------
801  // Else, we fail
802  //--------------------------------------------------------------------------
803  OnFatalError( subStream, status, scopedLock );
804  }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), XrdCl::PostMasterMsg, and XrdCl::TaskManager::RegisterTask().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t  subStream,
XRootDStatus  status 
)

On error.

Definition at line 809 of file XrdClStream.cc.

810  {
811  XrdSysMutexHelper scopedLock( pMutex );
812  Log *log = DefaultEnv::GetLog();
813  pSubStreams[subStream]->socket->Close();
814  pSubStreams[subStream]->status = Socket::Disconnected;
815 
816  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
817  pStreamName.c_str(), subStream, status.ToString().c_str() );
818 
819  //--------------------------------------------------------------------------
820  // Reinsert the stuff that we have failed to sent
821  //--------------------------------------------------------------------------
822  if( pSubStreams[subStream]->outMsgHelper.msg )
823  {
824  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
825  pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
826  h.stateful );
827  pSubStreams[subStream]->outMsgHelper.Reset();
828  }
829 
830  //--------------------------------------------------------------------------
831  // Reinsert the receiving handler and reset any partially read partial
832  //--------------------------------------------------------------------------
833  if( pSubStreams[subStream]->inMsgHelper.handler )
834  {
835  InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
836  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
837  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
838  if( xrdHandler ) xrdHandler->PartialReceived();
839  h.Reset();
840  }
841 
842  //--------------------------------------------------------------------------
843  // We are dealing with an error of a peripheral stream. If we don't have
844  // anything to send don't bother recovering. Otherwise move the requests
845  // to stream 0 if possible.
846  //--------------------------------------------------------------------------
847  if( subStream > 0 )
848  {
849  if( pSubStreams[subStream]->outQueue->IsEmpty() )
850  return;
851 
852  if( pSubStreams[0]->status != Socket::Disconnected )
853  {
854  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
855  if( pSubStreams[0]->status == Socket::Connected )
856  {
857  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
858  if( !st.IsOK() )
859  OnFatalError( 0, st, scopedLock );
860  return;
861  }
862  }
863  OnFatalError( subStream, status, scopedLock );
864  return;
865  }
866 
867  //--------------------------------------------------------------------------
868  // If we lost the stream 0 we have lost the session, we re-enable the
869  // stream if we still have things in one of the outgoing queues, otherwise
870  // there is not point to recover at this point.
871  //--------------------------------------------------------------------------
872  if( subStream == 0 )
873  {
874  MonitorDisconnection( status );
875 
876  SubStreamList::iterator it;
877  size_t outstanding = 0;
878  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
879  outstanding += (*it)->outQueue->GetSizeStateless();
880 
881  if( outstanding )
882  {
883  PathID path( 0, 0 );
884  XRootDStatus st = EnableLink( path );
885  if( !st.IsOK() )
886  {
887  OnFatalError( 0, st, scopedLock );
888  return;
889  }
890  }
891 
892  //------------------------------------------------------------------------
893  // We're done here, unlock the stream mutex to avoid deadlocks and
894  // report the disconnection event to the handlers
895  //------------------------------------------------------------------------
896  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
897  "message handlers.", pStreamName.c_str() );
898  OutQueue q;
899  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
900  q.GrabStateful( *(*it)->outQueue );
901  scopedLock.UnLock();
902 
903  q.Report( status );
904  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
905  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
906  return;
907  }
908  }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::Status::IsOK(), XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t  subStream,
std::shared_ptr< Message msg,
uint32_t  bytesReceived 
)

Call back when a message has been reconstructed.

Definition at line 471 of file XrdClStream.cc.

474  {
475  msg->SetSessionId( pSessionId );
476  pBytesReceived += bytesReceived;
477 
478  MsgHandler *handler = nullptr;
479  uint16_t action = 0;
480  {
481  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482  handler = mh.handler;
483  action = mh.action;
484  mh.Reset();
485  }
486 
487  if( !IsPartial( *msg ) )
488  {
489  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490  *pChannelData );
491  if( streamAction & TransportHandler::DigestMsg )
492  return;
493 
494  if( streamAction & TransportHandler::RequestClose )
495  {
496  RequestClose( *msg );
497  return;
498  }
499  }
500 
501  Log *log = DefaultEnv::GetLog();
502 
503  //--------------------------------------------------------------------------
504  // No handler, we discard the message ...
505  //--------------------------------------------------------------------------
506  if( !handler )
507  {
508  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509  log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
510  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511  pStreamName.c_str(), msg.get(), rsp->hdr.status,
512  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513  return;
514  }
515 
516  //--------------------------------------------------------------------------
517  // We have a handler, so we call the callback
518  //--------------------------------------------------------------------------
519  log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
520  pStreamName.c_str(), msg.get() );
521 
523  {
524  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
525  pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526 
527  // if we are handling partial response we have to take down the timeout fence
528  if( IsPartial( *msg ) )
529  {
530  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531  if( xrdHandler ) xrdHandler->PartialReceived();
532  }
533 
534  return;
535  }
536 
537  Job *job = new HandleIncMsgJob( handler );
538  pJobManager->QueueJob( job );
539  }
kXR_char streamid[2]
Definition: XProtocol.hh:914
ServerResponseHeader hdr
Definition: XProtocol.hh:1287
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t  subStream,
Message msg,
uint32_t  bytesSent 
)

Definition at line 584 of file XrdClStream.cc.

587  {
588  pTransport->MessageSent( msg, subStream, bytesSent,
589  *pChannelData );
590  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
591  pBytesSent += bytesSent;
592  if( h.handler )
593  {
594  h.handler->OnStatusReady( msg, XRootDStatus() );
595  bool rmMsg = false;
596  pIncomingQueue->AddMessageHandler( h.handler, h.handler->GetExpiration(), rmMsg );
597  if( rmMsg )
598  {
599  Log *log = DefaultEnv::GetLog();
600  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
601  pStreamName.c_str() );
602  }
603  }
604  pSubStreams[subStream]->outMsgHelper.Reset();
605  }
void AddMessageHandler(MsgHandler *handler, time_t expires, bool &rmMsg)
Definition: XrdClInQueue.cc:54
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AddMessageHandler(), XrdCl::MsgHandler::GetExpiration(), XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), XrdCl::MsgHandler::OnStatusReady(), XrdCl::PostMasterMsg, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t  subStream)

On read timeout.

Definition at line 1029 of file XrdClStream.cc.

1030  {
1031  //--------------------------------------------------------------------------
1032  // We only take the main stream into account
1033  //--------------------------------------------------------------------------
1034  if( substream != 0 )
1035  return true;
1036 
1037  //--------------------------------------------------------------------------
1038  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1039  // It is assumed that the underlying transport makes sure that there is no
1040  // pending requests that are not answered, ie. all possible virtual streams
1041  // are de-allocated
1042  //--------------------------------------------------------------------------
1043  Log *log = DefaultEnv::GetLog();
1044  SubStreamList::iterator it;
1045  time_t now = time(0);
1046 
1047  XrdSysMutexHelper scopedLock( pMutex );
1048  uint32_t outgoingMessages = 0;
1049  time_t lastActivity = 0;
1050  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1051  {
1052  outgoingMessages += (*it)->outQueue->GetSize();
1053  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1054  if( lastActivity < sockLastActivity )
1055  lastActivity = sockLastActivity;
1056  }
1057 
1058  if( !outgoingMessages )
1059  {
1060  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1061  *pChannelData );
1062  if( disconnect )
1063  {
1064  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1065  pStreamName.c_str() );
1066  scopedLock.UnLock();
1067  //----------------------------------------------------------------------
1068  // Important note!
1069  //
1070  // This destroys the Stream object itself, the underlined
1071  // AsyncSocketHandler object (that called this method) and the Channel
1072  // object that aggregates this Stream.
1073  //----------------------------------------------------------------------
1075  return false;
1076  }
1077  }
1078 
1079  //--------------------------------------------------------------------------
1080  // Check if the stream is broken
1081  //--------------------------------------------------------------------------
1082  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1083  *pChannelData );
1084  if( !st.IsOK() )
1085  {
1086  scopedLock.UnLock();
1087  OnError( substream, st );
1088  return false;
1089  }
1090  return true;
1091  }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClStream.cc:809
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnError(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t  subStream)

Definition at line 545 of file XrdClStream.cc.

546  {
547  XrdSysMutexHelper scopedLock( pMutex );
548  Log *log = DefaultEnv::GetLog();
549  if( pSubStreams[subStream]->outQueue->IsEmpty() )
550  {
551  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552  pSubStreams[subStream]->socket->GetStreamName().c_str() );
553 
554  pSubStreams[subStream]->socket->DisableUplink();
555  return std::make_pair( (Message *)0, (MsgHandler *)0 );
556  }
557 
558  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560  h.expires,
561  h.stateful );
562  scopedLock.UnLock();
563  if( h.handler )
564  h.handler->OnReadyToSend( h.msg );
565  return std::make_pair( h.msg, h.handler );
566  }

References XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t  subStream)

On write timeout.

Definition at line 1096 of file XrdClStream.cc.

1097  {
1098  return true;
1099  }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t  query,
AnyObject result 
)

Query the stream.

Definition at line 1218 of file XrdClStream.cc.

1219  {
1220  switch( query )
1221  {
1222  case StreamQuery::IpAddr:
1223  {
1224  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1225  return Status();
1226  }
1227 
1228  case StreamQuery::IpStack:
1229  {
1230  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1231  return Status();
1232  }
1233 
1234  case StreamQuery::HostName:
1235  {
1236  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1237  return Status();
1238  }
1239 
1240  default:
1241  return Status( stError, errQueryNotSupported );
1242  }
1243  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Referenced by XrdCl::Channel::QueryTransport().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler handler)

Register channel event handler.

Definition at line 1104 of file XrdClStream.cc.

1105  {
1106  pChannelEvHandlers.AddHandler( handler );
1107  }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

Referenced by XrdCl::Channel::RegisterEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler handler)

Remove a channel event handler.

Definition at line 1112 of file XrdClStream.cc.

1113  {
1114  pChannelEvHandlers.RemoveHandler( handler );
1115  }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

Referenced by XrdCl::Channel::RemoveEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message msg,
MsgHandler handler,
bool  stateful,
time_t  expires 
)

Queue the message for sending.

Definition at line 297 of file XrdClStream.cc.

301  {
302  XrdSysMutexHelper scopedLock( pMutex );
303  Log *log = DefaultEnv::GetLog();
304 
305  //--------------------------------------------------------------------------
306  // Check the session ID and bounce if needed
307  //--------------------------------------------------------------------------
308  if( msg->GetSessionId() &&
309  (pSubStreams[0]->status != Socket::Connected ||
310  pSessionId != msg->GetSessionId()) )
311  return XRootDStatus( stError, errInvalidSession );
312 
313  //--------------------------------------------------------------------------
314  // Decide on the path to send the message
315  //--------------------------------------------------------------------------
316  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317  if( pSubStreams.size() <= path.up )
318  {
319  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320  "substream %d, using 0 instead", pStreamName.c_str(),
321  msg->GetObfuscatedDescription().c_str(), path.up );
322  path.up = 0;
323  }
324 
325  log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
326  "substream %d expecting answer at %d", pStreamName.c_str(),
327  msg->GetObfuscatedDescription().c_str(), msg, path.up, path.down );
328 
329  //--------------------------------------------------------------------------
330  // Enable *a* path and insert the message to the right queue
331  //--------------------------------------------------------------------------
332  XRootDStatus st = EnableLink( path );
333  if( st.IsOK() )
334  {
335  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337  expires, stateful );
338  }
339  else
340  st.status = stFatal;
341  return st;
342  }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Referenced by XrdCl::Channel::Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116  {
117  pChannelData = channelData;
118  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108  {
109  pIncomingQueue = incomingQueue;
110  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132  {
133  pJobManager = jobManager;
134  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > &  onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264  {
265  XrdSysMutexHelper scopedLock( pMutex );
266  pOnDataConnJob = onConnJob;
267  }

Referenced by XrdCl::Channel::SetOnDataConnectHandler().

+ Here is the caller graph for this function:

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100  {
101  pPoller = poller;
102  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124  {
125  pTaskManager = taskManager;
126  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92  {
93  pTransport = transport;
94  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ Tick()

void XrdCl::Stream::Tick ( time_t  now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 377 of file XrdClStream.cc.

378  {
379  //--------------------------------------------------------------------------
380  // Check for timed-out requests and incoming handlers
381  //--------------------------------------------------------------------------
382  pMutex.Lock();
383  OutQueue q;
384  SubStreamList::iterator it;
385  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386  q.GrabExpired( *(*it)->outQueue, now );
387  pMutex.UnLock();
388 
389  q.Report( XRootDStatus( stError, errOperationExpired ) );
390  pIncomingQueue->ReportTimeout( now );
391  }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdSysMutex::Lock(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdSysMutex::UnLock().

Referenced by XrdCl::Channel::Tick().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: