XRootD
XrdCl::XRootDMsgHandler Class Reference

Handle/Process/Forward XRootD messages. More...

#include <XrdClXRootDMsgHandler.hh>

+ Inheritance diagram for XrdCl::XRootDMsgHandler:
+ Collaboration diagram for XrdCl::XRootDMsgHandler:

Public Member Functions

 XRootDMsgHandler (Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
 
 ~XRootDMsgHandler ()
 Destructor. More...
 
virtual uint16_t Examine (std::shared_ptr< Message > &msg) override
 
time_t GetExpiration () override
 Get a timestamp after which we give up. More...
 
const MessageGetRequest () const
 Get the request pointer. More...
 
virtual uint16_t GetSid () const override
 
virtual uint16_t InspectStatusRsp () override
 
virtual bool IsRaw () const override
 Are we a raw writer or not? More...
 
void OnReadyToSend ([[maybe_unused]] Message *msg) override
 
virtual void OnStatusReady (const Message *message, XRootDStatus status) override
 The requested action has been performed and the status is available. More...
 
virtual uint8_t OnStreamEvent (StreamEvent event, XRootDStatus status) override
 
void PartialReceived ()
 
virtual void Process () override
 Process the message if it was "taken" by the examine action. More...
 
virtual XRootDStatus ReadMessageBody (Message *msg, Socket *socket, uint32_t &bytesRead) override
 
void SetChunkList (ChunkList *chunkList)
 Set the chunk list. More...
 
void SetCrc32cDigests (std::vector< uint32_t > &&crc32cDigests)
 
void SetExpiration (time_t expiration)
 Set a timestamp after which we give up. More...
 
void SetFollowMetalink (bool followMetalink)
 
void SetHostList (HostList *hostList)
 Set host list. More...
 
void SetKernelBuffer (XrdSys::KernelBuffer *kbuff)
 Set the kernel buffer. More...
 
void SetLoadBalancer (const HostInfo &loadBalancer)
 Set the load balancer. More...
 
void SetOksofarAsAnswer (bool oksofarAsAnswer)
 
void SetRedirectAsAnswer (bool redirectAsAnswer)
 
void SetRedirectCounter (uint16_t redirectCounter)
 Set the redirect counter. More...
 
void SetStateful (bool stateful)
 
void WaitDone (time_t now)
 
XRootDStatus WriteMessageBody (Socket *socket, uint32_t &bytesWritten) override
 
- Public Member Functions inherited from XrdCl::MsgHandler
virtual ~MsgHandler ()
 Event types that the message handler may receive. More...
 
virtual void OnReadyToSend (Message *msg)
 

Friends

class HandleRspJob
 

Additional Inherited Members

- Public Types inherited from XrdCl::MsgHandler
enum  Action {
  None = 0x0000 ,
  Nop = 0x0001 ,
  Ignore = 0x0002 ,
  RemoveHandler = 0x0004 ,
  Raw = 0x0008 ,
  NoProcess = 0x0010 ,
  Corrupted = 0x0020 ,
  More = 0x0040
}
 Actions to be taken after a message is processed by the handler. More...
 
enum  StreamEvent {
  Ready = 1 ,
  Broken = 2 ,
  Timeout = 3 ,
  FatalError = 4
}
 Events that may have occurred to the stream. More...
 

Detailed Description

Handle/Process/Forward XRootD messages.

Definition at line 119 of file XrdClXRootDMsgHandler.hh.

Constructor & Destructor Documentation

◆ XRootDMsgHandler()

XrdCl::XRootDMsgHandler::XRootDMsgHandler ( Message msg,
ResponseHandler respHandler,
const URL url,
std::shared_ptr< SIDManager sidMgr,
LocalFileHandler lFileHandler 
)
inline

Constructor

Parameters
msgmessage that has been sent out
respHandlerresponse handler to be called then the final final response arrives
urlthe url the message has been sent to
sidMgrthe sid manager used to allocate SID for the initial message

Definition at line 134 of file XrdClXRootDMsgHandler.hh.

138  :
139  pRequest( msg ),
140  pResponseHandler( respHandler ),
141  pUrl( *url ),
142  pEffectiveDataServerUrl( 0 ),
143  pSidMgr( sidMgr ),
144  pLFileHandler( lFileHandler ),
145  pExpiration( 0 ),
146  pRedirectAsAnswer( false ),
147  pOksofarAsAnswer( false ),
148  pHasLoadBalancer( false ),
149  pHasSessionId( false ),
150  pChunkList( 0 ),
151  pKBuff( 0 ),
152  pRedirectCounter( 0 ),
153  pNotAuthorizedCounter( 0 ),
154 
155  pAsyncOffset( 0 ),
156  pAsyncChunkIndex( 0 ),
157 
158  pPgWrtCksumBuff( 4 ),
159  pPgWrtCurrentPageOffset( 0 ),
160  pPgWrtCurrentPageNb( 0 ),
161 
162  pOtherRawStarted( false ),
163 
164  pFollowMetalink( false ),
165 
166  pStateful( false ),
167 
168  pAggregatedWaitTime( 0 ),
169 
170  pMsgInFly( false ),
171  pSendingState( 0 ),
172 
173  pTimeoutFence( false ),
174 
175  pDirListStarted( false ),
176  pDirListWithStat( false ),
177 
178  pCV( 0 ),
179 
180  pSslErrCnt( 0 )
181  {
182  pPostMaster = DefaultEnv::GetPostMaster();
183  if( msg->GetSessionId() )
184  pHasSessionId = true;
185 
186  Log *log = DefaultEnv::GetLog();
187  log->Debug( ExDbgMsg, "[%s] MsgHandler created: %p (message: %s ).",
188  pUrl.GetHostId().c_str(), this,
189  pRequest->GetObfuscatedDescription().c_str() );
190 
191  ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
192  if( ntohs( hdr->requestid ) == kXR_pgread )
193  {
194  ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
195  pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
196  ntohl( pgrdreq->rlen ) ) );
197  }
198 
199  if( ntohs( hdr->requestid ) == kXR_readv )
200  pBodyReader.reset( new AsyncVectorReader( *url, *pRequest ) );
201  else if( ntohs( hdr->requestid ) == kXR_read )
202  pBodyReader.reset( new AsyncRawReader( *url, *pRequest ) );
203  else
204  pBodyReader.reset( new AsyncDiscardReader( *url, *pRequest ) );
205  }
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_pgread
Definition: XProtocol.hh:142
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint64_t ExDbgMsg
XrdSysError Log
Definition: XrdConfig.cc:113

References XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Message::GetSessionId(), kXR_pgread, kXR_read, kXR_readv, ClientPgReadRequest::offset, ClientRequestHdr::requestid, and ClientPgReadRequest::rlen.

+ Here is the call graph for this function:

◆ ~XRootDMsgHandler()

XrdCl::XRootDMsgHandler::~XRootDMsgHandler ( )
inline

Destructor.

Definition at line 210 of file XrdClXRootDMsgHandler.hh.

211  {
212  DumpRedirectTraceBack();
213 
214  if( !pHasSessionId )
215  delete pRequest;
216  delete pEffectiveDataServerUrl;
217 
218  pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
219  pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
220  pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
221  pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
222  pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
223  pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
224 
225  Log *log = DefaultEnv::GetLog();
226  log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: %p.",
227  pUrl.GetHostId().c_str(), this );
228  }
std::vector< ChunkInfo > ChunkList
List of chunks.

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::URL::GetHostId(), and XrdCl::DefaultEnv::GetLog().

+ Here is the call graph for this function:

Member Function Documentation

◆ Examine()

uint16_t XrdCl::XRootDMsgHandler::Examine ( std::shared_ptr< Message > &  msg)
overridevirtual

Examine an incoming message, and decide on the action to be taken

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 109 of file XrdClXRootDMsgHandler.cc.

110  {
111  const int sst = pSendingState.fetch_or( kSawResp );
112 
113  if( !( sst & kSendDone ) && !( sst & kSawResp ) )
114  {
115  // we must have been sent although we haven't got the OnStatusReady
116  // notification yet. Set the inflight notice.
117 
118  Log *log = DefaultEnv::GetLog();
119  log->Dump( XRootDMsg, "[%s] Message %s reply received before notification "
120  "that it was sent, assuming it was sent ok.",
121  pUrl.GetHostId().c_str(),
122  pRequest->GetObfuscatedDescription().c_str() );
123 
124  pMsgInFly = true;
125  }
126 
127  //--------------------------------------------------------------------------
128  // if the MsgHandler is already being used to process another request
129  // (kXR_oksofar) we need to wait
130  //--------------------------------------------------------------------------
131  if( pOksofarAsAnswer )
132  {
133  XrdSysCondVarHelper lck( pCV );
134  while( pResponse ) pCV.Wait();
135  }
136  else
137  {
138  if( pResponse )
139  {
140  Log *log = DefaultEnv::GetLog();
141  log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
142  "it already owns a response: %p (message: %s ).",
143  pUrl.GetHostId().c_str(), this,
144  pRequest->GetObfuscatedDescription().c_str() );
145  }
146  }
147 
148  if( msg->GetSize() < 8 )
149  return Ignore;
150 
151  ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
152  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
153  uint16_t status = 0;
154  uint32_t dlen = 0;
155 
156  //--------------------------------------------------------------------------
157  // We only care about async responses, but those are extracted now
158  // in the SocketHandler.
159  //--------------------------------------------------------------------------
160  if( rsp->hdr.status == kXR_attn )
161  {
162  return Ignore;
163  }
164  //--------------------------------------------------------------------------
165  // We got a sync message - check if it belongs to us
166  //--------------------------------------------------------------------------
167  else
168  {
169  if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
170  rsp->hdr.streamid[1] != req->header.streamid[1] )
171  return Ignore;
172 
173  status = rsp->hdr.status;
174  dlen = rsp->hdr.dlen;
175  }
176 
177  //--------------------------------------------------------------------------
178  // We take the ownership of the message and decide what we will do
179  // with the handler itself, the options are:
180  // 1) we want to either read in raw mode (the Raw flag) or have the message
181  // body reconstructed for us by the TransportHandler by the time
182  // Process() is called (default, no extra flag)
183  // 2) we either got a full response in which case we don't want to be
184  // notified about anything anymore (RemoveHandler) or we got a partial
185  // answer and we need to wait for more (default, no extra flag)
186  //--------------------------------------------------------------------------
187  pResponse = msg;
188  pBodyReader->SetDataLength( dlen );
189 
190  Log *log = DefaultEnv::GetLog();
191  switch( status )
192  {
193  //------------------------------------------------------------------------
194  // Handle the cached cases
195  //------------------------------------------------------------------------
196  case kXR_error:
197  case kXR_redirect:
198  case kXR_wait:
199  return RemoveHandler;
200 
201  case kXR_waitresp:
202  {
203  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
204  "message %s", pUrl.GetHostId().c_str(),
205  pRequest->GetObfuscatedDescription().c_str() );
206 
207  pResponse.reset();
208  return Ignore; // This must be handled synchronously!
209  }
210 
211  //------------------------------------------------------------------------
212  // Handle the potential raw cases
213  //------------------------------------------------------------------------
214  case kXR_ok:
215  {
216  //----------------------------------------------------------------------
217  // For kXR_read we read in raw mode
218  //----------------------------------------------------------------------
219  uint16_t reqId = ntohs( req->header.requestid );
220  if( reqId == kXR_read )
221  {
222  return Raw | RemoveHandler;
223  }
224 
225  //----------------------------------------------------------------------
226  // kXR_readv is the same as kXR_read
227  //----------------------------------------------------------------------
228  if( reqId == kXR_readv )
229  {
230  return Raw | RemoveHandler;
231  }
232 
233  //----------------------------------------------------------------------
234  // For everything else we just take what we got
235  //----------------------------------------------------------------------
236  return RemoveHandler;
237  }
238 
239  //------------------------------------------------------------------------
240  // kXR_oksofars are special, they are not full responses, so we reset
241  // the response pointer to 0 and add the message to the partial list
242  //------------------------------------------------------------------------
243  case kXR_oksofar:
244  {
245  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
246  "%s", pUrl.GetHostId().c_str(),
247  pRequest->GetObfuscatedDescription().c_str() );
248 
249  if( !pOksofarAsAnswer )
250  {
251  pPartialResps.emplace_back( std::move( pResponse ) );
252  }
253 
254  //----------------------------------------------------------------------
255  // For kXR_read we either read in raw mode if the message has not
256  // been fully reconstructed already, if it has, we adjust
257  // the buffer offset to prepare for the next one
258  //----------------------------------------------------------------------
259  uint16_t reqId = ntohs( req->header.requestid );
260  if( reqId == kXR_read )
261  {
262  pTimeoutFence.store( true, std::memory_order_relaxed );
263  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
264  }
265 
266  //----------------------------------------------------------------------
267  // kXR_readv is similar to read, except that the payload is different
268  //----------------------------------------------------------------------
269  if( reqId == kXR_readv )
270  {
271  pTimeoutFence.store( true, std::memory_order_relaxed );
272  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
273  }
274 
275  return ( pOksofarAsAnswer ? None : NoProcess );
276  }
277 
278  case kXR_status:
279  {
280  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
281  "%s", pUrl.GetHostId().c_str(),
282  pRequest->GetObfuscatedDescription().c_str() );
283 
284  uint16_t reqId = ntohs( req->header.requestid );
285  if( reqId == kXR_pgwrite )
286  {
287  //--------------------------------------------------------------------
288  // In case of pgwrite by definition this wont be a partial response
289  // so we can already remove the handler from the in-queue
290  //--------------------------------------------------------------------
291  return RemoveHandler;
292  }
293 
294  //----------------------------------------------------------------------
295  // Otherwise (pgread), first of all we need to read the body of the
296  // kXR_status response, we can handle the raw data (if any) only after
297  // we have the whole kXR_status body
298  //----------------------------------------------------------------------
299  pTimeoutFence.store( true, std::memory_order_relaxed );
300  return None;
301  }
302 
303  //------------------------------------------------------------------------
304  // Default
305  //------------------------------------------------------------------------
306  default:
307  return RemoveHandler;
308  }
309  return RemoveHandler;
310  }
kXR_char streamid[2]
Definition: XProtocol.hh:156
kXR_char streamid[2]
Definition: XProtocol.hh:914
@ kXR_waitresp
Definition: XProtocol.hh:906
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_oksofar
Definition: XProtocol.hh:900
@ kXR_status
Definition: XProtocol.hh:907
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_attn
Definition: XProtocol.hh:901
@ kXR_wait
Definition: XProtocol.hh:905
@ kXR_error
Definition: XProtocol.hh:903
struct ClientRequestHdr header
Definition: XProtocol.hh:846
@ kXR_pgwrite
Definition: XProtocol.hh:138
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
@ Ignore
Ignore the message.
const uint64_t XRootDMsg

References ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, kXR_attn, kXR_error, kXR_ok, kXR_oksofar, kXR_pgwrite, kXR_read, kXR_readv, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::MsgHandler::None, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseHeader::status, ClientRequestHdr::streamid, ServerResponseHeader::streamid, XrdSysCondVar::Wait(), XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ GetExpiration()

time_t XrdCl::XRootDMsgHandler::GetExpiration ( )
inlineoverridevirtual

Get a timestamp after which we give up.

Implements XrdCl::MsgHandler.

Definition at line 331 of file XrdClXRootDMsgHandler.hh.

332  {
333  return pExpiration;
334  }

◆ GetRequest()

const Message* XrdCl::XRootDMsgHandler::GetRequest ( ) const
inline

Get the request pointer.

Definition at line 357 of file XrdClXRootDMsgHandler.hh.

358  {
359  return pRequest;
360  }

◆ GetSid()

uint16_t XrdCl::XRootDMsgHandler::GetSid ( ) const
overridevirtual

Get handler sid

return sid of the corresponding request, otherwise 0

Implements XrdCl::MsgHandler.

Definition at line 405 of file XrdClXRootDMsgHandler.cc.

406  {
407  ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
408  return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
409  }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::XRootDMsgHandler::InspectStatusRsp ( )
overridevirtual

Reexamine the incoming message, and decide on the action to be taken

In case of kXR_status the message can be only fully examined after reading the whole body (without raw data).

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 315 of file XrdClXRootDMsgHandler.cc.

316  {
317  if( !pResponse )
318  return 0;
319 
320  Log *log = DefaultEnv::GetLog();
321  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
322 
323  //--------------------------------------------------------------------------
324  // Additional action is only required for kXR_status
325  //--------------------------------------------------------------------------
326  if( rsp->hdr.status != kXR_status ) return 0;
327 
328  //--------------------------------------------------------------------------
329  // Ignore malformed status response
330  //--------------------------------------------------------------------------
331  if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
332  {
333  log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
334  return Corrupted;
335  }
336 
337  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
338  uint16_t reqId = ntohs( req->header.requestid );
339  //--------------------------------------------------------------------------
340  // Unmarshal the status body
341  //--------------------------------------------------------------------------
342  XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
343 
344  if( !st.IsOK() && st.code == errDataError )
345  {
346  log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
347  st.GetErrorMessage().c_str() );
348  return Corrupted;
349  }
350 
351  if( !st.IsOK() )
352  {
353  log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
354  pUrl.GetHostId().c_str() );
355  pStatus = st;
356  HandleRspOrQueue();
357  return Ignore;
358  }
359 
360  //--------------------------------------------------------------------------
361  // Common handling for partial results
362  //--------------------------------------------------------------------------
363  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
365  {
366  pPartialResps.push_back( std::move( pResponse ) );
367  }
368 
369  //--------------------------------------------------------------------------
370  // Decide the actions that we need to take
371  //--------------------------------------------------------------------------
372  uint16_t action = 0;
373  if( reqId == kXR_pgread )
374  {
375  //----------------------------------------------------------------------
376  // The message contains only Status header and body but no raw data
377  //----------------------------------------------------------------------
378  if( !pPageReader )
379  pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
380  pPageReader->SetRsp( rspst );
381 
382  action |= Raw;
383 
385  action |= NoProcess;
386  else
387  action |= RemoveHandler;
388  }
389  else if( reqId == kXR_pgwrite )
390  {
391  // if data corruption has been detected on the server side we will
392  // send some additional data pointing to the pages that need to be
393  // retransmitted
394  if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
395  pResponse->GetCursor() )
396  action |= More;
397  }
398 
399  return action;
400  }
ServerResponseStatus status
Definition: XProtocol.hh:1310
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1262
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1261
@ More
there are more (non-raw) data to be read
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
@ kXR_PartialResult
Definition: XProtocol.hh:1251

References ServerResponseStatus::bdy, XrdCl::Status::code, XrdCl::MsgHandler::Corrupted, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), ServerResponseStatus::hdr, ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, XrdCl::Status::IsOK(), XrdProto::kXR_PartialResult, kXR_pgread, kXR_pgwrite, kXR_status, XrdCl::MsgHandler::More, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseBody_Status::resptype, ServerResponseHeader::status, ServerResponseV2::status, XrdCl::XRootDTransport::UnMarshalStatusBody(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ IsRaw()

bool XrdCl::XRootDMsgHandler::IsRaw ( ) const
overridevirtual

Are we a raw writer or not?

Reimplemented from XrdCl::MsgHandler.

Definition at line 981 of file XrdClXRootDMsgHandler.cc.

982  {
983  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
984  uint16_t reqId = ntohs( req->header.requestid );
985  if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
986  return true;
987  // checkpoint + execute
988  if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
989  {
990  ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
991  reqId = ntohs( xeq->header.requestid );
992  return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
993  }
994 
995  return false;
996  }
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_chkpoint
Definition: XProtocol.hh:124
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849

References ClientRequest::chkpoint, XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_chkpoint, kXR_ckpXeq, kXR_pgwrite, kXR_truncate, kXR_write, kXR_writev, ClientChkPointRequest::opcode, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ OnReadyToSend()

void XrdCl::XRootDMsgHandler::OnReadyToSend ( [[maybe_unused] ] Message msg)
inlineoverride

Definition at line 433 of file XrdClXRootDMsgHandler.hh.

434  {
435  pSendingState = 0;
436  }

◆ OnStatusReady()

void XrdCl::XRootDMsgHandler::OnStatusReady ( const Message message,
XRootDStatus  status 
)
overridevirtual

The requested action has been performed and the status is available.

Implements XrdCl::MsgHandler.

Definition at line 921 of file XrdClXRootDMsgHandler.cc.

923  {
924  Log *log = DefaultEnv::GetLog();
925 
926  const int sst = pSendingState.fetch_or( kSendDone );
927 
928  // if we have already seen a response we can not be in the out-queue
929  // anymore, so we should be getting notified of a successful send.
930  // If not log and do our best to recover.
931  if( !status.IsOK() && ( ( sst & kFinalResp ) || ( sst & kSawResp ) ) )
932  {
933  log->Error( XRootDMsg, "[%s] Unexpected error for message %s. Trying to "
934  "recover.", pUrl.GetHostId().c_str(),
935  message->GetObfuscatedDescription().c_str() );
936  HandleError( status );
937  return;
938  }
939 
940  if( sst & kFinalResp )
941  {
942  log->Dump( XRootDMsg, "[%s] Got late notification that outgoing message %s was "
943  "sent, already have final response, queuing handler callback.",
944  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
945  HandleRspOrQueue();
946  return;
947  }
948 
949  if( sst & kSawResp )
950  {
951  log->Dump( XRootDMsg, "[%s] Got late notification that message %s has "
952  "been successfully sent.",
953  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
954  return;
955  }
956 
957  //--------------------------------------------------------------------------
958  // We were successful, so we now need to listen for a response
959  //--------------------------------------------------------------------------
960  if( status.IsOK() )
961  {
962  log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
963  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
964 
965  pMsgInFly = true;
966  return;
967  }
968 
969  //--------------------------------------------------------------------------
970  // We have failed, recover if possible
971  //--------------------------------------------------------------------------
972  log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
973  "recover.", pUrl.GetHostId().c_str(),
974  message->GetObfuscatedDescription().c_str() );
975  HandleError( status );
976  }

References XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Status::IsOK(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ OnStreamEvent()

uint8_t XrdCl::XRootDMsgHandler::OnStreamEvent ( StreamEvent  event,
XRootDStatus  status 
)
overridevirtual

Handle an event other that a message arrival

Parameters
eventtype of the event
statusstatus info

Reimplemented from XrdCl::MsgHandler.

Definition at line 882 of file XrdClXRootDMsgHandler.cc.

884  {
885  Log *log = DefaultEnv::GetLog();
886  log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
887  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
888 
889  if( event == Ready )
890  return 0;
891 
892  if( pTimeoutFence.load( std::memory_order_relaxed ) )
893  return 0;
894 
895  HandleError( status );
896  return RemoveHandler;
897  }
@ Ready
The stream has become connected.

References XrdCl::Log::Dump(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::MsgHandler::Ready, XrdCl::MsgHandler::RemoveHandler, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ PartialReceived()

void XrdCl::XRootDMsgHandler::PartialReceived ( )

Bookkeeping after partial response has been received:

  • take down the timeout fence after oksofar response has been handled
  • reset status-response-body marshaled flag

Definition at line 1158 of file XrdClXRootDMsgHandler.cc.

1159  {
1160  pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1161  }

Referenced by XrdCl::Stream::ForceError(), XrdCl::Stream::OnError(), and XrdCl::Stream::OnIncoming().

+ Here is the caller graph for this function:

◆ Process()

void XrdCl::XRootDMsgHandler::Process ( )
overridevirtual

Process the message if it was "taken" by the examine action.

Process the message if it was "taken" by the examine action

Parameters
msgthe message to be processed

Reimplemented from XrdCl::MsgHandler.

Definition at line 414 of file XrdClXRootDMsgHandler.cc.

415  {
416  Log *log = DefaultEnv::GetLog();
417 
418  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
419 
420  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
421 
422  //--------------------------------------------------------------------------
423  // If it is a local file, it can be only a metalink redirector
424  //--------------------------------------------------------------------------
425  if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
426  pHosts->back().protocol = kXR_PROTOCOLVERSION;
427 
428  //--------------------------------------------------------------------------
429  // We got an answer, check who we were talking to
430  //--------------------------------------------------------------------------
431  else
432  {
433  AnyObject qryResult;
434  int *qryResponse = nullptr;
435  pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
436  qryResult.Get( qryResponse );
437  if (qryResponse) {
438  pHosts->back().flags = *qryResponse;
439  delete qryResponse;
440  qryResponse = nullptr;
441  }
442  pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
443  qryResult.Get( qryResponse );
444  if (qryResponse) {
445  pHosts->back().protocol = *qryResponse;
446  delete qryResponse;
447  }
448  }
449 
450  //--------------------------------------------------------------------------
451  // Process the message
452  //--------------------------------------------------------------------------
453  Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
454  if( !st.IsOK() )
455  {
456  pStatus = Status( stFatal, errInvalidMessage );
457  HandleResponse();
458  return;
459  }
460 
461  //--------------------------------------------------------------------------
462  // we have an response for the message so it's not in fly anymore
463  //--------------------------------------------------------------------------
464  pMsgInFly = false;
465 
466  //--------------------------------------------------------------------------
467  // Reset the aggregated wait (used to omit wait response in case of Metalink
468  // redirector)
469  //--------------------------------------------------------------------------
470  if( rsp->hdr.status != kXR_wait )
471  pAggregatedWaitTime = 0;
472 
473  switch( rsp->hdr.status )
474  {
475  //------------------------------------------------------------------------
476  // kXR_ok - we're done here
477  //------------------------------------------------------------------------
478  case kXR_ok:
479  {
480  log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
481  pUrl.GetHostId().c_str(),
482  pRequest->GetObfuscatedDescription().c_str() );
483  pStatus = Status();
484  HandleResponse();
485  return;
486  }
487 
488  case kXR_status:
489  {
490  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
491  pUrl.GetHostId().c_str(),
492  pRequest->GetObfuscatedDescription().c_str() );
493  pStatus = Status();
494  HandleResponse();
495  return;
496  }
497 
498  //------------------------------------------------------------------------
499  // kXR_ok - we're serving partial result to the user
500  //------------------------------------------------------------------------
501  case kXR_oksofar:
502  {
503  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
504  pUrl.GetHostId().c_str(),
505  pRequest->GetObfuscatedDescription().c_str() );
506  pStatus = Status( stOK, suContinue );
507  HandleResponse();
508  return;
509  }
510 
511  //------------------------------------------------------------------------
512  // kXR_error - we've got a problem
513  //------------------------------------------------------------------------
514  case kXR_error:
515  {
516  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
517  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
518  log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
519  "[%d] %s", pUrl.GetHostId().c_str(),
520  pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
521  errmsg );
522  delete [] errmsg;
523 
524  HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
525  return;
526  }
527 
528  //------------------------------------------------------------------------
529  // kXR_redirect - they tell us to go elsewhere
530  //------------------------------------------------------------------------
531  case kXR_redirect:
532  {
533  if( rsp->hdr.dlen <= 4 )
534  {
535  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
536  pUrl.GetHostId().c_str() );
537  pStatus = Status( stError, errInvalidResponse );
538  HandleResponse();
539  return;
540  }
541 
542  char *urlInfoBuff = new char[rsp->hdr.dlen-3];
543  urlInfoBuff[rsp->hdr.dlen-4] = 0;
544  memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
545  std::string urlInfo = urlInfoBuff;
546  delete [] urlInfoBuff;
547  log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
548  "message %s: %s, port %d", pUrl.GetHostId().c_str(),
549  pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
550  rsp->body.redirect.port );
551 
552  //----------------------------------------------------------------------
553  // Check if we can proceed
554  //----------------------------------------------------------------------
555  if( !pRedirectCounter )
556  {
557  log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
558  "message %s, the last known error is: %s",
559  pUrl.GetHostId().c_str(),
560  pRequest->GetObfuscatedDescription().c_str(),
561  pLastError.ToString().c_str() );
562 
563 
564  pStatus = Status( stFatal, errRedirectLimit );
565  HandleResponse();
566  return;
567  }
568  --pRedirectCounter;
569 
570  //----------------------------------------------------------------------
571  // Keep the info about this server if we still need to find a load
572  // balancer
573  //----------------------------------------------------------------------
574  uint32_t flags = pHosts->back().flags;
575  if( !pHasLoadBalancer )
576  {
577  if( flags & kXR_isManager )
578  {
579  //------------------------------------------------------------------
580  // If the current server is a meta manager then it supersedes
581  // any existing load balancer, otherwise we assign a load-balancer
582  // only if it has not been already assigned
583  //------------------------------------------------------------------
584  if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
585  {
586  pLoadBalancer = pHosts->back();
587  log->Dump( XRootDMsg, "[%s] Current server has been assigned "
588  "as a load-balancer for message %s",
589  pUrl.GetHostId().c_str(),
590  pRequest->GetObfuscatedDescription().c_str() );
591  HostList::iterator it;
592  for( it = pHosts->begin(); it != pHosts->end(); ++it )
593  it->loadBalancer = false;
594  pHosts->back().loadBalancer = true;
595  }
596  }
597  }
598 
599  //----------------------------------------------------------------------
600  // If the redirect comes from a data server safe the URL because
601  // in case of a failure we will use it as the effective data server URL
602  // for the tried CGI opaque info
603  //----------------------------------------------------------------------
604  if( flags & kXR_isServer )
605  pEffectiveDataServerUrl = new URL( pHosts->back().url );
606 
607  //----------------------------------------------------------------------
608  // Build the URL and check it's validity
609  //----------------------------------------------------------------------
610  std::vector<std::string> urlComponents;
611  std::string newCgi;
612  Utils::splitString( urlComponents, urlInfo, "?" );
613 
614  std::ostringstream o;
615 
616  o << urlComponents[0];
617  if( rsp->body.redirect.port > 0 )
618  o << ":" << rsp->body.redirect.port << "/";
619  else if( rsp->body.redirect.port < 0 )
620  {
621  //--------------------------------------------------------------------
622  // check if the manager wants to enforce write recovery at himself
623  // (beware we are dealing here with negative flags)
624  //--------------------------------------------------------------------
625  if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
626  pHosts->back().flags |= kXR_recoverWrts;
627 
628  //--------------------------------------------------------------------
629  // check if the manager wants to collapse the communication channel
630  // (the redirect host is to replace the current host)
631  //--------------------------------------------------------------------
632  if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
633  {
634  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
635  pPostMaster->CollapseRedirect( pUrl, url );
636  }
637 
638  if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
639  {
640  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
641  if( Utils::CheckEC( pRequest, url ) )
642  pRedirectAsAnswer = true;
643  }
644  }
645 
646  URL newUrl = URL( o.str() );
647  if( !newUrl.IsValid() )
648  {
649  pStatus = Status( stError, errInvalidRedirectURL );
650  log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
651  pUrl.GetHostId().c_str(), urlInfo.c_str() );
652  HandleResponse();
653  return;
654  }
655 
656  if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
657  newUrl.SetUserName( pUrl.GetUserName() );
658 
659  if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
660  newUrl.SetPassword( pUrl.GetPassword() );
661 
662  //----------------------------------------------------------------------
663  // Forward any "xrd.*" params from the original client request also to
664  // the new redirection url
665  // Also, we need to preserve any "xrdcl.*' as they are important for
666  // our internal workflows.
667  //----------------------------------------------------------------------
668  std::ostringstream ossXrd;
669  const URL::ParamsMap &urlParams = pUrl.GetParams();
670 
671  for(URL::ParamsMap::const_iterator it = urlParams.begin();
672  it != urlParams.end(); ++it )
673  {
674  if( it->first.compare( 0, 4, "xrd." ) &&
675  it->first.compare( 0, 6, "xrdcl." ) )
676  continue;
677 
678  ossXrd << it->first << '=' << it->second << '&';
679  }
680 
681  std::string xrdCgi = ossXrd.str();
682  pRedirectUrl = newUrl.GetURL();
683 
684  URL cgiURL;
685  if( urlComponents.size() > 1 )
686  {
687  pRedirectUrl += "?";
688  pRedirectUrl += urlComponents[1];
689  std::ostringstream o;
690  o << "fake://fake:111//fake?";
691  o << urlComponents[1];
692 
693  if( urlComponents.size() == 3 )
694  o << '?' << urlComponents[2];
695 
696  if (!xrdCgi.empty())
697  {
698  o << '&' << xrdCgi;
699  pRedirectUrl += '&';
700  pRedirectUrl += xrdCgi;
701  }
702 
703  cgiURL = URL( o.str() );
704  }
705  else {
706  if (!xrdCgi.empty())
707  {
708  std::ostringstream o;
709  o << "fake://fake:111//fake?";
710  o << xrdCgi;
711  cgiURL = URL( o.str() );
712  pRedirectUrl += '?';
713  pRedirectUrl += xrdCgi;
714  }
715  }
716 
717  //----------------------------------------------------------------------
718  // Check if we need to return the URL as a response
719  //----------------------------------------------------------------------
720  if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
721  newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
722  !newUrl.IsLocalFile() )
723  pRedirectAsAnswer = true;
724 
725  if( pRedirectAsAnswer )
726  {
727  pStatus = Status( stError, errRedirect );
728  HandleResponse();
729  return;
730  }
731 
732  //----------------------------------------------------------------------
733  // Rewrite the message in a way required to send it to another server
734  //----------------------------------------------------------------------
735  newUrl.SetParams( cgiURL.GetParams() );
736  Status st = RewriteRequestRedirect( newUrl );
737  if( !st.IsOK() )
738  {
739  pStatus = st;
740  HandleResponse();
741  return;
742  }
743 
744  //----------------------------------------------------------------------
745  // Make sure we don't change the protocol by accident (root vs roots)
746  //----------------------------------------------------------------------
747  if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
748  ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
749  newUrl.SetProtocol( "roots" );
750 
751  //----------------------------------------------------------------------
752  // Send the request to the new location
753  //----------------------------------------------------------------------
754  HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
755  return;
756  }
757 
758  //------------------------------------------------------------------------
759  // kXR_wait - we wait, and re-issue the request later
760  //------------------------------------------------------------------------
761  case kXR_wait:
762  {
763  uint32_t waitSeconds = 0;
764 
765  if( rsp->hdr.dlen >= 4 )
766  {
767  char *infoMsg = new char[rsp->hdr.dlen-3];
768  infoMsg[rsp->hdr.dlen-4] = 0;
769  memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
770  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
771  "message %s: %s", pUrl.GetHostId().c_str(),
772  rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
773  infoMsg );
774  delete [] infoMsg;
775  waitSeconds = rsp->body.wait.seconds;
776  }
777  else
778  {
779  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
780  "message %s", pUrl.GetHostId().c_str(),
781  pRequest->GetObfuscatedDescription().c_str() );
782  }
783 
784  pAggregatedWaitTime += waitSeconds;
785 
786  // We need a special case if the data node comes from metalink
787  // redirector. In this case it might make more sense to try the
788  // next entry in the Metalink than wait.
789  if( OmitWait( *pRequest, pLoadBalancer.url ) )
790  {
791  int maxWait = DefaultMaxMetalinkWait;
792  DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
793  if( pAggregatedWaitTime > maxWait )
794  {
795  UpdateTriedCGI();
796  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
797  return;
798  }
799  }
800 
801  //----------------------------------------------------------------------
802  // Some messages require rewriting before they can be sent again
803  // after wait
804  //----------------------------------------------------------------------
805  Status st = RewriteRequestWait();
806  if( !st.IsOK() )
807  {
808  pStatus = st;
809  HandleResponse();
810  return;
811  }
812 
813  //----------------------------------------------------------------------
814  // Register a task to resend the message in some seconds, if we still
815  // have time to do that, and report a timeout otherwise
816  //----------------------------------------------------------------------
817  time_t resendTime = ::time(0)+waitSeconds;
818 
819  if( resendTime < pExpiration )
820  {
821  log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
822  pUrl.GetHostId().c_str(), this,
823  pRequest->GetObfuscatedDescription().c_str() );
824 
825  TaskManager *taskMgr = pPostMaster->GetTaskManager();
826  taskMgr->RegisterTask( new WaitTask( this ), resendTime );
827  }
828  else
829  {
830  log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
831  pUrl.GetHostId().c_str(),
832  pRequest->GetObfuscatedDescription().c_str() );
833  HandleError( Status( stError, errOperationExpired) );
834  }
835  return;
836  }
837 
838  //------------------------------------------------------------------------
839  // kXR_waitresp - the response will be returned in some seconds as an
840  // unsolicited message. Currently all messages of this type are handled
841  // one step before in the XrdClStream::OnIncoming as they need to be
842  // processed synchronously.
843  //------------------------------------------------------------------------
844  case kXR_waitresp:
845  {
846  if( rsp->hdr.dlen < 4 )
847  {
848  log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
849  pUrl.GetHostId().c_str() );
850  pStatus = Status( stError, errInvalidResponse );
851  HandleResponse();
852  return;
853  }
854 
855  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
856  "message %s", pUrl.GetHostId().c_str(),
857  rsp->body.waitresp.seconds,
858  pRequest->GetObfuscatedDescription().c_str() );
859  return;
860  }
861 
862  //------------------------------------------------------------------------
863  // Default - unrecognized/unsupported response, declare an error
864  //------------------------------------------------------------------------
865  default:
866  {
867  log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
868  "message %s", pUrl.GetHostId().c_str(),
869  rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
870  pStatus = Status( stError, errInvalidResponse );
871  HandleResponse();
872  return;
873  }
874  }
875 
876  return;
877  }
#define kXR_isManager
Definition: XProtocol.hh:1156
union ServerResponse::@0 body
#define kXR_collapseRedir
Definition: XProtocol.hh:1167
#define kXR_attrMeta
Definition: XProtocol.hh:1159
#define kXR_recoverWrts
Definition: XProtocol.hh:1166
#define kXR_isServer
Definition: XProtocol.hh:1157
#define kXR_PROTOCOLVERSION
Definition: XProtocol.hh:70
#define kXR_ecRedir
Definition: XProtocol.hh:1168
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
void RegisterTask(Task *task, time_t time, bool own=true)
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:465
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
const std::string & GetUserName() const
Get the username.
Definition: XrdClURL.hh:135
const std::string & GetPassword() const
Get the password.
Definition: XrdClURL.hh:153
bool IsLocalFile() const
Definition: XrdClURL.cc:474
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Definition: XrdClUtils.cc:703
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
const uint16_t errRedirectLimit
Definition: XrdClStatus.hh:102
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errInvalidRedirectURL
Definition: XrdClStatus.hh:98
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85
URL url
URL of the host.
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version

References ServerResponse::body, XrdCl::Utils::CheckEC(), XrdCl::PostMaster::CollapseRedirect(), XrdCl::Log::Debug(), XrdCl::DefaultMaxMetalinkWait, ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::RedirectEntry::EntryRedirect, XrdCl::RedirectEntry::EntryRedirectOnWait, XrdCl::errErrorResponse, XrdCl::errInvalidMessage, XrdCl::errInvalidRedirectURL, XrdCl::errInvalidResponse, XrdCl::errOperationExpired, XrdCl::Log::Error(), XrdCl::errRedirect, XrdCl::errRedirectLimit, XrdCl::ExDbgMsg, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetEnv(), XrdCl::URL::GetHostId(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::URL::GetParams(), XrdCl::URL::GetPassword(), XrdCl::URL::GetProtocol(), XrdCl::PostMaster::GetTaskManager(), XrdCl::URL::GetURL(), XrdCl::URL::GetUserName(), ServerResponse::hdr, ClientRequest::header, XrdCl::URL::IsLocalFile(), XrdCl::URL::IsMetalink(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), kXR_attrMeta, kXR_collapseRedir, kXR_ecRedir, kXR_error, kXR_isManager, kXR_isServer, kXR_ok, kXR_oksofar, kXR_PROTOCOLVERSION, kXR_recoverWrts, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::PostMaster::QueryTransport(), XrdCl::TaskManager::RegisterTask(), ClientRequestHdr::requestid, XrdCl::XRootDQuery::ServerFlags, XrdCl::URL::SetParams(), XrdCl::URL::SetPassword(), XrdCl::URL::SetProtocol(), XrdCl::URL::SetUserName(), XrdCl::Utils::splitString(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stFatal, XrdCl::stOK, XrdCl::suContinue, XrdCl::Status::ToString(), XrdCl::XRootDTransport::UnMarshallBody(), XrdCl::HostInfo::url, XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ ReadMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::ReadMessageBody ( Message msg,
Socket socket,
uint32_t &  bytesRead 
)
overridevirtual

Read message body directly from a socket - called if Examine returns Raw flag - only socket related errors may be returned here

Parameters
msgthe corresponding message header
socketthe socket to read from
bytesReadnumber of bytes read by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data is needed stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 902 of file XrdClXRootDMsgHandler.cc.

905  {
906  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
907  uint16_t reqId = ntohs( req->header.requestid );
908 
909  if( reqId == kXR_pgread )
910  return pPageReader->Read( *socket, bytesRead );
911 
912  return pBodyReader->Read( *socket, bytesRead );
913  }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_pgread, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ SetChunkList()

void XrdCl::XRootDMsgHandler::SetChunkList ( ChunkList chunkList)
inline

Set the chunk list.

Definition at line 384 of file XrdClXRootDMsgHandler.hh.

385  {
386  pChunkList = chunkList;
387  if( pBodyReader )
388  pBodyReader->SetChunkList( chunkList );
389  if( chunkList )
390  pChunkStatus.resize( chunkList->size() );
391  else
392  pChunkStatus.clear();
393  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetCrc32cDigests()

void XrdCl::XRootDMsgHandler::SetCrc32cDigests ( std::vector< uint32_t > &&  crc32cDigests)
inline

Definition at line 395 of file XrdClXRootDMsgHandler.hh.

396  {
397  pCrc32cDigests = std::move( crc32cDigests );
398  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetExpiration()

void XrdCl::XRootDMsgHandler::SetExpiration ( time_t  expiration)
inline

Set a timestamp after which we give up.

Definition at line 323 of file XrdClXRootDMsgHandler.hh.

324  {
325  pExpiration = expiration;
326  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetFollowMetalink()

void XrdCl::XRootDMsgHandler::SetFollowMetalink ( bool  followMetalink)
inline

Definition at line 416 of file XrdClXRootDMsgHandler.hh.

417  {
418  pFollowMetalink = followMetalink;
419  }

Referenced by XrdCl::MessageUtils::RedirectMessage().

+ Here is the caller graph for this function:

◆ SetHostList()

void XrdCl::XRootDMsgHandler::SetHostList ( HostList hostList)
inline

Set host list.

Definition at line 376 of file XrdClXRootDMsgHandler.hh.

377  {
378  pHosts.reset( hostList );
379  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetKernelBuffer()

void XrdCl::XRootDMsgHandler::SetKernelBuffer ( XrdSys::KernelBuffer kbuff)
inline

Set the kernel buffer.

Definition at line 403 of file XrdClXRootDMsgHandler.hh.

404  {
405  pKBuff = kbuff;
406  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetLoadBalancer()

void XrdCl::XRootDMsgHandler::SetLoadBalancer ( const HostInfo loadBalancer)
inline

Set the load balancer.

Definition at line 365 of file XrdClXRootDMsgHandler.hh.

366  {
367  if( !loadBalancer.url.IsValid() )
368  return;
369  pLoadBalancer = loadBalancer;
370  pHasLoadBalancer = true;
371  }

References XrdCl::URL::IsValid(), and XrdCl::HostInfo::url.

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetOksofarAsAnswer()

void XrdCl::XRootDMsgHandler::SetOksofarAsAnswer ( bool  oksofarAsAnswer)
inline

Treat the kXR_oksofar response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 349 of file XrdClXRootDMsgHandler.hh.

350  {
351  pOksofarAsAnswer = oksofarAsAnswer;
352  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectAsAnswer()

void XrdCl::XRootDMsgHandler::SetRedirectAsAnswer ( bool  redirectAsAnswer)
inline

Treat the kXR_redirect response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 340 of file XrdClXRootDMsgHandler.hh.

341  {
342  pRedirectAsAnswer = redirectAsAnswer;
343  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectCounter()

void XrdCl::XRootDMsgHandler::SetRedirectCounter ( uint16_t  redirectCounter)
inline

Set the redirect counter.

Definition at line 411 of file XrdClXRootDMsgHandler.hh.

412  {
413  pRedirectCounter = redirectCounter;
414  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetStateful()

void XrdCl::XRootDMsgHandler::SetStateful ( bool  stateful)
inline

Definition at line 421 of file XrdClXRootDMsgHandler.hh.

422  {
423  pStateful = stateful;
424  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ WaitDone()

void XrdCl::XRootDMsgHandler::WaitDone ( time_t  now)

Called after the wait time for kXR_wait has elapsed

Parameters
nowcurrent timestamp

Definition at line 1150 of file XrdClXRootDMsgHandler.cc.

1151  {
1152  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1153  }

References XrdCl::RedirectEntry::EntryWait.

◆ WriteMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::WriteMessageBody ( Socket socket,
uint32_t &  bytesWritten 
)
overridevirtual

Write message body directly to a socket - called if IsRaw returns true - only socket related errors may be returned here

Parameters
socketthe socket to read from
bytesWrittennumber of bytes written by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data needs to be written stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 1001 of file XrdClXRootDMsgHandler.cc.

1003  {
1004  //--------------------------------------------------------------------------
1005  // First check if it is a PgWrite
1006  //--------------------------------------------------------------------------
1007  if( !pChunkList->empty() && !pCrc32cDigests.empty() )
1008  {
1009  //------------------------------------------------------------------------
1010  // PgWrite will have just one chunk
1011  //------------------------------------------------------------------------
1012  ChunkInfo chunk = pChunkList->front();
1013  //------------------------------------------------------------------------
1014  // Calculate the size of the first and last page (in case the chunk is not
1015  // 4KB aligned)
1016  //------------------------------------------------------------------------
1017  int fLen = 0, lLen = 0;
1018  size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
1019 
1020  //------------------------------------------------------------------------
1021  // Set the crc32c buffer if not ready yet
1022  //------------------------------------------------------------------------
1023  if( pPgWrtCksumBuff.GetCursor() == 0 )
1024  {
1025  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1026  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1027  }
1028 
1029  uint32_t btsLeft = chunk.length - pAsyncOffset;
1030  uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1031  if( pglen > btsLeft ) pglen = btsLeft;
1032  char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
1033 
1034  while( btsLeft > 0 )
1035  {
1036  // first write the crc32c digest
1037  while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
1038  {
1039  uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1040  char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1041  int btswrt = 0;
1042  Status st = socket->Send( dgstbuf, dgstlen, btswrt );
1043  if( !st.IsOK() ) return st;
1044  bytesWritten += btswrt;
1045  pPgWrtCksumBuff.AdvanceCursor( btswrt );
1046  if( st.code == suRetry ) return st;
1047  }
1048  // then write the raw data (one page)
1049  int btswrt = 0;
1050  Status st = socket->Send( pgbuf, pglen, btswrt );
1051  if( !st.IsOK() ) return st;
1052  pgbuf += btswrt;
1053  pglen -= btswrt;
1054  btsLeft -= btswrt;
1055  bytesWritten += btswrt;
1056  pAsyncOffset += btswrt; // update the offset to the raw data
1057  if( st.code == suRetry ) return st;
1058  // if we managed to write all the data ...
1059  if( pglen == 0 )
1060  {
1061  // move to the next page
1062  ++pPgWrtCurrentPageNb;
1063  if( pPgWrtCurrentPageNb < nbpgs )
1064  {
1065  // set the digest buffer
1066  pPgWrtCksumBuff.SetCursor( 0 );
1067  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1068  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1069  }
1070  // set the page length
1071  pglen = XrdSys::PageSize;
1072  if( pglen > btsLeft ) pglen = btsLeft;
1073  // reset offset in the current page
1074  pPgWrtCurrentPageOffset = 0;
1075  }
1076  else
1077  // otherwise just adjust the offset in the current page
1078  pPgWrtCurrentPageOffset += btswrt;
1079 
1080  }
1081  }
1082  else if( !pChunkList->empty() )
1083  {
1084  size_t size = pChunkList->size();
1085  for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1086  {
1087  char *buffer = (char*)(*pChunkList)[i].buffer;
1088  uint32_t size = (*pChunkList)[i].length;
1089  size_t leftToBeWritten = size - pAsyncOffset;
1090 
1091  while( leftToBeWritten )
1092  {
1093  int btswrt = 0;
1094  Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1095  bytesWritten += btswrt;
1096  if( !st.IsOK() || st.code == suRetry ) return st;
1097  pAsyncOffset += btswrt;
1098  leftToBeWritten -= btswrt;
1099  }
1100  //----------------------------------------------------------------------
1101  // Remember that we have moved to the next chunk, also clear the offset
1102  // within the buffer as we are going to move to a new one
1103  //----------------------------------------------------------------------
1104  ++pAsyncChunkIndex;
1105  pAsyncOffset = 0;
1106  }
1107  }
1108  else
1109  {
1110  Log *log = DefaultEnv::GetLog();
1111 
1112  //------------------------------------------------------------------------
1113  // If the socket is encrypted we cannot use a kernel buffer, we have to
1114  // convert to user space buffer
1115  //------------------------------------------------------------------------
1116  if( socket->IsEncrypted() )
1117  {
1118  log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1119  pUrl.GetHostId().c_str() );
1120 
1121  char *ubuff = 0;
1122  ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1123  if( ret < 0 ) return Status( stError, errInternal );
1124  pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1125  return WriteMessageBody( socket, bytesWritten );
1126  }
1127 
1128  //------------------------------------------------------------------------
1129  // Send the data
1130  //------------------------------------------------------------------------
1131  while( !pKBuff->Empty() )
1132  {
1133  int btswrt = 0;
1134  Status st = socket->Send( *pKBuff, btswrt );
1135  bytesWritten += btswrt;
1136  if( !st.IsOK() || st.code == suRetry ) return st;
1137  }
1138 
1139  log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1140  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1141  }
1142 
1143  return Status();
1144  }
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)

References XrdCl::Buffer::AdvanceCursor(), XrdCl::ChunkInfo::buffer, XrdCl::Status::code, XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdSys::KernelBuffer::Empty(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Socket::IsEncrypted(), XrdCl::Status::IsOK(), XrdCl::ChunkInfo::length, XrdSys::Move(), XrdCl::ChunkInfo::offset, XrdSys::PageSize, XrdCl::Socket::Send(), XrdCl::Buffer::SetCursor(), XrdCl::stError, XrdCl::suRetry, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

Friends And Related Function Documentation

◆ HandleRspJob

friend class HandleRspJob
friend

Definition at line 121 of file XrdClXRootDMsgHandler.hh.


The documentation for this class was generated from the following files: