XRootD
XrdClAsyncSocketHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #include "XrdCl/XrdClStream.hh"
20 #include "XrdCl/XrdClConstants.hh"
21 #include "XrdCl/XrdClLog.hh"
22 #include "XrdCl/XrdClMessage.hh"
26 #include "XrdCl/XrdClOptimizers.hh"
27 #include "XrdSys/XrdSysE2T.hh"
28 #include <netinet/tcp.h>
29 
30 namespace XrdCl
31 {
32  //----------------------------------------------------------------------------
33  // Constructor
34  //----------------------------------------------------------------------------
36  Poller *poller,
37  TransportHandler *transport,
38  AnyObject *channelData,
39  uint16_t subStreamNum,
40  Stream *strm ):
41  pPoller( poller ),
42  pTransport( transport ),
43  pChannelData( channelData ),
44  pSubStreamNum( subStreamNum ),
45  pStream( strm ),
46  pStreamName( ToStreamName( url, subStreamNum ) ),
47  pSocket( new Socket() ),
48  pHandShakeDone( false ),
49  pConnectionStarted( 0 ),
50  pConnectionTimeout( 0 ),
51  pHSWaitStarted( 0 ),
52  pHSWaitSeconds( 0 ),
53  pUrl( url ),
54  pTlsHandShakeOngoing( false )
55  {
56  Env *env = DefaultEnv::GetEnv();
57 
58  int timeoutResolution = DefaultTimeoutResolution;
59  env->GetInt( "TimeoutResolution", timeoutResolution );
60  pTimeoutResolution = timeoutResolution;
61 
63  pLastActivity = time(0);
64  }
65 
66  //----------------------------------------------------------------------------
67  // Destructor
68  //----------------------------------------------------------------------------
70  {
71  Close();
72  delete pSocket;
73  }
74 
75  //----------------------------------------------------------------------------
76  // Connect to given address
77  //----------------------------------------------------------------------------
79  {
80  Log *log = DefaultEnv::GetLog();
81  pLastActivity = pConnectionStarted = ::time(0);
82  pConnectionTimeout = timeout;
83 
84  //--------------------------------------------------------------------------
85  // Initialize the socket
86  //--------------------------------------------------------------------------
88  if( !st.IsOK() )
89  {
90  log->Error( AsyncSockMsg, "[%s] Unable to initialize socket: %s",
91  pStreamName.c_str(), st.ToString().c_str() );
92  st.status = stFatal;
93  return st;
94  }
95 
96  //--------------------------------------------------------------------------
97  // Set the keep-alive up
98  //--------------------------------------------------------------------------
99  Env *env = DefaultEnv::GetEnv();
100 
101  int keepAlive = DefaultTCPKeepAlive;
102  env->GetInt( "TCPKeepAlive", keepAlive );
103  if( keepAlive )
104  {
105  int param = 1;
106  XRootDStatus st = pSocket->SetSockOpt( SOL_SOCKET, SO_KEEPALIVE, &param,
107  sizeof(param) );
108  if( !st.IsOK() )
109  log->Error( AsyncSockMsg, "[%s] Unable to turn on keepalive: %s",
110  pStreamName.c_str(), st.ToString().c_str() );
111 
112 #if ( defined(__linux__) || defined(__GNU__) ) && defined( TCP_KEEPIDLE ) && \
113  defined( TCP_KEEPINTVL ) && defined( TCP_KEEPCNT )
114 
115  param = DefaultTCPKeepAliveTime;
116  env->GetInt( "TCPKeepAliveTime", param );
117  st = pSocket->SetSockOpt(SOL_TCP, TCP_KEEPIDLE, &param, sizeof(param));
118  if( !st.IsOK() )
119  log->Error( AsyncSockMsg, "[%s] Unable to set keepalive time: %s",
120  pStreamName.c_str(), st.ToString().c_str() );
121 
123  env->GetInt( "TCPKeepAliveInterval", param );
124  st = pSocket->SetSockOpt(SOL_TCP, TCP_KEEPINTVL, &param, sizeof(param));
125  if( !st.IsOK() )
126  log->Error( AsyncSockMsg, "[%s] Unable to set keepalive interval: %s",
127  pStreamName.c_str(), st.ToString().c_str() );
128 
130  env->GetInt( "TCPKeepAliveProbes", param );
131  st = pSocket->SetSockOpt(SOL_TCP, TCP_KEEPCNT, &param, sizeof(param));
132  if( !st.IsOK() )
133  log->Error( AsyncSockMsg, "[%s] Unable to set keepalive probes: %s",
134  pStreamName.c_str(), st.ToString().c_str() );
135 #endif
136  }
137 
138  pHandShakeDone = false;
139  pTlsHandShakeOngoing = false;
140  pHSWaitStarted = 0;
141  pHSWaitSeconds = 0;
142 
143  //--------------------------------------------------------------------------
144  // Initiate async connection to the address
145  //--------------------------------------------------------------------------
146  char nameBuff[256];
147  pSockAddr.Format( nameBuff, sizeof(nameBuff), XrdNetAddrInfo::fmtAdv6 );
148  log->Debug( AsyncSockMsg, "[%s] Attempting connection to %s",
149  pStreamName.c_str(), nameBuff );
150 
151  st = pSocket->ConnectToAddress( pSockAddr, 0 );
152  if( !st.IsOK() )
153  {
154  log->Error( AsyncSockMsg, "[%s] Unable to initiate the connection: %s",
155  pStreamName.c_str(), st.ToString().c_str() );
156  return st;
157  }
158 
160 
161  //--------------------------------------------------------------------------
162  // We should get the ready to write event once we're really connected
163  // so we need to listen to it
164  //--------------------------------------------------------------------------
165  if( !pPoller->AddSocket( pSocket, this ) )
166  {
168  pSocket->Close();
169  return st;
170  }
171 
173  {
176  pSocket->Close();
177  return st;
178  }
179 
180  return XRootDStatus();
181  }
182 
183  //----------------------------------------------------------------------------
184  // Close the connection
185  //----------------------------------------------------------------------------
187  {
188  Log *log = DefaultEnv::GetLog();
189  log->Debug( AsyncSockMsg, "[%s] Closing the socket", pStreamName.c_str() );
190 
192  pSubStreamNum );
193 
195  pSocket->Close();
196  return XRootDStatus();
197  }
198 
199  std::string AsyncSocketHandler::ToStreamName( const URL &url, uint16_t strmnb )
200  {
201  std::ostringstream o;
202  o << url.GetHostId();
203  o << "." << strmnb;
204  return o.str();
205  }
206 
207  //----------------------------------------------------------------------------
208  // Handler a socket event
209  //----------------------------------------------------------------------------
210  void AsyncSocketHandler::Event( uint8_t type, XrdCl::Socket */*socket*/ )
211  {
212  //--------------------------------------------------------------------------
213  // First check if the socket itself wants to apply some mapping on the
214  // event. E.g. in case of TLS socket it might want to map read events to
215  // write events and vice-versa.
216  //--------------------------------------------------------------------------
217  type = pSocket->MapEvent( type );
218 
219  //--------------------------------------------------------------------------
220  // Handle any read or write events. If any of the handlers indicate an error
221  // we will have been disconnected. A disconnection may cause the current
222  // object to be asynchronously reused or deleted, so we return immediately.
223  //--------------------------------------------------------------------------
224  if( !EventRead( type ) )
225  return;
226 
227  if( !EventWrite( type ) )
228  return;
229  }
230 
231  //----------------------------------------------------------------------------
232  // Handler for read related socket events
233  //----------------------------------------------------------------------------
234  bool AsyncSocketHandler::EventRead( uint8_t type )
235  {
236  //--------------------------------------------------------------------------
237  // Read event
238  //--------------------------------------------------------------------------
239  if( type & ReadyToRead )
240  {
241  pLastActivity = time(0);
243  return OnTLSHandShake();
244 
245  if( likely( pHandShakeDone ) )
246  return OnRead();
247 
248  return OnReadWhileHandshaking();
249  }
250 
251  //--------------------------------------------------------------------------
252  // Read timeout
253  //--------------------------------------------------------------------------
254  else if( type & ReadTimeOut )
255  {
256  if( pHSWaitSeconds )
257  {
258  if( !CheckHSWait() )
259  return false;
260  }
261 
262  if( likely( pHandShakeDone ) )
263  return OnReadTimeout();
264 
265  return OnTimeoutWhileHandshaking();
266  }
267 
268  return true;
269  }
270 
271  //----------------------------------------------------------------------------
272  // Handler for write related socket events
273  //----------------------------------------------------------------------------
274  bool AsyncSocketHandler::EventWrite( uint8_t type )
275  {
276  //--------------------------------------------------------------------------
277  // Write event
278  //--------------------------------------------------------------------------
279  if( type & ReadyToWrite )
280  {
281  pLastActivity = time(0);
283  return OnConnectionReturn();
284 
285  //------------------------------------------------------------------------
286  // Make sure we are not writing anything if we have been told to wait.
287  //------------------------------------------------------------------------
288  if( pHSWaitSeconds != 0 )
289  return true;
290 
292  return OnTLSHandShake();
293 
294  if( likely( pHandShakeDone ) )
295  return OnWrite();
296 
297  return OnWriteWhileHandshaking();
298  }
299 
300  //--------------------------------------------------------------------------
301  // Write timeout
302  //--------------------------------------------------------------------------
303  else if( type & WriteTimeOut )
304  {
305  if( likely( pHandShakeDone ) )
306  return OnWriteTimeout();
307 
308  return OnTimeoutWhileHandshaking();
309  }
310 
311  return true;
312  }
313 
314  //----------------------------------------------------------------------------
315  // Connect returned
316  //----------------------------------------------------------------------------
318  {
319  //--------------------------------------------------------------------------
320  // Check whether we were able to connect
321  //--------------------------------------------------------------------------
322  Log *log = DefaultEnv::GetLog();
323  log->Debug( AsyncSockMsg, "[%s] Async connection call returned",
324  pStreamName.c_str() );
325 
326  int errorCode = 0;
327  socklen_t optSize = sizeof( errorCode );
328  XRootDStatus st = pSocket->GetSockOpt( SOL_SOCKET, SO_ERROR, &errorCode,
329  &optSize );
330 
331  //--------------------------------------------------------------------------
332  // This is an internal error really (either logic or system fault),
333  // so we call it a day and don't retry
334  //--------------------------------------------------------------------------
335  if( !st.IsOK() )
336  {
337  log->Error( AsyncSockMsg, "[%s] Unable to get the status of the "
338  "connect operation: %s", pStreamName.c_str(),
339  XrdSysE2T( errno ) );
342  return false;
343  }
344 
345  //--------------------------------------------------------------------------
346  // We were unable to connect
347  //--------------------------------------------------------------------------
348  if( errorCode )
349  {
350  log->Error( AsyncSockMsg, "[%s] Unable to connect: %s",
351  pStreamName.c_str(), XrdSysE2T( errorCode ) );
354  return false;
355  }
357 
358  //--------------------------------------------------------------------------
359  // Cork the socket
360  //--------------------------------------------------------------------------
361  st = pSocket->Cork();
362  if( !st.IsOK() )
363  {
365  return false;
366  }
367 
368  //--------------------------------------------------------------------------
369  // Initialize the handshake
370  //--------------------------------------------------------------------------
372  pSubStreamNum ) );
373  pHandShakeData->serverAddr = pSocket->GetServerAddress();
374  pHandShakeData->clientName = pSocket->GetSockName();
375  pHandShakeData->streamName = pStreamName;
376 
378  if( !st.IsOK() )
379  {
380  log->Error( AsyncSockMsg, "[%s] Connection negotiation failed",
381  pStreamName.c_str() );
383  return false;
384  }
385 
386  if( st.code != suRetry )
387  ++pHandShakeData->step;
388 
389  //--------------------------------------------------------------------------
390  // Initialize the hand-shake reader and writer
391  //--------------------------------------------------------------------------
392  hswriter.reset( new AsyncHSWriter( *pSocket, pStreamName ) );
394 
395  //--------------------------------------------------------------------------
396  // Transport has given us something to send
397  //--------------------------------------------------------------------------
398  if( pHandShakeData->out )
399  {
400  hswriter->Reset( pHandShakeData->out );
401  pHandShakeData->out = nullptr;
402  }
403 
404  //--------------------------------------------------------------------------
405  // Listen to what the server has to say
406  //--------------------------------------------------------------------------
408  {
411  return false;
412  }
413  return true;
414  }
415 
416  //----------------------------------------------------------------------------
417  // Got a write readiness event
418  //----------------------------------------------------------------------------
420  {
421  if( !reqwriter )
422  {
423  OnFault( XRootDStatus( stError, errInternal, 0, "Request writer is null." ) );
424  return false;
425  }
426  //--------------------------------------------------------------------------
427  // Let's do the writing ...
428  //--------------------------------------------------------------------------
429  XRootDStatus st = reqwriter->Write();
430  if( !st.IsOK() )
431  {
432  //------------------------------------------------------------------------
433  // We failed
434  //------------------------------------------------------------------------
435  OnFault( st );
436  return false;
437  }
438  //--------------------------------------------------------------------------
439  // We are not done yet
440  //--------------------------------------------------------------------------
441  if( st.code == suRetry) return true;
442  //--------------------------------------------------------------------------
443  // Disable the respective substream if empty
444  //--------------------------------------------------------------------------
445  reqwriter->Reset();
447  return true;
448  }
449 
450  //----------------------------------------------------------------------------
451  // Got a write readiness event while handshaking
452  //----------------------------------------------------------------------------
454  {
455  XRootDStatus st;
456  if( !hswriter || !hswriter->HasMsg() )
457  {
458  if( !(st = DisableUplink()).IsOK() )
459  {
461  return false;
462  }
463  return true;
464  }
465  //--------------------------------------------------------------------------
466  // Let's do the writing ...
467  //--------------------------------------------------------------------------
468  st = hswriter->Write();
469  if( !st.IsOK() )
470  {
471  //------------------------------------------------------------------------
472  // We failed
473  //------------------------------------------------------------------------
475  return false;
476  }
477  //--------------------------------------------------------------------------
478  // We are not done yet
479  //--------------------------------------------------------------------------
480  if( st.code == suRetry ) return true;
481  //--------------------------------------------------------------------------
482  // Disable the uplink
483  // Note: at this point we don't deallocate the HS message as we might need
484  // to re-send it in case of a kXR_wait response
485  //--------------------------------------------------------------------------
486  if( !(st = DisableUplink()).IsOK() )
487  {
489  return false;
490  }
491  return true;
492  }
493 
494  //----------------------------------------------------------------------------
495  // Got a read readiness event
496  //----------------------------------------------------------------------------
498  {
499  //--------------------------------------------------------------------------
500  // Make sure the response reader object exists
501  //--------------------------------------------------------------------------
502  if( !rspreader )
503  {
504  OnFault( XRootDStatus( stError, errInternal, 0, "Response reader is null." ) );
505  return false;
506  }
507 
508  //--------------------------------------------------------------------------
509  // Readout the data from the socket
510  //--------------------------------------------------------------------------
511  XRootDStatus st = rspreader->Read();
512 
513  //--------------------------------------------------------------------------
514  // Handler header corruption
515  //--------------------------------------------------------------------------
516  if( !st.IsOK() && st.code == errCorruptedHeader )
517  {
519  return false;
520  }
521 
522  //--------------------------------------------------------------------------
523  // Handler other errors
524  //--------------------------------------------------------------------------
525  if( !st.IsOK() )
526  {
527  OnFault( st );
528  return false;
529  }
530 
531  //--------------------------------------------------------------------------
532  // We are not done yet
533  //--------------------------------------------------------------------------
534  if( st.code == suRetry ) return true;
535 
536  //--------------------------------------------------------------------------
537  // We are done, reset the response reader so we can read out next message
538  //--------------------------------------------------------------------------
539  rspreader->Reset();
540  return true;
541  }
542 
543  //----------------------------------------------------------------------------
544  // Got a read readiness event while handshaking
545  //----------------------------------------------------------------------------
547  {
548  //--------------------------------------------------------------------------
549  // Make sure the response reader object exists
550  //--------------------------------------------------------------------------
551  if( !hsreader )
552  {
553  OnFault( XRootDStatus( stError, errInternal, 0, "Hand-shake reader is null." ) );
554  return false;
555  }
556 
557  //--------------------------------------------------------------------------
558  // Read the message and let the transport handler look at it when
559  // reading has finished
560  //--------------------------------------------------------------------------
561  XRootDStatus st = hsreader->Read();
562  if( !st.IsOK() )
563  {
565  return false;
566  }
567 
568  if( st.code != suDone )
569  return true;
570 
571  return HandleHandShake( hsreader->ReleaseMsg() );
572  }
573 
574  //------------------------------------------------------------------------
575  // Handle the handshake message
576  //------------------------------------------------------------------------
577  bool AsyncSocketHandler::HandleHandShake( std::unique_ptr<Message> msg )
578  {
579  //--------------------------------------------------------------------------
580  // OK, we have a new message, let's deal with it;
581  //--------------------------------------------------------------------------
582  pHandShakeData->in = msg.release();
584 
585  //--------------------------------------------------------------------------
586  // Deal with wait responses
587  //--------------------------------------------------------------------------
588  kXR_int32 waitSeconds = HandleWaitRsp( pHandShakeData->in );
589 
590  delete pHandShakeData->in;
591  pHandShakeData->in = 0;
592 
593  if( !st.IsOK() )
594  {
596  return false;
597  }
598 
599  if( st.code == suRetry )
600  {
601  //------------------------------------------------------------------------
602  // We are handling a wait response and the transport handler told
603  // as to retry the request
604  //------------------------------------------------------------------------
605  if( waitSeconds >=0 )
606  {
607  time_t resendTime = ::time( 0 ) + waitSeconds;
608  if( resendTime > pConnectionStarted + pConnectionTimeout )
609  {
610  Log *log = DefaultEnv::GetLog();
611  log->Error( AsyncSockMsg,
612  "[%s] Won't retry kXR_endsess request because would"
613  "reach connection timeout.",
614  pStreamName.c_str() );
615 
617  return false;
618  }
619  else
620  {
621  //--------------------------------------------------------------------
622  // We need to wait before replaying the request
623  //--------------------------------------------------------------------
624  Log *log = DefaultEnv::GetLog();
625  log->Debug( AsyncSockMsg, "[%s] Received a wait response to endsess request, "
626  "will wait for %d seconds before replaying the endsess request",
627  pStreamName.c_str(), waitSeconds );
628  pHSWaitStarted = time( 0 );
629  pHSWaitSeconds = waitSeconds;
630  }
631  return true;
632  }
633  //------------------------------------------------------------------------
634  // We are re-sending a protocol request
635  //------------------------------------------------------------------------
636  else if( pHandShakeData->out )
637  {
638  return SendHSMsg();
639  }
640  }
641 
642  //--------------------------------------------------------------------------
643  // If now is the time to enable encryption
644  //--------------------------------------------------------------------------
645  if( !pSocket->IsEncrypted() &&
647  {
649  if( !st.IsOK() )
650  return false;
651  if ( st.code == suRetry )
652  return true;
653  }
654 
655  //--------------------------------------------------------------------------
656  // Now prepare the next step of the hand-shake procedure
657  //--------------------------------------------------------------------------
658  return HandShakeNextStep( st.IsOK() && st.code == suDone );
659  }
660 
661  //------------------------------------------------------------------------
662  // Prepare the next step of the hand-shake procedure
663  //------------------------------------------------------------------------
665  {
666  //--------------------------------------------------------------------------
667  // We successfully proceeded to the next step
668  //--------------------------------------------------------------------------
669  ++pHandShakeData->step;
670 
671  //--------------------------------------------------------------------------
672  // The hand shake process is done
673  //--------------------------------------------------------------------------
674  if( done )
675  {
676  pHandShakeData.reset();
677  hswriter.reset();
678  hsreader.reset();
679  //------------------------------------------------------------------------
680  // Initialize the request writer & reader
681  //------------------------------------------------------------------------
684  XRootDStatus st;
685  if( !(st = EnableUplink()).IsOK() )
686  {
688  return false;
689  }
690  pHandShakeDone = true;
692  }
693  //--------------------------------------------------------------------------
694  // The transport handler gave us something to write
695  //--------------------------------------------------------------------------
696  else if( pHandShakeData->out )
697  {
698  return SendHSMsg();
699  }
700  return true;
701  }
702 
703  //----------------------------------------------------------------------------
704  // Handle fault
705  //----------------------------------------------------------------------------
707  {
708  Log *log = DefaultEnv::GetLog();
709  log->Error( AsyncSockMsg, "[%s] Socket error encountered: %s",
710  pStreamName.c_str(), st.ToString().c_str() );
711 
712  pStream->OnError( pSubStreamNum, st );
713  }
714 
715  //----------------------------------------------------------------------------
716  // Handle fault while handshaking
717  //----------------------------------------------------------------------------
719  {
720  Log *log = DefaultEnv::GetLog();
721  log->Error( AsyncSockMsg, "[%s] Socket error while handshaking: %s",
722  pStreamName.c_str(), st.ToString().c_str() );
723 
725  }
726 
727  //----------------------------------------------------------------------------
728  // Handle write timeout
729  //----------------------------------------------------------------------------
731  {
733  }
734 
735  //----------------------------------------------------------------------------
736  // Handler read timeout
737  //----------------------------------------------------------------------------
739  {
741  }
742 
743  //----------------------------------------------------------------------------
744  // Handle timeout while handshaking
745  //----------------------------------------------------------------------------
747  {
748  time_t now = time(0);
750  {
752  return false;
753  }
754  return true;
755  }
756 
757  //----------------------------------------------------------------------------
758  // Handle header corruption in case of kXR_status response
759  //----------------------------------------------------------------------------
761  {
762  //--------------------------------------------------------------------------
763  // We need to force a socket error so this is handled in a similar way as
764  // a stream t/o and all requests are retried
765  //--------------------------------------------------------------------------
767  }
768 
769  //----------------------------------------------------------------------------
770  // Carry out the TLS hand-shake
771  //----------------------------------------------------------------------------
773  {
774  Log *log = DefaultEnv::GetLog();
775  log->Debug( AsyncSockMsg, "[%s] TLS hand-shake exchange.", pStreamName.c_str() );
776 
777  XRootDStatus st;
778  if( !( st = pSocket->TlsHandShake( this, pUrl.GetHostName() ) ).IsOK() )
779  {
780  pTlsHandShakeOngoing = false;
782  return st;
783  }
784 
785  if( st.code == suRetry )
786  {
787  pTlsHandShakeOngoing = true;
788  return st;
789  }
790 
791  pTlsHandShakeOngoing = false;
792  log->Info( AsyncSockMsg, "[%s] TLS hand-shake done.", pStreamName.c_str() );
793 
794  return st;
795  }
796 
797  //----------------------------------------------------------------------------
798  // Handle read/write event if we are in the middle of a TLS hand-shake
799  //----------------------------------------------------------------------------
801  {
803  if( !st.IsOK() )
804  return false;
805  if ( st.code == suRetry )
806  return true;
807 
809  *pChannelData ) );
810  }
811 
812  //----------------------------------------------------------------------------
813  // Prepare a HS writer for sending and enable uplink
814  //----------------------------------------------------------------------------
816  {
817  if( !hswriter )
818  {
820  "HS writer object missing!" ) );
821  return false;
822  }
823  //--------------------------------------------------------------------------
824  // We only set a new HS message if this is not a replay due to kXR_wait
825  //--------------------------------------------------------------------------
826  if( !pHSWaitSeconds )
827  {
828  hswriter->Reset( pHandShakeData->out );
829  pHandShakeData->out = nullptr;
830  }
831  //--------------------------------------------------------------------------
832  // otherwise we replay the kXR_endsess request
833  //--------------------------------------------------------------------------
834  else
835  hswriter->Replay();
836  //--------------------------------------------------------------------------
837  // Enable writing so we can replay the HS message
838  //--------------------------------------------------------------------------
839  XRootDStatus st;
840  if( !(st = EnableUplink()).IsOK() )
841  {
843  return false;
844  }
845  return true;
846  }
847 
849  {
850  // It would be more coherent if this could be done in the
851  // transport layer, unfortunately the API does not allow it.
852  kXR_int32 waitSeconds = -1;
853  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
854  if( rsp->hdr.status == kXR_wait )
855  waitSeconds = rsp->body.wait.seconds;
856  return waitSeconds;
857  }
858 
859  //----------------------------------------------------------------------------
860  // Check if HS wait time elapsed
861  //----------------------------------------------------------------------------
863  {
864  time_t now = time( 0 );
865  if( now - pHSWaitStarted >= pHSWaitSeconds )
866  {
867  Log *log = DefaultEnv::GetLog();
868  log->Debug( AsyncSockMsg, "[%s] The hand-shake wait time elapsed, will "
869  "replay the endsess request.", pStreamName.c_str() );
870  if( !SendHSMsg() )
871  return false;
872  //------------------------------------------------------------------------
873  // Make sure the wait state is reset
874  //------------------------------------------------------------------------
875  pHSWaitSeconds = 0;
876  pHSWaitStarted = 0;
877  }
878  return true;
879  }
880 
881  //------------------------------------------------------------------------
882  // Get the IP stack
883  //------------------------------------------------------------------------
884  std::string AsyncSocketHandler::GetIpStack() const
885  {
886  std::string ipstack( ( pSockAddr.isIPType( XrdNetAddr::IPType::IPv6 ) &&
887  !pSockAddr.isMapped() ) ? "IPv6" : "IPv4" );
888  return ipstack;
889  }
890 
891  //------------------------------------------------------------------------
892  // Get IP address
893  //------------------------------------------------------------------------
895  {
896  char nameBuff[256];
897  pSockAddr.Format( nameBuff, sizeof(nameBuff), XrdNetAddrInfo::fmtAddr, XrdNetAddrInfo::noPort );
898  return nameBuff;
899  }
900 
901  //------------------------------------------------------------------------
903  //------------------------------------------------------------------------
905  {
906  const char *cstr = pSockAddr.Name();
907  if( !cstr )
908  return std::string();
909  return cstr;
910  }
911 }
union ServerResponse::@0 body
@ kXR_wait
Definition: XProtocol.hh:905
ServerResponseHeader hdr
Definition: XProtocol.hh:1287
int kXR_int32
Definition: XPtypes.hh:89
#define likely(x)
#define unlikely(x)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
Utility class encapsulating reading hand-shake response logic.
Utility class encapsulating writing hand-shake request logic.
Utility class encapsulating reading response message logic.
Utility class encapsulating writing request logic.
static std::string ToStreamName(const URL &url, uint16_t strmnb)
Convert Stream object and sub-stream number to stream name.
bool OnReadTimeout() XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncHSWriter > hswriter
virtual bool OnConnectionReturn() XRD_WARN_UNUSED_RESULT
bool OnWriteTimeout() XRD_WARN_UNUSED_RESULT
bool OnWrite() XRD_WARN_UNUSED_RESULT
bool OnTimeoutWhileHandshaking() XRD_WARN_UNUSED_RESULT
bool CheckHSWait() XRD_WARN_UNUSED_RESULT
bool EventRead(uint8_t type) XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncHSReader > hsreader
bool HandleHandShake(std::unique_ptr< Message > msg) XRD_WARN_UNUSED_RESULT
bool OnWriteWhileHandshaking() XRD_WARN_UNUSED_RESULT
XRootDStatus Close()
Close the connection.
void OnFaultWhileHandshaking(XRootDStatus st)
virtual void Event(uint8_t type, XrdCl::Socket *)
Handle a socket event.
kXR_int32 HandleWaitRsp(Message *rsp)
bool HandShakeNextStep(bool done) XRD_WARN_UNUSED_RESULT
bool OnReadWhileHandshaking() XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncMsgWriter > reqwriter
XRootDStatus EnableUplink()
Enable uplink.
std::string GetIpStack() const
Get the IP stack.
bool EventWrite(uint8_t type) XRD_WARN_UNUSED_RESULT
std::string GetHostName()
Get hostname.
std::unique_ptr< AsyncMsgReader > rspreader
XRootDStatus DisableUplink()
Disable uplink.
std::unique_ptr< HandShakeData > pHandShakeData
bool OnRead() XRD_WARN_UNUSED_RESULT
XRootDStatus Connect(time_t timeout)
Connect to the currently set address.
AsyncSocketHandler(const URL &url, Poller *poller, TransportHandler *transport, AnyObject *channelData, uint16_t subStreamNum, Stream *strm)
Constructor.
bool OnTLSHandShake() XRD_WARN_UNUSED_RESULT
bool SendHSMsg() XRD_WARN_UNUSED_RESULT
std::string GetIpAddr()
Get IP address.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
Interface for socket pollers.
Definition: XrdClPoller.hh:87
virtual bool EnableReadNotification(Socket *socket, bool notify, uint16_t timeout=60)=0
virtual bool EnableWriteNotification(Socket *socket, bool notify, uint16_t timeout=60)=0
virtual bool AddSocket(Socket *socket, SocketHandler *handler)=0
virtual bool RemoveSocket(Socket *socket)=0
Remove the socket.
@ ReadTimeOut
Read timeout.
Definition: XrdClPoller.hh:42
@ ReadyToWrite
Writing won't block.
Definition: XrdClPoller.hh:43
@ WriteTimeOut
Write timeout.
Definition: XrdClPoller.hh:44
@ ReadyToRead
New data has arrived.
Definition: XrdClPoller.hh:41
A network socket.
Definition: XrdClSocket.hh:43
std::string GetSockName() const
Get the name of the socket.
Definition: XrdClSocket.cc:632
XRootDStatus Initialize(int family=AF_INET)
Initialize the socket.
Definition: XrdClSocket.cc:63
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
bool IsEncrypted()
Definition: XrdClSocket.cc:867
void SetChannelID(AnyObject *channelID)
Definition: XrdClSocket.hh:246
XRootDStatus ConnectToAddress(const XrdNetAddr &addr, uint16_t timout=10)
Definition: XrdClSocket.cc:212
void Close()
Disconnect.
Definition: XrdClSocket.cc:262
XRootDStatus SetSockOpt(int level, int optname, const void *optval, socklen_t optlen)
Set socket options.
Definition: XrdClSocket.cc:167
XRootDStatus TlsHandShake(AsyncSocketHandler *socketHandler, const std::string &thehost=std::string())
Definition: XrdClSocket.cc:844
uint8_t MapEvent(uint8_t event)
Definition: XrdClSocket.cc:835
const XrdNetAddr * GetServerAddress() const
Get the server address.
Definition: XrdClSocket.hh:237
XRootDStatus Cork()
Definition: XrdClSocket.cc:782
XRootDStatus GetSockOpt(int level, int optname, void *optval, socklen_t *optlen)
Get socket options.
Definition: XrdClSocket.cc:152
SocketStatus GetStatus() const
Get the socket status.
Definition: XrdClSocket.hh:125
void SetStatus(SocketStatus status)
Set socket status - do not use unless you know what you're doing.
Definition: XrdClSocket.hh:133
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Definition: XrdClStream.cc:913
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:610
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:709
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
Definition: XrdClStream.cc:568
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClStream.cc:809
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:157
Perform the handshake and the authentication for each physical stream.
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)=0
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)=0
The stream has been disconnected, do the cleanups.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)=0
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)=0
HandHake.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
static const int noPort
Do not add port number.
bool isMapped() const
bool isIPType(IPType ipType) const
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
const char * Name(const char *eName=0, const char **eText=0)
int Family() const
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errSocketOptError
Definition: XrdClStatus.hh:76
const int DefaultTCPKeepAliveProbes
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t errPollerError
Definition: XrdClStatus.hh:75
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const int DefaultTimeoutResolution
const uint64_t AsyncSockMsg
const int DefaultTCPKeepAliveInterval
const int DefaultTCPKeepAlive
const uint16_t errConnectionError
Definition: XrdClStatus.hh:78
const int DefaultTCPKeepAliveTime
const uint16_t errSocketError
Definition: XrdClStatus.hh:72
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
const uint16_t suDone
Definition: XrdClStatus.hh:38
Data structure that carries the handshake information.
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
uint16_t status
Status of the execution.
Definition: XrdClStatus.hh:146
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97