XRootD
XrdClStream.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdCl/XrdClStream.hh"
26 #include "XrdCl/XrdClSocket.hh"
27 #include "XrdCl/XrdClChannel.hh"
28 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClLog.hh"
30 #include "XrdCl/XrdClMessage.hh"
31 #include "XrdCl/XrdClDefaultEnv.hh"
32 #include "XrdCl/XrdClUtils.hh"
33 #include "XrdCl/XrdClOutQueue.hh"
34 #include "XrdCl/XrdClMonitor.hh"
39 
40 #include <sys/types.h>
41 #include <algorithm>
42 #include <sys/socket.h>
43 #include <sys/time.h>
44 
45 namespace XrdCl
46 {
47  //----------------------------------------------------------------------------
48  // Statics
49  //----------------------------------------------------------------------------
50  RAtomic_uint64_t Stream::sSessCntGen{0};
51 
52  //----------------------------------------------------------------------------
53  // Incoming message helper
54  //----------------------------------------------------------------------------
56  {
57  InMessageHelper( Message *message = 0,
58  MsgHandler *hndlr = 0,
59  time_t expir = 0,
60  uint16_t actio = 0 ):
61  msg( message ), handler( hndlr ), expires( expir ), action( actio ) {}
62  void Reset()
63  {
64  msg = 0; handler = 0; expires = 0; action = 0;
65  }
68  time_t expires;
69  uint16_t action;
70  };
71 
72  //----------------------------------------------------------------------------
73  // Sub stream helper
74  //----------------------------------------------------------------------------
76  {
77  SubStreamData(): socket( 0 ), status( Socket::Disconnected )
78  {
79  outQueue = new OutQueue();
80  }
82  {
83  delete socket;
84  delete outQueue;
85  }
91  };
92 
93  //----------------------------------------------------------------------------
94  // Constructor
95  //----------------------------------------------------------------------------
96  Stream::Stream( const URL *url, const URL &prefer ):
97  pUrl( url ),
98  pPrefer( prefer ),
99  pTransport( 0 ),
100  pPoller( 0 ),
101  pTaskManager( 0 ),
102  pJobManager( 0 ),
103  pIncomingQueue( 0 ),
104  pChannelData( 0 ),
105  pLastStreamError( 0 ),
106  pConnectionCount( 0 ),
107  pConnectionInitTime( 0 ),
108  pAddressType( Utils::IPAll ),
109  pSessionId( 0 ),
110  pBytesSent( 0 ),
111  pBytesReceived( 0 )
112  {
113  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115 
116  std::ostringstream o;
117  o << pUrl->GetHostId();
118  pStreamName = o.str();
119 
120  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126 
127  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129 
130  pAddressType = Utils::String2AddressType( netStack );
131  if( pAddressType == Utils::AddressType::IPAuto )
132  {
133  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
134  if( !( stacks & XrdNetUtils::hasIP64 ) )
135  {
136  if( stacks & XrdNetUtils::hasIPv4 )
137  pAddressType = Utils::AddressType::IPv4;
138  else if( stacks & XrdNetUtils::hasIPv6 )
139  pAddressType = Utils::AddressType::IPv6;
140  }
141  }
142 
143  Log *log = DefaultEnv::GetLog();
144  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146  "Window: %d", pStreamName.c_str(), netStack.c_str(),
147  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148  }
149 
150  //----------------------------------------------------------------------------
151  // Destructor
152  //----------------------------------------------------------------------------
154  {
155  Disconnect( true );
156 
157  Log *log = DefaultEnv::GetLog();
158  log->Debug( PostMasterMsg, "[%s] Destroying stream",
159  pStreamName.c_str() );
160 
161  MonitorDisconnection( XRootDStatus() );
162 
163  SubStreamList::iterator it;
164  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165  delete *it;
166  }
167 
168  //----------------------------------------------------------------------------
169  // Initializer
170  //----------------------------------------------------------------------------
172  {
173  if( !pTransport || !pPoller || !pChannelData )
175 
176  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177  pChannelData, 0, this );
178  pSubStreams.push_back( new SubStreamData() );
179  pSubStreams[0]->socket = s;
180  return XRootDStatus();
181  }
182 
183  //------------------------------------------------------------------------
184  // Make sure that the underlying socket handler gets write readiness
185  // events
186  //------------------------------------------------------------------------
188  {
189  XrdSysMutexHelper scopedLock( pMutex );
190 
191  //--------------------------------------------------------------------------
192  // We are in the process of connecting the main stream, so we do nothing
193  // because when the main stream connection is established it will connect
194  // all the other streams
195  //--------------------------------------------------------------------------
196  if( pSubStreams[0]->status == Socket::Connecting )
197  return XRootDStatus();
198 
199  //--------------------------------------------------------------------------
200  // The main stream is connected, so we can verify whether we have
201  // the up and the down stream connected and ready to handle data.
202  // If anything is not right we fall back to stream 0.
203  //--------------------------------------------------------------------------
204  if( pSubStreams[0]->status == Socket::Connected )
205  {
206  if( pSubStreams[path.down]->status != Socket::Connected )
207  path.down = 0;
208 
209  if( pSubStreams[path.up]->status == Socket::Disconnected )
210  {
211  path.up = 0;
212  return pSubStreams[0]->socket->EnableUplink();
213  }
214 
215  if( pSubStreams[path.up]->status == Socket::Connected )
216  return pSubStreams[path.up]->socket->EnableUplink();
217 
218  return XRootDStatus();
219  }
220 
221  //--------------------------------------------------------------------------
222  // The main stream is not connected, we need to check whether enough time
223  // has passed since we last encountered an error (if any) so that we could
224  // re-attempt the connection
225  //--------------------------------------------------------------------------
226  Log *log = DefaultEnv::GetLog();
227  time_t now = ::time(0);
228 
229  if( now-pLastStreamError < pStreamErrorWindow )
230  return pLastFatalError;
231 
232  gettimeofday( &pConnectionStarted, 0 );
233  ++pConnectionCount;
234 
235  //--------------------------------------------------------------------------
236  // Resolve all the addresses of the host we're supposed to connect to
237  //--------------------------------------------------------------------------
238  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239  if( !st.IsOK() )
240  {
241  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242  "the host", pStreamName.c_str() );
243  pLastStreamError = now;
244  st.status = stFatal;
245  pLastFatalError = st;
246  return st;
247  }
248 
249  if( pPrefer.IsValid() )
250  {
251  std::vector<XrdNetAddr> addrresses;
252  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253  if( !st.IsOK() )
254  {
255  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257  }
258  else
259  {
260  std::vector<XrdNetAddr> tmp;
261  tmp.reserve( pAddresses.size() );
262  // first add all remaining addresses
263  auto itr = pAddresses.begin();
264  for( ; itr != pAddresses.end() ; ++itr )
265  {
266  if( !HasNetAddr( *itr, addrresses ) )
267  tmp.push_back( *itr );
268  }
269  // then copy all 'preferred' addresses
270  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271  // and keep the result
272  pAddresses.swap( tmp );
273  }
274  }
275 
277  pAddresses );
278 
279  while( !pAddresses.empty() )
280  {
281  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282  pAddresses.pop_back();
283  pConnectionInitTime = ::time( 0 );
284  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285  if( st.IsOK() )
286  {
287  pSubStreams[0]->status = Socket::Connecting;
288  break;
289  }
290  }
291  return st;
292  }
293 
294  //----------------------------------------------------------------------------
295  // Queue the message for sending
296  //----------------------------------------------------------------------------
298  MsgHandler *handler,
299  bool stateful,
300  time_t expires )
301  {
302  XrdSysMutexHelper scopedLock( pMutex );
303  Log *log = DefaultEnv::GetLog();
304 
305  //--------------------------------------------------------------------------
306  // Check the session ID and bounce if needed
307  //--------------------------------------------------------------------------
308  if( msg->GetSessionId() &&
309  (pSubStreams[0]->status != Socket::Connected ||
310  pSessionId != msg->GetSessionId()) )
312 
313  //--------------------------------------------------------------------------
314  // Decide on the path to send the message
315  //--------------------------------------------------------------------------
316  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317  if( pSubStreams.size() <= path.up )
318  {
319  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320  "substream %d, using 0 instead", pStreamName.c_str(),
321  msg->GetObfuscatedDescription().c_str(), path.up );
322  path.up = 0;
323  }
324 
325  log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
326  "substream %d expecting answer at %d", pStreamName.c_str(),
327  msg->GetObfuscatedDescription().c_str(), msg, path.up, path.down );
328 
329  //--------------------------------------------------------------------------
330  // Enable *a* path and insert the message to the right queue
331  //--------------------------------------------------------------------------
332  XRootDStatus st = EnableLink( path );
333  if( st.IsOK() )
334  {
335  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337  expires, stateful );
338  }
339  else
340  st.status = stFatal;
341  return st;
342  }
343 
344  //----------------------------------------------------------------------------
345  // Force connection
346  //----------------------------------------------------------------------------
348  {
349  XrdSysMutexHelper scopedLock( pMutex );
350  if( pSubStreams[0]->status == Socket::Connecting )
351  {
352  pSubStreams[0]->status = Socket::Disconnected;
353  XrdCl::PathID path( 0, 0 );
354  XrdCl::XRootDStatus st = EnableLink( path );
355  if( !st.IsOK() )
356  OnConnectError( 0, st );
357  }
358  }
359 
360  //----------------------------------------------------------------------------
361  // Disconnect the stream
362  //----------------------------------------------------------------------------
363  void Stream::Disconnect( bool /*force*/ )
364  {
365  XrdSysMutexHelper scopedLock( pMutex );
366  SubStreamList::iterator it;
367  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368  {
369  (*it)->socket->Close();
370  (*it)->status = Socket::Disconnected;
371  }
372  }
373 
374  //----------------------------------------------------------------------------
375  // Handle a clock event
376  //----------------------------------------------------------------------------
377  void Stream::Tick( time_t now )
378  {
379  //--------------------------------------------------------------------------
380  // Check for timed-out requests and incoming handlers
381  //--------------------------------------------------------------------------
382  pMutex.Lock();
383  OutQueue q;
384  SubStreamList::iterator it;
385  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386  q.GrabExpired( *(*it)->outQueue, now );
387  pMutex.UnLock();
388 
390  pIncomingQueue->ReportTimeout( now );
391  }
392 }
393 
394 //------------------------------------------------------------------------------
395 // Handle message timeouts and reconnection in the future
396 //------------------------------------------------------------------------------
397 namespace
398 {
399  class StreamConnectorTask: public XrdCl::Task
400  {
401  public:
402  //------------------------------------------------------------------------
403  // Constructor
404  //------------------------------------------------------------------------
405  StreamConnectorTask( const XrdCl::URL &url, const std::string &n ):
406  url( url )
407  {
408  std::string name = "StreamConnectorTask for ";
409  name += n;
410  SetName( name );
411  }
412 
413  //------------------------------------------------------------------------
414  // Run the task
415  //------------------------------------------------------------------------
416  time_t Run( time_t )
417  {
419  return 0;
420  }
421 
422  private:
423  XrdCl::URL url;
424  };
425 }
426 
427 namespace XrdCl
428 {
429  XRootDStatus Stream::RequestClose( Message &response )
430  {
431  ServerResponse *rsp = reinterpret_cast<ServerResponse*>( response.GetBuffer() );
432  if( rsp->hdr.dlen < 4 ) return XRootDStatus( stError );
433  Message *msg;
434  ClientCloseRequest *req;
435  MessageUtils::CreateRequest( msg, req );
436  req->requestid = kXR_close;
437  memcpy( req->fhandle, reinterpret_cast<uint8_t*>( rsp->body.buffer.data ), 4 );
439  msg->SetSessionId( pSessionId );
440  NullResponseHandler *handler = new NullResponseHandler();
441  MessageSendParams params;
442  params.timeout = 0;
443  params.followRedirects = false;
444  params.stateful = true;
446  return MessageUtils::SendMessage( *pUrl, msg, handler, params, 0 );
447  }
448 
449  //------------------------------------------------------------------------
450  // Check if message is a partial response
451  //------------------------------------------------------------------------
452  bool Stream::IsPartial( Message &msg )
453  {
454  ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
455  if( rsphdr->status == kXR_oksofar )
456  return true;
457 
458  if( rsphdr->status == kXR_status )
459  {
460  ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
462  return true;
463  }
464 
465  return false;
466  }
467 
468  //----------------------------------------------------------------------------
469  // Call back when a message has been reconstructed
470  //----------------------------------------------------------------------------
471  void Stream::OnIncoming( uint16_t subStream,
472  std::shared_ptr<Message> msg,
473  uint32_t bytesReceived )
474  {
475  msg->SetSessionId( pSessionId );
476  pBytesReceived += bytesReceived;
477 
478  MsgHandler *handler = nullptr;
479  uint16_t action = 0;
480  {
481  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482  handler = mh.handler;
483  action = mh.action;
484  mh.Reset();
485  }
486 
487  if( !IsPartial( *msg ) )
488  {
489  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490  *pChannelData );
491  if( streamAction & TransportHandler::DigestMsg )
492  return;
493 
494  if( streamAction & TransportHandler::RequestClose )
495  {
496  RequestClose( *msg );
497  return;
498  }
499  }
500 
501  Log *log = DefaultEnv::GetLog();
502 
503  //--------------------------------------------------------------------------
504  // No handler, we discard the message ...
505  //--------------------------------------------------------------------------
506  if( !handler )
507  {
508  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509  log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
510  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511  pStreamName.c_str(), msg.get(), rsp->hdr.status,
512  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513  return;
514  }
515 
516  //--------------------------------------------------------------------------
517  // We have a handler, so we call the callback
518  //--------------------------------------------------------------------------
519  log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
520  pStreamName.c_str(), msg.get() );
521 
523  {
524  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
525  pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526 
527  // if we are handling partial response we have to take down the timeout fence
528  if( IsPartial( *msg ) )
529  {
530  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531  if( xrdHandler ) xrdHandler->PartialReceived();
532  }
533 
534  return;
535  }
536 
537  Job *job = new HandleIncMsgJob( handler );
538  pJobManager->QueueJob( job );
539  }
540 
541  //----------------------------------------------------------------------------
542  // Call when one of the sockets is ready to accept a new message
543  //----------------------------------------------------------------------------
544  std::pair<Message *, MsgHandler *>
545  Stream::OnReadyToWrite( uint16_t subStream )
546  {
547  XrdSysMutexHelper scopedLock( pMutex );
548  Log *log = DefaultEnv::GetLog();
549  if( pSubStreams[subStream]->outQueue->IsEmpty() )
550  {
551  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552  pSubStreams[subStream]->socket->GetStreamName().c_str() );
553 
554  pSubStreams[subStream]->socket->DisableUplink();
555  return std::make_pair( (Message *)0, (MsgHandler *)0 );
556  }
557 
558  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560  h.expires,
561  h.stateful );
562  scopedLock.UnLock();
563  if( h.handler )
564  h.handler->OnReadyToSend( h.msg );
565  return std::make_pair( h.msg, h.handler );
566  }
567 
568  void Stream::DisableIfEmpty( uint16_t subStream )
569  {
570  XrdSysMutexHelper scopedLock( pMutex );
571  Log *log = DefaultEnv::GetLog();
572 
573  if( pSubStreams[subStream]->outQueue->IsEmpty() )
574  {
575  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
576  pSubStreams[subStream]->socket->GetStreamName().c_str() );
577  pSubStreams[subStream]->socket->DisableUplink();
578  }
579  }
580 
581  //----------------------------------------------------------------------------
582  // Call when a message is written to the socket
583  //----------------------------------------------------------------------------
584  void Stream::OnMessageSent( uint16_t subStream,
585  Message *msg,
586  uint32_t bytesSent )
587  {
588  pTransport->MessageSent( msg, subStream, bytesSent,
589  *pChannelData );
590  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
591  pBytesSent += bytesSent;
592  if( h.handler )
593  {
594  h.handler->OnStatusReady( msg, XRootDStatus() );
595  bool rmMsg = false;
596  pIncomingQueue->AddMessageHandler( h.handler, h.handler->GetExpiration(), rmMsg );
597  if( rmMsg )
598  {
599  Log *log = DefaultEnv::GetLog();
600  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
601  pStreamName.c_str() );
602  }
603  }
604  pSubStreams[subStream]->outMsgHelper.Reset();
605  }
606 
607  //----------------------------------------------------------------------------
608  // Call back when a message has been reconstructed
609  //----------------------------------------------------------------------------
610  void Stream::OnConnect( uint16_t subStream )
611  {
612  XrdSysMutexHelper scopedLock( pMutex );
613  pSubStreams[subStream]->status = Socket::Connected;
614 
615  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
616  Log *log = DefaultEnv::GetLog();
617  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
618  subStream, ipstack.c_str() );
619 
620  if( subStream == 0 )
621  {
622  pLastStreamError = 0;
623  pLastFatalError = XRootDStatus();
624  pConnectionCount = 0;
625  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
626  pSessionId = ++sSessCntGen;
627 
628  //------------------------------------------------------------------------
629  // Create the streams if they don't exist yet
630  //------------------------------------------------------------------------
631  if( pSubStreams.size() == 1 && numSub > 1 )
632  {
633  for( uint16_t i = 1; i < numSub; ++i )
634  {
635  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
636  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
637  pChannelData, i, this );
638  pSubStreams.push_back( new SubStreamData() );
639  pSubStreams[i]->socket = s;
640  }
641  }
642 
643  //------------------------------------------------------------------------
644  // Connect the extra streams, if we fail we move all the outgoing items
645  // to stream 0, we don't need to enable the uplink here, because it
646  // should be already enabled after the handshaking process is completed.
647  //------------------------------------------------------------------------
648  if( pSubStreams.size() > 1 )
649  {
650  log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
651  pStreamName.c_str(), pSubStreams.size() - 1 );
652  for( size_t i = 1; i < pSubStreams.size(); ++i )
653  {
654  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
655  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
656  if( !st.IsOK() )
657  {
658  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
659  pSubStreams[i]->socket->Close();
660  }
661  else
662  {
663  pSubStreams[i]->status = Socket::Connecting;
664  }
665  }
666  }
667 
668  //------------------------------------------------------------------------
669  // Inform monitoring
670  //------------------------------------------------------------------------
671  pBytesSent = 0;
672  pBytesReceived = 0;
673  gettimeofday( &pConnectionDone, 0 );
675  if( mon )
676  {
678  i.server = pUrl->GetHostId();
679  i.sTOD = pConnectionStarted;
680  i.eTOD = pConnectionDone;
681  i.streams = pSubStreams.size();
682 
683  AnyObject qryResult;
684  std::string *qryResponse = 0;
685  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
686  qryResult.Get( qryResponse );
687  i.auth = *qryResponse;
688  delete qryResponse;
689  mon->Event( Monitor::EvConnect, &i );
690  }
691 
692  //------------------------------------------------------------------------
693  // For every connected control-stream call the global on-connect handler
694  //------------------------------------------------------------------------
696  }
697  else if( pOnDataConnJob )
698  {
699  //------------------------------------------------------------------------
700  // For every connected data-stream call the on-connect handler
701  //------------------------------------------------------------------------
702  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
703  }
704  }
705 
706  //----------------------------------------------------------------------------
707  // On connect error
708  //----------------------------------------------------------------------------
709  void Stream::OnConnectError( uint16_t subStream, XRootDStatus status )
710  {
711  XrdSysMutexHelper scopedLock( pMutex );
712  Log *log = DefaultEnv::GetLog();
713  pSubStreams[subStream]->socket->Close();
714  time_t now = ::time(0);
715 
716  //--------------------------------------------------------------------------
717  // For every connection error call the global connection error handler
718  //--------------------------------------------------------------------------
720 
721  //--------------------------------------------------------------------------
722  // If we connected subStream == 0 and cannot connect >0 then we just give
723  // up and move the outgoing messages to another queue
724  //--------------------------------------------------------------------------
725  if( subStream > 0 )
726  {
727  pSubStreams[subStream]->status = Socket::Disconnected;
728  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
729  if( pSubStreams[0]->status == Socket::Connected )
730  {
731  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
732  if( !st.IsOK() )
733  OnFatalError( 0, st, scopedLock );
734  return;
735  }
736 
737  if( pSubStreams[0]->status == Socket::Connecting )
738  return;
739 
740  OnFatalError( subStream, status, scopedLock );
741  return;
742  }
743 
744  //--------------------------------------------------------------------------
745  // Check if we still have time to try and do something in the current window
746  //--------------------------------------------------------------------------
747  time_t elapsed = now-pConnectionInitTime;
748  log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
749  pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
750 
751  //------------------------------------------------------------------------
752  // If we have some IP addresses left we try them
753  //------------------------------------------------------------------------
754  if( !pAddresses.empty() )
755  {
756  XRootDStatus st;
757  do
758  {
759  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
760  pAddresses.pop_back();
761  pConnectionInitTime = ::time( 0 );
762  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
763  }
764  while( !pAddresses.empty() && !st.IsOK() );
765 
766  if( !st.IsOK() )
767  OnFatalError( subStream, st, scopedLock );
768 
769  return;
770  }
771  //------------------------------------------------------------------------
772  // If we still can retry with the same host name, we sleep until the end
773  // of the connection window and try
774  //------------------------------------------------------------------------
775  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
776  && !status.IsFatal() )
777  {
778  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
779  pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
780 
781  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
782  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
783  return;
784  }
785  //--------------------------------------------------------------------------
786  // We are out of the connection window, the only thing we can do here
787  // is re-resolving the host name and retrying if we still can
788  //--------------------------------------------------------------------------
789  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
790  {
791  pAddresses.clear();
792  pSubStreams[0]->status = Socket::Disconnected;
793  PathID path( 0, 0 );
794  XRootDStatus st = EnableLink( path );
795  if( !st.IsOK() )
796  OnFatalError( subStream, st, scopedLock );
797  return;
798  }
799 
800  //--------------------------------------------------------------------------
801  // Else, we fail
802  //--------------------------------------------------------------------------
803  OnFatalError( subStream, status, scopedLock );
804  }
805 
806  //----------------------------------------------------------------------------
807  // Call back when an error has occurred
808  //----------------------------------------------------------------------------
809  void Stream::OnError( uint16_t subStream, XRootDStatus status )
810  {
811  XrdSysMutexHelper scopedLock( pMutex );
812  Log *log = DefaultEnv::GetLog();
813  pSubStreams[subStream]->socket->Close();
814  pSubStreams[subStream]->status = Socket::Disconnected;
815 
816  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
817  pStreamName.c_str(), subStream, status.ToString().c_str() );
818 
819  //--------------------------------------------------------------------------
820  // Reinsert the stuff that we have failed to sent
821  //--------------------------------------------------------------------------
822  if( pSubStreams[subStream]->outMsgHelper.msg )
823  {
824  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
825  pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
826  h.stateful );
827  pSubStreams[subStream]->outMsgHelper.Reset();
828  }
829 
830  //--------------------------------------------------------------------------
831  // Reinsert the receiving handler and reset any partially read partial
832  //--------------------------------------------------------------------------
833  if( pSubStreams[subStream]->inMsgHelper.handler )
834  {
835  InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
836  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
837  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
838  if( xrdHandler ) xrdHandler->PartialReceived();
839  h.Reset();
840  }
841 
842  //--------------------------------------------------------------------------
843  // We are dealing with an error of a peripheral stream. If we don't have
844  // anything to send don't bother recovering. Otherwise move the requests
845  // to stream 0 if possible.
846  //--------------------------------------------------------------------------
847  if( subStream > 0 )
848  {
849  if( pSubStreams[subStream]->outQueue->IsEmpty() )
850  return;
851 
852  if( pSubStreams[0]->status != Socket::Disconnected )
853  {
854  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
855  if( pSubStreams[0]->status == Socket::Connected )
856  {
857  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
858  if( !st.IsOK() )
859  OnFatalError( 0, st, scopedLock );
860  return;
861  }
862  }
863  OnFatalError( subStream, status, scopedLock );
864  return;
865  }
866 
867  //--------------------------------------------------------------------------
868  // If we lost the stream 0 we have lost the session, we re-enable the
869  // stream if we still have things in one of the outgoing queues, otherwise
870  // there is not point to recover at this point.
871  //--------------------------------------------------------------------------
872  if( subStream == 0 )
873  {
874  MonitorDisconnection( status );
875 
876  SubStreamList::iterator it;
877  size_t outstanding = 0;
878  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
879  outstanding += (*it)->outQueue->GetSizeStateless();
880 
881  if( outstanding )
882  {
883  PathID path( 0, 0 );
884  XRootDStatus st = EnableLink( path );
885  if( !st.IsOK() )
886  {
887  OnFatalError( 0, st, scopedLock );
888  return;
889  }
890  }
891 
892  //------------------------------------------------------------------------
893  // We're done here, unlock the stream mutex to avoid deadlocks and
894  // report the disconnection event to the handlers
895  //------------------------------------------------------------------------
896  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
897  "message handlers.", pStreamName.c_str() );
898  OutQueue q;
899  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
900  q.GrabStateful( *(*it)->outQueue );
901  scopedLock.UnLock();
902 
903  q.Report( status );
904  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
905  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
906  return;
907  }
908  }
909 
910  //------------------------------------------------------------------------
911  // Force error
912  //------------------------------------------------------------------------
913  void Stream::ForceError( XRootDStatus status, bool hush )
914  {
915  XrdSysMutexHelper scopedLock( pMutex );
916  Log *log = DefaultEnv::GetLog();
917  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
918  {
919  if( pSubStreams[substream]->status != Socket::Connected ) continue;
920  pSubStreams[substream]->socket->Close();
921  pSubStreams[substream]->status = Socket::Disconnected;
922 
923  if( !hush )
924  log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
925  pStreamName.c_str(), status.ToString().c_str() );
926 
927  //--------------------------------------------------------------------
928  // Reinsert the stuff that we have failed to sent
929  //--------------------------------------------------------------------
930  if( pSubStreams[substream]->outMsgHelper.msg )
931  {
932  OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
933  pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
934  h.stateful );
935  pSubStreams[substream]->outMsgHelper.Reset();
936  }
937 
938  //--------------------------------------------------------------------
939  // Reinsert the receiving handler and reset any partially read partial
940  //--------------------------------------------------------------------
941  if( pSubStreams[substream]->inMsgHelper.handler )
942  {
943  InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
944  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
945  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
946  if( xrdHandler ) xrdHandler->PartialReceived();
947  h.Reset();
948  }
949  }
950 
951  pConnectionCount = 0;
952 
953  //------------------------------------------------------------------------
954  // We're done here, unlock the stream mutex to avoid deadlocks and
955  // report the disconnection event to the handlers
956  //------------------------------------------------------------------------
957  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
958  "message handlers.", pStreamName.c_str() );
959 
960  SubStreamList::iterator it;
961  OutQueue q;
962  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
963  q.GrabItems( *(*it)->outQueue );
964  scopedLock.UnLock();
965 
966  q.Report( status );
967 
968  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
969  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
970  }
971 
972  //----------------------------------------------------------------------------
973  // On fatal error
974  //----------------------------------------------------------------------------
975  void Stream::OnFatalError( uint16_t subStream,
976  XRootDStatus status,
977  XrdSysMutexHelper &lock )
978  {
979  Log *log = DefaultEnv::GetLog();
980  pSubStreams[subStream]->status = Socket::Disconnected;
981  log->Error( PostMasterMsg, "[%s] Unable to recover: %s.",
982  pStreamName.c_str(), status.ToString().c_str() );
983 
984  //--------------------------------------------------------------------------
985  // Don't set the stream error windows for authentication errors as the user
986  // may refresh his credential at any time
987  //--------------------------------------------------------------------------
988  if( status.code != errAuthFailed )
989  {
990  pConnectionCount = 0;
991  pLastStreamError = ::time(0);
992  pLastFatalError = status;
993  }
994 
995  SubStreamList::iterator it;
996  OutQueue q;
997  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
998  q.GrabItems( *(*it)->outQueue );
999  lock.UnLock();
1000 
1001  status.status = stFatal;
1002  q.Report( status );
1003  pIncomingQueue->ReportStreamEvent( MsgHandler::FatalError, status );
1004  pChannelEvHandlers.ReportEvent( ChannelEventHandler::FatalError, status );
1005 
1006  }
1007 
1008  //----------------------------------------------------------------------------
1009  // Inform monitoring about disconnection
1010  //----------------------------------------------------------------------------
1011  void Stream::MonitorDisconnection( XRootDStatus status )
1012  {
1013  Monitor *mon = DefaultEnv::GetMonitor();
1014  if( mon )
1015  {
1016  Monitor::DisconnectInfo i;
1017  i.server = pUrl->GetHostId();
1018  i.rBytes = pBytesReceived;
1019  i.sBytes = pBytesSent;
1020  i.cTime = ::time(0) - pConnectionDone.tv_sec;
1021  i.status = status;
1022  mon->Event( Monitor::EvDisconnect, &i );
1023  }
1024  }
1025 
1026  //----------------------------------------------------------------------------
1027  // Call back when a message has been reconstructed
1028  //----------------------------------------------------------------------------
1029  bool Stream::OnReadTimeout( uint16_t substream )
1030  {
1031  //--------------------------------------------------------------------------
1032  // We only take the main stream into account
1033  //--------------------------------------------------------------------------
1034  if( substream != 0 )
1035  return true;
1036 
1037  //--------------------------------------------------------------------------
1038  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1039  // It is assumed that the underlying transport makes sure that there is no
1040  // pending requests that are not answered, ie. all possible virtual streams
1041  // are de-allocated
1042  //--------------------------------------------------------------------------
1043  Log *log = DefaultEnv::GetLog();
1044  SubStreamList::iterator it;
1045  time_t now = time(0);
1046 
1047  XrdSysMutexHelper scopedLock( pMutex );
1048  uint32_t outgoingMessages = 0;
1049  time_t lastActivity = 0;
1050  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1051  {
1052  outgoingMessages += (*it)->outQueue->GetSize();
1053  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1054  if( lastActivity < sockLastActivity )
1055  lastActivity = sockLastActivity;
1056  }
1057 
1058  if( !outgoingMessages )
1059  {
1060  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1061  *pChannelData );
1062  if( disconnect )
1063  {
1064  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1065  pStreamName.c_str() );
1066  scopedLock.UnLock();
1067  //----------------------------------------------------------------------
1068  // Important note!
1069  //
1070  // This destroys the Stream object itself, the underlined
1071  // AsyncSocketHandler object (that called this method) and the Channel
1072  // object that aggregates this Stream.
1073  //----------------------------------------------------------------------
1075  return false;
1076  }
1077  }
1078 
1079  //--------------------------------------------------------------------------
1080  // Check if the stream is broken
1081  //--------------------------------------------------------------------------
1082  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1083  *pChannelData );
1084  if( !st.IsOK() )
1085  {
1086  scopedLock.UnLock();
1087  OnError( substream, st );
1088  return false;
1089  }
1090  return true;
1091  }
1092 
1093  //----------------------------------------------------------------------------
1094  // Call back when a message has been reconstru
1095  //----------------------------------------------------------------------------
1096  bool Stream::OnWriteTimeout( uint16_t /*substream*/ )
1097  {
1098  return true;
1099  }
1100 
1101  //----------------------------------------------------------------------------
1102  // Register channel event handler
1103  //----------------------------------------------------------------------------
1105  {
1106  pChannelEvHandlers.AddHandler( handler );
1107  }
1108 
1109  //----------------------------------------------------------------------------
1110  // Remove a channel event handler
1111  //----------------------------------------------------------------------------
1113  {
1114  pChannelEvHandlers.RemoveHandler( handler );
1115  }
1116 
1117  //----------------------------------------------------------------------------
1118  // Install a incoming message handler
1119  //----------------------------------------------------------------------------
1120  MsgHandler*
1121  Stream::InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream )
1122  {
1123  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1124  if( !mh.handler )
1125  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1126  mh.expires,
1127  mh.action );
1128 
1129  if( !mh.handler )
1130  return nullptr;
1131 
1132  if( mh.action & MsgHandler::Raw )
1133  return mh.handler;
1134  return nullptr;
1135  }
1136 
1137  //----------------------------------------------------------------------------
1141  //----------------------------------------------------------------------------
1142  uint16_t Stream::InspectStatusRsp( uint16_t stream,
1143  MsgHandler *&incHandler )
1144  {
1145  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1146  if( !mh.handler )
1148 
1149  uint16_t action = mh.handler->InspectStatusRsp();
1150  mh.action |= action;
1151 
1152  if( action & MsgHandler::RemoveHandler )
1153  pIncomingQueue->RemoveMessageHandler( mh.handler );
1154 
1155  if( action & MsgHandler::Raw )
1156  {
1157  incHandler = mh.handler;
1158  return MsgHandler::Raw;
1159  }
1160 
1161  if( action & MsgHandler::Corrupted )
1162  return MsgHandler::Corrupted;
1163 
1164  if( action & MsgHandler::More )
1165  return MsgHandler::More;
1166 
1167  return MsgHandler::None;
1168  }
1169 
1170  //----------------------------------------------------------------------------
1171  // Check if channel can be collapsed using given URL
1172  //----------------------------------------------------------------------------
1173  bool Stream::CanCollapse( const URL &url )
1174  {
1175  Log *log = DefaultEnv::GetLog();
1176 
1177  //--------------------------------------------------------------------------
1178  // Resolve all the addresses of the host we're supposed to connect to
1179  //--------------------------------------------------------------------------
1180  std::vector<XrdNetAddr> prefaddrs;
1181  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1182  if( !st.IsOK() )
1183  {
1184  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1185  , pStreamName.c_str(), url.GetHostName().c_str() );
1186  return false;
1187  }
1188 
1189  //--------------------------------------------------------------------------
1190  // Resolve all the addresses of the alias
1191  //--------------------------------------------------------------------------
1192  std::vector<XrdNetAddr> aliasaddrs;
1193  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1194  if( !st.IsOK() )
1195  {
1196  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1197  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1198  return false;
1199  }
1200 
1201  //--------------------------------------------------------------------------
1202  // Now check if the preferred host is part of the alias
1203  //--------------------------------------------------------------------------
1204  auto itr = prefaddrs.begin();
1205  for( ; itr != prefaddrs.end() ; ++itr )
1206  {
1207  auto itr2 = aliasaddrs.begin();
1208  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1209  if( itr->Same( &*itr2 ) ) return true;
1210  }
1211 
1212  return false;
1213  }
1214 
1215  //------------------------------------------------------------------------
1216  // Query the stream
1217  //------------------------------------------------------------------------
1218  Status Stream::Query( uint16_t query, AnyObject &result )
1219  {
1220  switch( query )
1221  {
1222  case StreamQuery::IpAddr:
1223  {
1224  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1225  return Status();
1226  }
1227 
1228  case StreamQuery::IpStack:
1229  {
1230  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1231  return Status();
1232  }
1233 
1234  case StreamQuery::HostName:
1235  {
1236  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1237  return Status();
1238  }
1239 
1240  default:
1241  return Status( stError, errQueryNotSupported );
1242  }
1243  }
1244 
1245 }
union ServerResponse::@0 body
kXR_char streamid[2]
Definition: XProtocol.hh:914
kXR_unt16 requestid
Definition: XProtocol.hh:228
@ kXR_oksofar
Definition: XProtocol.hh:900
@ kXR_status
Definition: XProtocol.hh:907
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1261
kXR_char fhandle[4]
Definition: XProtocol.hh:229
@ kXR_close
Definition: XProtocol.hh:115
ServerResponseHeader hdr
Definition: XProtocol.hh:1287
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ FatalError
Stream has been broken and won't be recovered.
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66
void AddMessageHandler(MsgHandler *handler, time_t expires, bool &rmMsg)
Definition: XrdClInQueue.cc:54
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
virtual time_t GetExpiration()=0
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
Status ForceDisconnect(const URL &url)
Shut down a channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
A network socket.
Definition: XrdClSocket.hh:43
SocketStatus
Status of the socket.
Definition: XrdClSocket.hh:49
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
Definition: XrdClStream.cc:297
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
Definition: XrdClStream.cc:347
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Definition: XrdClStream.cc:913
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Disconnect(bool force=false)
Disconnect the stream.
Definition: XrdClStream.cc:363
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:187
Stream(const URL *url, const URL &prefer=URL())
Constructor.
Definition: XrdClStream.cc:96
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:610
void Tick(time_t now)
Definition: XrdClStream.cc:377
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:709
~Stream()
Destructor.
Definition: XrdClStream.cc:153
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
Definition: XrdClStream.cc:568
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
Definition: XrdClStream.cc:584
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:471
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClStream.cc:809
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
Definition: XrdClStream.cc:545
XRootDStatus Initialize()
Initializer.
Definition: XrdClStream.cc:171
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:445
Random utilities.
Definition: XrdClUtils.hh:50
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:234
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:140
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:123
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:104
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:681
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79
const uint16_t errAuthFailed
Definition: XrdClStatus.hh:88
@ kXR_PartialResult
Definition: XProtocol.hh:1250
MsgHandler * handler
Definition: XrdClStream.cc:67
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Definition: XrdClStream.cc:57
Describe a server login event.
Definition: XrdClMonitor.hh:72
std::string server
"user@host:port"
Definition: XrdClMonitor.hh:78
uint16_t streams
Number of streams.
Definition: XrdClMonitor.hh:82
timeval sTOD
gettimeofday() when login started
Definition: XrdClMonitor.hh:80
timeval eTOD
gettimeofday() when login ended
Definition: XrdClMonitor.hh:81
std::string auth
authentication protocol used or empty if none
Definition: XrdClMonitor.hh:79
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
uint16_t status
Status of the execution.
Definition: XrdClStatus.hh:146
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
bool IsFatal() const
Fatal error.
Definition: XrdClStatus.hh:123
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
Definition: XrdClStream.cc:89
AsyncSocketHandler * socket
Definition: XrdClStream.cc:86
OutQueue::MsgHelper outMsgHelper
Definition: XrdClStream.cc:88
Socket::SocketStatus status
Definition: XrdClStream.cc:90
static const uint16_t Auth
Transport name, returns std::string *.