28 #include <netinet/tcp.h>
39 uint16_t subStreamNum,
42 pTransport( transport ),
43 pChannelData( channelData ),
44 pSubStreamNum( subStreamNum ),
46 pStreamName( ToStreamName( url, subStreamNum ) ),
48 pHandShakeDone( false ),
49 pConnectionStarted( 0 ),
50 pConnectionTimeout( 0 ),
54 pTlsHandShakeOngoing( false )
59 env->
GetInt(
"TimeoutResolution", timeoutResolution );
102 env->
GetInt(
"TCPKeepAlive", keepAlive );
112 #if ( defined(__linux__) || defined(__GNU__) ) && defined( TCP_KEEPIDLE ) && \
113 defined( TCP_KEEPINTVL ) && defined( TCP_KEEPCNT )
116 env->
GetInt(
"TCPKeepAliveTime", param );
123 env->
GetInt(
"TCPKeepAliveInterval", param );
130 env->
GetInt(
"TCPKeepAliveProbes", param );
201 std::ostringstream o;
327 socklen_t optSize =
sizeof( errorCode );
605 if( waitSeconds >=0 )
607 time_t resendTime = ::time( 0 ) + waitSeconds;
612 "[%s] Won't retry kXR_endsess request because would"
613 "reach connection timeout.",
626 "will wait for %d seconds before replaying the endsess request",
748 time_t now = time(0);
820 "HS writer object missing!" ) );
855 waitSeconds = rsp->
body.wait.seconds;
864 time_t now = time( 0 );
869 "replay the endsess request.",
pStreamName.c_str() );
908 return std::string();
union ServerResponse::@0 body
const char * XrdSysE2T(int errcode)
Utility class encapsulating reading hand-shake response logic.
Utility class encapsulating writing hand-shake request logic.
Utility class encapsulating reading response message logic.
Utility class encapsulating writing request logic.
static std::string ToStreamName(const URL &url, uint16_t strmnb)
Convert Stream object and sub-stream number to stream name.
~AsyncSocketHandler()
Destructor.
bool OnReadTimeout() XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncHSWriter > hswriter
virtual bool OnConnectionReturn() XRD_WARN_UNUSED_RESULT
bool OnWriteTimeout() XRD_WARN_UNUSED_RESULT
TransportHandler * pTransport
bool OnWrite() XRD_WARN_UNUSED_RESULT
bool OnTimeoutWhileHandshaking() XRD_WARN_UNUSED_RESULT
uint16_t pTimeoutResolution
bool CheckHSWait() XRD_WARN_UNUSED_RESULT
bool EventRead(uint8_t type) XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncHSReader > hsreader
XRootDStatus DoTlsHandShake()
void OnFault(XRootDStatus st)
bool HandleHandShake(std::unique_ptr< Message > msg) XRD_WARN_UNUSED_RESULT
bool OnWriteWhileHandshaking() XRD_WARN_UNUSED_RESULT
XRootDStatus Close()
Close the connection.
time_t pConnectionTimeout
void OnFaultWhileHandshaking(XRootDStatus st)
virtual void Event(uint8_t type, XrdCl::Socket *)
Handle a socket event.
kXR_int32 HandleWaitRsp(Message *rsp)
bool HandShakeNextStep(bool done) XRD_WARN_UNUSED_RESULT
bool OnReadWhileHandshaking() XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncMsgWriter > reqwriter
XRootDStatus EnableUplink()
Enable uplink.
std::string GetIpStack() const
Get the IP stack.
bool EventWrite(uint8_t type) XRD_WARN_UNUSED_RESULT
std::string GetHostName()
Get hostname.
std::unique_ptr< AsyncMsgReader > rspreader
XRootDStatus DisableUplink()
Disable uplink.
std::unique_ptr< HandShakeData > pHandShakeData
bool OnRead() XRD_WARN_UNUSED_RESULT
XRootDStatus Connect(time_t timeout)
Connect to the currently set address.
bool pTlsHandShakeOngoing
AsyncSocketHandler(const URL &url, Poller *poller, TransportHandler *transport, AnyObject *channelData, uint16_t subStreamNum, Stream *strm)
Constructor.
bool OnTLSHandShake() XRD_WARN_UNUSED_RESULT
time_t pConnectionStarted
bool SendHSMsg() XRD_WARN_UNUSED_RESULT
std::string GetIpAddr()
Get IP address.
void OnHeaderCorruption()
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
void Error(uint64_t topic, const char *format,...)
Report an error.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
Interface for socket pollers.
virtual bool EnableReadNotification(Socket *socket, bool notify, uint16_t timeout=60)=0
virtual bool EnableWriteNotification(Socket *socket, bool notify, uint16_t timeout=60)=0
virtual bool AddSocket(Socket *socket, SocketHandler *handler)=0
virtual bool RemoveSocket(Socket *socket)=0
Remove the socket.
@ ReadTimeOut
Read timeout.
@ ReadyToWrite
Writing won't block.
@ WriteTimeOut
Write timeout.
@ ReadyToRead
New data has arrived.
std::string GetSockName() const
Get the name of the socket.
XRootDStatus Initialize(int family=AF_INET)
Initialize the socket.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
void SetChannelID(AnyObject *channelID)
XRootDStatus ConnectToAddress(const XrdNetAddr &addr, uint16_t timout=10)
XRootDStatus SetSockOpt(int level, int optname, const void *optval, socklen_t optlen)
Set socket options.
XRootDStatus TlsHandShake(AsyncSocketHandler *socketHandler, const std::string &thehost=std::string())
uint8_t MapEvent(uint8_t event)
const XrdNetAddr * GetServerAddress() const
Get the server address.
XRootDStatus GetSockOpt(int level, int optname, void *optval, socklen_t *optlen)
Get socket options.
SocketStatus GetStatus() const
Get the socket status.
void SetStatus(SocketStatus status)
Set socket status - do not use unless you know what you're doing.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
const URL * GetURL() const
Get the URL.
Perform the handshake and the authentication for each physical stream.
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)=0
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)=0
The stream has been disconnected, do the cleanups.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)=0
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)=0
HandHake.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
const std::string & GetHostName() const
Get the name of the target host.
static const int noPort
Do not add port number.
bool isIPType(IPType ipType) const
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
const char * Name(const char *eName=0, const char **eText=0)
const uint16_t errSocketOptError
const int DefaultTCPKeepAliveProbes
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errSocketTimeout
const uint16_t errInternal
Internal error.
const int DefaultTimeoutResolution
const uint64_t AsyncSockMsg
const int DefaultTCPKeepAliveInterval
const int DefaultTCPKeepAlive
const uint16_t errConnectionError
const int DefaultTCPKeepAliveTime
const uint16_t errSocketError
const uint16_t errCorruptedHeader
Data structure that carries the handshake information.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.