51 #include "XrdVersion.hh"
53 #include <arpa/inet.h>
54 #include <sys/types.h>
88 std::pair< std::set<std::string>::iterator,
bool > ret =
protocols.insert( protocol );
145 strmqueues.resize( size - 1, 0 );
153 strmqueues.resize( size - 1, 0);
161 uint16_t
Select(
const std::vector<bool> &connected )
164 size_t minval = std::numeric_limits<size_t>::max();
166 for( uint16_t i = 0; i < connected.size() && i < strmqueues.size(); ++i )
168 if( !connected[i] )
continue;
170 if( strmqueues[i] < minval )
173 minval = strmqueues[i];
187 --strmqueues[substrm - 1];
192 std::vector<size_t> strmqueues;
198 bindprefs( std::move( bindprefs ) ), next( 0 )
202 inline const std::string&
Get()
204 std::string &ret = bindprefs[next];
206 if( next >= bindprefs.size() )
212 std::vector<std::string> bindprefs;
301 delete pSecUnloadHandler; pSecUnloadHandler = 0;
320 size_t leftToBeRead = 8 - message.
GetCursor();
321 while( leftToBeRead )
325 leftToBeRead, bytesRead );
329 leftToBeRead -= bytesRead;
334 uint32_t bodySize = *(uint32_t*)(message.
GetBuffer(4));
337 "body", &message, bodySize );
352 size_t leftToBeRead = 0;
353 uint32_t bodySize = 0;
355 bodySize = rsphdr->
dlen;
357 if( message.
GetSize() < bodySize + 8 )
360 leftToBeRead = bodySize-(message.
GetCursor()-8);
361 while( leftToBeRead )
369 leftToBeRead -= bytesRead;
392 uint32_t bodySize = rsphdr->
dlen;
395 "kXR_status: invalid message size." );
400 if( message.
GetSize() < bodySize + 8 )
403 size_t leftToBeRead = bodySize-(message.
GetCursor()-8);
404 while( leftToBeRead )
412 leftToBeRead -= bytesRead;
444 channelData.
Set( info );
448 env->
GetInt(
"SubStreamsPerChannel", streams );
449 if( streams < 1 ) streams = 1;
450 info->
stream.resize( streams );
471 channelData.
Get( info );
478 "[%s] Internal error: not enough substreams",
486 return HandShakeMain( handShakeData, channelData );
488 return HandShakeParallel( handShakeData, channelData );
498 channelData.
Get( info );
507 handShakeData->
out = GenerateInitialHSProtocol( handShakeData, info,
518 XRootDStatus st = ProcessServerHS( handShakeData, info );
532 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
542 handShakeData->
out = GenerateProtocol( handShakeData, info,
548 handShakeData->
out = GenerateLogIn( handShakeData, info );
559 XRootDStatus st = ProcessLogInResp( handShakeData, info );
567 if( st.IsOK() && st.code ==
suDone )
576 handShakeData->
out = GenerateEndSession( handShakeData, info );
586 st = DoAuthentication( handShakeData, info );
599 XRootDStatus st = DoAuthentication( handShakeData, info );
607 if( st.IsOK() && st.code ==
suDone )
614 handShakeData->
out = GenerateEndSession( handShakeData, info );
632 XRootDStatus st = ProcessEndSessionResp( handShakeData, info );
634 if( st.IsOK() && st.code ==
suDone )
638 else if( !st.IsOK() )
652 XRootDStatus XRootDTransport::HandShakeParallel( HandShakeData *handShakeData,
653 AnyObject &channelData )
655 XRootDChannelInfo *info = 0;
656 channelData.Get( info );
658 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
666 handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
678 XRootDStatus st = ProcessServerHS( handShakeData, info );
692 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
700 handShakeData->out = GenerateBind( handShakeData, info );
710 XRootDStatus st = ProcessBindResp( handShakeData, info );
718 return XRootDStatus();
720 return XRootDStatus();
731 channelData.
Get( info );
743 channelData.
Get( info );
754 env->
GetInt(
"DataServerTTL", ttl );
759 env->
GetInt(
"LoadBalancerTTL", ttl );
766 uint16_t allocatedSIDs = info->
sidManager->GetNumberOfAllocatedSIDs();
768 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
769 info->
streamName.c_str(), (
long long) inactiveTime, ttl, allocatedSIDs,
772 if( info->
openFiles != 0 && info->
finstcnt.load( std::memory_order_relaxed ) != 0 )
775 if( !allocatedSIDs && inactiveTime > ttl )
789 channelData.
Get( info );
794 env->
GetInt(
"StreamTimeout", streamTimeout );
798 const time_t now = time(0);
800 info->
sidManager->IsAnySIDOldAs( now - streamTimeout );
803 "stream timeout: %d, any SID: %d, wait barrier: %s",
804 info->
streamName.c_str(), (
long long) inactiveTime, streamTimeout,
807 if( inactiveTime < streamTimeout )
810 if( now < info->waitBarrier )
835 channelData.
Get( info );
849 uint16_t upStream = 0;
850 uint16_t downStream = 0;
855 downStream = hint->
down;
860 std::vector<bool> connected;
861 connected.reserve( info->
stream.size() - 1 );
862 size_t nbConnected = 0;
863 for(
size_t i = 1; i < info->
stream.size(); ++i )
866 connected.push_back(
true );
870 connected.push_back(
false );
872 if( nbConnected == 0 )
878 if( upStream >= info->
stream.size() )
881 "[%s] Up link stream %d does not exist, using 0",
886 if( downStream >= info->
stream.size() )
889 "[%s] Down link stream %d does not exist, using 0",
913 memset( newBuf, 0, 8 );
987 return PathID( upStream, downStream );
998 channelData.
Get( info );
1011 uint16_t ret = info->
stream.size();
1015 env->
GetInt(
"TlsNoData", nodata );
1028 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1029 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1035 if( ret == 1 ) ++ret;
1038 if( ret > info->
stream.size() )
1040 info->
stream.resize( ret );
1139 uint16_t numChunks = (req->
readv.
dlen)/16;
1141 for(
size_t i = 0; i < numChunks; ++i )
1143 dataChunk[i].
rlen = htonl( dataChunk[i].rlen );
1144 dataChunk[i].
offset = htonll( dataChunk[i].offset );
1157 for(
size_t i = 0; i < numChunks; ++i )
1159 wrtList[i].
wlen = htonl( wrtList[i].wlen );
1160 wrtList[i].
offset = htonll( wrtList[i].offset );
1244 m->
body.protocol.pval = ntohl( m->
body.protocol.pval );
1245 m->
body.protocol.flags = ntohl( m->
body.protocol.flags );
1256 m->
body.error.errnum = ntohl( m->
body.error.errnum );
1266 m->
body.wait.seconds = htonl( m->
body.wait.seconds );
1276 m->
body.redirect.port = htonl( m->
body.redirect.port );
1286 m->
body.waitresp.seconds = htonl( m->
body.waitresp.seconds );
1296 m->
body.attn.actnum = htonl( m->
body.attn.actnum );
1332 "kXR_status: invalid message size." );
1360 "corrupted (crc32c integrity check failed)." );
1367 "(stream ID mismatch)." );
1375 "(request ID mismatch)." );
1399 "kXR_status: invalid message size." );
1413 if( crcval != cse->
cseCRC )
1416 "corrupted (crc32c integrity check failed)." );
1427 for(
size_t i = 0; i < pgcnt; ++i )
1428 pgoffs[i] = ntohll( pgoffs[i] );
1448 header->
dlen = ntohl( header->
dlen );
1458 char *errmsg =
new char[rsp->
hdr.
dlen-3]; errmsg[rsp->
hdr.
dlen-4] = 0;
1459 memcpy( errmsg, rsp->
body.error.errmsg, rsp->
hdr.
dlen-4 );
1461 rsp->
body.error.errnum, errmsg );
1471 channelData.
Get( info );
1474 uint16_t nbConnected = 0;
1475 for(
size_t i = 1; i < info->
stream.size(); ++i )
1486 uint16_t subStreamId )
1489 channelData.
Get( info );
1492 CleanUpProtection( info );
1494 if( !info->
stream.empty() )
1500 if( subStreamId == 0 )
1518 channelData.
Get( info );
1527 result.
Set( (
const char*)
"XRootD",
false );
1566 channelData.
Get( info );
1588 "response that we're no longer interested in (timed out)",
1600 uint16_t sid; memcpy( &sid, rsp->
hdr.
streamid, 2 );
1601 std::set<uint16_t>::iterator sidIt = info->
sentOpens.find( sid );
1613 uint32_t seconds = 0;
1615 seconds = ntohl( rsp->
body.wait.seconds ) + 5;
1619 seconds = ntohl( rsp->
body.waitresp.seconds );
1621 log->
Dump(
XRootDMsg,
"[%s] Got kXR_waitresp response of %u seconds, "
1622 "setting up wait barrier.",
1627 time_t barrier = time(0) + seconds;
1635 uint16_t sid; memcpy( &sid, rsp->
hdr.
streamid, 2 );
1636 std::set<uint16_t>::iterator sidIt = info->
sentOpens.find( sid );
1645 info->
finstcnt.fetch_add( 1, std::memory_order_relaxed );
1675 channelData.
Get( info );
1700 channelData.
Get( info );
1728 sign->
Grab(
reinterpret_cast<char*
>( newreq ), rc );
1740 channelData.
Get( info );
1741 if( info->
finstcnt.load( std::memory_order_relaxed ) > 0 )
1742 info->
finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1751 pSecUnloadHandler->
unloaded =
true;
1761 channelData.
Get( info );
1765 env->
GetInt(
"NoTlsOK", notlsok );
1839 channelData.
Get( info );
1856 "[%s] Sending out the initial hand shake + kXR_protocol",
1866 init->
fifth = htonl(2012);
1869 InitProtocolReq( proto, info, expect );
1877 Message *XRootDTransport::GenerateProtocol( HandShakeData *hsData,
1878 XRootDChannelInfo *info,
1883 "[%s] Sending out the kXR_protocol",
1884 hsData->streamName.c_str() );
1886 Message *msg =
new Message();
1891 InitProtocolReq( proto, info, expect );
1900 XRootDChannelInfo *info,
1913 env->
GetInt(
"NoTlsOK", notlsok );
1916 env->
GetInt(
"TlsNoData", tlsnodata );
1918 if (info->encrypted ||
InitTLS())
1921 if (info->encrypted && !(notlsok || tlsnodata))
1924 request->
expect = expect;
1937 XRootDStatus XRootDTransport::ProcessServerHS( HandShakeData *hsData,
1938 XRootDChannelInfo *info )
1942 Message *msg = hsData->in;
1949 hsData->streamName.c_str() );
1954 info->protocolVersion = ntohl(hs->
protover);
1960 "[%s] Got the server hand shake response (%s, protocol "
1962 hsData->streamName.c_str(),
1963 ServerFlagsToStr( info->serverFlags ).c_str(),
1964 info->protocolVersion );
1972 XRootDStatus XRootDTransport::ProcessProtocolResp( HandShakeData *hsData,
1973 XRootDChannelInfo *info )
1987 hsData->streamName.c_str() );
1994 env->
GetInt(
"NoTlsOK", notlsok );
2002 if( !notlsok )
return XRootDStatus(
stFatal,
errTlsError, ENOTSUP,
"TLS not supported" );
2009 "[%s] Falling back to unencrypted transmission, server does "
2010 "not support TLS encryption.",
2011 hsData->streamName.c_str() );
2012 info->encrypted =
false;
2015 if( rsp->
body.protocol.pval >= 0x297 )
2016 info->serverFlags = rsp->
body.protocol.flags;
2021 info->protRespBody->flags = rsp->
body.protocol.flags;
2022 info->protRespBody->pval = rsp->
body.protocol.pval;
2024 char* bodybuff =
reinterpret_cast<char*
>( &rsp->
body.protocol.secreq );
2025 size_t bodysize = rsp->
hdr.
dlen - 8;
2026 XRootDStatus st = ProcessProtocolBody( bodybuff, bodysize, info );
2032 "[%s] kXR_protocol successful (%s, protocol version %x)",
2033 hsData->streamName.c_str(),
2034 ServerFlagsToStr( info->serverFlags ).c_str(),
2035 info->protocolVersion );
2037 if( !( info->serverFlags &
kXR_haveTLS ) && info->encrypted )
2044 "Server was not configured to support encryption." );
2052 env->
GetInt(
"WantTlsOnNoPgrw", tlsOnNoPgrw );
2053 if( !( info->serverFlags &
kXR_suppgrw ) && tlsOnNoPgrw )
2059 if( info->encrypted )
2062 "[%s] Server does not support PgRead/PgWrite and"
2063 " WantTlsOnNoPgrw is on; enforcing encryption for data.",
2064 hsData->streamName.c_str() );
2074 info->encrypted =
true;
2082 XRootDStatus XRootDTransport::ProcessProtocolBody(
char *bodybuff,
2084 XRootDChannelInfo *info )
2095 if( bodysize < bifreq->bifILen )
2097 "protocol response." );
2098 std::string bindprefs_str( bodybuff, bifreq->
bifILen );
2099 std::vector<std::string> bindprefs;
2101 info->bindSelector.reset(
new BindPrefSelector( std::move( bindprefs ) ) );
2109 if( bodysize >= 6 && secreq->
theTag ==
'S' )
2111 memcpy( &info->protRespBody->secreq, secreq, bodysize );
2112 info->protRespSize = bodysize + 8 ;
2115 return XRootDStatus();
2121 Message *XRootDTransport::GenerateBind( HandShakeData *hsData,
2122 XRootDChannelInfo *info )
2127 "[%s] Sending out the bind request",
2128 hsData->streamName.c_str() );
2135 memcpy( bindReq->
sessid, info->sessionId, 16 );
2144 XRootDStatus XRootDTransport::ProcessBindResp( HandShakeData *hsData,
2145 XRootDChannelInfo *info )
2158 hsData->streamName.c_str() );
2162 info->stream[hsData->subStreamId].pathId = rsp->
body.bind.substreamid;
2164 hsData->streamName.c_str() );
2166 return XRootDStatus();
2172 Message *XRootDTransport::GenerateLogIn( HandShakeData *hsData,
2173 XRootDChannelInfo *info )
2184 char *cgiBuffer =
new char[1024 + info->logintoken.size()];
2185 std::string appName;
2186 std::string monInfo;
2187 env->GetString(
"AppName", appName );
2188 env->GetString(
"MonInfo", monInfo );
2189 if( info->logintoken.empty() )
2191 snprintf( cgiBuffer, 1024,
2192 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2193 "xrd.hostname=%s&xrd.rn=%s", countryCode.c_str(), timeZone,
2194 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION );
2198 snprintf( cgiBuffer, 1024,
2199 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2200 "xrd.hostname=%s&xrd.rn=%s&%s", countryCode.c_str(), timeZone,
2201 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION, info->logintoken.c_str() );
2203 uint16_t cgiLen = strlen( cgiBuffer );
2213 loginReq->
pid = ::getpid();
2215 loginReq->
dlen = cgiLen;
2221 int multiProtocol = 0;
2222 env->GetInt(
"MultiProtocol", multiProtocol );
2230 bool dualStack =
false;
2231 bool privateIPv6 =
false;
2232 bool privateIPv4 =
false;
2256 if( !dualStack && hsData->serverAddr )
2269 std::string buffer( 8, 0 );
2270 if( hsData->url->GetUserName().length() )
2271 buffer = hsData->url->GetUserName();
2274 char *name =
new char[1024];
2281 buffer.resize( 8, 0 );
2282 std::copy( buffer.begin(), buffer.end(), (
char*)loginReq->
username );
2284 msg->Append( cgiBuffer, cgiLen, 24 );
2287 "username: %s, cgi: %s, dual-stack: %s, private IPv4: %s, "
2288 "private IPv6: %s", hsData->streamName.c_str(),
2289 loginReq->
username, cgiBuffer, dualStack ?
"true" :
"false",
2290 privateIPv4 ?
"true" :
"false",
2291 privateIPv6 ?
"true" :
"false" );
2293 delete [] cgiBuffer;
2301 XRootDStatus XRootDTransport::ProcessLogInResp( HandShakeData *hsData,
2302 XRootDChannelInfo *info )
2315 hsData->streamName.c_str() );
2319 if( !info->firstLogIn )
2320 memcpy( info->oldSessionId, info->sessionId, 16 );
2322 if( rsp->
hdr.
dlen == 0 && info->protocolVersion <= 0x289 )
2329 memset( info->sessionId, 0, 16 );
2331 "[%s] Logged in, accepting empty login response.",
2332 hsData->streamName.c_str() );
2333 return XRootDStatus();
2339 memcpy( info->sessionId, rsp->
body.login.sessid, 16 );
2344 hsData->streamName.c_str(), sessId.c_str() );
2351 size_t len = rsp->
hdr.
dlen-16;
2352 info->authBuffer =
new char[len+1];
2353 info->authBuffer[len] = 0;
2354 memcpy( info->authBuffer, rsp->
body.login.sec, len );
2356 hsData->streamName.c_str(), info->authBuffer );
2361 return XRootDStatus();
2367 XRootDStatus XRootDTransport::DoAuthentication( HandShakeData *hsData,
2368 XRootDChannelInfo *info )
2374 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2376 std::string protocolName;
2384 hsData->streamName.c_str() );
2390 info->authEnv->Put(
"sockname", hsData->clientName.c_str() );
2391 info->authEnv->Put(
"username", hsData->url->GetUserName().c_str() );
2392 info->authEnv->Put(
"password", hsData->url->GetPassword().c_str() );
2395 URL::ParamsMap::const_iterator it;
2396 for( it = urlParams.begin(); it != urlParams.end(); ++it )
2398 if( it->first.compare( 0, 4,
"xrd." ) == 0 ||
2399 it->first.compare( 0, 6,
"xrdcl." ) == 0 )
2400 info->authEnv->Put( it->first.c_str(), it->second.c_str() );
2406 size_t authBuffLen = strlen( info->authBuffer );
2407 char *pars = (
char *)malloc( authBuffLen + 1 );
2408 memcpy( pars, info->authBuffer, authBuffLen );
2411 delete [] info->authBuffer;
2412 info->authBuffer = 0;
2417 XRootDStatus st = GetCredentials( credentials, hsData, info );
2420 CleanUpAuthentication( info );
2423 protocolName = info->authProtocol->Entity.prot;
2432 protocolName = info->authProtocol->Entity.prot;
2440 "[%s] Sending more authentication data for %s",
2441 hsData->streamName.c_str(), protocolName.c_str() );
2444 char *secTokenData = (
char*)malloc( len );
2445 memcpy( secTokenData, rsp->
body.authmore.data, len );
2448 credentials = info->authProtocol->getCredentials( secToken, &ei );
2457 "[%s] Auth protocol handler for %s refuses to give "
2458 "us more credentials %s",
2459 hsData->streamName.c_str(), protocolName.c_str(),
2461 CleanUpAuthentication( info );
2471 info->authProtocolName = info->authProtocol->Entity.prot;
2476 if( info->protRespBody )
2478 int rc =
XrdSecGetProtection( info->protection, *info->authProtocol, *info->protRespBody, info->protRespSize );
2482 "[%s] XrdSecProtect loaded.", hsData->streamName.c_str() );
2487 "[%s] XrdSecProtect: no protection needed.",
2488 hsData->streamName.c_str() );
2493 "[%s] Failed to load XrdSecProtect: %s",
2494 hsData->streamName.c_str(),
XrdSysE2T( -rc ) );
2495 CleanUpAuthentication( info );
2501 if( !info->protection )
2502 CleanUpAuthentication( info );
2504 pSecUnloadHandler->
Register( info->authProtocolName );
2507 "[%s] Authenticated with %s.", hsData->streamName.c_str(),
2508 protocolName.c_str() );
2516 return XRootDStatus();
2523 char *errmsg =
new char[rsp->
hdr.
dlen-3]; errmsg[rsp->
hdr.
dlen-4] = 0;
2524 memcpy( errmsg, rsp->
body.error.errmsg, rsp->
hdr.
dlen-4 );
2526 "[%s] Authentication with %s failed: %s",
2527 hsData->streamName.c_str(), protocolName.c_str(),
2531 info->authProtocol->Delete();
2532 info->authProtocol = 0;
2537 XRootDStatus st = GetCredentials( credentials, hsData, info );
2540 CleanUpAuthentication( info );
2543 protocolName = info->authProtocol->Entity.prot;
2550 info->authProtocolName = info->authProtocol->Entity.prot;
2551 CleanUpAuthentication( info );
2554 "[%s] Authentication with %s failed: unexpected answer",
2555 hsData->streamName.c_str(), protocolName.c_str() );
2571 protocolName.length() > 4 ? 4 : protocolName.length() );
2573 memcpy( reqBuffer, credentials->
buffer, credentials->
size );
2591 HandShakeData *hsData,
2592 XRootDChannelInfo *info )
2608 char *secuidc = (ei.getEnv()) ? ei.getEnv()->Get(
"xrdcl.secuid") : 0;
2609 char *secgidc = (ei.getEnv()) ? ei.getEnv()->Get(
"xrdcl.secgid") : 0;
2614 if(secuidc) secuid = atoi(secuidc);
2615 if(secgidc) secgid = atoi(secgidc);
2618 ScopedFsUidSetter uidSetter(secuid, secgid, hsData->streamName);
2619 if(!uidSetter.IsOk()) {
2620 log->Error(
XRootDTransportMsg,
"[%s] Error while setting (fsuid, fsgid) to (%d, %d)",
2621 hsData->streamName.c_str(), secuid, secgid );
2625 if(secuid >= 0 || secgid >= 0) {
2626 log->Error(
XRootDTransportMsg,
"[%s] xrdcl.secuid and xrdcl.secgid only supported on Linux.",
2627 hsData->streamName.c_str() );
2629 " only supported on Linux" );
2638 srvAddrInfo.
SetTLS( info->encrypted );
2644 info->authProtocol = (*authHandler)( hsData->url->GetHostName().c_str(),
2648 if( !info->authProtocol )
2651 hsData->streamName.c_str() );
2655 std::string protocolName = info->authProtocol->Entity.prot;
2657 hsData->streamName.c_str(), protocolName.c_str() );
2662 credentials = info->authProtocol->getCredentials( 0, &ei );
2666 "[%s] Cannot get credentials for protocol %s: %s",
2667 hsData->streamName.c_str(), protocolName.c_str(),
2669 info->authProtocol->Delete();
2679 Status XRootDTransport::CleanUpAuthentication( XRootDChannelInfo *info )
2681 if( info->authProtocol )
2682 info->authProtocol->Delete();
2683 delete info->authParams;
2684 delete info->authEnv;
2685 info->authProtocol = 0;
2686 info->authParams = 0;
2695 Status XRootDTransport::CleanUpProtection( XRootDChannelInfo *info )
2700 if( info->protection )
2702 info->protection->Delete();
2703 info->protection = 0;
2705 CleanUpAuthentication( info );
2708 if( info->protRespBody )
2710 delete info->protRespBody;
2711 info->protRespBody = 0;
2712 info->protRespSize = 0;
2724 char errorBuff[1024];
2729 auto ret = authHandler.load( std::memory_order_relaxed );
2730 if( ret )
return ret;
2739 ret = authHandler.load( std::memory_order_relaxed );
2740 if( ret )
return ret;
2744 authHandler.store( ret, std::memory_order_relaxed );
2749 "Unable to get the security framework: %s", errorBuff );
2758 Message *XRootDTransport::GenerateEndSession( HandShakeData *hsData,
2759 XRootDChannelInfo *info )
2770 memcpy( endsessReq->
sessid, info->oldSessionId, 16 );
2774 " %s", hsData->streamName.c_str(), sessId.c_str() );
2785 uint32_t size = sign->GetSize();
2786 sign->ReAllocate( size + msg->GetSize() );
2787 char* buffer = sign->GetBuffer( size );
2788 memcpy( buffer, msg->GetBuffer(), msg->GetSize() );
2789 msg->Grab( sign->GetBuffer(), sign->GetSize() );
2798 Status XRootDTransport::ProcessEndSessionResp( HandShakeData *hsData,
2799 XRootDChannelInfo *info )
2821 std::string errorMsg( rsp->
body.error.errmsg, rsp->
hdr.
dlen - 4 );
2823 "kXR_endsess: %s", hsData->streamName.c_str(),
2831 std::string msg( rsp->
body.wait.infomsg, rsp->
hdr.
dlen - 4 );
2833 "kXR_endsess: %s", hsData->streamName.c_str(),
2835 hsData->out = GenerateEndSession( hsData, info );
2846 std::string XRootDTransport::ServerFlagsToStr( uint32_t flags )
2848 std::string repr =
"type: ";
2872 repr.erase( repr.length()-1, 1 );
2883 char *GetDataAsString(
char *msg )
2886 char *fn =
new char[req->
dlen+1];
2887 memcpy( fn, msg + 24, req->
dlen );
2914 char *fn = GetDataAsString( msg );
2915 o <<
"file: " << fn <<
", ";
2917 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
", ";
2918 o << std::setbase(10);
2935 o <<
"kXR_open_apnd ";
2937 o <<
"kXR_open_read ";
2939 o <<
"kXR_open_updt ";
2943 o <<
"kXR_refresh ";
2945 o <<
"kXR_replica ";
2951 o <<
"kXR_retstat ";
2964 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
2978 char *fn = GetDataAsString( msg );;
2979 o <<
"path: " << fn <<
", ";
2984 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3006 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3007 o << std::setbase(10);
3009 o <<
"offset: " << sreq->
offset <<
", ";
3010 o <<
"size: " << sreq->
rlen <<
")";
3020 o <<
"kXR_pgread (";
3021 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3022 o << std::setbase(10);
3024 o <<
"offset: " << sreq->
offset <<
", ";
3025 o <<
"size: " << sreq->
rlen <<
")";
3036 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3037 o << std::setbase(10);
3039 o <<
"offset: " << sreq->
offset <<
", ";
3040 o <<
"size: " << sreq->
dlen <<
")";
3050 o <<
"kXR_pgwrite (";
3051 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3052 o << std::setbase(10);
3054 o <<
"offset: " << sreq->
offset <<
", ";
3055 o <<
"size: " << sreq->
dlen <<
")";
3066 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3077 o <<
"kXR_truncate (";
3079 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3082 char *fn = GetDataAsString( msg );
3083 o <<
"file: " << fn;
3086 o << std::setbase(10);
3088 o <<
"offset: " << sreq->
offset;
3098 unsigned char *fhandle = 0;
3103 fhandle = dataChunk[0].
fhandle;
3105 o << FileHandleToStr( fhandle );
3109 o << std::setbase(10);
3114 size += dataChunk[i].
rlen;
3115 o <<
"(offset: " << dataChunk[i].
offset;
3116 o <<
", size: " << dataChunk[i].
rlen <<
"); ";
3119 o <<
"total size: " << size <<
")";
3128 unsigned char *fhandle = 0;
3129 o <<
"kXR_writev (";
3134 uint32_t numChunks = 0;
3138 size += wrtList[i].
wlen;
3143 o << FileHandleToStr( fhandle );
3147 o << std::setbase(10);
3148 o <<
"chunks: " << numChunks <<
", ";
3149 o <<
"total size: " << size <<
")";
3159 char *fn = GetDataAsString( msg );;
3160 o <<
"kXR_locate (";
3161 o <<
"path: " << fn <<
", ";
3169 o <<
"kXR_refresh ";
3171 o <<
"kXR_prefname ";
3177 o <<
"kXR_compress ";
3193 o <<
"destination: ";
3215 case kXR_QPrep: o <<
"kXR_QPrep";
break;
3218 case kXR_Qvisa: o <<
"kXR_Qvisa";
break;
3220 default: o << sreq->
infotype;
break;
3226 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3230 o <<
"arg length: " << sreq->
dlen <<
")";
3240 char *fn = GetDataAsString( msg );;
3241 o <<
"path: " << fn <<
")";
3253 char *fn = GetDataAsString( msg );
3254 o <<
"path: " << fn <<
", ";
3256 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
", ";
3257 o << std::setbase(10);
3264 o <<
"kXR_mkdirpath";
3276 char *fn = GetDataAsString( msg );
3277 o <<
"path: " << fn <<
")";
3289 char *fn = GetDataAsString( msg );
3290 o <<
"path: " << fn <<
", ";
3292 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
")";
3311 o <<
"kXR_protocol (";
3312 o <<
"clientpv: 0x" << std::setbase(16) << sreq->
clientpv <<
")";
3321 o <<
"kXR_dirlist (";
3322 char *fn = GetDataAsString( msg );;
3323 o <<
"path: " << fn <<
")";
3334 char *fn = GetDataAsString( msg );;
3335 o <<
"data: " << fn <<
")";
3346 o <<
"kXR_prepare (";
3363 o <<
", priority: " << (int) sreq->
prty <<
", ";
3365 char *fn = GetDataAsString( msg );
3367 for( cursor = fn; *cursor; ++cursor )
3368 if( *cursor ==
'\n' ) *cursor =
' ';
3370 o <<
"paths: " << fn <<
")";
3378 o <<
"kXR_chkpoint (";
3386 o <<
"kXR_ckpXeq) ";
3400 o <<
"kXR_unknown (length: " << req->
dlen <<
")";
3409 std::string XRootDTransport::FileHandleToStr(
const unsigned char handle[4] )
3411 std::ostringstream o;
3413 for( uint8_t i = 0; i < 4; ++i )
3415 o << std::setbase(16) << std::setfill(
'0') << std::setw(2);
3416 o << (int)handle[i];
static const int kXR_ckpRollback
struct ClientTruncateRequest truncate
union ServerResponse::@0 body
ServerResponseStatus status
struct ClientPgReadRequest pgread
struct ClientMkdirRequest mkdir
struct ClientAuthRequest auth
static const int kXR_ckpXeq
struct ClientPgWriteRequest pgwrite
struct ClientReadVRequest readv
struct ClientOpenRequest open
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
struct ClientWriteVRequest writev
struct ClientLoginRequest login
struct ClientChmodRequest chmod
struct ClientQueryRequest query
struct ClientReadRequest read
struct ClientMvRequest mv
struct ClientChkPointRequest chkpoint
struct ServerResponseHeader hdr
#define kXR_PROTOCOLVERSION
static const int kXR_ckpCommit
struct ClientPrepareRequest prepare
static const int kXR_ckpQuery
struct ClientWriteRequest write
#define kXR_PROTTLSVERSION
struct ClientProtocolRequest protocol
struct ClientLocateRequest locate
static const int kXR_ckpBegin
XrdSecBuffer XrdSecParameters
XrdSecProtocol *(* XrdSecGetProt_t)(const char *hostname, XrdNetAddrInfo &endPoint, XrdSecParameters §oken, XrdOucErrInfo *einfo)
Typedef to simplify the encoding of methods returning XrdSecProtocol.
XrdSecGetProt_t XrdSecLoadSecFactory(char *eBuff, int eBlen, const char *seclib)
int XrdSecGetProtection(XrdSecProtect *&protP, XrdSecProtocol &aprot, ServerResponseBody_Protocol &resp, unsigned int resplen)
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.
const char * XrdSysE2T(int errcode)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
void Grab(char *buffer, uint32_t size)
Grab a buffer allocated outside.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
void ReAllocate(uint32_t size)
Reallocate the buffer to a new location of a given size.
void Allocate(uint32_t size)
Allocate the buffer.
uint32_t GetCursor() const
Get append cursor.
uint32_t GetSize() const
Get the size of the message.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
bool GetInt(const std::string &key, int &value)
void Error(uint64_t topic, const char *format,...)
Report an error.
LogLevel GetLevel() const
Get the log level.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
void SetIsMarshalled(bool isMarshalled)
Set the marshalling status.
bool IsMarshalled() const
Check if the message is marshalled.
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
static void ClearErrorQueue()
Clear the error queue for the calling thread.
Perform the handshake and the authentication for each physical stream.
@ RequestClose
Send a close request.
virtual void WaitBeforeExit()=0
Wait before exit.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
std::string GetChannelId() const
std::map< std::string, std::string > ParamsMap
bool IsSecure() const
Does the protocol indicate encryption.
bool IsTPC() const
Is the URL used in TPC context.
std::string GetLoginToken() const
Get the login token if present in the opaque info.
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
static std::string FQDNToCC(const std::string &fqdn)
Convert the fully qualified host name to country code.
static std::string Char2Hex(uint8_t *array, uint16_t size)
Print a char array as hex.
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
const std::string & GetErrorMessage() const
Get error message.
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
virtual bool IsStreamTTLElapsed(time_t time, AnyObject &channelData)
Check if the stream should be disconnected.
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)
The stream has been disconnected, do the cleanups.
XRootDTransport()
Constructor.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)
Check if the message invokes a stream action.
virtual void WaitBeforeExit()
Wait until the program can safely exit.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
virtual XRootDStatus GetBody(Message &message, Socket *socket)
virtual XRootDStatus GetHeader(Message &message, Socket *socket)
~XRootDTransport()
Destructor.
virtual uint16_t SubStreamNumber(AnyObject &channelData)
Return a number of substreams per stream that should be created.
virtual void FinalizeChannel(AnyObject &channelData)
Finalize channel.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
Notify the transport about a message having been sent.
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)
HandShake.
virtual XRootDStatus GetMore(Message &message, Socket *socket)
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
static void LogErrorResponse(const Message &msg)
Log server error response.
virtual void DecFileInstCnt(AnyObject &channelData)
Decrement file object instance count bound to this channel.
virtual PathID Multiplex(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual void InitializeChannel(const URL &url, AnyObject &channelData)
Initialize channel.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)
Query the channel.
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)
Get bind preference for the next data stream.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)
static char * MyHostName(const char *eName="*unknown*", const char **eText=0)
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static int UserName(uid_t uID, char *uName, int uNsz)
virtual int Secure(SecurityRequest *&newreq, ClientRequest &thereq, const char *thedata)
const uint16_t errQueryNotSupported
const int DefaultLoadBalancerTTL
const uint64_t XRootDTransportMsg
const uint16_t errTlsError
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errLoginFailed
const int DefaultWantTlsOnNoPgrw
const uint16_t errSocketTimeout
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const int DefaultSubStreamsPerChannel
const uint16_t errInvalidOp
const int DefaultDataServerTTL
const uint16_t errHandShakeFailed
const int DefaultStreamTimeout
const uint16_t suAlreadyDone
const uint16_t errNotSupported
const uint16_t suContinue
const int DefaultTlsNoData
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
struct ServerResponseBifs_Protocol bifReqs
BindPrefSelector(std::vector< std::string > &&bindprefs)
const std::string & Get()
Data structure that carries the handshake information.
std::string streamName
Name of the stream.
uint16_t subStreamId
Sub-stream id.
Message * out
Message to be sent out.
static void UnloadHandler(const std::string &trProt)
void Register(const std::string &protocol)
static void UnloadHandler()
std::set< std::string > protocols
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
Selects less loaded stream for read operation over multiple streams.
void AdjustQueues(uint16_t size)
StreamSelector(uint16_t size)
void MsgReceived(uint16_t substrm)
uint16_t Select(const std::vector< bool > &connected)
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
Information holder for xrootd channels.
std::vector< XRootDStreamInfo > StreamInfoVector
std::set< uint16_t > sentCloses
std::unique_ptr< StreamSelector > strmSelector
XrdSecParameters * authParams
XrdSecProtocol * authProtocol
XrdSecProtect * protection
std::unique_ptr< BindPrefSelector > bindSelector
std::string authProtocolName
std::atomic< uint32_t > finstcnt
unsigned int protRespSize
ServerResponseBody_Protocol * protRespBody
XRootDChannelInfo(const URL &url)
std::set< uint16_t > sentOpens
std::shared_ptr< SIDManager > sidManager
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted
Information holder for XRootDStreams.
Generic structure to pass security information back and forth.
char * buffer
Pointer to the buffer.
int size
Size of the buffer or length of data in the buffer.