40 #include <sys/types.h>
42 #include <sys/socket.h>
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType(
Utils::IPAll ),
113 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
116 std::ostringstream o;
118 pStreamName = o.str();
131 if( pAddressType == Utils::AddressType::IPAuto )
137 pAddressType = Utils::AddressType::IPv4;
139 pAddressType = Utils::AddressType::IPv6;
145 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146 "Window: %d", pStreamName.c_str(), netStack.c_str(),
147 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
159 pStreamName.c_str() );
163 SubStreamList::iterator it;
164 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
173 if( !pTransport || !pPoller || !pChannelData )
177 pChannelData, 0,
this );
179 pSubStreams[0]->socket = s;
212 return pSubStreams[0]->socket->EnableUplink();
216 return pSubStreams[path.
up]->socket->EnableUplink();
227 time_t now = ::time(0);
229 if( now-pLastStreamError < pStreamErrorWindow )
230 return pLastFatalError;
232 gettimeofday( &pConnectionStarted, 0 );
242 "the host", pStreamName.c_str() );
243 pLastStreamError = now;
245 pLastFatalError = st;
251 std::vector<XrdNetAddr> addrresses;
256 pStreamName.c_str(), pPrefer.
GetHostName().c_str() );
260 std::vector<XrdNetAddr> tmp;
261 tmp.reserve( pAddresses.size() );
263 auto itr = pAddresses.begin();
264 for( ; itr != pAddresses.end() ; ++itr )
266 if( !HasNetAddr( *itr, addrresses ) )
267 tmp.push_back( *itr );
270 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
272 pAddresses.swap( tmp );
279 while( !pAddresses.empty() )
281 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282 pAddresses.pop_back();
283 pConnectionInitTime = ::time( 0 );
284 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
317 if( pSubStreams.size() <= path.
up )
320 "substream %d, using 0 instead", pStreamName.c_str(),
326 "substream %d expecting answer at %d", pStreamName.c_str(),
336 pSubStreams[path.
up]->outQueue->PushBack( msg, handler,
366 SubStreamList::iterator it;
367 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
369 (*it)->socket->Close();
384 SubStreamList::iterator it;
385 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
405 StreamConnectorTask(
const XrdCl::URL &url,
const std::string &n ):
408 std::string name =
"StreamConnectorTask for ";
429 XRootDStatus Stream::RequestClose( Message &response )
437 memcpy( req->
fhandle,
reinterpret_cast<uint8_t*
>( rsp->
body.buffer.data ), 4 );
439 msg->SetSessionId( pSessionId );
440 NullResponseHandler *handler =
new NullResponseHandler();
441 MessageSendParams params;
443 params.followRedirects =
false;
444 params.stateful =
true;
452 bool Stream::IsPartial( Message &msg )
472 std::shared_ptr<Message> msg,
473 uint32_t bytesReceived )
475 msg->SetSessionId( pSessionId );
476 pBytesReceived += bytesReceived;
487 if( !IsPartial( *msg ) )
496 RequestClose( *msg );
510 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511 pStreamName.c_str(), msg.get(), rsp->
hdr.
status,
520 pStreamName.c_str(), msg.get() );
525 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
528 if( IsPartial( *msg ) )
537 Job *job =
new HandleIncMsgJob( handler );
544 std::pair<Message *, MsgHandler *>
549 if( pSubStreams[subStream]->outQueue->IsEmpty() )
552 pSubStreams[subStream]->socket->GetStreamName().c_str() );
554 pSubStreams[subStream]->socket->DisableUplink();
559 h.
msg = pSubStreams[subStream]->outQueue->PopMessage( h.
handler,
573 if( pSubStreams[subStream]->outQueue->IsEmpty() )
576 pSubStreams[subStream]->socket->GetStreamName().c_str() );
577 pSubStreams[subStream]->socket->DisableUplink();
588 pTransport->
MessageSent( msg, subStream, bytesSent,
591 pBytesSent += bytesSent;
601 pStreamName.c_str() );
604 pSubStreams[subStream]->outMsgHelper.Reset();
615 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
618 subStream, ipstack.c_str() );
622 pLastStreamError = 0;
624 pConnectionCount = 0;
626 pSessionId = ++sSessCntGen;
631 if( pSubStreams.size() == 1 && numSub > 1 )
633 for( uint16_t i = 1; i < numSub; ++i )
637 pChannelData, i,
this );
639 pSubStreams[i]->socket = s;
648 if( pSubStreams.size() > 1 )
651 pStreamName.c_str(), pSubStreams.size() - 1 );
652 for(
size_t i = 1; i < pSubStreams.size(); ++i )
654 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
655 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
658 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
659 pSubStreams[i]->socket->Close();
673 gettimeofday( &pConnectionDone, 0 );
679 i.
sTOD = pConnectionStarted;
680 i.
eTOD = pConnectionDone;
681 i.
streams = pSubStreams.size();
684 std::string *qryResponse = 0;
686 qryResult.
Get( qryResponse );
687 i.
auth = *qryResponse;
697 else if( pOnDataConnJob )
702 pJobManager->
QueueJob( pOnDataConnJob.get(), 0 );
713 pSubStreams[subStream]->socket->Close();
714 time_t now = ::time(0);
728 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
731 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
733 OnFatalError( 0, st, scopedLock );
740 OnFatalError( subStream, status, scopedLock );
747 time_t elapsed = now-pConnectionInitTime;
749 pStreamName.c_str(), (
long long) elapsed, pConnectionWindow );
754 if( !pAddresses.empty() )
759 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
760 pAddresses.pop_back();
761 pConnectionInitTime = ::time( 0 );
762 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
764 while( !pAddresses.empty() && !st.
IsOK() );
767 OnFatalError( subStream, st, scopedLock );
775 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
779 pStreamName.c_str(), (
long long) (pConnectionWindow - elapsed) );
781 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
782 pTaskManager->
RegisterTask( task, pConnectionInitTime+pConnectionWindow );
789 else if( pConnectionCount < pConnectionRetry && !status.
IsFatal() )
796 OnFatalError( subStream, st, scopedLock );
803 OnFatalError( subStream, status, scopedLock );
813 pSubStreams[subStream]->socket->Close();
817 pStreamName.c_str(), subStream, status.
ToString().c_str() );
822 if( pSubStreams[subStream]->outMsgHelper.msg )
827 pSubStreams[subStream]->outMsgHelper.Reset();
833 if( pSubStreams[subStream]->inMsgHelper.handler )
849 if( pSubStreams[subStream]->outQueue->IsEmpty() )
854 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
857 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
859 OnFatalError( 0, st, scopedLock );
863 OnFatalError( subStream, status, scopedLock );
874 MonitorDisconnection( status );
876 SubStreamList::iterator it;
877 size_t outstanding = 0;
878 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
879 outstanding += (*it)->outQueue->GetSizeStateless();
887 OnFatalError( 0, st, scopedLock );
897 "message handlers.", pStreamName.c_str() );
899 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
917 for(
size_t substream = 0; substream < pSubStreams.size(); ++substream )
920 pSubStreams[substream]->socket->Close();
925 pStreamName.c_str(), status.
ToString().c_str() );
930 if( pSubStreams[substream]->outMsgHelper.msg )
935 pSubStreams[substream]->outMsgHelper.Reset();
941 if( pSubStreams[substream]->inMsgHelper.handler )
951 pConnectionCount = 0;
958 "message handlers.", pStreamName.c_str() );
960 SubStreamList::iterator it;
962 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
975 void Stream::OnFatalError( uint16_t subStream,
982 pStreamName.c_str(), status.
ToString().c_str() );
990 pConnectionCount = 0;
991 pLastStreamError = ::time(0);
992 pLastFatalError = status;
995 SubStreamList::iterator it;
997 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
998 q.GrabItems( *(*it)->outQueue );
1011 void Stream::MonitorDisconnection( XRootDStatus status )
1016 Monitor::DisconnectInfo i;
1018 i.rBytes = pBytesReceived;
1019 i.sBytes = pBytesSent;
1020 i.cTime = ::time(0) - pConnectionDone.tv_sec;
1034 if( substream != 0 )
1044 SubStreamList::iterator it;
1045 time_t now = time(0);
1048 uint32_t outgoingMessages = 0;
1049 time_t lastActivity = 0;
1050 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1052 outgoingMessages += (*it)->outQueue->GetSize();
1053 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1054 if( lastActivity < sockLastActivity )
1055 lastActivity = sockLastActivity;
1058 if( !outgoingMessages )
1065 pStreamName.c_str() );
1180 std::vector<XrdNetAddr> prefaddrs;
1185 , pStreamName.c_str(), url.
GetHostName().c_str() );
1192 std::vector<XrdNetAddr> aliasaddrs;
1197 , pStreamName.c_str(), pUrl->
GetHostName().c_str() );
1204 auto itr = prefaddrs.begin();
1205 for( ; itr != prefaddrs.end() ; ++itr )
1207 auto itr2 = aliasaddrs.begin();
1208 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1209 if( itr->Same( &*itr2 ) )
return true;
1224 result.
Set(
new std::string( pSubStreams[0]->socket->GetIpAddr() ),
false );
1230 result.
Set(
new std::string( pSubStreams[0]->socket->GetIpStack() ),
false );
1236 result.
Set(
new std::string( pSubStreams[0]->socket->GetHostName() ),
false );
union ServerResponse::@0 body
struct ServerResponseBody_Status bdy
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ StreamBroken
The stream is broken.
@ FatalError
Stream has been broken and won't be recovered.
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
void AddMessageHandler(MsgHandler *handler, time_t expires, bool &rmMsg)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
virtual time_t GetExpiration()=0
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
Status ForceDisconnect(const URL &url)
Shut down a channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
SocketStatus
Status of the socket.
@ Disconnected
The socket is disconnected.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
const std::string & GetHostName() const
Get the name of the target host.
bool IsValid() const
Is the url valid.
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint16_t errQueryNotSupported
const uint16_t errUninitialized
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errInvalidSession
const uint16_t errAuthFailed
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Describe a server login event.
std::string server
"user@host:port"
uint16_t streams
Number of streams.
timeval sTOD
gettimeofday() when login started
timeval eTOD
gettimeofday() when login ended
std::string auth
authentication protocol used or empty if none
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
bool IsFatal() const
Fatal error.
std::string ToString() const
Create a string representation.
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
AsyncSocketHandler * socket
OutQueue::MsgHelper outMsgHelper
Socket::SocketStatus status
static const uint16_t Auth
Transport name, returns std::string *.