XRootD
XrdCl::XRootDTransport Class Reference

XRootD transport handler. More...

#include <XrdClXRootDTransport.hh>

+ Inheritance diagram for XrdCl::XRootDTransport:
+ Collaboration diagram for XrdCl::XRootDTransport:

Public Member Functions

 XRootDTransport ()
 Constructor. More...
 
 ~XRootDTransport ()
 Destructor. More...
 
virtual void DecFileInstCnt (AnyObject &channelData)
 Decrement file object instance count bound to this channel. More...
 
virtual void Disconnect (AnyObject &channelData, uint16_t subStreamId)
 The stream has been disconnected, do the cleanups. More...
 
virtual void FinalizeChannel (AnyObject &channelData)
 Finalize channel. More...
 
virtual URL GetBindPreference (const URL &url, AnyObject &channelData)
 Get bind preference for the next data stream. More...
 
virtual XRootDStatus GetBody (Message &message, Socket *socket)
 
virtual XRootDStatus GetHeader (Message &message, Socket *socket)
 
virtual XRootDStatus GetMore (Message &message, Socket *socket)
 
virtual Status GetSignature (Message *toSign, Message *&sign, AnyObject &channelData)
 Get signature for given message. More...
 
virtual Status GetSignature (Message *toSign, Message *&sign, XRootDChannelInfo *info)
 Get signature for given message. More...
 
virtual XRootDStatus HandShake (HandShakeData *handShakeData, AnyObject &channelData)
 HandShake. More...
 
virtual bool HandShakeDone (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual void InitializeChannel (const URL &url, AnyObject &channelData)
 Initialize channel. More...
 
virtual Status IsStreamBroken (time_t inactiveTime, AnyObject &channelData)
 
virtual bool IsStreamTTLElapsed (time_t time, AnyObject &channelData)
 Check if the stream should be disconnected. More...
 
virtual uint32_t MessageReceived (Message &msg, uint16_t subStream, AnyObject &channelData)
 Check if the message invokes a stream action. More...
 
virtual void MessageSent (Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
 Notify the transport about a message having been sent. More...
 
virtual PathID Multiplex (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual PathID MultiplexSubStream (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual bool NeedControlConnection ()
 
virtual bool NeedEncryption (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual Status Query (uint16_t query, AnyObject &result, AnyObject &channelData)
 Query the channel. More...
 
virtual uint16_t SubStreamNumber (AnyObject &channelData)
 Return a number of substreams per stream that should be created. More...
 
virtual void WaitBeforeExit ()
 Wait until the program can safely exit. More...
 
- Public Member Functions inherited from XrdCl::TransportHandler
virtual ~TransportHandler ()
 

Static Public Member Functions

static void GenerateDescription (char *msg, std::ostringstream &o)
 Get the description of a message. More...
 
static void LogErrorResponse (const Message &msg)
 Log server error response. More...
 
static XRootDStatus MarshallRequest (char *msg)
 Marshal the outgoing message. More...
 
static XRootDStatus MarshallRequest (Message *msg)
 Marshal the outgoing message. More...
 
static uint16_t NbConnectedStrm (AnyObject &channelData)
 Number of currently connected data streams. More...
 
static void SetDescription (Message *msg)
 Get the description of a message. More...
 
static XRootDStatus UnMarchalStatusMore (Message &msg)
 Unmarshall the correction-segment of the status response for pgwrite. More...
 
static XRootDStatus UnMarshallBody (Message *msg, uint16_t reqType)
 Unmarshall the body of the incoming message. More...
 
static void UnMarshallHeader (Message &msg)
 Unmarshall the header incoming message. More...
 
static XRootDStatus UnMarshallRequest (Message *msg)
 
static XRootDStatus UnMarshalStatusBody (Message &msg, uint16_t reqType)
 Unmarshall the body of the status response. More...
 

Friends

struct PluginUnloadHandler
 

Additional Inherited Members

- Public Types inherited from XrdCl::TransportHandler
enum  StreamAction {
  NoAction = 0x0000 ,
  DigestMsg = 0x0001 ,
  AbortStream = 0x0002 ,
  CloseStream = 0x0004 ,
  ResumeStream = 0x0008 ,
  HoldStream = 0x0010 ,
  RequestClose = 0x0020
}
 Stream actions that may be triggered by incoming control messages. More...
 

Detailed Description

XRootD transport handler.

Definition at line 47 of file XrdClXRootDTransport.hh.

Constructor & Destructor Documentation

◆ XRootDTransport()

XrdCl::XRootDTransport::XRootDTransport ( )

Constructor.

Definition at line 291 of file XrdClXRootDTransport.cc.

291  :
292  pSecUnloadHandler( new PluginUnloadHandler() )
293  {
294  }

◆ ~XRootDTransport()

XrdCl::XRootDTransport::~XRootDTransport ( )

Destructor.

Definition at line 299 of file XrdClXRootDTransport.cc.

300  {
301  delete pSecUnloadHandler; pSecUnloadHandler = 0;
302  }

Member Function Documentation

◆ DecFileInstCnt()

void XrdCl::XRootDTransport::DecFileInstCnt ( AnyObject channelData)
virtual

Decrement file object instance count bound to this channel.

Implements XrdCl::TransportHandler.

Definition at line 1737 of file XrdClXRootDTransport.cc.

1738  {
1739  XRootDChannelInfo *info = 0;
1740  channelData.Get( info );
1741  if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1742  info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1743  }

References XrdCl::XRootDChannelInfo::finstcnt, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ Disconnect()

void XrdCl::XRootDTransport::Disconnect ( AnyObject channelData,
uint16_t  subStreamId 
)
virtual

The stream has been disconnected, do the cleanups.

Implements XrdCl::TransportHandler.

Definition at line 1485 of file XrdClXRootDTransport.cc.

1487  {
1488  XRootDChannelInfo *info = 0;
1489  channelData.Get( info );
1490  XrdSysMutexHelper scopedLock( info->mutex );
1491 
1492  CleanUpProtection( info );
1493 
1494  if( !info->stream.empty() )
1495  {
1496  XRootDStreamInfo &sInfo = info->stream[subStreamId];
1497  sInfo.status = XRootDStreamInfo::Disconnected;
1498  }
1499 
1500  if( subStreamId == 0 )
1501  {
1502  info->sidManager->ReleaseAllTimedOut();
1503  info->sentOpens.clear();
1504  info->sentCloses.clear();
1505  info->openFiles = 0;
1506  info->waitBarrier = 0;
1507  }
1508  }

References XrdCl::XRootDStreamInfo::Disconnected, XrdCl::AnyObject::Get(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::waitBarrier.

+ Here is the call graph for this function:

◆ FinalizeChannel()

void XrdCl::XRootDTransport::FinalizeChannel ( AnyObject channelData)
virtual

Finalize channel.

Implements XrdCl::TransportHandler.

Definition at line 460 of file XrdClXRootDTransport.cc.

461  {
462  }

◆ GenerateDescription()

void XrdCl::XRootDTransport::GenerateDescription ( char *  msg,
std::ostringstream &  o 
)
static

Get the description of a message.

Definition at line 2898 of file XrdClXRootDTransport.cc.

2899  {
2900  Log *log = DefaultEnv::GetLog();
2901  if( log->GetLevel() < Log::ErrorMsg )
2902  return;
2903 
2904  ClientRequestHdr *req = (ClientRequestHdr *)msg;
2905  switch( req->requestid )
2906  {
2907  //------------------------------------------------------------------------
2908  // kXR_open
2909  //------------------------------------------------------------------------
2910  case kXR_open:
2911  {
2912  ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
2913  o << "kXR_open (";
2914  char *fn = GetDataAsString( msg );
2915  o << "file: " << fn << ", ";
2916  delete [] fn;
2917  o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
2918  o << std::setbase(10);
2919  o << "flags: ";
2920  if( sreq->options == 0 )
2921  o << "none";
2922  else
2923  {
2924  if( sreq->options & kXR_delete )
2925  o << "kXR_delete ";
2926  if( sreq->options & kXR_force )
2927  o << "kXR_force ";
2928  if( sreq->options & kXR_mkpath )
2929  o << "kXR_mkpath ";
2930  if( sreq->options & kXR_new )
2931  o << "kXR_new ";
2932  if( sreq->options & kXR_nowait )
2933  o << "kXR_delete ";
2934  if( sreq->options & kXR_open_apnd )
2935  o << "kXR_open_apnd ";
2936  if( sreq->options & kXR_open_read )
2937  o << "kXR_open_read ";
2938  if( sreq->options & kXR_open_updt )
2939  o << "kXR_open_updt ";
2940  if( sreq->options & kXR_posc )
2941  o << "kXR_posc ";
2942  if( sreq->options & kXR_refresh )
2943  o << "kXR_refresh ";
2944  if( sreq->options & kXR_replica )
2945  o << "kXR_replica ";
2946  if( sreq->options & kXR_seqio )
2947  o << "kXR_seqio ";
2948  if( sreq->options & kXR_async )
2949  o << "kXR_async ";
2950  if( sreq->options & kXR_retstat )
2951  o << "kXR_retstat ";
2952  }
2953  o << ")";
2954  break;
2955  }
2956 
2957  //------------------------------------------------------------------------
2958  // kXR_close
2959  //------------------------------------------------------------------------
2960  case kXR_close:
2961  {
2962  ClientCloseRequest *sreq = (ClientCloseRequest *)msg;
2963  o << "kXR_close (";
2964  o << "handle: " << FileHandleToStr( sreq->fhandle );
2965  o << ")";
2966  break;
2967  }
2968 
2969  //------------------------------------------------------------------------
2970  // kXR_stat
2971  //------------------------------------------------------------------------
2972  case kXR_stat:
2973  {
2974  ClientStatRequest *sreq = (ClientStatRequest *)msg;
2975  o << "kXR_stat (";
2976  if( sreq->dlen )
2977  {
2978  char *fn = GetDataAsString( msg );;
2979  o << "path: " << fn << ", ";
2980  delete [] fn;
2981  }
2982  else
2983  {
2984  o << "handle: " << FileHandleToStr( sreq->fhandle );
2985  o << ", ";
2986  }
2987  o << "flags: ";
2988  if( sreq->options == 0 )
2989  o << "none";
2990  else
2991  {
2992  if( sreq->options & kXR_vfs )
2993  o << "kXR_vfs";
2994  }
2995  o << ")";
2996  break;
2997  }
2998 
2999  //------------------------------------------------------------------------
3000  // kXR_read
3001  //------------------------------------------------------------------------
3002  case kXR_read:
3003  {
3004  ClientReadRequest *sreq = (ClientReadRequest *)msg;
3005  o << "kXR_read (";
3006  o << "handle: " << FileHandleToStr( sreq->fhandle );
3007  o << std::setbase(10);
3008  o << ", ";
3009  o << "offset: " << sreq->offset << ", ";
3010  o << "size: " << sreq->rlen << ")";
3011  break;
3012  }
3013 
3014  //------------------------------------------------------------------------
3015  // kXR_pgread
3016  //------------------------------------------------------------------------
3017  case kXR_pgread:
3018  {
3020  o << "kXR_pgread (";
3021  o << "handle: " << FileHandleToStr( sreq->fhandle );
3022  o << std::setbase(10);
3023  o << ", ";
3024  o << "offset: " << sreq->offset << ", ";
3025  o << "size: " << sreq->rlen << ")";
3026  break;
3027  }
3028 
3029  //------------------------------------------------------------------------
3030  // kXR_write
3031  //------------------------------------------------------------------------
3032  case kXR_write:
3033  {
3034  ClientWriteRequest *sreq = (ClientWriteRequest *)msg;
3035  o << "kXR_write (";
3036  o << "handle: " << FileHandleToStr( sreq->fhandle );
3037  o << std::setbase(10);
3038  o << ", ";
3039  o << "offset: " << sreq->offset << ", ";
3040  o << "size: " << sreq->dlen << ")";
3041  break;
3042  }
3043 
3044  //------------------------------------------------------------------------
3045  // kXR_pgwrite
3046  //------------------------------------------------------------------------
3047  case kXR_pgwrite:
3048  {
3050  o << "kXR_pgwrite (";
3051  o << "handle: " << FileHandleToStr( sreq->fhandle );
3052  o << std::setbase(10);
3053  o << ", ";
3054  o << "offset: " << sreq->offset << ", ";
3055  o << "size: " << sreq->dlen << ")";
3056  break;
3057  }
3058 
3059  //------------------------------------------------------------------------
3060  // kXR_sync
3061  //------------------------------------------------------------------------
3062  case kXR_sync:
3063  {
3064  ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3065  o << "kXR_sync (";
3066  o << "handle: " << FileHandleToStr( sreq->fhandle );
3067  o << ")";
3068  break;
3069  }
3070 
3071  //------------------------------------------------------------------------
3072  // kXR_truncate
3073  //------------------------------------------------------------------------
3074  case kXR_truncate:
3075  {
3077  o << "kXR_truncate (";
3078  if( !sreq->dlen )
3079  o << "handle: " << FileHandleToStr( sreq->fhandle );
3080  else
3081  {
3082  char *fn = GetDataAsString( msg );
3083  o << "file: " << fn;
3084  delete [] fn;
3085  }
3086  o << std::setbase(10);
3087  o << ", ";
3088  o << "offset: " << sreq->offset;
3089  o << ")";
3090  break;
3091  }
3092 
3093  //------------------------------------------------------------------------
3094  // kXR_readv
3095  //------------------------------------------------------------------------
3096  case kXR_readv:
3097  {
3098  unsigned char *fhandle = 0;
3099  o << "kXR_readv (";
3100 
3101  o << "handle: ";
3102  readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3103  fhandle = dataChunk[0].fhandle;
3104  if( fhandle )
3105  o << FileHandleToStr( fhandle );
3106  else
3107  o << "unknown";
3108  o << ", ";
3109  o << std::setbase(10);
3110  o << "chunks: [";
3111  uint64_t size = 0;
3112  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3113  {
3114  size += dataChunk[i].rlen;
3115  o << "(offset: " << dataChunk[i].offset;
3116  o << ", size: " << dataChunk[i].rlen << "); ";
3117  }
3118  o << "], ";
3119  o << "total size: " << size << ")";
3120  break;
3121  }
3122 
3123  //------------------------------------------------------------------------
3124  // kXR_writev
3125  //------------------------------------------------------------------------
3126  case kXR_writev:
3127  {
3128  unsigned char *fhandle = 0;
3129  o << "kXR_writev (";
3130 
3131  XrdProto::write_list *wrtList =
3132  reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3133  uint64_t size = 0;
3134  uint32_t numChunks = 0;
3135  for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3136  {
3137  fhandle = wrtList[i].fhandle;
3138  size += wrtList[i].wlen;
3139  ++numChunks;
3140  }
3141  o << "handle: ";
3142  if( fhandle )
3143  o << FileHandleToStr( fhandle );
3144  else
3145  o << "unknown";
3146  o << ", ";
3147  o << std::setbase(10);
3148  o << "chunks: " << numChunks << ", ";
3149  o << "total size: " << size << ")";
3150  break;
3151  }
3152 
3153  //------------------------------------------------------------------------
3154  // kXR_locate
3155  //------------------------------------------------------------------------
3156  case kXR_locate:
3157  {
3159  char *fn = GetDataAsString( msg );;
3160  o << "kXR_locate (";
3161  o << "path: " << fn << ", ";
3162  delete [] fn;
3163  o << "flags: ";
3164  if( sreq->options == 0 )
3165  o << "none";
3166  else
3167  {
3168  if( sreq->options & kXR_refresh )
3169  o << "kXR_refresh ";
3170  if( sreq->options & kXR_prefname )
3171  o << "kXR_prefname ";
3172  if( sreq->options & kXR_nowait )
3173  o << "kXR_nowait ";
3174  if( sreq->options & kXR_force )
3175  o << "kXR_force ";
3176  if( sreq->options & kXR_compress )
3177  o << "kXR_compress ";
3178  }
3179  o << ")";
3180  break;
3181  }
3182 
3183  //------------------------------------------------------------------------
3184  // kXR_mv
3185  //------------------------------------------------------------------------
3186  case kXR_mv:
3187  {
3188  ClientMvRequest *sreq = (ClientMvRequest *)msg;
3189  o << "kXR_mv (";
3190  o << "source: ";
3191  o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3192  o << ", ";
3193  o << "destination: ";
3194  o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3195  o << ")";
3196  break;
3197  }
3198 
3199  //------------------------------------------------------------------------
3200  // kXR_query
3201  //------------------------------------------------------------------------
3202  case kXR_query:
3203  {
3204  ClientQueryRequest *sreq = (ClientQueryRequest *)msg;
3205  o << "kXR_query (";
3206  o << "code: ";
3207  switch( sreq->infotype )
3208  {
3209  case kXR_Qconfig: o << "kXR_Qconfig"; break;
3210  case kXR_Qckscan: o << "kXR_Qckscan"; break;
3211  case kXR_Qcksum: o << "kXR_Qcksum"; break;
3212  case kXR_Qopaque: o << "kXR_Qopaque"; break;
3213  case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3214  case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3215  case kXR_QPrep: o << "kXR_QPrep"; break;
3216  case kXR_Qspace: o << "kXR_Qspace"; break;
3217  case kXR_QStats: o << "kXR_QStats"; break;
3218  case kXR_Qvisa: o << "kXR_Qvisa"; break;
3219  case kXR_Qxattr: o << "kXR_Qxattr"; break;
3220  default: o << sreq->infotype; break;
3221  }
3222  o << ", ";
3223 
3224  if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3225  {
3226  o << "handle: " << FileHandleToStr( sreq->fhandle );
3227  o << ", ";
3228  }
3229 
3230  o << "arg length: " << sreq->dlen << ")";
3231  break;
3232  }
3233 
3234  //------------------------------------------------------------------------
3235  // kXR_rm
3236  //------------------------------------------------------------------------
3237  case kXR_rm:
3238  {
3239  o << "kXR_rm (";
3240  char *fn = GetDataAsString( msg );;
3241  o << "path: " << fn << ")";
3242  delete [] fn;
3243  break;
3244  }
3245 
3246  //------------------------------------------------------------------------
3247  // kXR_mkdir
3248  //------------------------------------------------------------------------
3249  case kXR_mkdir:
3250  {
3251  ClientMkdirRequest *sreq = (ClientMkdirRequest *)msg;
3252  o << "kXR_mkdir (";
3253  char *fn = GetDataAsString( msg );
3254  o << "path: " << fn << ", ";
3255  delete [] fn;
3256  o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3257  o << std::setbase(10);
3258  o << "flags: ";
3259  if( sreq->options[0] == 0 )
3260  o << "none";
3261  else
3262  {
3263  if( sreq->options[0] & kXR_mkdirpath )
3264  o << "kXR_mkdirpath";
3265  }
3266  o << ")";
3267  break;
3268  }
3269 
3270  //------------------------------------------------------------------------
3271  // kXR_rmdir
3272  //------------------------------------------------------------------------
3273  case kXR_rmdir:
3274  {
3275  o << "kXR_rmdir (";
3276  char *fn = GetDataAsString( msg );
3277  o << "path: " << fn << ")";
3278  delete [] fn;
3279  break;
3280  }
3281 
3282  //------------------------------------------------------------------------
3283  // kXR_chmod
3284  //------------------------------------------------------------------------
3285  case kXR_chmod:
3286  {
3287  ClientChmodRequest *sreq = (ClientChmodRequest *)msg;
3288  o << "kXR_chmod (";
3289  char *fn = GetDataAsString( msg );
3290  o << "path: " << fn << ", ";
3291  delete [] fn;
3292  o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3293  break;
3294  }
3295 
3296  //------------------------------------------------------------------------
3297  // kXR_ping
3298  //------------------------------------------------------------------------
3299  case kXR_ping:
3300  {
3301  o << "kXR_ping ()";
3302  break;
3303  }
3304 
3305  //------------------------------------------------------------------------
3306  // kXR_protocol
3307  //------------------------------------------------------------------------
3308  case kXR_protocol:
3309  {
3311  o << "kXR_protocol (";
3312  o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3313  break;
3314  }
3315 
3316  //------------------------------------------------------------------------
3317  // kXR_dirlist
3318  //------------------------------------------------------------------------
3319  case kXR_dirlist:
3320  {
3321  o << "kXR_dirlist (";
3322  char *fn = GetDataAsString( msg );;
3323  o << "path: " << fn << ")";
3324  delete [] fn;
3325  break;
3326  }
3327 
3328  //------------------------------------------------------------------------
3329  // kXR_set
3330  //------------------------------------------------------------------------
3331  case kXR_set:
3332  {
3333  o << "kXR_set (";
3334  char *fn = GetDataAsString( msg );;
3335  o << "data: " << fn << ")";
3336  delete [] fn;
3337  break;
3338  }
3339 
3340  //------------------------------------------------------------------------
3341  // kXR_prepare
3342  //------------------------------------------------------------------------
3343  case kXR_prepare:
3344  {
3346  o << "kXR_prepare (";
3347  o << "flags: ";
3348 
3349  if( sreq->options == 0 )
3350  o << "none";
3351  else
3352  {
3353  if( sreq->options & kXR_stage )
3354  o << "kXR_stage ";
3355  if( sreq->options & kXR_wmode )
3356  o << "kXR_wmode ";
3357  if( sreq->options & kXR_coloc )
3358  o << "kXR_coloc ";
3359  if( sreq->options & kXR_fresh )
3360  o << "kXR_fresh ";
3361  }
3362 
3363  o << ", priority: " << (int) sreq->prty << ", ";
3364 
3365  char *fn = GetDataAsString( msg );
3366  char *cursor;
3367  for( cursor = fn; *cursor; ++cursor )
3368  if( *cursor == '\n' ) *cursor = ' ';
3369 
3370  o << "paths: " << fn << ")";
3371  delete [] fn;
3372  break;
3373  }
3374 
3375  case kXR_chkpoint:
3376  {
3378  o << "kXR_chkpoint (";
3379  o << "opcode: ";
3380  if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3381  else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3382  else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3383  else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3384  else if( sreq->opcode == kXR_ckpXeq )
3385  {
3386  o << "kXR_ckpXeq) ";
3387  // In this case our request body will be one of kXR_pgwrite,
3388  // kXR_truncate, kXR_write, or kXR_writev request.
3389  GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3390  }
3391 
3392  break;
3393  }
3394 
3395  //------------------------------------------------------------------------
3396  // Default
3397  //------------------------------------------------------------------------
3398  default:
3399  {
3400  o << "kXR_unknown (length: " << req->dlen << ")";
3401  break;
3402  }
3403  };
3404  }
static const int kXR_ckpRollback
Definition: XProtocol.hh:215
kXR_int16 arg1len
Definition: XProtocol.hh:430
kXR_char fhandle[4]
Definition: XProtocol.hh:531
kXR_char fhandle[4]
Definition: XProtocol.hh:782
kXR_char fhandle[4]
Definition: XProtocol.hh:807
kXR_char fhandle[4]
Definition: XProtocol.hh:771
kXR_int32 dlen
Definition: XProtocol.hh:431
kXR_int64 offset
Definition: XProtocol.hh:646
kXR_unt16 options
Definition: XProtocol.hh:481
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
@ kXR_compress
Definition: XProtocol.hh:452
@ kXR_async
Definition: XProtocol.hh:458
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_prefname
Definition: XProtocol.hh:461
@ kXR_nowait
Definition: XProtocol.hh:467
@ kXR_open_read
Definition: XProtocol.hh:456
@ kXR_open_updt
Definition: XProtocol.hh:457
@ kXR_mkpath
Definition: XProtocol.hh:460
@ kXR_seqio
Definition: XProtocol.hh:468
@ kXR_replica
Definition: XProtocol.hh:465
@ kXR_posc
Definition: XProtocol.hh:466
@ kXR_refresh
Definition: XProtocol.hh:459
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_force
Definition: XProtocol.hh:454
@ kXR_open_apnd
Definition: XProtocol.hh:462
@ kXR_retstat
Definition: XProtocol.hh:463
kXR_char fhandle[4]
Definition: XProtocol.hh:509
kXR_unt16 infotype
Definition: XProtocol.hh:631
kXR_char fhandle[4]
Definition: XProtocol.hh:645
kXR_char fhandle[4]
Definition: XProtocol.hh:659
kXR_char fhandle[4]
Definition: XProtocol.hh:229
kXR_unt16 requestid
Definition: XProtocol.hh:157
kXR_char fhandle[4]
Definition: XProtocol.hh:633
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_chmod
Definition: XProtocol.hh:114
@ kXR_dirlist
Definition: XProtocol.hh:116
@ kXR_rm
Definition: XProtocol.hh:126
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_set
Definition: XProtocol.hh:130
@ kXR_rmdir
Definition: XProtocol.hh:127
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_protocol
Definition: XProtocol.hh:118
@ kXR_mv
Definition: XProtocol.hh:121
@ kXR_ping
Definition: XProtocol.hh:123
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_locate
Definition: XProtocol.hh:139
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
@ kXR_prepare
Definition: XProtocol.hh:133
kXR_int32 rlen
Definition: XProtocol.hh:660
kXR_char fhandle[4]
Definition: XProtocol.hh:794
kXR_unt16 mode
Definition: XProtocol.hh:480
kXR_char options[1]
Definition: XProtocol.hh:416
static const int kXR_ckpCommit
Definition: XProtocol.hh:213
kXR_int64 offset
Definition: XProtocol.hh:661
@ kXR_vfs
Definition: XProtocol.hh:763
@ kXR_mkdirpath
Definition: XProtocol.hh:410
@ kXR_wmode
Definition: XProtocol.hh:591
@ kXR_fresh
Definition: XProtocol.hh:593
@ kXR_coloc
Definition: XProtocol.hh:592
@ kXR_stage
Definition: XProtocol.hh:590
static const int kXR_ckpQuery
Definition: XProtocol.hh:214
kXR_int64 offset
Definition: XProtocol.hh:808
kXR_int32 dlen
Definition: XProtocol.hh:772
kXR_char options
Definition: XProtocol.hh:769
kXR_int32 rlen
Definition: XProtocol.hh:647
@ kXR_QPrep
Definition: XProtocol.hh:616
@ kXR_Qopaqug
Definition: XProtocol.hh:625
@ kXR_Qconfig
Definition: XProtocol.hh:621
@ kXR_Qopaquf
Definition: XProtocol.hh:624
@ kXR_Qckscan
Definition: XProtocol.hh:620
@ kXR_Qxattr
Definition: XProtocol.hh:618
@ kXR_Qspace
Definition: XProtocol.hh:619
@ kXR_Qvisa
Definition: XProtocol.hh:622
@ kXR_QStats
Definition: XProtocol.hh:615
@ kXR_Qcksum
Definition: XProtocol.hh:617
@ kXR_Qopaque
Definition: XProtocol.hh:623
kXR_int32 dlen
Definition: XProtocol.hh:159
static const int kXR_ckpBegin
Definition: XProtocol.hh:212
static Log * GetLog()
Get default log.
@ ErrorMsg
report errors
Definition: XrdClLog.hh:109
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
XrdSysError Log
Definition: XrdConfig.cc:112
kXR_char fhandle[4]
Definition: XProtocol.hh:832

References ClientMvRequest::arg1len, ClientProtocolRequest::clientpv, ClientRequestHdr::dlen, ClientMvRequest::dlen, ClientPgWriteRequest::dlen, ClientQueryRequest::dlen, ClientStatRequest::dlen, ClientTruncateRequest::dlen, ClientWriteRequest::dlen, XrdCl::Log::ErrorMsg, ClientCloseRequest::fhandle, ClientPgReadRequest::fhandle, ClientPgWriteRequest::fhandle, ClientQueryRequest::fhandle, ClientReadRequest::fhandle, readahead_list::fhandle, ClientStatRequest::fhandle, ClientSyncRequest::fhandle, ClientTruncateRequest::fhandle, ClientWriteRequest::fhandle, XrdProto::write_list::fhandle, XrdCl::Log::GetLevel(), XrdCl::DefaultEnv::GetLog(), ClientQueryRequest::infotype, kXR_async, kXR_chkpoint, kXR_chmod, kXR_ckpBegin, kXR_ckpCommit, kXR_ckpQuery, kXR_ckpRollback, kXR_ckpXeq, kXR_close, kXR_coloc, kXR_compress, kXR_delete, kXR_dirlist, kXR_force, kXR_fresh, kXR_locate, kXR_mkdir, kXR_mkdirpath, kXR_mkpath, kXR_mv, kXR_new, kXR_nowait, kXR_open, kXR_open_apnd, kXR_open_read, kXR_open_updt, kXR_pgread, kXR_pgwrite, kXR_ping, kXR_posc, kXR_prefname, kXR_prepare, kXR_protocol, kXR_Qckscan, kXR_Qcksum, kXR_Qconfig, kXR_Qopaque, kXR_Qopaquf, kXR_Qopaqug, kXR_QPrep, kXR_Qspace, kXR_QStats, kXR_query, kXR_Qvisa, kXR_Qxattr, kXR_read, kXR_readv, kXR_refresh, kXR_replica, kXR_retstat, kXR_rm, kXR_rmdir, kXR_seqio, kXR_set, kXR_stage, kXR_stat, kXR_sync, kXR_truncate, kXR_vfs, kXR_wmode, kXR_write, kXR_writev, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, readahead_list::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, ClientChkPointRequest::opcode, ClientLocateRequest::options, ClientMkdirRequest::options, ClientOpenRequest::options, ClientPrepareRequest::options, ClientStatRequest::options, ClientPrepareRequest::prty, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, and XrdProto::write_list::wlen.

Referenced by SetDescription().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetBindPreference()

URL XrdCl::XRootDTransport::GetBindPreference ( const URL url,
AnyObject channelData 
)
virtual

Get bind preference for the next data stream.

Implements XrdCl::TransportHandler.

Definition at line 1835 of file XrdClXRootDTransport.cc.

1837  {
1838  XRootDChannelInfo *info = 0;
1839  channelData.Get( info );
1840  if( !bool( info->bindSelector ) )
1841  return url;
1842 
1843  return URL( info->bindSelector->Get() );
1844  }

References XrdCl::XRootDChannelInfo::bindSelector, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ GetBody()

XRootDStatus XrdCl::XRootDTransport::GetBody ( Message message,
Socket socket 
)
virtual

Read the message body from the socket, the socket is non-blocking, the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 347 of file XrdClXRootDTransport.cc.

348  {
349  //--------------------------------------------------------------------------
350  // Retrieve the body
351  //--------------------------------------------------------------------------
352  size_t leftToBeRead = 0;
353  uint32_t bodySize = 0;
354  ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
355  bodySize = rsphdr->dlen;
356 
357  if( message.GetSize() < bodySize + 8 )
358  message.ReAllocate( bodySize + 8 );
359 
360  leftToBeRead = bodySize-(message.GetCursor()-8);
361  while( leftToBeRead )
362  {
363  int bytesRead = 0;
364  XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
365 
366  if( !status.IsOK() || status.code == suRetry )
367  return status;
368 
369  leftToBeRead -= bytesRead;
370  message.AdvanceCursor( bytesRead );
371  }
372 
373  return XRootDStatus( stOK, suDone );
374  }
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t suDone
Definition: XrdClStatus.hh:38

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Status::code, ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), XrdCl::stOK, XrdCl::suDone, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ GetHeader()

XRootDStatus XrdCl::XRootDTransport::GetHeader ( Message message,
Socket socket 
)
virtual

Read a message header from the socket, the socket is non-blocking, so if there is not enough data the function should return suRetry in which case it will be called again when more data arrives, with the data previously read stored in the message buffer

Parameters
messagethe message buffer
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 307 of file XrdClXRootDTransport.cc.

308  {
309  //--------------------------------------------------------------------------
310  // A new message - allocate the space needed for the header
311  //--------------------------------------------------------------------------
312  if( message.GetCursor() == 0 && message.GetSize() < 8 )
313  message.Allocate( 8 );
314 
315  //--------------------------------------------------------------------------
316  // Read the message header
317  //--------------------------------------------------------------------------
318  if( message.GetCursor() < 8 )
319  {
320  size_t leftToBeRead = 8 - message.GetCursor();
321  while( leftToBeRead )
322  {
323  int bytesRead = 0;
324  XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
325  leftToBeRead, bytesRead );
326  if( !status.IsOK() || status.code == suRetry )
327  return status;
328 
329  leftToBeRead -= bytesRead;
330  message.AdvanceCursor( bytesRead );
331  }
332  UnMarshallHeader( message );
333 
334  uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
335  Log *log = DefaultEnv::GetLog();
336  log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
337  "body", &message, bodySize );
338 
339  return XRootDStatus( stOK, suDone );
340  }
341  return XRootDStatus( stError, errInternal );
342  }
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
const uint64_t XRootDTransportMsg
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Buffer::Allocate(), XrdCl::Status::code, XrdCl::Log::Dump(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarshallHeader(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetMore()

XRootDStatus XrdCl::XRootDTransport::GetMore ( Message message,
Socket socket 
)
virtual

Read more of the message body from the socket, the socket is non-blocking the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 379 of file XrdClXRootDTransport.cc.

380  {
381  ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
382  if( rsphdr->status != kXR_status )
383  return XRootDStatus( stError, errInvalidOp );
384 
385  //--------------------------------------------------------------------------
386  // In case of non kXR_status responses we read all the response, including
387  // data. For kXR_status responses we first read only the remainder of the
388  // header. The header must then be unmarshalled, and then a second call to
389  // GetMore (repeated for suRetry as needed) will read the data.
390  //--------------------------------------------------------------------------
391 
392  uint32_t bodySize = rsphdr->dlen;
393  if( bodySize+8 < sizeof( ServerResponseStatus ) )
394  return XRootDStatus( stError, errInvalidMessage, 0,
395  "kXR_status: invalid message size." );
396 
397  ServerResponseStatus *rspst = (ServerResponseStatus*)message.GetBuffer();
398  bodySize += rspst->bdy.dlen;
399 
400  if( message.GetSize() < bodySize + 8 )
401  message.ReAllocate( bodySize + 8 );
402 
403  size_t leftToBeRead = bodySize-(message.GetCursor()-8);
404  while( leftToBeRead )
405  {
406  int bytesRead = 0;
407  XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
408 
409  if( !status.IsOK() || status.code == suRetry )
410  return status;
411 
412  leftToBeRead -= bytesRead;
413  message.AdvanceCursor( bytesRead );
414  }
415 
416  // Unmarchal to message body
417  Log *log = DefaultEnv::GetLog();
418  XRootDStatus st = XRootDTransport::UnMarchalStatusMore( message );
419  if( !st.IsOK() && st.code == errDataError )
420  {
421  log->Error( XRootDTransportMsg, "[msg: %p] %s", &message,
422  st.GetErrorMessage().c_str() );
423  return st;
424  }
425 
426  if( !st.IsOK() )
427  {
428  log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
429  &message );
430  return st;
431  }
432 
433  return XRootDStatus( stOK, suDone );
434  }
@ kXR_status
Definition: XProtocol.hh:907
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1261
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85

References XrdCl::Buffer::AdvanceCursor(), ServerResponseStatus::bdy, XrdCl::Status::code, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errInvalidOp, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), kXR_status, XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarchalStatusMore(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetSignature() [1/2]

Status XrdCl::XRootDTransport::GetSignature ( Message toSign,
Message *&  sign,
AnyObject channelData 
)
virtual

Get signature for given message.

Implements XrdCl::TransportHandler.

Definition at line 1697 of file XrdClXRootDTransport.cc.

1698  {
1699  XRootDChannelInfo *info = 0;
1700  channelData.Get( info );
1701  return GetSignature( toSign, sign, info );
1702  }
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.

References XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ GetSignature() [2/2]

Status XrdCl::XRootDTransport::GetSignature ( Message toSign,
Message *&  sign,
XRootDChannelInfo info 
)
virtual

Get signature for given message.

Definition at line 1707 of file XrdClXRootDTransport.cc.

1710  {
1711  XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1712  if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1713 
1714  ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1715  if( !info ) return Status( stError, errInternal );
1716  if( info->protection )
1717  {
1718  SecurityRequest *newreq = 0;
1719  // check if we have to secure the request in the first place
1720  if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1721  // secure (sign/encrypt) the request
1722  int rc = info->protection->Secure( newreq, *thereq, 0 );
1723  // there was an error
1724  if( rc < 0 )
1725  return Status( stError, errInternal, -rc );
1726 
1727  sign = new Message();
1728  sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1729  }
1730 
1731  return Status();
1732  }
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.

References XrdCl::errInternal, XrdCl::errInvalidOp, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::Grab(), XrdCl::PluginUnloadHandler::lock, NEED2SECURE, XrdCl::XRootDChannelInfo::protection, XrdSecProtect::Secure(), XrdCl::stError, and XrdCl::PluginUnloadHandler::unloaded.

+ Here is the call graph for this function:

◆ HandShake()

XRootDStatus XrdCl::XRootDTransport::HandShake ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual

HandShake.

Implements XrdCl::TransportHandler.

Definition at line 467 of file XrdClXRootDTransport.cc.

469  {
470  XRootDChannelInfo *info = 0;
471  channelData.Get( info );
472  XrdSysMutexHelper scopedLock( info->mutex );
473 
474  if( info->stream.size() <= handShakeData->subStreamId )
475  {
476  Log *log = DefaultEnv::GetLog();
477  log->Error( XRootDTransportMsg,
478  "[%s] Internal error: not enough substreams",
479  handShakeData->streamName.c_str() );
480  return XRootDStatus( stFatal, errInternal );
481  }
482 
483  if( handShakeData->subStreamId == 0 )
484  {
485  info->streamName = handShakeData->streamName;
486  return HandShakeMain( handShakeData, channelData );
487  }
488  return HandShakeParallel( handShakeData, channelData );
489  }
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::stFatal, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::XRootDChannelInfo::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ HandShakeDone()

bool XrdCl::XRootDTransport::HandShakeDone ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual

Implements XrdCl::TransportHandler.

Definition at line 727 of file XrdClXRootDTransport.cc.

729  {
730  XRootDChannelInfo *info = 0;
731  channelData.Get( info );
732  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
733  return ( sInfo.status == XRootDStreamInfo::Connected );
734  }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::AnyObject::Get(), XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

+ Here is the call graph for this function:

◆ InitializeChannel()

void XrdCl::XRootDTransport::InitializeChannel ( const URL url,
AnyObject channelData 
)
virtual

Initialize channel.

Implements XrdCl::TransportHandler.

Definition at line 439 of file XrdClXRootDTransport.cc.

441  {
442  XRootDChannelInfo *info = new XRootDChannelInfo( url );
443  XrdSysMutexHelper scopedLock( info->mutex );
444  channelData.Set( info );
445 
446  Env *env = DefaultEnv::GetEnv();
447  int streams = DefaultSubStreamsPerChannel;
448  env->GetInt( "SubStreamsPerChannel", streams );
449  if( streams < 1 ) streams = 1;
450  info->stream.resize( streams );
451  info->strmSelector.reset( new StreamSelector( streams ) );
452  info->encrypted = url.IsSecure();
453  info->istpc = url.IsTPC();
454  info->logintoken = url.GetLoginToken();
455  }
static Env * GetEnv()
Get default client environment.
const int DefaultSubStreamsPerChannel

References XrdCl::DefaultSubStreamsPerChannel, XrdCl::XRootDChannelInfo::encrypted, XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::URL::GetLoginToken(), XrdCl::URL::IsSecure(), XrdCl::URL::IsTPC(), XrdCl::XRootDChannelInfo::istpc, XrdCl::XRootDChannelInfo::logintoken, XrdCl::XRootDChannelInfo::mutex, XrdCl::AnyObject::Set(), XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

+ Here is the call graph for this function:

◆ IsStreamBroken()

Status XrdCl::XRootDTransport::IsStreamBroken ( time_t  inactiveTime,
AnyObject channelData 
)
virtual

Check the stream is broken - ie. TCP connection got broken and went undetected by the TCP stack

Implements XrdCl::TransportHandler.

Definition at line 785 of file XrdClXRootDTransport.cc.

787  {
788  XRootDChannelInfo *info = 0;
789  channelData.Get( info );
790  Env *env = DefaultEnv::GetEnv();
791  Log *log = DefaultEnv::GetLog();
792 
793  int streamTimeout = DefaultStreamTimeout;
794  env->GetInt( "StreamTimeout", streamTimeout );
795 
796  XrdSysMutexHelper scopedLock( info->mutex );
797 
798  const time_t now = time(0);
799  const bool anySID =
800  info->sidManager->IsAnySIDOldAs( now - streamTimeout );
801 
802  log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
803  "stream timeout: %d, any SID: %d, wait barrier: %s",
804  info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
805  anySID, Utils::TimeToString(info->waitBarrier).c_str() );
806 
807  if( inactiveTime < streamTimeout )
808  return Status();
809 
810  if( now < info->waitBarrier )
811  return Status();
812 
813  if( !anySID )
814  return Status();
815 
816  return Status( stError, errSocketTimeout );
817  }
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
Definition: XrdClUtils.cc:256
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
const int DefaultStreamTimeout

References XrdCl::DefaultStreamTimeout, XrdCl::Log::Dump(), XrdCl::errSocketTimeout, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::sidManager, XrdCl::stError, XrdCl::XRootDChannelInfo::streamName, XrdCl::Utils::TimeToString(), XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ IsStreamTTLElapsed()

bool XrdCl::XRootDTransport::IsStreamTTLElapsed ( time_t  time,
AnyObject channelData 
)
virtual

Check if the stream should be disconnected.

Implements XrdCl::TransportHandler.

Definition at line 739 of file XrdClXRootDTransport.cc.

741  {
742  XRootDChannelInfo *info = 0;
743  channelData.Get( info );
744  Env *env = DefaultEnv::GetEnv();
745  Log *log = DefaultEnv::GetLog();
746 
747  //--------------------------------------------------------------------------
748  // Check the TTL settings for the current server
749  //--------------------------------------------------------------------------
750  int ttl;
751  if( info->serverFlags & kXR_isServer )
752  {
753  ttl = DefaultDataServerTTL;
754  env->GetInt( "DataServerTTL", ttl );
755  }
756  else
757  {
759  env->GetInt( "LoadBalancerTTL", ttl );
760  }
761 
762  //--------------------------------------------------------------------------
763  // See whether we can give a go-ahead for the disconnection
764  //--------------------------------------------------------------------------
765  XrdSysMutexHelper scopedLock( info->mutex );
766  uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
767  log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
768  "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
769  info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
770  info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
771 
772  if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
773  return false;
774 
775  if( !allocatedSIDs && inactiveTime > ttl )
776  return true;
777 
778  return false;
779  }
#define kXR_isServer
Definition: XProtocol.hh:1157
const int DefaultLoadBalancerTTL
const int DefaultDataServerTTL

References XrdCl::DefaultDataServerTTL, XrdCl::DefaultLoadBalancerTTL, XrdCl::Log::Dump(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), kXR_isServer, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDChannelInfo::streamName, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ LogErrorResponse()

void XrdCl::XRootDTransport::LogErrorResponse ( const Message msg)
static

Log server error response.

Definition at line 1454 of file XrdClXRootDTransport.cc.

1455  {
1456  Log *log = DefaultEnv::GetLog();
1457  ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1458  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1459  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1460  log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1461  rsp->body.error.errnum, errmsg );
1462  delete [] errmsg;
1463  }
union ServerResponse::@0 body
ServerResponseHeader hdr
Definition: XProtocol.hh:1287

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MarshallRequest() [1/2]

XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( char *  msg)
static

Marshal the outgoing message.

Definition at line 1050 of file XrdClXRootDTransport.cc.

1051  {
1052  ClientRequest *req = (ClientRequest*)msg;
1053  switch( req->header.requestid )
1054  {
1055  //------------------------------------------------------------------------
1056  // kXR_protocol
1057  //------------------------------------------------------------------------
1058  case kXR_protocol:
1059  req->protocol.clientpv = htonl( req->protocol.clientpv );
1060  break;
1061 
1062  //------------------------------------------------------------------------
1063  // kXR_login
1064  //------------------------------------------------------------------------
1065  case kXR_login:
1066  req->login.pid = htonl( req->login.pid );
1067  break;
1068 
1069  //------------------------------------------------------------------------
1070  // kXR_locate
1071  //------------------------------------------------------------------------
1072  case kXR_locate:
1073  req->locate.options = htons( req->locate.options );
1074  break;
1075 
1076  //------------------------------------------------------------------------
1077  // kXR_query
1078  //------------------------------------------------------------------------
1079  case kXR_query:
1080  req->query.infotype = htons( req->query.infotype );
1081  break;
1082 
1083  //------------------------------------------------------------------------
1084  // kXR_truncate
1085  //------------------------------------------------------------------------
1086  case kXR_truncate:
1087  req->truncate.offset = htonll( req->truncate.offset );
1088  break;
1089 
1090  //------------------------------------------------------------------------
1091  // kXR_mkdir
1092  //------------------------------------------------------------------------
1093  case kXR_mkdir:
1094  req->mkdir.mode = htons( req->mkdir.mode );
1095  break;
1096 
1097  //------------------------------------------------------------------------
1098  // kXR_chmod
1099  //------------------------------------------------------------------------
1100  case kXR_chmod:
1101  req->chmod.mode = htons( req->chmod.mode );
1102  break;
1103 
1104  //------------------------------------------------------------------------
1105  // kXR_open
1106  //------------------------------------------------------------------------
1107  case kXR_open:
1108  req->open.mode = htons( req->open.mode );
1109  req->open.options = htons( req->open.options );
1110  break;
1111 
1112  //------------------------------------------------------------------------
1113  // kXR_read
1114  //------------------------------------------------------------------------
1115  case kXR_read:
1116  req->read.offset = htonll( req->read.offset );
1117  req->read.rlen = htonl( req->read.rlen );
1118  break;
1119 
1120  //------------------------------------------------------------------------
1121  // kXR_write
1122  //------------------------------------------------------------------------
1123  case kXR_write:
1124  req->write.offset = htonll( req->write.offset );
1125  break;
1126 
1127  //------------------------------------------------------------------------
1128  // kXR_mv
1129  //------------------------------------------------------------------------
1130  case kXR_mv:
1131  req->mv.arg1len = htons( req->mv.arg1len );
1132  break;
1133 
1134  //------------------------------------------------------------------------
1135  // kXR_readv
1136  //------------------------------------------------------------------------
1137  case kXR_readv:
1138  {
1139  uint16_t numChunks = (req->readv.dlen)/16;
1140  readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1141  for( size_t i = 0; i < numChunks; ++i )
1142  {
1143  dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1144  dataChunk[i].offset = htonll( dataChunk[i].offset );
1145  }
1146  break;
1147  }
1148 
1149  //------------------------------------------------------------------------
1150  // kXR_writev
1151  //------------------------------------------------------------------------
1152  case kXR_writev:
1153  {
1154  uint16_t numChunks = (req->writev.dlen)/16;
1155  XrdProto::write_list *wrtList =
1156  reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1157  for( size_t i = 0; i < numChunks; ++i )
1158  {
1159  wrtList[i].wlen = htonl( wrtList[i].wlen );
1160  wrtList[i].offset = htonll( wrtList[i].offset );
1161  }
1162 
1163  break;
1164  }
1165 
1166  case kXR_pgread:
1167  {
1168  req->pgread.offset = htonll( req->pgread.offset );
1169  req->pgread.rlen = htonl( req->pgread.rlen );
1170  break;
1171  }
1172 
1173  case kXR_pgwrite:
1174  {
1175  req->pgwrite.offset = htonll( req->pgwrite.offset );
1176  break;
1177  }
1178 
1179  //------------------------------------------------------------------------
1180  // kXR_prepare
1181  //------------------------------------------------------------------------
1182  case kXR_prepare:
1183  {
1184  req->prepare.optionX = htons( req->prepare.optionX );
1185  req->prepare.port = htons( req->prepare.port );
1186  break;
1187  }
1188 
1189  case kXR_chkpoint:
1190  {
1191  if( req->chkpoint.opcode == kXR_ckpXeq )
1192  MarshallRequest( msg + 24 );
1193  break;
1194  }
1195  };
1196 
1197  req->header.requestid = htons( req->header.requestid );
1198  req->header.dlen = htonl( req->header.dlen );
1199  return XRootDStatus();
1200  }
struct ClientTruncateRequest truncate
Definition: XProtocol.hh:875
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:861
struct ClientMkdirRequest mkdir
Definition: XProtocol.hh:858
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:862
struct ClientReadVRequest readv
Definition: XProtocol.hh:868
struct ClientOpenRequest open
Definition: XProtocol.hh:860
struct ClientRequestHdr header
Definition: XProtocol.hh:846
struct ClientWriteVRequest writev
Definition: XProtocol.hh:877
struct ClientLoginRequest login
Definition: XProtocol.hh:857
@ kXR_login
Definition: XProtocol.hh:119
struct ClientChmodRequest chmod
Definition: XProtocol.hh:850
struct ClientQueryRequest query
Definition: XProtocol.hh:866
struct ClientReadRequest read
Definition: XProtocol.hh:867
struct ClientMvRequest mv
Definition: XProtocol.hh:859
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849
struct ClientPrepareRequest prepare
Definition: XProtocol.hh:864
struct ClientWriteRequest write
Definition: XProtocol.hh:876
struct ClientProtocolRequest protocol
Definition: XProtocol.hh:865
struct ClientLocateRequest locate
Definition: XProtocol.hh:856
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.

References ClientMvRequest::arg1len, ClientRequest::chkpoint, ClientRequest::chmod, ClientProtocolRequest::clientpv, ClientRequestHdr::dlen, ClientReadVRequest::dlen, ClientWriteVRequest::dlen, ClientRequest::header, ClientQueryRequest::infotype, kXR_chkpoint, kXR_chmod, kXR_ckpXeq, kXR_locate, kXR_login, kXR_mkdir, kXR_mv, kXR_open, kXR_pgread, kXR_pgwrite, kXR_prepare, kXR_protocol, kXR_query, kXR_read, kXR_readv, kXR_truncate, kXR_write, kXR_writev, ClientRequest::locate, ClientRequest::login, MarshallRequest(), ClientRequest::mkdir, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientRequest::mv, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, readahead_list::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, XrdProto::write_list::offset, ClientChkPointRequest::opcode, ClientRequest::open, ClientLocateRequest::options, ClientOpenRequest::options, ClientPrepareRequest::optionX, ClientRequest::pgread, ClientRequest::pgwrite, ClientLoginRequest::pid, ClientPrepareRequest::port, ClientRequest::prepare, ClientRequest::protocol, ClientRequest::query, ClientRequest::read, ClientRequest::readv, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, ClientRequest::truncate, XrdProto::write_list::wlen, ClientRequest::write, and ClientRequest::writev.

+ Here is the call graph for this function:

◆ MarshallRequest() [2/2]

static XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( Message msg)
inlinestatic

Marshal the outgoing message.

Definition at line 175 of file XrdClXRootDTransport.hh.

176  {
177  MarshallRequest( msg->GetBuffer() );
178  msg->SetIsMarshalled( true );
179  return XRootDStatus();
180  }

References XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetIsMarshalled().

Referenced by MarshallRequest(), MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), XrdCl::MessageUtils::SendMessage(), and UnMarshallRequest().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ MessageReceived()

uint32_t XrdCl::XRootDTransport::MessageReceived ( Message msg,
uint16_t  subStream,
AnyObject channelData 
)
virtual

Check if the message invokes a stream action.

Implements XrdCl::TransportHandler.

Definition at line 1561 of file XrdClXRootDTransport.cc.

1564  {
1565  XRootDChannelInfo *info = 0;
1566  channelData.Get( info );
1567  XrdSysMutexHelper scopedLock( info->mutex );
1568  Log *log = DefaultEnv::GetLog();
1569 
1570  //--------------------------------------------------------------------------
1571  // Update the substream queues
1572  //--------------------------------------------------------------------------
1573  info->strmSelector->MsgReceived( subStream );
1574 
1575  //--------------------------------------------------------------------------
1576  // Check whether this message is a response to a request that has
1577  // timed out, and if so, drop it
1578  //--------------------------------------------------------------------------
1579  ServerResponse *rsp = (ServerResponse*)msg.GetBuffer();
1580  if( rsp->hdr.status == kXR_attn )
1581  {
1582  return NoAction;
1583  }
1584 
1585  if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1586  {
1587  log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1588  "response that we're no longer interested in (timed out)",
1589  &msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1590  //------------------------------------------------------------------------
1591  // If it is kXR_waitresp there will be another one,
1592  // so we don't release the sid yet
1593  //------------------------------------------------------------------------
1594  if( rsp->hdr.status != kXR_waitresp )
1595  info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1596  //------------------------------------------------------------------------
1597  // If it is a successful response to an open request
1598  // that timed out, we need to send a close
1599  //------------------------------------------------------------------------
1600  uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1601  std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1602  if( sidIt != info->sentOpens.end() )
1603  {
1604  info->sentOpens.erase( sidIt );
1605  if( rsp->hdr.status == kXR_ok ) return RequestClose;
1606  }
1607  return DigestMsg;
1608  }
1609 
1610  //--------------------------------------------------------------------------
1611  // If we have a wait or waitresp
1612  //--------------------------------------------------------------------------
1613  uint32_t seconds = 0;
1614  if( rsp->hdr.status == kXR_wait )
1615  seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1616  // to re-send the request
1617  else if( rsp->hdr.status == kXR_waitresp )
1618  {
1619  seconds = ntohl( rsp->body.waitresp.seconds );
1620 
1621  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1622  "setting up wait barrier.",
1623  info->streamName.c_str(),
1624  seconds );
1625  }
1626 
1627  time_t barrier = time(0) + seconds;
1628  if( info->waitBarrier < barrier )
1629  info->waitBarrier = barrier;
1630 
1631  //--------------------------------------------------------------------------
1632  // If we got a response to an open request, we may need to bump the counter
1633  // of open files
1634  //--------------------------------------------------------------------------
1635  uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1636  std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1637  if( sidIt != info->sentOpens.end() )
1638  {
1639  if( rsp->hdr.status == kXR_waitresp )
1640  return NoAction;
1641  info->sentOpens.erase( sidIt );
1642  if( rsp->hdr.status == kXR_ok )
1643  {
1644  ++info->openFiles;
1645  info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1646  }
1647  return NoAction;
1648  }
1649 
1650  //--------------------------------------------------------------------------
1651  // If we got a response to a close, we may need to decrement the counter of
1652  // open files
1653  //--------------------------------------------------------------------------
1654  sidIt = info->sentCloses.find( sid );
1655  if( sidIt != info->sentCloses.end() )
1656  {
1657  if( rsp->hdr.status == kXR_waitresp )
1658  return NoAction;
1659  info->sentCloses.erase( sidIt );
1660  --info->openFiles;
1661  return NoAction;
1662  }
1663  return NoAction;
1664  }
kXR_char streamid[2]
Definition: XProtocol.hh:914
@ kXR_waitresp
Definition: XProtocol.hh:906
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_attn
Definition: XProtocol.hh:901
@ kXR_wait
Definition: XProtocol.hh:905
@ RequestClose
Send a close request.
const uint64_t XRootDMsg

References ServerResponse::body, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, kXR_attn, kXR_ok, kXR_wait, kXR_waitresp, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportHandler::NoAction, XrdCl::XRootDChannelInfo::openFiles, XrdCl::TransportHandler::RequestClose, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, ServerResponseHeader::status, ServerResponseHeader::streamid, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, XrdCl::XRootDChannelInfo::waitBarrier, XrdCl::XRootDMsg, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MessageSent()

void XrdCl::XRootDTransport::MessageSent ( Message msg,
uint16_t  subStream,
uint32_t  bytesSent,
AnyObject channelData 
)
virtual

Notify the transport about a message having been sent.

Implements XrdCl::TransportHandler.

Definition at line 1669 of file XrdClXRootDTransport.cc.

1673  {
1674  XRootDChannelInfo *info = 0;
1675  channelData.Get( info );
1676  XrdSysMutexHelper scopedLock( info->mutex );
1677  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1678  uint16_t reqid = ntohs( req->header.requestid );
1679 
1680 
1681  //--------------------------------------------------------------------------
1682  // We need to track opens to know if we can close streams due to idleness
1683  //--------------------------------------------------------------------------
1684  uint16_t sid;
1685  memcpy( &sid, req->header.streamid, 2 );
1686 
1687  if( reqid == kXR_open )
1688  info->sentOpens.insert( sid );
1689  else if( reqid == kXR_close )
1690  info->sentCloses.insert( sid );
1691  }
kXR_char streamid[2]
Definition: XProtocol.hh:156

References XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_close, kXR_open, XrdCl::XRootDChannelInfo::mutex, ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ Multiplex()

PathID XrdCl::XRootDTransport::Multiplex ( Message msg,
AnyObject channelData,
PathID hint = 0 
)
virtual

Return the ID for the up stream this message should be sent by and the down stream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 822 of file XrdClXRootDTransport.cc.

823  {
824  return PathID( 0, 0 );
825  }

◆ MultiplexSubStream()

PathID XrdCl::XRootDTransport::MultiplexSubStream ( Message msg,
AnyObject channelData,
PathID hint = 0 
)
virtual

Return the ID for the up substream this message should be sent by and the down substream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 830 of file XrdClXRootDTransport.cc.

833  {
834  XRootDChannelInfo *info = 0;
835  channelData.Get( info );
836  XrdSysMutexHelper scopedLock( info->mutex );
837 
838  //--------------------------------------------------------------------------
839  // If we're not connected to a data server or we don't know that yet
840  // we stream through 0
841  //--------------------------------------------------------------------------
842  if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
843  return PathID( 0, 0 );
844 
845  //--------------------------------------------------------------------------
846  // Select the streams
847  //--------------------------------------------------------------------------
848  Log *log = DefaultEnv::GetLog();
849  uint16_t upStream = 0;
850  uint16_t downStream = 0;
851 
852  if( hint )
853  {
854  upStream = hint->up;
855  downStream = hint->down;
856  }
857  else
858  {
859  upStream = 0;
860  std::vector<bool> connected;
861  connected.reserve( info->stream.size() - 1 );
862  size_t nbConnected = 0;
863  for( size_t i = 1; i < info->stream.size(); ++i )
864  if( info->stream[i].status == XRootDStreamInfo::Connected )
865  {
866  connected.push_back( true );
867  ++nbConnected;
868  }
869  else
870  connected.push_back( false );
871 
872  if( nbConnected == 0 )
873  downStream = 0;
874  else
875  downStream = info->strmSelector->Select( connected );
876  }
877 
878  if( upStream >= info->stream.size() )
879  {
880  log->Debug( XRootDTransportMsg,
881  "[%s] Up link stream %d does not exist, using 0",
882  info->streamName.c_str(), upStream );
883  upStream = 0;
884  }
885 
886  if( downStream >= info->stream.size() )
887  {
888  log->Debug( XRootDTransportMsg,
889  "[%s] Down link stream %d does not exist, using 0",
890  info->streamName.c_str(), downStream );
891  downStream = 0;
892  }
893 
894  //--------------------------------------------------------------------------
895  // Modify the message
896  //--------------------------------------------------------------------------
897  UnMarshallRequest( msg );
898  ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
899  switch( hdr->requestid )
900  {
901  //------------------------------------------------------------------------
902  // Read - we update the path id to tell the server where we want to
903  // get the response, but we still send the request through stream 0
904  // We need to allocate space for read_args if we don't have it
905  // included yet
906  //------------------------------------------------------------------------
907  case kXR_read:
908  {
909  if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
910  {
911  msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
912  void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
913  memset( newBuf, 0, 8 );
914  ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
915  req->dlen += 8;
916  }
917  read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
918  args->pathid = info->stream[downStream].pathId;
919  break;
920  }
921 
922 
923  //------------------------------------------------------------------------
924  // PgRead - we update the path id to tell the server where we want to
925  // get the response, but we still send the request through stream 0
926  // We need to allocate space for ClientPgReadReqArgs if we don't have it
927  // included yet
928  //------------------------------------------------------------------------
929  case kXR_pgread:
930  {
931  if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
932  {
933  msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
934  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
935  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
936  ClientPgReadRequest *req = (ClientPgReadRequest*)msg->GetBuffer();
937  req->dlen += sizeof( ClientPgReadReqArgs );
938  }
939  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
940  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
941  args->pathid = info->stream[downStream].pathId;
942  break;
943  }
944 
945  //------------------------------------------------------------------------
946  // ReadV - the situation is identical to read but we don't need any
947  // additional structures to specify the return path
948  //------------------------------------------------------------------------
949  case kXR_readv:
950  {
951  ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
952  req->pathid = info->stream[downStream].pathId;
953  break;
954  }
955 
956  //------------------------------------------------------------------------
957  // Write - multiplexing writes doesn't work properly in the server
958  //------------------------------------------------------------------------
959  case kXR_write:
960  {
961 // ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
962 // req->pathid = info->stream[downStream].pathId;
963  break;
964  }
965 
966  //------------------------------------------------------------------------
967  // WriteV - multiplexing writes doesn't work properly in the server
968  //------------------------------------------------------------------------
969  case kXR_writev:
970  {
971 // ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
972 // req->pathid = info->stream[downStream].pathId;
973  break;
974  }
975 
976  //------------------------------------------------------------------------
977  // PgWrite - multiplexing writes doesn't work properly in the server
978  //------------------------------------------------------------------------
979  case kXR_pgwrite:
980  {
981 // ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
982 // req->pathid = info->stream[downStream].pathId;
983  break;
984  }
985  };
986  MarshallRequest( msg );
987  return PathID( upStream, downStream );
988  }
kXR_char pathid
Definition: XProtocol.hh:653
kXR_int32 dlen
Definition: XProtocol.hh:648
static XRootDStatus UnMarshallRequest(Message *msg)

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Debug(), ClientPgReadRequest::dlen, ClientReadRequest::dlen, XrdCl::PathID::down, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), kXR_isServer, kXR_pgread, kXR_pgwrite, kXR_read, kXR_readv, kXR_write, kXR_writev, MarshallRequest(), XrdCl::XRootDChannelInfo::mutex, ClientPgReadReqArgs::pathid, read_args::pathid, ClientReadVRequest::pathid, XrdCl::Buffer::ReAllocate(), ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, UnMarshallRequest(), XrdCl::PathID::up, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ NbConnectedStrm()

uint16_t XrdCl::XRootDTransport::NbConnectedStrm ( AnyObject channelData)
static

Number of currently connected data streams.

Definition at line 1468 of file XrdClXRootDTransport.cc.

1469  {
1470  XRootDChannelInfo *info = 0;
1471  channelData.Get( info );
1472  XrdSysMutexHelper scopedLock( info->mutex );
1473 
1474  uint16_t nbConnected = 0;
1475  for( size_t i = 1; i < info->stream.size(); ++i )
1476  if( info->stream[i].status == XRootDStreamInfo::Connected )
1477  ++nbConnected;
1478 
1479  return nbConnected;
1480  }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::AnyObject::Get(), XrdCl::XRootDChannelInfo::mutex, and XrdCl::XRootDChannelInfo::stream.

Referenced by XrdCl::Channel::NbConnectedStrm().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ NeedControlConnection()

virtual bool XrdCl::XRootDTransport::NeedControlConnection ( )
inlinevirtual

Return the information whether a control connection needs to be valid before establishing other connections

Definition at line 167 of file XrdClXRootDTransport.hh.

168  {
169  return true;
170  }

◆ NeedEncryption()

bool XrdCl::XRootDTransport::NeedEncryption ( HandShakeData handShakeData,
AnyObject channelData 
)
virtual
Returns
: true if encryption should be turned on, false otherwise

Implements XrdCl::TransportHandler.

Definition at line 1757 of file XrdClXRootDTransport.cc.

1759  {
1760  XRootDChannelInfo *info = 0;
1761  channelData.Get( info );
1762 
1764  int notlsok = DefaultNoTlsOK;
1765  env->GetInt( "NoTlsOK", notlsok );
1766 
1767  if( notlsok )
1768  return info->encrypted;
1769 
1770  // Did the server instructed us to switch to TLS right away?
1771  if( info->serverFlags & kXR_gotoTLS )
1772  {
1773  info->encrypted = true;
1774  return true ;
1775  }
1776 
1777  XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1778 
1779  //--------------------------------------------------------------------------
1780  // The control stream (sub-stream 0) might need to switch to TLS before
1781  // login or after login
1782  //--------------------------------------------------------------------------
1783  if( handShakeData->subStreamId == 0 )
1784  {
1785  //------------------------------------------------------------------------
1786  // We are about to login and the server asked to start encrypting
1787  // before login
1788  //------------------------------------------------------------------------
1789  if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1790  ( info->serverFlags & kXR_tlsLogin ) )
1791  {
1792  info->encrypted = true;
1793  return true;
1794  }
1795 
1796  //--------------------------------------------------------------------
1797  // The hand-shake is done and the server requested to encrypt the session
1798  //--------------------------------------------------------------------
1799  if( (sInfo.status == XRootDStreamInfo::Connected ||
1800  //--------------------------------------------------------------------
1801  // we really need to turn on TLS before we sent kXR_endsess and we
1802  // are about to do so (1st enable encryption, then send kXR_endsess)
1803  //--------------------------------------------------------------------
1804  sInfo.status == XRootDStreamInfo::EndSessionSent ) &&
1805  ( info->serverFlags & kXR_tlsSess ) )
1806  {
1807  info->encrypted = true;
1808  return true;
1809  }
1810  }
1811  //--------------------------------------------------------------------------
1812  // A data stream (sub-stream > 0) if need be will be switched to TLS before
1813  // bind.
1814  //--------------------------------------------------------------------------
1815  else
1816  {
1817  //------------------------------------------------------------------------
1818  // We are about to bind a data stream and the server asked to start
1819  // encrypting before bind
1820  //------------------------------------------------------------------------
1821  if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1822  ( info->serverFlags & kXR_tlsData ) )
1823  {
1824  info->encrypted = true;
1825  return true;
1826  }
1827  }
1828 
1829  return false;
1830  }
#define kXR_tlsLogin
Definition: XProtocol.hh:1184
#define kXR_gotoTLS
Definition: XProtocol.hh:1180
#define kXR_tlsSess
Definition: XProtocol.hh:1185
#define kXR_tlsData
Definition: XProtocol.hh:1182
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
const int DefaultNoTlsOK

References XrdCl::XRootDStreamInfo::BindSent, XrdCl::XRootDStreamInfo::Connected, XrdCl::DefaultNoTlsOK, XrdCl::XRootDChannelInfo::encrypted, XrdCl::XRootDStreamInfo::EndSessionSent, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), kXR_gotoTLS, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDStreamInfo::LoginSent, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

+ Here is the call graph for this function:

◆ Query()

Status XrdCl::XRootDTransport::Query ( uint16_t  query,
AnyObject result,
AnyObject channelData 
)
virtual

Query the channel.

Implements XrdCl::TransportHandler.

Definition at line 1513 of file XrdClXRootDTransport.cc.

1516  {
1517  XRootDChannelInfo *info = 0;
1518  channelData.Get( info );
1519  XrdSysMutexHelper scopedLock( info->mutex );
1520 
1521  switch( query )
1522  {
1523  //------------------------------------------------------------------------
1524  // Protocol name
1525  //------------------------------------------------------------------------
1526  case TransportQuery::Name:
1527  result.Set( (const char*)"XRootD", false );
1528  return Status();
1529 
1530  //------------------------------------------------------------------------
1531  // Authentication
1532  //------------------------------------------------------------------------
1533  case TransportQuery::Auth:
1534  result.Set( new std::string( info->authProtocolName ), false );
1535  return Status();
1536 
1537  //------------------------------------------------------------------------
1538  // Server flags
1539  //------------------------------------------------------------------------
1541  result.Set( new int( info->serverFlags ), false );
1542  return Status();
1543 
1544  //------------------------------------------------------------------------
1545  // Protocol version
1546  //------------------------------------------------------------------------
1548  result.Set( new int( info->protocolVersion ), false );
1549  return Status();
1550 
1552  result.Set( new bool( info->encrypted ), false );
1553  return Status();
1554  };
1555  return Status( stError, errQueryNotSupported );
1556  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted

References XrdCl::TransportQuery::Auth, XrdCl::XRootDChannelInfo::authProtocolName, XrdCl::XRootDChannelInfo::encrypted, XrdCl::errQueryNotSupported, XrdCl::AnyObject::Get(), XrdCl::XRootDQuery::IsEncrypted, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportQuery::Name, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::XRootDChannelInfo::protocolVersion, XrdCl::XRootDQuery::ServerFlags, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::AnyObject::Set(), and XrdCl::stError.

+ Here is the call graph for this function:

◆ SetDescription()

static void XrdCl::XRootDTransport::SetDescription ( Message msg)
inlinestatic

Get the description of a message.

Definition at line 245 of file XrdClXRootDTransport.hh.

246  {
247  std::ostringstream o;
248  GenerateDescription( msg->GetBuffer(), o );
249  msg->SetDescription( o.str() );
250  }

References GenerateDescription(), XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetDescription().

Referenced by XrdCl::FileStateHandler::Checkpoint(), XrdCl::FileStateHandler::ChkptWrt(), XrdCl::FileStateHandler::ChkptWrtV(), XrdCl::FileSystem::ChMod(), XrdCl::FileStateHandler::Close(), XrdCl::FileSystem::DirList(), XrdCl::FileStateHandler::Fcntl(), XrdCl::FileSystem::Locate(), XrdCl::FileSystem::MkDir(), XrdCl::FileSystem::Mv(), XrdCl::FileStateHandler::Open(), XrdCl::FileStateHandler::PgReadImpl(), XrdCl::FileStateHandler::PgWriteImpl(), XrdCl::FileSystem::Ping(), XrdCl::FileSystem::Prepare(), XrdCl::FileSystem::Protocol(), XrdCl::FileSystem::Query(), XrdCl::FileStateHandler::Read(), XrdCl::FileStateHandler::ReadV(), XrdCl::MessageUtils::RewriteCGIAndPath(), XrdCl::FileSystem::Rm(), XrdCl::FileSystem::RmDir(), XrdCl::FileSystem::Stat(), XrdCl::FileStateHandler::Stat(), XrdCl::FileSystem::StatVFS(), XrdCl::FileStateHandler::Sync(), XrdCl::FileSystem::Truncate(), XrdCl::FileStateHandler::Truncate(), XrdCl::FileStateHandler::VectorRead(), XrdCl::FileStateHandler::VectorWrite(), XrdCl::FileStateHandler::Visa(), XrdCl::FileStateHandler::Write(), and XrdCl::FileStateHandler::WriteV().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SubStreamNumber()

uint16_t XrdCl::XRootDTransport::SubStreamNumber ( AnyObject channelData)
virtual

Return a number of substreams per stream that should be created.

Implements XrdCl::TransportHandler.

Definition at line 995 of file XrdClXRootDTransport.cc.

996  {
997  XRootDChannelInfo *info = 0;
998  channelData.Get( info );
999  XrdSysMutexHelper scopedLock( info->mutex );
1000 
1001  //--------------------------------------------------------------------------
1002  // If the connection has been opened in order to orchestrate a TPC or
1003  // the remote server is a Manager or Metamanager we will need only one
1004  // (control) stream.
1005  //--------------------------------------------------------------------------
1006  if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1007 
1008  //--------------------------------------------------------------------------
1009  // Number of streams requested by user
1010  //--------------------------------------------------------------------------
1011  uint16_t ret = info->stream.size();
1012 
1014  int nodata = DefaultTlsNoData;
1015  env->GetInt( "TlsNoData", nodata );
1016 
1017  // Does the server require the stream 0 to be encrypted?
1018  bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1019  ( info->serverFlags & kXR_tlsLogin ) ||
1020  ( info->serverFlags & kXR_tlsSess );
1021  // Does the server NOT require the data streams to be encrypted?
1022  bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1023  // Does the user require the stream 0 to be encrypted?
1024  bool usrTlsStrm0 = info->encrypted;
1025  // Does the user NOT require the data streams to be encrypted?
1026  bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1027 
1028  if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1029  ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1030  {
1031  //------------------------------------------------------------------------
1032  // The server or user asked us to encrypt stream 0, but to send the data
1033  // (read/write) using a plain TCP connection
1034  //------------------------------------------------------------------------
1035  if( ret == 1 ) ++ret;
1036  }
1037 
1038  if( ret > info->stream.size() )
1039  {
1040  info->stream.resize( ret );
1041  info->strmSelector->AdjustQueues( ret );
1042  }
1043 
1044  return ret;
1045  }
const int DefaultTlsNoData

References XrdCl::DefaultTlsNoData, XrdCl::XRootDChannelInfo::encrypted, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::XRootDChannelInfo::istpc, kXR_gotoTLS, kXR_isServer, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

+ Here is the call graph for this function:

◆ UnMarchalStatusMore()

XRootDStatus XrdCl::XRootDTransport::UnMarchalStatusMore ( Message msg)
static

Unmarshall the correction-segment of the status response for pgwrite.

Definition at line 1381 of file XrdClXRootDTransport.cc.

1382  {
1383  ServerResponseV2 *rsp = (ServerResponseV2*)msg.GetBuffer();
1384  uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1385 
1386  switch( reqType )
1387  {
1388  case kXR_pgwrite:
1389  {
1390  //--------------------------------------------------------------------------
1391  // If there's no additional data there's nothing to unmarshal
1392  //--------------------------------------------------------------------------
1393  if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1394  //--------------------------------------------------------------------------
1395  // If there's not enough data to form correction-segment report an error
1396  //--------------------------------------------------------------------------
1397  if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1398  return XRootDStatus( stError, errInvalidMessage, 0,
1399  "kXR_status: invalid message size." );
1400 
1401  //--------------------------------------------------------------------------
1402  // Calculate the crc32c for the additional data
1403  //--------------------------------------------------------------------------
1404  ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)msg.GetBuffer( sizeof( ServerResponseV2 ) );
1405  cse->cseCRC = ntohl( cse->cseCRC );
1406  size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1407  void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1408  uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1409 
1410  //--------------------------------------------------------------------------
1411  // Do the integrity checks
1412  //--------------------------------------------------------------------------
1413  if( crcval != cse->cseCRC )
1414  {
1415  return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1416  "corrupted (crc32c integrity check failed)." );
1417  }
1418 
1419  cse->dlFirst = ntohs( cse->dlFirst );
1420  cse->dlLast = ntohs( cse->dlLast );
1421 
1422  size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1423  sizeof( kXR_int64 );
1424  kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1425  sizeof( ServerResponseBody_pgWrCSE ) );
1426 
1427  for( size_t i = 0; i < pgcnt; ++i )
1428  pgoffs[i] = ntohll( pgoffs[i] );
1429 
1430  return XRootDStatus();
1431  break;
1432  }
1433 
1434  default:
1435  break;
1436  }
1437 
1438  return XRootDStatus( stError, errNotSupported );
1439  }
ServerResponseStatus status
Definition: XProtocol.hh:1309
@ kXR_1stRequest
Definition: XProtocol.hh:111
long long kXR_int64
Definition: XPtypes.hh:98
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_pgWrCSE::cseCRC, ServerResponseBody_Status::dlen, ServerResponseBody_pgWrCSE::dlFirst, ServerResponseBody_pgWrCSE::dlLast, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errNotSupported, XrdCl::Buffer::GetBuffer(), kXR_1stRequest, kXR_pgwrite, ServerResponseBody_Status::requestid, ServerResponseV2::status, and XrdCl::stError.

Referenced by GetMore().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshallBody ( Message msg,
uint16_t  reqType 
)
static

Unmarshall the body of the incoming message.

Definition at line 1227 of file XrdClXRootDTransport.cc.

1228  {
1229  ServerResponse *m = (ServerResponse *)msg->GetBuffer();
1230 
1231  //--------------------------------------------------------------------------
1232  // kXR_ok
1233  //--------------------------------------------------------------------------
1234  if( m->hdr.status == kXR_ok )
1235  {
1236  switch( reqType )
1237  {
1238  //----------------------------------------------------------------------
1239  // kXR_protocol
1240  //----------------------------------------------------------------------
1241  case kXR_protocol:
1242  if( m->hdr.dlen < 8 )
1243  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1244  m->body.protocol.pval = ntohl( m->body.protocol.pval );
1245  m->body.protocol.flags = ntohl( m->body.protocol.flags );
1246  break;
1247  }
1248  }
1249  //--------------------------------------------------------------------------
1250  // kXR_error
1251  //--------------------------------------------------------------------------
1252  else if( m->hdr.status == kXR_error )
1253  {
1254  if( m->hdr.dlen < 4 )
1255  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1256  m->body.error.errnum = ntohl( m->body.error.errnum );
1257  }
1258 
1259  //--------------------------------------------------------------------------
1260  // kXR_wait
1261  //--------------------------------------------------------------------------
1262  else if( m->hdr.status == kXR_wait )
1263  {
1264  if( m->hdr.dlen < 4 )
1265  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1266  m->body.wait.seconds = htonl( m->body.wait.seconds );
1267  }
1268 
1269  //--------------------------------------------------------------------------
1270  // kXR_redirect
1271  //--------------------------------------------------------------------------
1272  else if( m->hdr.status == kXR_redirect )
1273  {
1274  if( m->hdr.dlen < 4 )
1275  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1276  m->body.redirect.port = htonl( m->body.redirect.port );
1277  }
1278 
1279  //--------------------------------------------------------------------------
1280  // kXR_waitresp
1281  //--------------------------------------------------------------------------
1282  else if( m->hdr.status == kXR_waitresp )
1283  {
1284  if( m->hdr.dlen < 4 )
1285  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1286  m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1287  }
1288 
1289  //--------------------------------------------------------------------------
1290  // kXR_attn
1291  //--------------------------------------------------------------------------
1292  else if( m->hdr.status == kXR_attn )
1293  {
1294  if( m->hdr.dlen < 4 )
1295  return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1296  m->body.attn.actnum = htonl( m->body.attn.actnum );
1297  }
1298 
1299  return XRootDStatus();
1300  }
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_error
Definition: XProtocol.hh:903

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), ServerResponse::hdr, kXR_attn, kXR_error, kXR_ok, kXR_protocol, kXR_redirect, kXR_wait, kXR_waitresp, ServerResponseHeader::status, and XrdCl::stError.

Referenced by XrdCl::XRootDMsgHandler::Process().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallHeader()

void XrdCl::XRootDTransport::UnMarshallHeader ( Message msg)
static

Unmarshall the header incoming message.

Definition at line 1444 of file XrdClXRootDTransport.cc.

1445  {
1446  ServerResponseHeader *header = (ServerResponseHeader *)msg.GetBuffer();
1447  header->status = ntohs( header->status );
1448  header->dlen = ntohl( header->dlen );
1449  }

References ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), and ServerResponseHeader::status.

Referenced by GetHeader().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallRequest()

XRootDStatus XrdCl::XRootDTransport::UnMarshallRequest ( Message msg)
static

Unmarshall the request - sometimes the requests need to be rewritten, so we need to unmarshall them

Definition at line 1206 of file XrdClXRootDTransport.cc.

1207  {
1208  if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1209  // We rely on the marshaling process to be symmetric!
1210  // First we unmarshall the request ID and the length because
1211  // MarshallRequest() relies on these, and then we need to unmarshall these
1212  // two again, because they get marshalled in MarshallRequest().
1213  // All this is pretty damn ugly and should be rewritten.
1214  ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1215  req->header.requestid = htons( req->header.requestid );
1216  req->header.dlen = htonl( req->header.dlen );
1217  XRootDStatus st = MarshallRequest( msg );
1218  req->header.requestid = htons( req->header.requestid );
1219  req->header.dlen = htonl( req->header.dlen );
1220  msg->SetIsMarshalled( false );
1221  return st;
1222  }
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42

References ClientRequestHdr::dlen, XrdCl::Buffer::GetBuffer(), ClientRequest::header, XrdCl::Message::IsMarshalled(), MarshallRequest(), ClientRequestHdr::requestid, XrdCl::Message::SetIsMarshalled(), XrdCl::stOK, and XrdCl::suAlreadyDone.

Referenced by MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshalStatusBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshalStatusBody ( Message msg,
uint16_t  reqType 
)
static

Unmarshall the body of the status response.

Definition at line 1305 of file XrdClXRootDTransport.cc.

1306  {
1307  //--------------------------------------------------------------------------
1308  // Calculate the crc32c before the unmarshaling the body!
1309  //--------------------------------------------------------------------------
1310  ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
1311  char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1312  size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1313  uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1314 
1315  size_t stlen = sizeof( ServerResponseStatus );
1316  switch( reqType )
1317  {
1318  case kXR_pgread:
1319  {
1320  stlen += sizeof( ServerResponseBody_pgRead );
1321  break;
1322  }
1323 
1324  case kXR_pgwrite:
1325  {
1326  stlen += sizeof( ServerResponseBody_pgWrite );
1327  break;
1328  }
1329  }
1330 
1331  if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1332  "kXR_status: invalid message size." );
1333 
1334  rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1335  rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1336 
1337  switch( reqType )
1338  {
1339  case kXR_pgread:
1340  {
1341  ServerResponseBody_pgRead *pgrdbdy = (ServerResponseBody_pgRead*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1342  pgrdbdy->offset = ntohll( pgrdbdy->offset );
1343  break;
1344  }
1345 
1346  case kXR_pgwrite:
1347  {
1348  ServerResponseBody_pgWrite *pgwrtbdy = (ServerResponseBody_pgWrite*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1349  pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1350  break;
1351  }
1352  }
1353 
1354  //--------------------------------------------------------------------------
1355  // Do the integrity checks
1356  //--------------------------------------------------------------------------
1357  if( crcval != rspst->bdy.crc32c )
1358  {
1359  return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1360  "corrupted (crc32c integrity check failed)." );
1361  }
1362 
1363  if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1364  rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1365  {
1366  return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1367  "(stream ID mismatch)." );
1368  }
1369 
1370 
1371 
1372  if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1373  {
1374  return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1375  "(request ID mismatch)." );
1376  }
1377 
1378  return XRootDStatus();
1379  }
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1260

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_Status::crc32c, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetSize(), ServerResponseStatus::hdr, kXR_1stRequest, kXR_pgread, kXR_pgwrite, ServerResponseBody_pgRead::offset, ServerResponseBody_pgWrite::offset, ServerResponseBody_Status::requestid, XrdCl::stError, ServerResponseHeader::streamid, and ServerResponseBody_Status::streamID.

Referenced by XrdCl::XRootDMsgHandler::InspectStatusRsp().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WaitBeforeExit()

void XrdCl::XRootDTransport::WaitBeforeExit ( )
virtual

Wait until the program can safely exit.

Implements XrdCl::TransportHandler.

Definition at line 1748 of file XrdClXRootDTransport.cc.

1749  {
1750  XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1751  pSecUnloadHandler->unloaded = true;
1752  }

References XrdCl::PluginUnloadHandler::lock, and XrdCl::PluginUnloadHandler::unloaded.

Friends And Related Function Documentation

◆ PluginUnloadHandler

friend struct PluginUnloadHandler
friend

Definition at line 432 of file XrdClXRootDTransport.hh.


The documentation for this class was generated from the following files: