XRootD
XrdCl::XRootDMsgHandler Class Reference

Handle/Process/Forward XRootD messages. More...

#include <XrdClXRootDMsgHandler.hh>

+ Inheritance diagram for XrdCl::XRootDMsgHandler:
+ Collaboration diagram for XrdCl::XRootDMsgHandler:

Public Member Functions

 XRootDMsgHandler (Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
 
 ~XRootDMsgHandler ()
 Destructor. More...
 
virtual uint16_t Examine (std::shared_ptr< Message > &msg)
 
time_t GetExpiration ()
 Get a timestamp after which we give up. More...
 
const MessageGetRequest () const
 Get the request pointer. More...
 
virtual uint16_t GetSid () const
 
virtual uint16_t InspectStatusRsp ()
 
virtual bool IsRaw () const
 Are we a raw writer or not? More...
 
virtual void OnStatusReady (const Message *message, XRootDStatus status)
 The requested action has been performed and the status is available. More...
 
virtual uint8_t OnStreamEvent (StreamEvent event, XRootDStatus status)
 
void PartialReceived ()
 
virtual void Process ()
 Process the message if it was "taken" by the examine action. More...
 
virtual XRootDStatus ReadMessageBody (Message *msg, Socket *socket, uint32_t &bytesRead)
 
void SetChunkList (ChunkList *chunkList)
 Set the chunk list. More...
 
void SetCrc32cDigests (std::vector< uint32_t > &&crc32cDigests)
 
void SetExpiration (time_t expiration)
 Set a timestamp after which we give up. More...
 
void SetFollowMetalink (bool followMetalink)
 
void SetHostList (HostList *hostList)
 Set host list. More...
 
void SetKernelBuffer (XrdSys::KernelBuffer *kbuff)
 Set the kernel buffer. More...
 
void SetLoadBalancer (const HostInfo &loadBalancer)
 Set the load balancer. More...
 
void SetOksofarAsAnswer (bool oksofarAsAnswer)
 
void SetRedirectAsAnswer (bool redirectAsAnswer)
 
void SetRedirectCounter (uint16_t redirectCounter)
 Set the redirect counter. More...
 
void SetStateful (bool stateful)
 
void WaitDone (time_t now)
 
XRootDStatus WriteMessageBody (Socket *socket, uint32_t &bytesWritten)
 
- Public Member Functions inherited from XrdCl::MsgHandler
virtual ~MsgHandler ()
 Event types that the message handler may receive. More...
 
virtual void OnReadyToSend (Message *msg)
 

Friends

class HandleRspJob
 

Additional Inherited Members

- Public Types inherited from XrdCl::MsgHandler
enum  Action {
  None = 0x0000 ,
  Nop = 0x0001 ,
  Ignore = 0x0002 ,
  RemoveHandler = 0x0004 ,
  Raw = 0x0008 ,
  NoProcess = 0x0010 ,
  Corrupted = 0x0020 ,
  More = 0x0040
}
 Actions to be taken after a message is processed by the handler. More...
 
enum  StreamEvent {
  Ready = 1 ,
  Broken = 2 ,
  Timeout = 3 ,
  FatalError = 4
}
 Events that may have occurred to the stream. More...
 

Detailed Description

Handle/Process/Forward XRootD messages.

Definition at line 119 of file XrdClXRootDMsgHandler.hh.

Constructor & Destructor Documentation

◆ XRootDMsgHandler()

XrdCl::XRootDMsgHandler::XRootDMsgHandler ( Message msg,
ResponseHandler respHandler,
const URL url,
std::shared_ptr< SIDManager sidMgr,
LocalFileHandler lFileHandler 
)
inline

Constructor

Parameters
msgmessage that has been sent out
respHandlerresponse handler to be called then the final final response arrives
urlthe url the message has been sent to
sidMgrthe sid manager used to allocate SID for the initial message

Definition at line 134 of file XrdClXRootDMsgHandler.hh.

138  :
139  pRequest( msg ),
140  pResponseHandler( respHandler ),
141  pUrl( *url ),
142  pEffectiveDataServerUrl( 0 ),
143  pSidMgr( sidMgr ),
144  pLFileHandler( lFileHandler ),
145  pExpiration( 0 ),
146  pRedirectAsAnswer( false ),
147  pOksofarAsAnswer( false ),
148  pHasLoadBalancer( false ),
149  pHasSessionId( false ),
150  pChunkList( 0 ),
151  pKBuff( 0 ),
152  pRedirectCounter( 0 ),
153  pNotAuthorizedCounter( 0 ),
154 
155  pAsyncOffset( 0 ),
156  pAsyncChunkIndex( 0 ),
157 
158  pPgWrtCksumBuff( 4 ),
159  pPgWrtCurrentPageOffset( 0 ),
160  pPgWrtCurrentPageNb( 0 ),
161 
162  pOtherRawStarted( false ),
163 
164  pFollowMetalink( false ),
165 
166  pStateful( false ),
167 
168  pAggregatedWaitTime( 0 ),
169 
170  pMsgInFly( false ),
171 
172  pTimeoutFence( false ),
173 
174  pDirListStarted( false ),
175  pDirListWithStat( false ),
176 
177  pCV( 0 ),
178 
179  pSslErrCnt( 0 )
180  {
181  pPostMaster = DefaultEnv::GetPostMaster();
182  if( msg->GetSessionId() )
183  pHasSessionId = true;
184 
185  Log *log = DefaultEnv::GetLog();
186  log->Debug( ExDbgMsg, "[%s] MsgHandler created: %p (message: %s ).",
187  pUrl.GetHostId().c_str(), this,
188  pRequest->GetObfuscatedDescription().c_str() );
189 
190  ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
191  if( ntohs( hdr->requestid ) == kXR_pgread )
192  {
193  ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
194  pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
195  ntohl( pgrdreq->rlen ) ) );
196  }
197 
198  if( ntohs( hdr->requestid ) == kXR_readv )
199  pBodyReader.reset( new AsyncVectorReader( *url, *pRequest ) );
200  else if( ntohs( hdr->requestid ) == kXR_read )
201  pBodyReader.reset( new AsyncRawReader( *url, *pRequest ) );
202  else
203  pBodyReader.reset( new AsyncDiscardReader( *url, *pRequest ) );
204  }
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_pgread
Definition: XProtocol.hh:142
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint64_t ExDbgMsg
XrdSysError Log
Definition: XrdConfig.cc:112

References XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Message::GetSessionId(), kXR_pgread, kXR_read, kXR_readv, ClientPgReadRequest::offset, ClientRequestHdr::requestid, and ClientPgReadRequest::rlen.

+ Here is the call graph for this function:

◆ ~XRootDMsgHandler()

XrdCl::XRootDMsgHandler::~XRootDMsgHandler ( )
inline

Destructor.

Definition at line 209 of file XrdClXRootDMsgHandler.hh.

210  {
211  DumpRedirectTraceBack();
212 
213  if( !pHasSessionId )
214  delete pRequest;
215  delete pEffectiveDataServerUrl;
216 
217  pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
218  pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
219  pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
220  pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
221  pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
222  pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
223 
224  Log *log = DefaultEnv::GetLog();
225  log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: %p.",
226  pUrl.GetHostId().c_str(), this );
227  }
std::vector< ChunkInfo > ChunkList
List of chunks.

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::URL::GetHostId(), and XrdCl::DefaultEnv::GetLog().

+ Here is the call graph for this function:

Member Function Documentation

◆ Examine()

uint16_t XrdCl::XRootDMsgHandler::Examine ( std::shared_ptr< Message > &  msg)
virtual

Examine an incoming message, and decide on the action to be taken

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 109 of file XrdClXRootDMsgHandler.cc.

110  {
111  //--------------------------------------------------------------------------
112  // if the MsgHandler is already being used to process another request
113  // (kXR_oksofar) we need to wait
114  //--------------------------------------------------------------------------
115  if( pOksofarAsAnswer )
116  {
117  XrdSysCondVarHelper lck( pCV );
118  while( pResponse ) pCV.Wait();
119  }
120  else
121  {
122  if( pResponse )
123  {
124  Log *log = DefaultEnv::GetLog();
125  log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
126  "it already owns a response: %p (message: %s ).",
127  pUrl.GetHostId().c_str(), this,
128  pRequest->GetObfuscatedDescription().c_str() );
129  }
130  }
131 
132  if( msg->GetSize() < 8 )
133  return Ignore;
134 
135  ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
136  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
137  uint16_t status = 0;
138  uint32_t dlen = 0;
139 
140  //--------------------------------------------------------------------------
141  // We only care about async responses, but those are extracted now
142  // in the SocketHandler.
143  //--------------------------------------------------------------------------
144  if( rsp->hdr.status == kXR_attn )
145  {
146  return Ignore;
147  }
148  //--------------------------------------------------------------------------
149  // We got a sync message - check if it belongs to us
150  //--------------------------------------------------------------------------
151  else
152  {
153  if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
154  rsp->hdr.streamid[1] != req->header.streamid[1] )
155  return Ignore;
156 
157  status = rsp->hdr.status;
158  dlen = rsp->hdr.dlen;
159  }
160 
161  //--------------------------------------------------------------------------
162  // We take the ownership of the message and decide what we will do
163  // with the handler itself, the options are:
164  // 1) we want to either read in raw mode (the Raw flag) or have the message
165  // body reconstructed for us by the TransportHandler by the time
166  // Process() is called (default, no extra flag)
167  // 2) we either got a full response in which case we don't want to be
168  // notified about anything anymore (RemoveHandler) or we got a partial
169  // answer and we need to wait for more (default, no extra flag)
170  //--------------------------------------------------------------------------
171  pResponse = msg;
172  pBodyReader->SetDataLength( dlen );
173 
174  Log *log = DefaultEnv::GetLog();
175  switch( status )
176  {
177  //------------------------------------------------------------------------
178  // Handle the cached cases
179  //------------------------------------------------------------------------
180  case kXR_error:
181  case kXR_redirect:
182  case kXR_wait:
183  return RemoveHandler;
184 
185  case kXR_waitresp:
186  {
187  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
188  "message %s", pUrl.GetHostId().c_str(),
189  pRequest->GetObfuscatedDescription().c_str() );
190 
191  pResponse.reset();
192  return Ignore; // This must be handled synchronously!
193  }
194 
195  //------------------------------------------------------------------------
196  // Handle the potential raw cases
197  //------------------------------------------------------------------------
198  case kXR_ok:
199  {
200  //----------------------------------------------------------------------
201  // For kXR_read we read in raw mode
202  //----------------------------------------------------------------------
203  uint16_t reqId = ntohs( req->header.requestid );
204  if( reqId == kXR_read )
205  {
206  return Raw | RemoveHandler;
207  }
208 
209  //----------------------------------------------------------------------
210  // kXR_readv is the same as kXR_read
211  //----------------------------------------------------------------------
212  if( reqId == kXR_readv )
213  {
214  return Raw | RemoveHandler;
215  }
216 
217  //----------------------------------------------------------------------
218  // For everything else we just take what we got
219  //----------------------------------------------------------------------
220  return RemoveHandler;
221  }
222 
223  //------------------------------------------------------------------------
224  // kXR_oksofars are special, they are not full responses, so we reset
225  // the response pointer to 0 and add the message to the partial list
226  //------------------------------------------------------------------------
227  case kXR_oksofar:
228  {
229  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
230  "%s", pUrl.GetHostId().c_str(),
231  pRequest->GetObfuscatedDescription().c_str() );
232 
233  if( !pOksofarAsAnswer )
234  {
235  pPartialResps.emplace_back( std::move( pResponse ) );
236  }
237 
238  //----------------------------------------------------------------------
239  // For kXR_read we either read in raw mode if the message has not
240  // been fully reconstructed already, if it has, we adjust
241  // the buffer offset to prepare for the next one
242  //----------------------------------------------------------------------
243  uint16_t reqId = ntohs( req->header.requestid );
244  if( reqId == kXR_read )
245  {
246  pTimeoutFence.store( true, std::memory_order_relaxed );
247  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
248  }
249 
250  //----------------------------------------------------------------------
251  // kXR_readv is similar to read, except that the payload is different
252  //----------------------------------------------------------------------
253  if( reqId == kXR_readv )
254  {
255  pTimeoutFence.store( true, std::memory_order_relaxed );
256  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
257  }
258 
259  return ( pOksofarAsAnswer ? None : NoProcess );
260  }
261 
262  case kXR_status:
263  {
264  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
265  "%s", pUrl.GetHostId().c_str(),
266  pRequest->GetObfuscatedDescription().c_str() );
267 
268  uint16_t reqId = ntohs( req->header.requestid );
269  if( reqId == kXR_pgwrite )
270  {
271  //--------------------------------------------------------------------
272  // In case of pgwrite by definition this wont be a partial response
273  // so we can already remove the handler from the in-queue
274  //--------------------------------------------------------------------
275  return RemoveHandler;
276  }
277 
278  //----------------------------------------------------------------------
279  // Otherwise (pgread), first of all we need to read the body of the
280  // kXR_status response, we can handle the raw data (if any) only after
281  // we have the whole kXR_status body
282  //----------------------------------------------------------------------
283  pTimeoutFence.store( true, std::memory_order_relaxed );
284  return None;
285  }
286 
287  //------------------------------------------------------------------------
288  // Default
289  //------------------------------------------------------------------------
290  default:
291  return RemoveHandler;
292  }
293  return RemoveHandler;
294  }
kXR_char streamid[2]
Definition: XProtocol.hh:156
kXR_char streamid[2]
Definition: XProtocol.hh:914
@ kXR_waitresp
Definition: XProtocol.hh:906
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_oksofar
Definition: XProtocol.hh:900
@ kXR_status
Definition: XProtocol.hh:907
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_attn
Definition: XProtocol.hh:901
@ kXR_wait
Definition: XProtocol.hh:905
@ kXR_error
Definition: XProtocol.hh:903
struct ClientRequestHdr header
Definition: XProtocol.hh:846
@ kXR_pgwrite
Definition: XProtocol.hh:138
ServerResponseHeader hdr
Definition: XProtocol.hh:1287
@ Ignore
Ignore the message.
const uint64_t XRootDMsg

References ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, kXR_attn, kXR_error, kXR_ok, kXR_oksofar, kXR_pgwrite, kXR_read, kXR_readv, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::MsgHandler::None, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseHeader::status, ClientRequestHdr::streamid, ServerResponseHeader::streamid, XrdSysCondVar::Wait(), XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ GetExpiration()

time_t XrdCl::XRootDMsgHandler::GetExpiration ( )
inlinevirtual

Get a timestamp after which we give up.

Implements XrdCl::MsgHandler.

Definition at line 330 of file XrdClXRootDMsgHandler.hh.

331  {
332  return pExpiration;
333  }

◆ GetRequest()

const Message* XrdCl::XRootDMsgHandler::GetRequest ( ) const
inline

Get the request pointer.

Definition at line 356 of file XrdClXRootDMsgHandler.hh.

357  {
358  return pRequest;
359  }

◆ GetSid()

uint16_t XrdCl::XRootDMsgHandler::GetSid ( ) const
virtual

Get handler sid

return sid of the corresponding request, otherwise 0

Implements XrdCl::MsgHandler.

Definition at line 389 of file XrdClXRootDMsgHandler.cc.

390  {
391  ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
392  return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
393  }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::XRootDMsgHandler::InspectStatusRsp ( )
virtual

Reexamine the incoming message, and decide on the action to be taken

In case of kXR_status the message can be only fully examined after reading the whole body (without raw data).

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 299 of file XrdClXRootDMsgHandler.cc.

300  {
301  if( !pResponse )
302  return 0;
303 
304  Log *log = DefaultEnv::GetLog();
305  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
306 
307  //--------------------------------------------------------------------------
308  // Additional action is only required for kXR_status
309  //--------------------------------------------------------------------------
310  if( rsp->hdr.status != kXR_status ) return 0;
311 
312  //--------------------------------------------------------------------------
313  // Ignore malformed status response
314  //--------------------------------------------------------------------------
315  if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
316  {
317  log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
318  return Corrupted;
319  }
320 
321  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
322  uint16_t reqId = ntohs( req->header.requestid );
323  //--------------------------------------------------------------------------
324  // Unmarshal the status body
325  //--------------------------------------------------------------------------
326  XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
327 
328  if( !st.IsOK() && st.code == errDataError )
329  {
330  log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
331  st.GetErrorMessage().c_str() );
332  return Corrupted;
333  }
334 
335  if( !st.IsOK() )
336  {
337  log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
338  pUrl.GetHostId().c_str() );
339  pStatus = st;
340  HandleRspOrQueue();
341  return Ignore;
342  }
343 
344  //--------------------------------------------------------------------------
345  // Common handling for partial results
346  //--------------------------------------------------------------------------
347  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
349  {
350  pPartialResps.push_back( std::move( pResponse ) );
351  }
352 
353  //--------------------------------------------------------------------------
354  // Decide the actions that we need to take
355  //--------------------------------------------------------------------------
356  uint16_t action = 0;
357  if( reqId == kXR_pgread )
358  {
359  //----------------------------------------------------------------------
360  // The message contains only Status header and body but no raw data
361  //----------------------------------------------------------------------
362  if( !pPageReader )
363  pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
364  pPageReader->SetRsp( rspst );
365 
366  action |= Raw;
367 
369  action |= NoProcess;
370  else
371  action |= RemoveHandler;
372  }
373  else if( reqId == kXR_pgwrite )
374  {
375  // if data corruption has been detected on the server side we will
376  // send some additional data pointing to the pages that need to be
377  // retransmitted
378  if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
379  pResponse->GetCursor() )
380  action |= More;
381  }
382 
383  return action;
384  }
ServerResponseStatus status
Definition: XProtocol.hh:1309
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1261
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1260
@ More
there are more (non-raw) data to be read
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
@ kXR_PartialResult
Definition: XProtocol.hh:1250

References ServerResponseStatus::bdy, XrdCl::Status::code, XrdCl::MsgHandler::Corrupted, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), ServerResponseStatus::hdr, ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, XrdCl::Status::IsOK(), XrdProto::kXR_PartialResult, kXR_pgread, kXR_pgwrite, kXR_status, XrdCl::MsgHandler::More, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseBody_Status::resptype, ServerResponseHeader::status, ServerResponseV2::status, XrdCl::XRootDTransport::UnMarshalStatusBody(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ IsRaw()

bool XrdCl::XRootDMsgHandler::IsRaw ( ) const
virtual

Are we a raw writer or not?

Reimplemented from XrdCl::MsgHandler.

Definition at line 929 of file XrdClXRootDMsgHandler.cc.

930  {
931  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
932  uint16_t reqId = ntohs( req->header.requestid );
933  if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
934  return true;
935  // checkpoint + execute
936  if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
937  {
938  ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
939  reqId = ntohs( xeq->header.requestid );
940  return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
941  }
942 
943  return false;
944  }
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_chkpoint
Definition: XProtocol.hh:124
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849

References ClientRequest::chkpoint, XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_chkpoint, kXR_ckpXeq, kXR_pgwrite, kXR_truncate, kXR_write, kXR_writev, ClientChkPointRequest::opcode, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ OnStatusReady()

void XrdCl::XRootDMsgHandler::OnStatusReady ( const Message message,
XRootDStatus  status 
)
virtual

The requested action has been performed and the status is available.

Implements XrdCl::MsgHandler.

Definition at line 896 of file XrdClXRootDMsgHandler.cc.

898  {
899  Log *log = DefaultEnv::GetLog();
900 
901  //--------------------------------------------------------------------------
902  // We were successful, so we now need to listen for a response
903  //--------------------------------------------------------------------------
904  if( status.IsOK() )
905  {
906  log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
907  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
908 
909  log->Debug( ExDbgMsg, "[%s] Moving MsgHandler: %p (message: %s ) from out-queue to in-queue.",
910  pUrl.GetHostId().c_str(), this,
911  pRequest->GetObfuscatedDescription().c_str() );
912 
913  pMsgInFly = true;
914  return;
915  }
916 
917  //--------------------------------------------------------------------------
918  // We have failed, recover if possible
919  //--------------------------------------------------------------------------
920  log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
921  "recover.", pUrl.GetHostId().c_str(),
922  message->GetObfuscatedDescription().c_str() );
923  HandleError( status );
924  }

References XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::ExDbgMsg, XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Status::IsOK(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ OnStreamEvent()

uint8_t XrdCl::XRootDMsgHandler::OnStreamEvent ( StreamEvent  event,
XRootDStatus  status 
)
virtual

Handle an event other that a message arrival

Parameters
eventtype of the event
statusstatus info

Reimplemented from XrdCl::MsgHandler.

Definition at line 859 of file XrdClXRootDMsgHandler.cc.

861  {
862  Log *log = DefaultEnv::GetLog();
863  log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
864  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
865 
866  if( event == Ready )
867  return 0;
868 
869  if( pTimeoutFence.load( std::memory_order_relaxed ) )
870  return 0;
871 
872  HandleError( status );
873  return RemoveHandler;
874  }
@ Ready
The stream has become connected.

References XrdCl::Log::Dump(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::MsgHandler::Ready, XrdCl::MsgHandler::RemoveHandler, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ PartialReceived()

void XrdCl::XRootDMsgHandler::PartialReceived ( )

Bookkeeping after partial response has been received:

  • take down the timeout fence after oksofar response has been handled
  • reset status-response-body marshaled flag

Definition at line 1106 of file XrdClXRootDMsgHandler.cc.

1107  {
1108  pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1109  }

Referenced by XrdCl::Stream::ForceError(), XrdCl::Stream::OnError(), and XrdCl::Stream::OnIncoming().

+ Here is the caller graph for this function:

◆ Process()

void XrdCl::XRootDMsgHandler::Process ( )
virtual

Process the message if it was "taken" by the examine action.

Process the message if it was "taken" by the examine action

Parameters
msgthe message to be processed

Reimplemented from XrdCl::MsgHandler.

Definition at line 398 of file XrdClXRootDMsgHandler.cc.

399  {
400  Log *log = DefaultEnv::GetLog();
401 
402  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
403 
404  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
405 
406  //--------------------------------------------------------------------------
407  // If it is a local file, it can be only a metalink redirector
408  //--------------------------------------------------------------------------
409  if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
410  pHosts->back().protocol = kXR_PROTOCOLVERSION;
411 
412  //--------------------------------------------------------------------------
413  // We got an answer, check who we were talking to
414  //--------------------------------------------------------------------------
415  else
416  {
417  AnyObject qryResult;
418  int *qryResponse = 0;
419  pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
420  qryResult.Get( qryResponse );
421  pHosts->back().flags = *qryResponse; delete qryResponse; qryResponse = 0;
422  pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
423  qryResult.Get( qryResponse );
424  pHosts->back().protocol = *qryResponse; delete qryResponse;
425  }
426 
427  //--------------------------------------------------------------------------
428  // Process the message
429  //--------------------------------------------------------------------------
430  Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
431  if( !st.IsOK() )
432  {
433  pStatus = Status( stFatal, errInvalidMessage );
434  HandleResponse();
435  return;
436  }
437 
438  //--------------------------------------------------------------------------
439  // we have an response for the message so it's not in fly anymore
440  //--------------------------------------------------------------------------
441  pMsgInFly = false;
442 
443  //--------------------------------------------------------------------------
444  // Reset the aggregated wait (used to omit wait response in case of Metalink
445  // redirector)
446  //--------------------------------------------------------------------------
447  if( rsp->hdr.status != kXR_wait )
448  pAggregatedWaitTime = 0;
449 
450  switch( rsp->hdr.status )
451  {
452  //------------------------------------------------------------------------
453  // kXR_ok - we're done here
454  //------------------------------------------------------------------------
455  case kXR_ok:
456  {
457  log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
458  pUrl.GetHostId().c_str(),
459  pRequest->GetObfuscatedDescription().c_str() );
460  pStatus = Status();
461  HandleResponse();
462  return;
463  }
464 
465  case kXR_status:
466  {
467  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
468  pUrl.GetHostId().c_str(),
469  pRequest->GetObfuscatedDescription().c_str() );
470  pStatus = Status();
471  HandleResponse();
472  return;
473  }
474 
475  //------------------------------------------------------------------------
476  // kXR_ok - we're serving partial result to the user
477  //------------------------------------------------------------------------
478  case kXR_oksofar:
479  {
480  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
481  pUrl.GetHostId().c_str(),
482  pRequest->GetObfuscatedDescription().c_str() );
483  pStatus = Status( stOK, suContinue );
484  HandleResponse();
485  return;
486  }
487 
488  //------------------------------------------------------------------------
489  // kXR_error - we've got a problem
490  //------------------------------------------------------------------------
491  case kXR_error:
492  {
493  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
494  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
495  log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
496  "[%d] %s", pUrl.GetHostId().c_str(),
497  pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
498  errmsg );
499  delete [] errmsg;
500 
501  HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
502  return;
503  }
504 
505  //------------------------------------------------------------------------
506  // kXR_redirect - they tell us to go elsewhere
507  //------------------------------------------------------------------------
508  case kXR_redirect:
509  {
510  if( rsp->hdr.dlen <= 4 )
511  {
512  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
513  pUrl.GetHostId().c_str() );
514  pStatus = Status( stError, errInvalidResponse );
515  HandleResponse();
516  return;
517  }
518 
519  char *urlInfoBuff = new char[rsp->hdr.dlen-3];
520  urlInfoBuff[rsp->hdr.dlen-4] = 0;
521  memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
522  std::string urlInfo = urlInfoBuff;
523  delete [] urlInfoBuff;
524  log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
525  "message %s: %s, port %d", pUrl.GetHostId().c_str(),
526  pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
527  rsp->body.redirect.port );
528 
529  //----------------------------------------------------------------------
530  // Check if we can proceed
531  //----------------------------------------------------------------------
532  if( !pRedirectCounter )
533  {
534  log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
535  "message %s, the last known error is: %s",
536  pUrl.GetHostId().c_str(),
537  pRequest->GetObfuscatedDescription().c_str(),
538  pLastError.ToString().c_str() );
539 
540 
541  pStatus = Status( stFatal, errRedirectLimit );
542  HandleResponse();
543  return;
544  }
545  --pRedirectCounter;
546 
547  //----------------------------------------------------------------------
548  // Keep the info about this server if we still need to find a load
549  // balancer
550  //----------------------------------------------------------------------
551  uint32_t flags = pHosts->back().flags;
552  if( !pHasLoadBalancer )
553  {
554  if( flags & kXR_isManager )
555  {
556  //------------------------------------------------------------------
557  // If the current server is a meta manager then it supersedes
558  // any existing load balancer, otherwise we assign a load-balancer
559  // only if it has not been already assigned
560  //------------------------------------------------------------------
561  if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
562  {
563  pLoadBalancer = pHosts->back();
564  log->Dump( XRootDMsg, "[%s] Current server has been assigned "
565  "as a load-balancer for message %s",
566  pUrl.GetHostId().c_str(),
567  pRequest->GetObfuscatedDescription().c_str() );
568  HostList::iterator it;
569  for( it = pHosts->begin(); it != pHosts->end(); ++it )
570  it->loadBalancer = false;
571  pHosts->back().loadBalancer = true;
572  }
573  }
574  }
575 
576  //----------------------------------------------------------------------
577  // If the redirect comes from a data server safe the URL because
578  // in case of a failure we will use it as the effective data server URL
579  // for the tried CGI opaque info
580  //----------------------------------------------------------------------
581  if( flags & kXR_isServer )
582  pEffectiveDataServerUrl = new URL( pHosts->back().url );
583 
584  //----------------------------------------------------------------------
585  // Build the URL and check it's validity
586  //----------------------------------------------------------------------
587  std::vector<std::string> urlComponents;
588  std::string newCgi;
589  Utils::splitString( urlComponents, urlInfo, "?" );
590 
591  std::ostringstream o;
592 
593  o << urlComponents[0];
594  if( rsp->body.redirect.port > 0 )
595  o << ":" << rsp->body.redirect.port << "/";
596  else if( rsp->body.redirect.port < 0 )
597  {
598  //--------------------------------------------------------------------
599  // check if the manager wants to enforce write recovery at himself
600  // (beware we are dealing here with negative flags)
601  //--------------------------------------------------------------------
602  if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
603  pHosts->back().flags |= kXR_recoverWrts;
604 
605  //--------------------------------------------------------------------
606  // check if the manager wants to collapse the communication channel
607  // (the redirect host is to replace the current host)
608  //--------------------------------------------------------------------
609  if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
610  {
611  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
612  pPostMaster->CollapseRedirect( pUrl, url );
613  }
614 
615  if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
616  {
617  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
618  if( Utils::CheckEC( pRequest, url ) )
619  pRedirectAsAnswer = true;
620  }
621  }
622 
623  URL newUrl = URL( o.str() );
624  if( !newUrl.IsValid() )
625  {
626  pStatus = Status( stError, errInvalidRedirectURL );
627  log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
628  pUrl.GetHostId().c_str(), urlInfo.c_str() );
629  HandleResponse();
630  return;
631  }
632 
633  if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
634  newUrl.SetUserName( pUrl.GetUserName() );
635 
636  if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
637  newUrl.SetPassword( pUrl.GetPassword() );
638 
639  //----------------------------------------------------------------------
640  // Forward any "xrd.*" params from the original client request also to
641  // the new redirection url
642  // Also, we need to preserve any "xrdcl.*' as they are important for
643  // our internal workflows.
644  //----------------------------------------------------------------------
645  std::ostringstream ossXrd;
646  const URL::ParamsMap &urlParams = pUrl.GetParams();
647 
648  for(URL::ParamsMap::const_iterator it = urlParams.begin();
649  it != urlParams.end(); ++it )
650  {
651  if( it->first.compare( 0, 4, "xrd." ) &&
652  it->first.compare( 0, 6, "xrdcl." ) )
653  continue;
654 
655  ossXrd << it->first << '=' << it->second << '&';
656  }
657 
658  std::string xrdCgi = ossXrd.str();
659  pRedirectUrl = newUrl.GetURL();
660 
661  URL cgiURL;
662  if( urlComponents.size() > 1 )
663  {
664  pRedirectUrl += "?";
665  pRedirectUrl += urlComponents[1];
666  std::ostringstream o;
667  o << "fake://fake:111//fake?";
668  o << urlComponents[1];
669 
670  if( urlComponents.size() == 3 )
671  o << '?' << urlComponents[2];
672 
673  if (!xrdCgi.empty())
674  {
675  o << '&' << xrdCgi;
676  pRedirectUrl += '&';
677  pRedirectUrl += xrdCgi;
678  }
679 
680  cgiURL = URL( o.str() );
681  }
682  else {
683  if (!xrdCgi.empty())
684  {
685  std::ostringstream o;
686  o << "fake://fake:111//fake?";
687  o << xrdCgi;
688  cgiURL = URL( o.str() );
689  pRedirectUrl += '?';
690  pRedirectUrl += xrdCgi;
691  }
692  }
693 
694  //----------------------------------------------------------------------
695  // Check if we need to return the URL as a response
696  //----------------------------------------------------------------------
697  if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
698  newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
699  !newUrl.IsLocalFile() )
700  pRedirectAsAnswer = true;
701 
702  if( pRedirectAsAnswer )
703  {
704  pStatus = Status( stError, errRedirect );
705  HandleResponse();
706  return;
707  }
708 
709  //----------------------------------------------------------------------
710  // Rewrite the message in a way required to send it to another server
711  //----------------------------------------------------------------------
712  newUrl.SetParams( cgiURL.GetParams() );
713  Status st = RewriteRequestRedirect( newUrl );
714  if( !st.IsOK() )
715  {
716  pStatus = st;
717  HandleResponse();
718  return;
719  }
720 
721  //----------------------------------------------------------------------
722  // Make sure we don't change the protocol by accident (root vs roots)
723  //----------------------------------------------------------------------
724  if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
725  ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
726  newUrl.SetProtocol( "roots" );
727 
728  //----------------------------------------------------------------------
729  // Send the request to the new location
730  //----------------------------------------------------------------------
731  HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
732  return;
733  }
734 
735  //------------------------------------------------------------------------
736  // kXR_wait - we wait, and re-issue the request later
737  //------------------------------------------------------------------------
738  case kXR_wait:
739  {
740  uint32_t waitSeconds = 0;
741 
742  if( rsp->hdr.dlen >= 4 )
743  {
744  char *infoMsg = new char[rsp->hdr.dlen-3];
745  infoMsg[rsp->hdr.dlen-4] = 0;
746  memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
747  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
748  "message %s: %s", pUrl.GetHostId().c_str(),
749  rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
750  infoMsg );
751  delete [] infoMsg;
752  waitSeconds = rsp->body.wait.seconds;
753  }
754  else
755  {
756  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
757  "message %s", pUrl.GetHostId().c_str(),
758  pRequest->GetObfuscatedDescription().c_str() );
759  }
760 
761  pAggregatedWaitTime += waitSeconds;
762 
763  // We need a special case if the data node comes from metalink
764  // redirector. In this case it might make more sense to try the
765  // next entry in the Metalink than wait.
766  if( OmitWait( *pRequest, pLoadBalancer.url ) )
767  {
768  int maxWait = DefaultMaxMetalinkWait;
769  DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
770  if( pAggregatedWaitTime > maxWait )
771  {
772  UpdateTriedCGI();
773  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
774  return;
775  }
776  }
777 
778  //----------------------------------------------------------------------
779  // Some messages require rewriting before they can be sent again
780  // after wait
781  //----------------------------------------------------------------------
782  Status st = RewriteRequestWait();
783  if( !st.IsOK() )
784  {
785  pStatus = st;
786  HandleResponse();
787  return;
788  }
789 
790  //----------------------------------------------------------------------
791  // Register a task to resend the message in some seconds, if we still
792  // have time to do that, and report a timeout otherwise
793  //----------------------------------------------------------------------
794  time_t resendTime = ::time(0)+waitSeconds;
795 
796  if( resendTime < pExpiration )
797  {
798  log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
799  pUrl.GetHostId().c_str(), this,
800  pRequest->GetObfuscatedDescription().c_str() );
801 
802  TaskManager *taskMgr = pPostMaster->GetTaskManager();
803  taskMgr->RegisterTask( new WaitTask( this ), resendTime );
804  }
805  else
806  {
807  log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
808  pUrl.GetHostId().c_str(),
809  pRequest->GetObfuscatedDescription().c_str() );
810  HandleError( Status( stError, errOperationExpired) );
811  }
812  return;
813  }
814 
815  //------------------------------------------------------------------------
816  // kXR_waitresp - the response will be returned in some seconds as an
817  // unsolicited message. Currently all messages of this type are handled
818  // one step before in the XrdClStream::OnIncoming as they need to be
819  // processed synchronously.
820  //------------------------------------------------------------------------
821  case kXR_waitresp:
822  {
823  if( rsp->hdr.dlen < 4 )
824  {
825  log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
826  pUrl.GetHostId().c_str() );
827  pStatus = Status( stError, errInvalidResponse );
828  HandleResponse();
829  return;
830  }
831 
832  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
833  "message %s", pUrl.GetHostId().c_str(),
834  rsp->body.waitresp.seconds,
835  pRequest->GetObfuscatedDescription().c_str() );
836  return;
837  }
838 
839  //------------------------------------------------------------------------
840  // Default - unrecognized/unsupported response, declare an error
841  //------------------------------------------------------------------------
842  default:
843  {
844  log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
845  "message %s", pUrl.GetHostId().c_str(),
846  rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
847  pStatus = Status( stError, errInvalidResponse );
848  HandleResponse();
849  return;
850  }
851  }
852 
853  return;
854  }
#define kXR_isManager
Definition: XProtocol.hh:1156
union ServerResponse::@0 body
#define kXR_collapseRedir
Definition: XProtocol.hh:1167
#define kXR_attrMeta
Definition: XProtocol.hh:1159
#define kXR_recoverWrts
Definition: XProtocol.hh:1166
#define kXR_isServer
Definition: XProtocol.hh:1157
#define kXR_PROTOCOLVERSION
Definition: XProtocol.hh:70
#define kXR_ecRedir
Definition: XProtocol.hh:1168
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
void RegisterTask(Task *task, time_t time, bool own=true)
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:458
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
const std::string & GetUserName() const
Get the username.
Definition: XrdClURL.hh:135
const std::string & GetPassword() const
Get the password.
Definition: XrdClURL.hh:153
bool IsLocalFile() const
Definition: XrdClURL.cc:467
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:445
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Definition: XrdClUtils.cc:703
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
const uint16_t errRedirectLimit
Definition: XrdClStatus.hh:102
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errInvalidRedirectURL
Definition: XrdClStatus.hh:98
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85
URL url
URL of the host.
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version

References ServerResponse::body, XrdCl::Utils::CheckEC(), XrdCl::PostMaster::CollapseRedirect(), XrdCl::Log::Debug(), XrdCl::DefaultMaxMetalinkWait, ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::RedirectEntry::EntryRedirect, XrdCl::RedirectEntry::EntryRedirectOnWait, XrdCl::errErrorResponse, XrdCl::errInvalidMessage, XrdCl::errInvalidRedirectURL, XrdCl::errInvalidResponse, XrdCl::errOperationExpired, XrdCl::Log::Error(), XrdCl::errRedirect, XrdCl::errRedirectLimit, XrdCl::ExDbgMsg, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetEnv(), XrdCl::URL::GetHostId(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::URL::GetParams(), XrdCl::URL::GetPassword(), XrdCl::URL::GetProtocol(), XrdCl::PostMaster::GetTaskManager(), XrdCl::URL::GetURL(), XrdCl::URL::GetUserName(), ServerResponse::hdr, ClientRequest::header, XrdCl::URL::IsLocalFile(), XrdCl::URL::IsMetalink(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), kXR_attrMeta, kXR_collapseRedir, kXR_ecRedir, kXR_error, kXR_isManager, kXR_isServer, kXR_ok, kXR_oksofar, kXR_PROTOCOLVERSION, kXR_recoverWrts, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::PostMaster::QueryTransport(), XrdCl::TaskManager::RegisterTask(), ClientRequestHdr::requestid, XrdCl::XRootDQuery::ServerFlags, XrdCl::URL::SetParams(), XrdCl::URL::SetPassword(), XrdCl::URL::SetProtocol(), XrdCl::URL::SetUserName(), XrdCl::Utils::splitString(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stFatal, XrdCl::stOK, XrdCl::suContinue, XrdCl::Status::ToString(), XrdCl::XRootDTransport::UnMarshallBody(), XrdCl::HostInfo::url, XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ ReadMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::ReadMessageBody ( Message msg,
Socket socket,
uint32_t &  bytesRead 
)
virtual

Read message body directly from a socket - called if Examine returns Raw flag - only socket related errors may be returned here

Parameters
msgthe corresponding message header
socketthe socket to read from
bytesReadnumber of bytes read by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data is needed stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 879 of file XrdClXRootDMsgHandler.cc.

882  {
883  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
884  uint16_t reqId = ntohs( req->header.requestid );
885 
886  if( reqId == kXR_pgread )
887  return pPageReader->Read( *socket, bytesRead );
888 
889  return pBodyReader->Read( *socket, bytesRead );
890  }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_pgread, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ SetChunkList()

void XrdCl::XRootDMsgHandler::SetChunkList ( ChunkList chunkList)
inline

Set the chunk list.

Definition at line 383 of file XrdClXRootDMsgHandler.hh.

384  {
385  pChunkList = chunkList;
386  if( pBodyReader )
387  pBodyReader->SetChunkList( chunkList );
388  if( chunkList )
389  pChunkStatus.resize( chunkList->size() );
390  else
391  pChunkStatus.clear();
392  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetCrc32cDigests()

void XrdCl::XRootDMsgHandler::SetCrc32cDigests ( std::vector< uint32_t > &&  crc32cDigests)
inline

Definition at line 394 of file XrdClXRootDMsgHandler.hh.

395  {
396  pCrc32cDigests = std::move( crc32cDigests );
397  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetExpiration()

void XrdCl::XRootDMsgHandler::SetExpiration ( time_t  expiration)
inline

Set a timestamp after which we give up.

Definition at line 322 of file XrdClXRootDMsgHandler.hh.

323  {
324  pExpiration = expiration;
325  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetFollowMetalink()

void XrdCl::XRootDMsgHandler::SetFollowMetalink ( bool  followMetalink)
inline

Definition at line 415 of file XrdClXRootDMsgHandler.hh.

416  {
417  pFollowMetalink = followMetalink;
418  }

Referenced by XrdCl::MessageUtils::RedirectMessage().

+ Here is the caller graph for this function:

◆ SetHostList()

void XrdCl::XRootDMsgHandler::SetHostList ( HostList hostList)
inline

Set host list.

Definition at line 375 of file XrdClXRootDMsgHandler.hh.

376  {
377  pHosts.reset( hostList );
378  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetKernelBuffer()

void XrdCl::XRootDMsgHandler::SetKernelBuffer ( XrdSys::KernelBuffer kbuff)
inline

Set the kernel buffer.

Definition at line 402 of file XrdClXRootDMsgHandler.hh.

403  {
404  pKBuff = kbuff;
405  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetLoadBalancer()

void XrdCl::XRootDMsgHandler::SetLoadBalancer ( const HostInfo loadBalancer)
inline

Set the load balancer.

Definition at line 364 of file XrdClXRootDMsgHandler.hh.

365  {
366  if( !loadBalancer.url.IsValid() )
367  return;
368  pLoadBalancer = loadBalancer;
369  pHasLoadBalancer = true;
370  }

References XrdCl::URL::IsValid(), and XrdCl::HostInfo::url.

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetOksofarAsAnswer()

void XrdCl::XRootDMsgHandler::SetOksofarAsAnswer ( bool  oksofarAsAnswer)
inline

Treat the kXR_oksofar response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 348 of file XrdClXRootDMsgHandler.hh.

349  {
350  pOksofarAsAnswer = oksofarAsAnswer;
351  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectAsAnswer()

void XrdCl::XRootDMsgHandler::SetRedirectAsAnswer ( bool  redirectAsAnswer)
inline

Treat the kXR_redirect response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 339 of file XrdClXRootDMsgHandler.hh.

340  {
341  pRedirectAsAnswer = redirectAsAnswer;
342  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectCounter()

void XrdCl::XRootDMsgHandler::SetRedirectCounter ( uint16_t  redirectCounter)
inline

Set the redirect counter.

Definition at line 410 of file XrdClXRootDMsgHandler.hh.

411  {
412  pRedirectCounter = redirectCounter;
413  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetStateful()

void XrdCl::XRootDMsgHandler::SetStateful ( bool  stateful)
inline

Definition at line 420 of file XrdClXRootDMsgHandler.hh.

421  {
422  pStateful = stateful;
423  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ WaitDone()

void XrdCl::XRootDMsgHandler::WaitDone ( time_t  now)

Called after the wait time for kXR_wait has elapsed

Parameters
nowcurrent timestamp

Definition at line 1098 of file XrdClXRootDMsgHandler.cc.

1099  {
1100  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1101  }

References XrdCl::RedirectEntry::EntryWait.

◆ WriteMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::WriteMessageBody ( Socket socket,
uint32_t &  bytesWritten 
)
virtual

Write message body directly to a socket - called if IsRaw returns true - only socket related errors may be returned here

Parameters
socketthe socket to read from
bytesWrittennumber of bytes written by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data needs to be written stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 949 of file XrdClXRootDMsgHandler.cc.

951  {
952  //--------------------------------------------------------------------------
953  // First check if it is a PgWrite
954  //--------------------------------------------------------------------------
955  if( !pChunkList->empty() && !pCrc32cDigests.empty() )
956  {
957  //------------------------------------------------------------------------
958  // PgWrite will have just one chunk
959  //------------------------------------------------------------------------
960  ChunkInfo chunk = pChunkList->front();
961  //------------------------------------------------------------------------
962  // Calculate the size of the first and last page (in case the chunk is not
963  // 4KB aligned)
964  //------------------------------------------------------------------------
965  int fLen = 0, lLen = 0;
966  size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
967 
968  //------------------------------------------------------------------------
969  // Set the crc32c buffer if not ready yet
970  //------------------------------------------------------------------------
971  if( pPgWrtCksumBuff.GetCursor() == 0 )
972  {
973  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
974  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
975  }
976 
977  uint32_t btsLeft = chunk.length - pAsyncOffset;
978  uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
979  if( pglen > btsLeft ) pglen = btsLeft;
980  char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
981 
982  while( btsLeft > 0 )
983  {
984  // first write the crc32c digest
985  while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
986  {
987  uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
988  char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
989  int btswrt = 0;
990  Status st = socket->Send( dgstbuf, dgstlen, btswrt );
991  if( !st.IsOK() ) return st;
992  bytesWritten += btswrt;
993  pPgWrtCksumBuff.AdvanceCursor( btswrt );
994  if( st.code == suRetry ) return st;
995  }
996  // then write the raw data (one page)
997  int btswrt = 0;
998  Status st = socket->Send( pgbuf, pglen, btswrt );
999  if( !st.IsOK() ) return st;
1000  pgbuf += btswrt;
1001  pglen -= btswrt;
1002  btsLeft -= btswrt;
1003  bytesWritten += btswrt;
1004  pAsyncOffset += btswrt; // update the offset to the raw data
1005  if( st.code == suRetry ) return st;
1006  // if we managed to write all the data ...
1007  if( pglen == 0 )
1008  {
1009  // move to the next page
1010  ++pPgWrtCurrentPageNb;
1011  if( pPgWrtCurrentPageNb < nbpgs )
1012  {
1013  // set the digest buffer
1014  pPgWrtCksumBuff.SetCursor( 0 );
1015  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1016  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1017  }
1018  // set the page length
1019  pglen = XrdSys::PageSize;
1020  if( pglen > btsLeft ) pglen = btsLeft;
1021  // reset offset in the current page
1022  pPgWrtCurrentPageOffset = 0;
1023  }
1024  else
1025  // otherwise just adjust the offset in the current page
1026  pPgWrtCurrentPageOffset += btswrt;
1027 
1028  }
1029  }
1030  else if( !pChunkList->empty() )
1031  {
1032  size_t size = pChunkList->size();
1033  for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1034  {
1035  char *buffer = (char*)(*pChunkList)[i].buffer;
1036  uint32_t size = (*pChunkList)[i].length;
1037  size_t leftToBeWritten = size - pAsyncOffset;
1038 
1039  while( leftToBeWritten )
1040  {
1041  int btswrt = 0;
1042  Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1043  bytesWritten += btswrt;
1044  if( !st.IsOK() || st.code == suRetry ) return st;
1045  pAsyncOffset += btswrt;
1046  leftToBeWritten -= btswrt;
1047  }
1048  //----------------------------------------------------------------------
1049  // Remember that we have moved to the next chunk, also clear the offset
1050  // within the buffer as we are going to move to a new one
1051  //----------------------------------------------------------------------
1052  ++pAsyncChunkIndex;
1053  pAsyncOffset = 0;
1054  }
1055  }
1056  else
1057  {
1058  Log *log = DefaultEnv::GetLog();
1059 
1060  //------------------------------------------------------------------------
1061  // If the socket is encrypted we cannot use a kernel buffer, we have to
1062  // convert to user space buffer
1063  //------------------------------------------------------------------------
1064  if( socket->IsEncrypted() )
1065  {
1066  log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1067  pUrl.GetHostId().c_str() );
1068 
1069  char *ubuff = 0;
1070  ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1071  if( ret < 0 ) return Status( stError, errInternal );
1072  pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1073  return WriteMessageBody( socket, bytesWritten );
1074  }
1075 
1076  //------------------------------------------------------------------------
1077  // Send the data
1078  //------------------------------------------------------------------------
1079  while( !pKBuff->Empty() )
1080  {
1081  int btswrt = 0;
1082  Status st = socket->Send( *pKBuff, btswrt );
1083  bytesWritten += btswrt;
1084  if( !st.IsOK() || st.code == suRetry ) return st;
1085  }
1086 
1087  log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1088  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1089  }
1090 
1091  return Status();
1092  }
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)

References XrdCl::Buffer::AdvanceCursor(), XrdCl::ChunkInfo::buffer, XrdCl::Status::code, XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdSys::KernelBuffer::Empty(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Socket::IsEncrypted(), XrdCl::Status::IsOK(), XrdCl::ChunkInfo::length, XrdSys::Move(), XrdCl::ChunkInfo::offset, XrdSys::PageSize, XrdCl::Socket::Send(), XrdCl::Buffer::SetCursor(), XrdCl::stError, XrdCl::suRetry, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

Friends And Related Function Documentation

◆ HandleRspJob

friend class HandleRspJob
friend

Definition at line 121 of file XrdClXRootDMsgHandler.hh.


The documentation for this class was generated from the following files: