XRootD
XrdClXRootDMsgHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClDefaultEnv.hh"
28 #include "XrdCl/XrdClConstants.hh"
30 #include "XrdCl/XrdClMessage.hh"
31 #include "XrdCl/XrdClURL.hh"
32 #include "XrdCl/XrdClUtils.hh"
34 #include "XrdCl/XrdClJobManager.hh"
35 #include "XrdCl/XrdClSIDManager.hh"
39 #include "XrdCl/XrdClSocket.hh"
40 #include "XrdCl/XrdClTls.hh"
41 #include "XrdCl/XrdClOptimizers.hh"
42 
43 #include "XrdOuc/XrdOucCRC.hh"
45 
46 #include "XrdSys/XrdSysPlatform.hh" // same as above
47 #include "XrdSys/XrdSysAtomics.hh"
48 #include "XrdSys/XrdSysPthread.hh"
49 #include <memory>
50 #include <sstream>
51 #include <numeric>
52 
53 namespace
54 {
55  //----------------------------------------------------------------------------
56  // We need an extra task what will run the handler in the future, because
57  // tasks get deleted and we need the handler
58  //----------------------------------------------------------------------------
59  class WaitTask: public XrdCl::Task
60  {
61  public:
62  WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
63  {
64  std::ostringstream o;
65  o << "WaitTask for: 0x" << handler->GetRequest();
66  SetName( o.str() );
67  }
68 
69  virtual time_t Run( time_t now )
70  {
71  pHandler->WaitDone( now );
72  return 0;
73  }
74  private:
75  XrdCl::XRootDMsgHandler *pHandler;
76  };
77 }
78 
79 namespace XrdCl
80 {
81  //----------------------------------------------------------------------------
82  // Delegate the response handling to the thread-pool
83  //----------------------------------------------------------------------------
84  class HandleRspJob: public XrdCl::Job
85  {
86  public:
87  HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
88  {
89 
90  }
91 
92  virtual ~HandleRspJob()
93  {
94 
95  }
96 
97  virtual void Run( void *arg )
98  {
99  pHandler->HandleResponse();
100  delete this;
101  }
102  private:
103  XrdCl::XRootDMsgHandler *pHandler;
104  };
105 
106  //----------------------------------------------------------------------------
107  // Examine an incoming message, and decide on the action to be taken
108  //----------------------------------------------------------------------------
109  uint16_t XRootDMsgHandler::Examine( std::shared_ptr<Message> &msg )
110  {
111  //--------------------------------------------------------------------------
112  // if the MsgHandler is already being used to process another request
113  // (kXR_oksofar) we need to wait
114  //--------------------------------------------------------------------------
115  if( pOksofarAsAnswer )
116  {
117  XrdSysCondVarHelper lck( pCV );
118  while( pResponse ) pCV.Wait();
119  }
120  else
121  {
122  if( pResponse )
123  {
124  Log *log = DefaultEnv::GetLog();
125  log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
126  "it already owns a response: %p (message: %s ).",
127  pUrl.GetHostId().c_str(), this,
128  pRequest->GetObfuscatedDescription().c_str() );
129  }
130  }
131 
132  if( msg->GetSize() < 8 )
133  return Ignore;
134 
135  ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
136  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
137  uint16_t status = 0;
138  uint32_t dlen = 0;
139 
140  //--------------------------------------------------------------------------
141  // We only care about async responses, but those are extracted now
142  // in the SocketHandler.
143  //--------------------------------------------------------------------------
144  if( rsp->hdr.status == kXR_attn )
145  {
146  return Ignore;
147  }
148  //--------------------------------------------------------------------------
149  // We got a sync message - check if it belongs to us
150  //--------------------------------------------------------------------------
151  else
152  {
153  if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
154  rsp->hdr.streamid[1] != req->header.streamid[1] )
155  return Ignore;
156 
157  status = rsp->hdr.status;
158  dlen = rsp->hdr.dlen;
159  }
160 
161  //--------------------------------------------------------------------------
162  // We take the ownership of the message and decide what we will do
163  // with the handler itself, the options are:
164  // 1) we want to either read in raw mode (the Raw flag) or have the message
165  // body reconstructed for us by the TransportHandler by the time
166  // Process() is called (default, no extra flag)
167  // 2) we either got a full response in which case we don't want to be
168  // notified about anything anymore (RemoveHandler) or we got a partial
169  // answer and we need to wait for more (default, no extra flag)
170  //--------------------------------------------------------------------------
171  pResponse = msg;
172  pBodyReader->SetDataLength( dlen );
173 
174  Log *log = DefaultEnv::GetLog();
175  switch( status )
176  {
177  //------------------------------------------------------------------------
178  // Handle the cached cases
179  //------------------------------------------------------------------------
180  case kXR_error:
181  case kXR_redirect:
182  case kXR_wait:
183  return RemoveHandler;
184 
185  case kXR_waitresp:
186  {
187  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
188  "message %s", pUrl.GetHostId().c_str(),
189  pRequest->GetObfuscatedDescription().c_str() );
190 
191  pResponse.reset();
192  return Ignore; // This must be handled synchronously!
193  }
194 
195  //------------------------------------------------------------------------
196  // Handle the potential raw cases
197  //------------------------------------------------------------------------
198  case kXR_ok:
199  {
200  //----------------------------------------------------------------------
201  // For kXR_read we read in raw mode
202  //----------------------------------------------------------------------
203  uint16_t reqId = ntohs( req->header.requestid );
204  if( reqId == kXR_read )
205  {
206  return Raw | RemoveHandler;
207  }
208 
209  //----------------------------------------------------------------------
210  // kXR_readv is the same as kXR_read
211  //----------------------------------------------------------------------
212  if( reqId == kXR_readv )
213  {
214  return Raw | RemoveHandler;
215  }
216 
217  //----------------------------------------------------------------------
218  // For everything else we just take what we got
219  //----------------------------------------------------------------------
220  return RemoveHandler;
221  }
222 
223  //------------------------------------------------------------------------
224  // kXR_oksofars are special, they are not full responses, so we reset
225  // the response pointer to 0 and add the message to the partial list
226  //------------------------------------------------------------------------
227  case kXR_oksofar:
228  {
229  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
230  "%s", pUrl.GetHostId().c_str(),
231  pRequest->GetObfuscatedDescription().c_str() );
232 
233  if( !pOksofarAsAnswer )
234  {
235  pPartialResps.emplace_back( std::move( pResponse ) );
236  }
237 
238  //----------------------------------------------------------------------
239  // For kXR_read we either read in raw mode if the message has not
240  // been fully reconstructed already, if it has, we adjust
241  // the buffer offset to prepare for the next one
242  //----------------------------------------------------------------------
243  uint16_t reqId = ntohs( req->header.requestid );
244  if( reqId == kXR_read )
245  {
246  pTimeoutFence.store( true, std::memory_order_relaxed );
247  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
248  }
249 
250  //----------------------------------------------------------------------
251  // kXR_readv is similar to read, except that the payload is different
252  //----------------------------------------------------------------------
253  if( reqId == kXR_readv )
254  {
255  pTimeoutFence.store( true, std::memory_order_relaxed );
256  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
257  }
258 
259  return ( pOksofarAsAnswer ? None : NoProcess );
260  }
261 
262  case kXR_status:
263  {
264  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
265  "%s", pUrl.GetHostId().c_str(),
266  pRequest->GetObfuscatedDescription().c_str() );
267 
268  uint16_t reqId = ntohs( req->header.requestid );
269  if( reqId == kXR_pgwrite )
270  {
271  //--------------------------------------------------------------------
272  // In case of pgwrite by definition this wont be a partial response
273  // so we can already remove the handler from the in-queue
274  //--------------------------------------------------------------------
275  return RemoveHandler;
276  }
277 
278  //----------------------------------------------------------------------
279  // Otherwise (pgread), first of all we need to read the body of the
280  // kXR_status response, we can handle the raw data (if any) only after
281  // we have the whole kXR_status body
282  //----------------------------------------------------------------------
283  pTimeoutFence.store( true, std::memory_order_relaxed );
284  return None;
285  }
286 
287  //------------------------------------------------------------------------
288  // Default
289  //------------------------------------------------------------------------
290  default:
291  return RemoveHandler;
292  }
293  return RemoveHandler;
294  }
295 
296  //----------------------------------------------------------------------------
297  // Reexamine the incoming message, and decide on the action to be taken
298  //----------------------------------------------------------------------------
300  {
301  if( !pResponse )
302  return 0;
303 
304  Log *log = DefaultEnv::GetLog();
305  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
306 
307  //--------------------------------------------------------------------------
308  // Additional action is only required for kXR_status
309  //--------------------------------------------------------------------------
310  if( rsp->hdr.status != kXR_status ) return 0;
311 
312  //--------------------------------------------------------------------------
313  // Ignore malformed status response
314  //--------------------------------------------------------------------------
315  if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
316  {
317  log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
318  return Corrupted;
319  }
320 
321  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
322  uint16_t reqId = ntohs( req->header.requestid );
323  //--------------------------------------------------------------------------
324  // Unmarshal the status body
325  //--------------------------------------------------------------------------
326  XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
327 
328  if( !st.IsOK() && st.code == errDataError )
329  {
330  log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
331  st.GetErrorMessage().c_str() );
332  return Corrupted;
333  }
334 
335  if( !st.IsOK() )
336  {
337  log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
338  pUrl.GetHostId().c_str() );
339  pStatus = st;
340  HandleRspOrQueue();
341  return Ignore;
342  }
343 
344  //--------------------------------------------------------------------------
345  // Common handling for partial results
346  //--------------------------------------------------------------------------
347  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
349  {
350  pPartialResps.push_back( std::move( pResponse ) );
351  }
352 
353  //--------------------------------------------------------------------------
354  // Decide the actions that we need to take
355  //--------------------------------------------------------------------------
356  uint16_t action = 0;
357  if( reqId == kXR_pgread )
358  {
359  //----------------------------------------------------------------------
360  // The message contains only Status header and body but no raw data
361  //----------------------------------------------------------------------
362  if( !pPageReader )
363  pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
364  pPageReader->SetRsp( rspst );
365 
366  action |= Raw;
367 
369  action |= NoProcess;
370  else
371  action |= RemoveHandler;
372  }
373  else if( reqId == kXR_pgwrite )
374  {
375  // if data corruption has been detected on the server side we will
376  // send some additional data pointing to the pages that need to be
377  // retransmitted
378  if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
379  pResponse->GetCursor() )
380  action |= More;
381  }
382 
383  return action;
384  }
385 
386  //----------------------------------------------------------------------------
387  // Get handler sid
388  //----------------------------------------------------------------------------
389  uint16_t XRootDMsgHandler::GetSid() const
390  {
391  ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
392  return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
393  }
394 
395  //----------------------------------------------------------------------------
397  //----------------------------------------------------------------------------
399  {
400  Log *log = DefaultEnv::GetLog();
401 
402  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
403 
404  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
405 
406  //--------------------------------------------------------------------------
407  // If it is a local file, it can be only a metalink redirector
408  //--------------------------------------------------------------------------
409  if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
410  pHosts->back().protocol = kXR_PROTOCOLVERSION;
411 
412  //--------------------------------------------------------------------------
413  // We got an answer, check who we were talking to
414  //--------------------------------------------------------------------------
415  else
416  {
417  AnyObject qryResult;
418  int *qryResponse = 0;
419  pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
420  qryResult.Get( qryResponse );
421  pHosts->back().flags = *qryResponse; delete qryResponse; qryResponse = 0;
422  pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
423  qryResult.Get( qryResponse );
424  pHosts->back().protocol = *qryResponse; delete qryResponse;
425  }
426 
427  //--------------------------------------------------------------------------
428  // Process the message
429  //--------------------------------------------------------------------------
430  Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
431  if( !st.IsOK() )
432  {
433  pStatus = Status( stFatal, errInvalidMessage );
434  HandleResponse();
435  return;
436  }
437 
438  //--------------------------------------------------------------------------
439  // we have an response for the message so it's not in fly anymore
440  //--------------------------------------------------------------------------
441  pMsgInFly = false;
442 
443  //--------------------------------------------------------------------------
444  // Reset the aggregated wait (used to omit wait response in case of Metalink
445  // redirector)
446  //--------------------------------------------------------------------------
447  if( rsp->hdr.status != kXR_wait )
448  pAggregatedWaitTime = 0;
449 
450  switch( rsp->hdr.status )
451  {
452  //------------------------------------------------------------------------
453  // kXR_ok - we're done here
454  //------------------------------------------------------------------------
455  case kXR_ok:
456  {
457  log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
458  pUrl.GetHostId().c_str(),
459  pRequest->GetObfuscatedDescription().c_str() );
460  pStatus = Status();
461  HandleResponse();
462  return;
463  }
464 
465  case kXR_status:
466  {
467  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
468  pUrl.GetHostId().c_str(),
469  pRequest->GetObfuscatedDescription().c_str() );
470  pStatus = Status();
471  HandleResponse();
472  return;
473  }
474 
475  //------------------------------------------------------------------------
476  // kXR_ok - we're serving partial result to the user
477  //------------------------------------------------------------------------
478  case kXR_oksofar:
479  {
480  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
481  pUrl.GetHostId().c_str(),
482  pRequest->GetObfuscatedDescription().c_str() );
483  pStatus = Status( stOK, suContinue );
484  HandleResponse();
485  return;
486  }
487 
488  //------------------------------------------------------------------------
489  // kXR_error - we've got a problem
490  //------------------------------------------------------------------------
491  case kXR_error:
492  {
493  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
494  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
495  log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
496  "[%d] %s", pUrl.GetHostId().c_str(),
497  pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
498  errmsg );
499  delete [] errmsg;
500 
501  HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
502  return;
503  }
504 
505  //------------------------------------------------------------------------
506  // kXR_redirect - they tell us to go elsewhere
507  //------------------------------------------------------------------------
508  case kXR_redirect:
509  {
510  if( rsp->hdr.dlen <= 4 )
511  {
512  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
513  pUrl.GetHostId().c_str() );
514  pStatus = Status( stError, errInvalidResponse );
515  HandleResponse();
516  return;
517  }
518 
519  char *urlInfoBuff = new char[rsp->hdr.dlen-3];
520  urlInfoBuff[rsp->hdr.dlen-4] = 0;
521  memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
522  std::string urlInfo = urlInfoBuff;
523  delete [] urlInfoBuff;
524  log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
525  "message %s: %s, port %d", pUrl.GetHostId().c_str(),
526  pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
527  rsp->body.redirect.port );
528 
529  //----------------------------------------------------------------------
530  // Check if we can proceed
531  //----------------------------------------------------------------------
532  if( !pRedirectCounter )
533  {
534  log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
535  "message %s, the last known error is: %s",
536  pUrl.GetHostId().c_str(),
537  pRequest->GetObfuscatedDescription().c_str(),
538  pLastError.ToString().c_str() );
539 
540 
541  pStatus = Status( stFatal, errRedirectLimit );
542  HandleResponse();
543  return;
544  }
545  --pRedirectCounter;
546 
547  //----------------------------------------------------------------------
548  // Keep the info about this server if we still need to find a load
549  // balancer
550  //----------------------------------------------------------------------
551  uint32_t flags = pHosts->back().flags;
552  if( !pHasLoadBalancer )
553  {
554  if( flags & kXR_isManager )
555  {
556  //------------------------------------------------------------------
557  // If the current server is a meta manager then it supersedes
558  // any existing load balancer, otherwise we assign a load-balancer
559  // only if it has not been already assigned
560  //------------------------------------------------------------------
561  if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
562  {
563  pLoadBalancer = pHosts->back();
564  log->Dump( XRootDMsg, "[%s] Current server has been assigned "
565  "as a load-balancer for message %s",
566  pUrl.GetHostId().c_str(),
567  pRequest->GetObfuscatedDescription().c_str() );
568  HostList::iterator it;
569  for( it = pHosts->begin(); it != pHosts->end(); ++it )
570  it->loadBalancer = false;
571  pHosts->back().loadBalancer = true;
572  }
573  }
574  }
575 
576  //----------------------------------------------------------------------
577  // If the redirect comes from a data server safe the URL because
578  // in case of a failure we will use it as the effective data server URL
579  // for the tried CGI opaque info
580  //----------------------------------------------------------------------
581  if( flags & kXR_isServer )
582  pEffectiveDataServerUrl = new URL( pHosts->back().url );
583 
584  //----------------------------------------------------------------------
585  // Build the URL and check it's validity
586  //----------------------------------------------------------------------
587  std::vector<std::string> urlComponents;
588  std::string newCgi;
589  Utils::splitString( urlComponents, urlInfo, "?" );
590 
591  std::ostringstream o;
592 
593  o << urlComponents[0];
594  if( rsp->body.redirect.port > 0 )
595  o << ":" << rsp->body.redirect.port << "/";
596  else if( rsp->body.redirect.port < 0 )
597  {
598  //--------------------------------------------------------------------
599  // check if the manager wants to enforce write recovery at himself
600  // (beware we are dealing here with negative flags)
601  //--------------------------------------------------------------------
602  if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
603  pHosts->back().flags |= kXR_recoverWrts;
604 
605  //--------------------------------------------------------------------
606  // check if the manager wants to collapse the communication channel
607  // (the redirect host is to replace the current host)
608  //--------------------------------------------------------------------
609  if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
610  {
611  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
612  pPostMaster->CollapseRedirect( pUrl, url );
613  }
614 
615  if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
616  {
617  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
618  if( Utils::CheckEC( pRequest, url ) )
619  pRedirectAsAnswer = true;
620  }
621  }
622 
623  URL newUrl = URL( o.str() );
624  if( !newUrl.IsValid() )
625  {
626  pStatus = Status( stError, errInvalidRedirectURL );
627  log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
628  pUrl.GetHostId().c_str(), urlInfo.c_str() );
629  HandleResponse();
630  return;
631  }
632 
633  if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
634  newUrl.SetUserName( pUrl.GetUserName() );
635 
636  if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
637  newUrl.SetPassword( pUrl.GetPassword() );
638 
639  //----------------------------------------------------------------------
640  // Forward any "xrd.*" params from the original client request also to
641  // the new redirection url
642  // Also, we need to preserve any "xrdcl.*' as they are important for
643  // our internal workflows.
644  //----------------------------------------------------------------------
645  std::ostringstream ossXrd;
646  const URL::ParamsMap &urlParams = pUrl.GetParams();
647 
648  for(URL::ParamsMap::const_iterator it = urlParams.begin();
649  it != urlParams.end(); ++it )
650  {
651  if( it->first.compare( 0, 4, "xrd." ) &&
652  it->first.compare( 0, 6, "xrdcl." ) )
653  continue;
654 
655  ossXrd << it->first << '=' << it->second << '&';
656  }
657 
658  std::string xrdCgi = ossXrd.str();
659  pRedirectUrl = newUrl.GetURL();
660 
661  URL cgiURL;
662  if( urlComponents.size() > 1 )
663  {
664  pRedirectUrl += "?";
665  pRedirectUrl += urlComponents[1];
666  std::ostringstream o;
667  o << "fake://fake:111//fake?";
668  o << urlComponents[1];
669 
670  if( urlComponents.size() == 3 )
671  o << '?' << urlComponents[2];
672 
673  if (!xrdCgi.empty())
674  {
675  o << '&' << xrdCgi;
676  pRedirectUrl += '&';
677  pRedirectUrl += xrdCgi;
678  }
679 
680  cgiURL = URL( o.str() );
681  }
682  else {
683  if (!xrdCgi.empty())
684  {
685  std::ostringstream o;
686  o << "fake://fake:111//fake?";
687  o << xrdCgi;
688  cgiURL = URL( o.str() );
689  pRedirectUrl += '?';
690  pRedirectUrl += xrdCgi;
691  }
692  }
693 
694  //----------------------------------------------------------------------
695  // Check if we need to return the URL as a response
696  //----------------------------------------------------------------------
697  if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
698  newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
699  !newUrl.IsLocalFile() )
700  pRedirectAsAnswer = true;
701 
702  if( pRedirectAsAnswer )
703  {
704  pStatus = Status( stError, errRedirect );
705  HandleResponse();
706  return;
707  }
708 
709  //----------------------------------------------------------------------
710  // Rewrite the message in a way required to send it to another server
711  //----------------------------------------------------------------------
712  newUrl.SetParams( cgiURL.GetParams() );
713  Status st = RewriteRequestRedirect( newUrl );
714  if( !st.IsOK() )
715  {
716  pStatus = st;
717  HandleResponse();
718  return;
719  }
720 
721  //----------------------------------------------------------------------
722  // Make sure we don't change the protocol by accident (root vs roots)
723  //----------------------------------------------------------------------
724  if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
725  ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
726  newUrl.SetProtocol( "roots" );
727 
728  //----------------------------------------------------------------------
729  // Send the request to the new location
730  //----------------------------------------------------------------------
731  HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
732  return;
733  }
734 
735  //------------------------------------------------------------------------
736  // kXR_wait - we wait, and re-issue the request later
737  //------------------------------------------------------------------------
738  case kXR_wait:
739  {
740  uint32_t waitSeconds = 0;
741 
742  if( rsp->hdr.dlen >= 4 )
743  {
744  char *infoMsg = new char[rsp->hdr.dlen-3];
745  infoMsg[rsp->hdr.dlen-4] = 0;
746  memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
747  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
748  "message %s: %s", pUrl.GetHostId().c_str(),
749  rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
750  infoMsg );
751  delete [] infoMsg;
752  waitSeconds = rsp->body.wait.seconds;
753  }
754  else
755  {
756  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
757  "message %s", pUrl.GetHostId().c_str(),
758  pRequest->GetObfuscatedDescription().c_str() );
759  }
760 
761  pAggregatedWaitTime += waitSeconds;
762 
763  // We need a special case if the data node comes from metalink
764  // redirector. In this case it might make more sense to try the
765  // next entry in the Metalink than wait.
766  if( OmitWait( *pRequest, pLoadBalancer.url ) )
767  {
768  int maxWait = DefaultMaxMetalinkWait;
769  DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
770  if( pAggregatedWaitTime > maxWait )
771  {
772  UpdateTriedCGI();
773  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
774  return;
775  }
776  }
777 
778  //----------------------------------------------------------------------
779  // Some messages require rewriting before they can be sent again
780  // after wait
781  //----------------------------------------------------------------------
782  Status st = RewriteRequestWait();
783  if( !st.IsOK() )
784  {
785  pStatus = st;
786  HandleResponse();
787  return;
788  }
789 
790  //----------------------------------------------------------------------
791  // Register a task to resend the message in some seconds, if we still
792  // have time to do that, and report a timeout otherwise
793  //----------------------------------------------------------------------
794  time_t resendTime = ::time(0)+waitSeconds;
795 
796  if( resendTime < pExpiration )
797  {
798  log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
799  pUrl.GetHostId().c_str(), this,
800  pRequest->GetObfuscatedDescription().c_str() );
801 
802  TaskManager *taskMgr = pPostMaster->GetTaskManager();
803  taskMgr->RegisterTask( new WaitTask( this ), resendTime );
804  }
805  else
806  {
807  log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
808  pUrl.GetHostId().c_str(),
809  pRequest->GetObfuscatedDescription().c_str() );
810  HandleError( Status( stError, errOperationExpired) );
811  }
812  return;
813  }
814 
815  //------------------------------------------------------------------------
816  // kXR_waitresp - the response will be returned in some seconds as an
817  // unsolicited message. Currently all messages of this type are handled
818  // one step before in the XrdClStream::OnIncoming as they need to be
819  // processed synchronously.
820  //------------------------------------------------------------------------
821  case kXR_waitresp:
822  {
823  if( rsp->hdr.dlen < 4 )
824  {
825  log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
826  pUrl.GetHostId().c_str() );
827  pStatus = Status( stError, errInvalidResponse );
828  HandleResponse();
829  return;
830  }
831 
832  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
833  "message %s", pUrl.GetHostId().c_str(),
834  rsp->body.waitresp.seconds,
835  pRequest->GetObfuscatedDescription().c_str() );
836  return;
837  }
838 
839  //------------------------------------------------------------------------
840  // Default - unrecognized/unsupported response, declare an error
841  //------------------------------------------------------------------------
842  default:
843  {
844  log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
845  "message %s", pUrl.GetHostId().c_str(),
846  rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
847  pStatus = Status( stError, errInvalidResponse );
848  HandleResponse();
849  return;
850  }
851  }
852 
853  return;
854  }
855 
856  //----------------------------------------------------------------------------
857  // Handle an event other that a message arrival - may be timeout
858  //----------------------------------------------------------------------------
860  XRootDStatus status )
861  {
862  Log *log = DefaultEnv::GetLog();
863  log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
864  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
865 
866  if( event == Ready )
867  return 0;
868 
869  if( pTimeoutFence.load( std::memory_order_relaxed ) )
870  return 0;
871 
872  HandleError( status );
873  return RemoveHandler;
874  }
875 
876  //----------------------------------------------------------------------------
877  // Read message body directly from a socket
878  //----------------------------------------------------------------------------
880  Socket *socket,
881  uint32_t &bytesRead )
882  {
883  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
884  uint16_t reqId = ntohs( req->header.requestid );
885 
886  if( reqId == kXR_pgread )
887  return pPageReader->Read( *socket, bytesRead );
888 
889  return pBodyReader->Read( *socket, bytesRead );
890  }
891 
892  //----------------------------------------------------------------------------
893  // We're here when we requested sending something over the wire
894  // and there has been a status update on this action
895  //----------------------------------------------------------------------------
897  XRootDStatus status )
898  {
899  Log *log = DefaultEnv::GetLog();
900 
901  //--------------------------------------------------------------------------
902  // We were successful, so we now need to listen for a response
903  //--------------------------------------------------------------------------
904  if( status.IsOK() )
905  {
906  log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
907  pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
908 
909  log->Debug( ExDbgMsg, "[%s] Moving MsgHandler: %p (message: %s ) from out-queue to in-queue.",
910  pUrl.GetHostId().c_str(), this,
911  pRequest->GetObfuscatedDescription().c_str() );
912 
913  pMsgInFly = true;
914  return;
915  }
916 
917  //--------------------------------------------------------------------------
918  // We have failed, recover if possible
919  //--------------------------------------------------------------------------
920  log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
921  "recover.", pUrl.GetHostId().c_str(),
922  message->GetObfuscatedDescription().c_str() );
923  HandleError( status );
924  }
925 
926  //----------------------------------------------------------------------------
927  // Are we a raw writer or not?
928  //----------------------------------------------------------------------------
930  {
931  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
932  uint16_t reqId = ntohs( req->header.requestid );
933  if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
934  return true;
935  // checkpoint + execute
936  if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
937  {
938  ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
939  reqId = ntohs( xeq->header.requestid );
940  return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
941  }
942 
943  return false;
944  }
945 
946  //----------------------------------------------------------------------------
947  // Write the message body
948  //----------------------------------------------------------------------------
950  uint32_t &bytesWritten )
951  {
952  //--------------------------------------------------------------------------
953  // First check if it is a PgWrite
954  //--------------------------------------------------------------------------
955  if( !pChunkList->empty() && !pCrc32cDigests.empty() )
956  {
957  //------------------------------------------------------------------------
958  // PgWrite will have just one chunk
959  //------------------------------------------------------------------------
960  ChunkInfo chunk = pChunkList->front();
961  //------------------------------------------------------------------------
962  // Calculate the size of the first and last page (in case the chunk is not
963  // 4KB aligned)
964  //------------------------------------------------------------------------
965  int fLen = 0, lLen = 0;
966  size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
967 
968  //------------------------------------------------------------------------
969  // Set the crc32c buffer if not ready yet
970  //------------------------------------------------------------------------
971  if( pPgWrtCksumBuff.GetCursor() == 0 )
972  {
973  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
974  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
975  }
976 
977  uint32_t btsLeft = chunk.length - pAsyncOffset;
978  uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
979  if( pglen > btsLeft ) pglen = btsLeft;
980  char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
981 
982  while( btsLeft > 0 )
983  {
984  // first write the crc32c digest
985  while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
986  {
987  uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
988  char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
989  int btswrt = 0;
990  Status st = socket->Send( dgstbuf, dgstlen, btswrt );
991  if( !st.IsOK() ) return st;
992  bytesWritten += btswrt;
993  pPgWrtCksumBuff.AdvanceCursor( btswrt );
994  if( st.code == suRetry ) return st;
995  }
996  // then write the raw data (one page)
997  int btswrt = 0;
998  Status st = socket->Send( pgbuf, pglen, btswrt );
999  if( !st.IsOK() ) return st;
1000  pgbuf += btswrt;
1001  pglen -= btswrt;
1002  btsLeft -= btswrt;
1003  bytesWritten += btswrt;
1004  pAsyncOffset += btswrt; // update the offset to the raw data
1005  if( st.code == suRetry ) return st;
1006  // if we managed to write all the data ...
1007  if( pglen == 0 )
1008  {
1009  // move to the next page
1010  ++pPgWrtCurrentPageNb;
1011  if( pPgWrtCurrentPageNb < nbpgs )
1012  {
1013  // set the digest buffer
1014  pPgWrtCksumBuff.SetCursor( 0 );
1015  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1016  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1017  }
1018  // set the page length
1019  pglen = XrdSys::PageSize;
1020  if( pglen > btsLeft ) pglen = btsLeft;
1021  // reset offset in the current page
1022  pPgWrtCurrentPageOffset = 0;
1023  }
1024  else
1025  // otherwise just adjust the offset in the current page
1026  pPgWrtCurrentPageOffset += btswrt;
1027 
1028  }
1029  }
1030  else if( !pChunkList->empty() )
1031  {
1032  size_t size = pChunkList->size();
1033  for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1034  {
1035  char *buffer = (char*)(*pChunkList)[i].buffer;
1036  uint32_t size = (*pChunkList)[i].length;
1037  size_t leftToBeWritten = size - pAsyncOffset;
1038 
1039  while( leftToBeWritten )
1040  {
1041  int btswrt = 0;
1042  Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1043  bytesWritten += btswrt;
1044  if( !st.IsOK() || st.code == suRetry ) return st;
1045  pAsyncOffset += btswrt;
1046  leftToBeWritten -= btswrt;
1047  }
1048  //----------------------------------------------------------------------
1049  // Remember that we have moved to the next chunk, also clear the offset
1050  // within the buffer as we are going to move to a new one
1051  //----------------------------------------------------------------------
1052  ++pAsyncChunkIndex;
1053  pAsyncOffset = 0;
1054  }
1055  }
1056  else
1057  {
1058  Log *log = DefaultEnv::GetLog();
1059 
1060  //------------------------------------------------------------------------
1061  // If the socket is encrypted we cannot use a kernel buffer, we have to
1062  // convert to user space buffer
1063  //------------------------------------------------------------------------
1064  if( socket->IsEncrypted() )
1065  {
1066  log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1067  pUrl.GetHostId().c_str() );
1068 
1069  char *ubuff = 0;
1070  ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1071  if( ret < 0 ) return Status( stError, errInternal );
1072  pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1073  return WriteMessageBody( socket, bytesWritten );
1074  }
1075 
1076  //------------------------------------------------------------------------
1077  // Send the data
1078  //------------------------------------------------------------------------
1079  while( !pKBuff->Empty() )
1080  {
1081  int btswrt = 0;
1082  Status st = socket->Send( *pKBuff, btswrt );
1083  bytesWritten += btswrt;
1084  if( !st.IsOK() || st.code == suRetry ) return st;
1085  }
1086 
1087  log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1088  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1089  }
1090 
1091  return Status();
1092  }
1093 
1094  //----------------------------------------------------------------------------
1095  // We're here when we got a time event. We needed to re-issue the request
1096  // in some time in the future, and that moment has arrived
1097  //----------------------------------------------------------------------------
1099  {
1100  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1101  }
1102 
1103  //----------------------------------------------------------------------------
1104  // Bookkeeping after partial response has been received.
1105  //----------------------------------------------------------------------------
1107  {
1108  pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1109  }
1110 
1111  //----------------------------------------------------------------------------
1112  // Unpack the message and call the response handler
1113  //----------------------------------------------------------------------------
1114  void XRootDMsgHandler::HandleResponse()
1115  {
1116  //--------------------------------------------------------------------------
1117  // Process the response and notify the listener
1118  //--------------------------------------------------------------------------
1120  XRootDStatus *status = ProcessStatus();
1121  AnyObject *response = 0;
1122 
1123  Log *log = DefaultEnv::GetLog();
1124  log->Debug( ExDbgMsg, "[%s] Calling MsgHandler: %p (message: %s ) "
1125  "with status: %s.",
1126  pUrl.GetHostId().c_str(), this,
1127  pRequest->GetObfuscatedDescription().c_str(),
1128  status->ToString().c_str() );
1129 
1130  if( status->IsOK() )
1131  {
1132  Status st = ParseResponse( response );
1133  if( !st.IsOK() )
1134  {
1135  delete status;
1136  delete response;
1137  status = new XRootDStatus( st );
1138  response = 0;
1139  }
1140  }
1141 
1142  //--------------------------------------------------------------------------
1143  // Close the redirect entry if necessary
1144  //--------------------------------------------------------------------------
1145  if( pRdirEntry )
1146  {
1147  pRdirEntry->status = *status;
1148  pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1149  }
1150 
1151  //--------------------------------------------------------------------------
1152  // Is it a final response?
1153  //--------------------------------------------------------------------------
1154  bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
1155 
1156  //--------------------------------------------------------------------------
1157  // Release the stream id
1158  //--------------------------------------------------------------------------
1159  if( pSidMgr && finalrsp )
1160  {
1161  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1162  if( status->IsOK() || !pMsgInFly ||
1163  !( status->code == errOperationExpired || status->code == errOperationInterrupted ) )
1164  pSidMgr->ReleaseSID( req->header.streamid );
1165  }
1166 
1167  HostList *hosts = pHosts.release();
1168  if( !finalrsp )
1169  pHosts.reset( new HostList( *hosts ) );
1170 
1171  pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1172 
1173  //--------------------------------------------------------------------------
1174  // if it is the final response there is nothing more to do ...
1175  //--------------------------------------------------------------------------
1176  if( finalrsp )
1177  delete this;
1178  //--------------------------------------------------------------------------
1179  // on the other hand if it is not the final response, we have to keep the
1180  // MsgHandler and delete the current response
1181  //--------------------------------------------------------------------------
1182  else
1183  {
1184  XrdSysCondVarHelper lck( pCV );
1185  pResponse.reset();
1186  pTimeoutFence.store( false, std::memory_order_relaxed );
1187  pCV.Broadcast();
1188  }
1189  }
1190 
1191 
1192  //----------------------------------------------------------------------------
1193  // Extract the status information from the stuff that we got
1194  //----------------------------------------------------------------------------
1195  XRootDStatus *XRootDMsgHandler::ProcessStatus()
1196  {
1197  XRootDStatus *st = new XRootDStatus( pStatus );
1198  ServerResponse *rsp = 0;
1199  if( pResponse )
1200  rsp = (ServerResponse *)pResponse->GetBuffer();
1201 
1202  if( !pStatus.IsOK() && rsp )
1203  {
1204  if( pStatus.code == errErrorResponse )
1205  {
1206  st->errNo = rsp->body.error.errnum;
1207  // omit the last character as the string returned from the server
1208  // (acording to protocol specs) should be null-terminated
1209  std::string errmsg( rsp->body.error.errmsg, rsp->hdr.dlen-5 );
1210  if( st->errNo == kXR_noReplicas && !pLastError.IsOK() )
1211  errmsg += " Last seen error: " + pLastError.ToString();
1212  st->SetErrorMessage( errmsg );
1213  }
1214  else if( pStatus.code == errRedirect )
1215  st->SetErrorMessage( pRedirectUrl );
1216  }
1217  return st;
1218  }
1219 
1220  //------------------------------------------------------------------------
1221  // Parse the response and put it in an object that could be passed to
1222  // the user
1223  //------------------------------------------------------------------------
1224  Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1225  {
1226  if( !pResponse )
1227  return Status();
1228 
1229  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1230  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1231  Log *log = DefaultEnv::GetLog();
1232 
1233  //--------------------------------------------------------------------------
1234  // Handle redirect as an answer
1235  //--------------------------------------------------------------------------
1236  if( rsp->hdr.status == kXR_redirect )
1237  {
1238  log->Error( XRootDMsg, "Internal Error: unable to process redirect" );
1239  return 0;
1240  }
1241 
1242  Buffer buff;
1243  uint32_t length = 0;
1244  char *buffer = 0;
1245 
1246  //--------------------------------------------------------------------------
1247  // We don't have any partial answers so pass what we have
1248  //--------------------------------------------------------------------------
1249  if( pPartialResps.empty() )
1250  {
1251  buffer = rsp->body.buffer.data;
1252  length = rsp->hdr.dlen;
1253  }
1254  //--------------------------------------------------------------------------
1255  // Partial answers, we need to glue them together before parsing
1256  //--------------------------------------------------------------------------
1257  else if( req->header.requestid != kXR_read &&
1258  req->header.requestid != kXR_readv )
1259  {
1260  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1261  {
1262  ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1263  length += part->hdr.dlen;
1264  }
1265  length += rsp->hdr.dlen;
1266 
1267  buff.Allocate( length );
1268  uint32_t offset = 0;
1269  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1270  {
1271  ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1272  buff.Append( part->body.buffer.data, part->hdr.dlen, offset );
1273  offset += part->hdr.dlen;
1274  }
1275  buff.Append( rsp->body.buffer.data, rsp->hdr.dlen, offset );
1276  buffer = buff.GetBuffer();
1277  }
1278 
1279  //--------------------------------------------------------------------------
1280  // Right, but what was the question?
1281  //--------------------------------------------------------------------------
1282  switch( req->header.requestid )
1283  {
1284  //------------------------------------------------------------------------
1285  // kXR_mv, kXR_truncate, kXR_rm, kXR_mkdir, kXR_rmdir, kXR_chmod,
1286  // kXR_ping, kXR_close, kXR_write, kXR_sync
1287  //------------------------------------------------------------------------
1288  case kXR_mv:
1289  case kXR_truncate:
1290  case kXR_rm:
1291  case kXR_mkdir:
1292  case kXR_rmdir:
1293  case kXR_chmod:
1294  case kXR_ping:
1295  case kXR_close:
1296  case kXR_write:
1297  case kXR_writev:
1298  case kXR_sync:
1299  case kXR_chkpoint:
1300  return Status();
1301 
1302  //------------------------------------------------------------------------
1303  // kXR_locate
1304  //------------------------------------------------------------------------
1305  case kXR_locate:
1306  {
1307  AnyObject *obj = new AnyObject();
1308 
1309  char *nullBuffer = new char[length+1];
1310  nullBuffer[length] = 0;
1311  memcpy( nullBuffer, buffer, length );
1312 
1313  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1314  "LocateInfo: %s", pUrl.GetHostId().c_str(),
1315  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1316  LocationInfo *data = new LocationInfo();
1317 
1318  if( data->ParseServerResponse( nullBuffer ) == false )
1319  {
1320  delete obj;
1321  delete data;
1322  delete [] nullBuffer;
1323  return Status( stError, errInvalidResponse );
1324  }
1325  delete [] nullBuffer;
1326 
1327  obj->Set( data );
1328  response = obj;
1329  return Status();
1330  }
1331 
1332  //------------------------------------------------------------------------
1333  // kXR_stat
1334  //------------------------------------------------------------------------
1335  case kXR_stat:
1336  {
1337  AnyObject *obj = new AnyObject();
1338 
1339  //----------------------------------------------------------------------
1340  // Virtual File System stat (kXR_vfs)
1341  //----------------------------------------------------------------------
1342  if( req->stat.options & kXR_vfs )
1343  {
1344  StatInfoVFS *data = new StatInfoVFS();
1345 
1346  char *nullBuffer = new char[length+1];
1347  nullBuffer[length] = 0;
1348  memcpy( nullBuffer, buffer, length );
1349 
1350  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1351  "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1352  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1353 
1354  if( data->ParseServerResponse( nullBuffer ) == false )
1355  {
1356  delete obj;
1357  delete data;
1358  delete [] nullBuffer;
1359  return Status( stError, errInvalidResponse );
1360  }
1361  delete [] nullBuffer;
1362 
1363  obj->Set( data );
1364  }
1365  //----------------------------------------------------------------------
1366  // Normal stat
1367  //----------------------------------------------------------------------
1368  else
1369  {
1370  StatInfo *data = new StatInfo();
1371 
1372  char *nullBuffer = new char[length+1];
1373  nullBuffer[length] = 0;
1374  memcpy( nullBuffer, buffer, length );
1375 
1376  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as StatInfo: "
1377  "%s", pUrl.GetHostId().c_str(),
1378  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1379 
1380  if( data->ParseServerResponse( nullBuffer ) == false )
1381  {
1382  delete obj;
1383  delete data;
1384  delete [] nullBuffer;
1385  return Status( stError, errInvalidResponse );
1386  }
1387  delete [] nullBuffer;
1388  obj->Set( data );
1389  }
1390 
1391  response = obj;
1392  return Status();
1393  }
1394 
1395  //------------------------------------------------------------------------
1396  // kXR_protocol
1397  //------------------------------------------------------------------------
1398  case kXR_protocol:
1399  {
1400  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ProtocolInfo",
1401  pUrl.GetHostId().c_str(),
1402  pRequest->GetObfuscatedDescription().c_str() );
1403 
1404  if( rsp->hdr.dlen < 8 )
1405  {
1406  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
1407  pUrl.GetHostId().c_str() );
1408  return Status( stError, errInvalidResponse );
1409  }
1410 
1411  AnyObject *obj = new AnyObject();
1412  ProtocolInfo *data = new ProtocolInfo( rsp->body.protocol.pval,
1413  rsp->body.protocol.flags );
1414  obj->Set( data );
1415  response = obj;
1416  return Status();
1417  }
1418 
1419  //------------------------------------------------------------------------
1420  // kXR_dirlist
1421  //------------------------------------------------------------------------
1422  case kXR_dirlist:
1423  {
1424  AnyObject *obj = new AnyObject();
1425  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1426  "DirectoryList", pUrl.GetHostId().c_str(),
1427  pRequest->GetObfuscatedDescription().c_str() );
1428 
1429  char *path = new char[req->dirlist.dlen+1];
1430  path[req->dirlist.dlen] = 0;
1431  memcpy( path, pRequest->GetBuffer(24), req->dirlist.dlen );
1432 
1433  DirectoryList *data = new DirectoryList();
1434  data->SetParentName( path );
1435  delete [] path;
1436 
1437  char *nullBuffer = new char[length+1];
1438  nullBuffer[length] = 0;
1439  memcpy( nullBuffer, buffer, length );
1440 
1441  bool invalidrsp = false;
1442 
1443  if( !pDirListStarted )
1444  {
1445  pDirListWithStat = DirectoryList::HasStatInfo( nullBuffer );
1446  pDirListStarted = true;
1447 
1448  invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1449  }
1450  else
1451  invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1452 
1453  if( invalidrsp )
1454  {
1455  delete data;
1456  delete obj;
1457  delete [] nullBuffer;
1458  return Status( stError, errInvalidResponse );
1459  }
1460 
1461  delete [] nullBuffer;
1462  obj->Set( data );
1463  response = obj;
1464  return Status();
1465  }
1466 
1467  //------------------------------------------------------------------------
1468  // kXR_open - if we got the statistics, otherwise return 0
1469  //------------------------------------------------------------------------
1470  case kXR_open:
1471  {
1472  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as OpenInfo",
1473  pUrl.GetHostId().c_str(),
1474  pRequest->GetObfuscatedDescription().c_str() );
1475 
1476  if( rsp->hdr.dlen < 4 )
1477  {
1478  log->Error( XRootDMsg, "[%s] Got invalid open response.",
1479  pUrl.GetHostId().c_str() );
1480  return Status( stError, errInvalidResponse );
1481  }
1482 
1483  AnyObject *obj = new AnyObject();
1484  StatInfo *statInfo = 0;
1485 
1486  //----------------------------------------------------------------------
1487  // Handle StatInfo if requested
1488  //----------------------------------------------------------------------
1489  if( req->open.options & kXR_retstat )
1490  {
1491  log->Dump( XRootDMsg, "[%s] Parsing StatInfo in response to %s",
1492  pUrl.GetHostId().c_str(),
1493  pRequest->GetObfuscatedDescription().c_str() );
1494 
1495  if( rsp->hdr.dlen >= 12 )
1496  {
1497  char *nullBuffer = new char[rsp->hdr.dlen-11];
1498  nullBuffer[rsp->hdr.dlen-12] = 0;
1499  memcpy( nullBuffer, buffer+12, rsp->hdr.dlen-12 );
1500 
1501  statInfo = new StatInfo();
1502  if( statInfo->ParseServerResponse( nullBuffer ) == false )
1503  {
1504  delete statInfo;
1505  statInfo = 0;
1506  }
1507  delete [] nullBuffer;
1508  }
1509 
1510  if( rsp->hdr.dlen < 12 || !statInfo )
1511  {
1512  log->Error( XRootDMsg, "[%s] Unable to parse StatInfo in response "
1513  "to %s", pUrl.GetHostId().c_str(),
1514  pRequest->GetObfuscatedDescription().c_str() );
1515  delete obj;
1516  return Status( stError, errInvalidResponse );
1517  }
1518  }
1519 
1520  OpenInfo *data = new OpenInfo( (uint8_t*)buffer,
1521  pResponse->GetSessionId(),
1522  statInfo );
1523  obj->Set( data );
1524  response = obj;
1525  return Status();
1526  }
1527 
1528  //------------------------------------------------------------------------
1529  // kXR_read
1530  //------------------------------------------------------------------------
1531  case kXR_read:
1532  {
1533  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ChunkInfo",
1534  pUrl.GetHostId().c_str(),
1535  pRequest->GetObfuscatedDescription().c_str() );
1536 
1537  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1538  {
1539  //--------------------------------------------------------------------
1540  // we are expecting to have only the header in the message, the raw
1541  // data have been readout into the user buffer
1542  //--------------------------------------------------------------------
1543  if( pPartialResps[i]->GetSize() > 8 )
1544  return Status( stOK, errInternal );
1545  }
1546  //----------------------------------------------------------------------
1547  // we are expecting to have only the header in the message, the raw
1548  // data have been readout into the user buffer
1549  //----------------------------------------------------------------------
1550  if( pResponse->GetSize() > 8 )
1551  return Status( stOK, errInternal );
1552  //----------------------------------------------------------------------
1553  // Get the response for the end user
1554  //----------------------------------------------------------------------
1555  return pBodyReader->GetResponse( response );
1556  }
1557 
1558  //------------------------------------------------------------------------
1559  // kXR_pgread
1560  //------------------------------------------------------------------------
1561  case kXR_pgread:
1562  {
1563  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as PageInfo",
1564  pUrl.GetHostId().c_str(),
1565  pRequest->GetObfuscatedDescription().c_str() );
1566 
1567  //----------------------------------------------------------------------
1568  // Glue in the cached responses if necessary
1569  //----------------------------------------------------------------------
1570  ChunkInfo chunk = pChunkList->front();
1571  bool sizeMismatch = false;
1572  uint32_t currentOffset = 0;
1573  char *cursor = (char*)chunk.buffer;
1574  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1575  {
1576  ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1577 
1578  //--------------------------------------------------------------------
1579  // the actual size of the raw data without the crc32c checksums
1580  //--------------------------------------------------------------------
1581  size_t datalen = part->status.bdy.dlen - NbPgPerRsp( part->info.pgread.offset,
1582  part->status.bdy.dlen ) * CksumSize;
1583 
1584  if( currentOffset + datalen > chunk.length )
1585  {
1586  sizeMismatch = true;
1587  break;
1588  }
1589 
1590  currentOffset += datalen;
1591  cursor += datalen;
1592  }
1593 
1594  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1595  size_t datalen = rspst->status.bdy.dlen - NbPgPerRsp( rspst->info.pgread.offset,
1596  rspst->status.bdy.dlen ) * CksumSize;
1597  if( currentOffset + datalen <= chunk.length )
1598  currentOffset += datalen;
1599  else
1600  sizeMismatch = true;
1601 
1602  //----------------------------------------------------------------------
1603  // Overflow
1604  //----------------------------------------------------------------------
1605  if( pChunkStatus.front().sizeError || sizeMismatch )
1606  {
1607  log->Error( XRootDMsg, "[%s] Handling response to %s: user supplied "
1608  "buffer is too small for the received data.",
1609  pUrl.GetHostId().c_str(),
1610  pRequest->GetObfuscatedDescription().c_str() );
1611  return Status( stError, errInvalidResponse );
1612  }
1613 
1614  AnyObject *obj = new AnyObject();
1615  PageInfo *pgInfo = new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1616  std::move( pCrc32cDigests) );
1617 
1618  obj->Set( pgInfo );
1619  response = obj;
1620  return Status();
1621  }
1622 
1623  //------------------------------------------------------------------------
1624  // kXR_pgwrite
1625  //------------------------------------------------------------------------
1626  case kXR_pgwrite:
1627  {
1628  std::vector<std::tuple<uint64_t, uint32_t>> retries;
1629 
1630  ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1631  if( rsp->status.bdy.dlen > 0 )
1632  {
1633  ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) );
1634  size_t pgcnt = ( rsp->status.bdy.dlen - 8 ) / sizeof( kXR_int64 );
1635  retries.reserve( pgcnt );
1636  kXR_int64 *pgoffs = (kXR_int64*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) +
1637  sizeof( ServerResponseBody_pgWrCSE ) );
1638 
1639  for( size_t i = 0; i < pgcnt; ++i )
1640  {
1641  uint32_t len = XrdSys::PageSize;
1642  if( i == 0 ) len = cse->dlFirst;
1643  else if( i == pgcnt - 1 ) len = cse->dlLast;
1644  retries.push_back( std::make_tuple( pgoffs[i], len ) );
1645  }
1646  }
1647 
1648  RetryInfo *info = new RetryInfo( std::move( retries ) );
1649  AnyObject *obj = new AnyObject();
1650  obj->Set( info );
1651  response = obj;
1652 
1653  return Status();
1654  }
1655 
1656 
1657  //------------------------------------------------------------------------
1658  // kXR_readv - we need to pass the length of the buffer to the user code
1659  //------------------------------------------------------------------------
1660  case kXR_readv:
1661  {
1662  log->Dump( XRootDMsg, "[%s] Parsing the response to %p as "
1663  "VectorReadInfo", pUrl.GetHostId().c_str(),
1664  pRequest->GetObfuscatedDescription().c_str() );
1665 
1666  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1667  {
1668  //--------------------------------------------------------------------
1669  // we are expecting to have only the header in the message, the raw
1670  // data have been readout into the user buffer
1671  //--------------------------------------------------------------------
1672  if( pPartialResps[i]->GetSize() > 8 )
1673  return Status( stOK, errInternal );
1674  }
1675  //----------------------------------------------------------------------
1676  // we are expecting to have only the header in the message, the raw
1677  // data have been readout into the user buffer
1678  //----------------------------------------------------------------------
1679  if( pResponse->GetSize() > 8 )
1680  return Status( stOK, errInternal );
1681  //----------------------------------------------------------------------
1682  // Get the response for the end user
1683  //----------------------------------------------------------------------
1684  return pBodyReader->GetResponse( response );
1685  }
1686 
1687  //------------------------------------------------------------------------
1688  // kXR_fattr
1689  //------------------------------------------------------------------------
1690  case kXR_fattr:
1691  {
1692  int len = rsp->hdr.dlen;
1693  char* data = rsp->body.buffer.data;
1694 
1695  return ParseXAttrResponse( data, len, response );
1696  }
1697 
1698  //------------------------------------------------------------------------
1699  // kXR_query
1700  //------------------------------------------------------------------------
1701  case kXR_query:
1702  case kXR_set:
1703  case kXR_prepare:
1704  default:
1705  {
1706  AnyObject *obj = new AnyObject();
1707  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as BinaryData",
1708  pUrl.GetHostId().c_str(),
1709  pRequest->GetObfuscatedDescription().c_str() );
1710 
1711  BinaryDataInfo *data = new BinaryDataInfo();
1712  data->Allocate( length );
1713  data->Append( buffer, length );
1714  obj->Set( data );
1715  response = obj;
1716  return Status();
1717  }
1718  };
1719  return Status( stError, errInvalidMessage );
1720  }
1721 
1722  //------------------------------------------------------------------------
1723  // Parse the response to kXR_fattr request and put it in an object that
1724  // could be passed to the user
1725  //------------------------------------------------------------------------
1726  Status XRootDMsgHandler::ParseXAttrResponse( char *data, size_t len,
1727  AnyObject *&response )
1728  {
1729  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1730 // Log *log = DefaultEnv::GetLog(); //TODO
1731 
1732  switch( req->fattr.subcode )
1733  {
1734  case kXR_fattrDel:
1735  case kXR_fattrSet:
1736  {
1737  Status status;
1738 
1739  kXR_char nerrs = 0;
1740  if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1741  return status;
1742 
1743  kXR_char nattr = 0;
1744  if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1745  return status;
1746 
1747  std::vector<XAttrStatus> resp;
1748  // read the namevec
1749  for( kXR_char i = 0; i < nattr; ++i )
1750  {
1751  kXR_unt16 rc = 0;
1752  if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1753  return status;
1754  rc = ntohs( rc );
1755 
1756  // count errors
1757  if( rc ) --nerrs;
1758 
1759  std::string name;
1760  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1761  return status;
1762 
1763  XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1764  XRootDStatus();
1765  resp.push_back( XAttrStatus( name, st ) );
1766  }
1767 
1768  // check if we read all the data and if the error count is OK
1769  if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1770 
1771  // set up the response object
1772  response = new AnyObject();
1773  response->Set( new std::vector<XAttrStatus>( std::move( resp ) ) );
1774 
1775  return Status();
1776  }
1777 
1778  case kXR_fattrGet:
1779  {
1780  Status status;
1781 
1782  kXR_char nerrs = 0;
1783  if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1784  return status;
1785 
1786  kXR_char nattr = 0;
1787  if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1788  return status;
1789 
1790  std::vector<XAttr> resp;
1791  resp.reserve( nattr );
1792 
1793  // read the name vec
1794  for( kXR_char i = 0; i < nattr; ++i )
1795  {
1796  kXR_unt16 rc = 0;
1797  if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1798  return status;
1799  rc = ntohs( rc );
1800 
1801  // count errors
1802  if( rc ) --nerrs;
1803 
1804  std::string name;
1805  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1806  return status;
1807 
1808  XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1809  XRootDStatus();
1810  resp.push_back( XAttr( name, st ) );
1811  }
1812 
1813  // read the value vec
1814  for( kXR_char i = 0; i < nattr; ++i )
1815  {
1816  kXR_int32 vlen = 0;
1817  if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1818  return status;
1819  vlen = ntohl( vlen );
1820 
1821  std::string value;
1822  if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1823  return status;
1824 
1825  resp[i].value.swap( value );
1826  }
1827 
1828  // check if we read all the data and if the error count is OK
1829  if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1830 
1831  // set up the response object
1832  response = new AnyObject();
1833  response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1834 
1835  return Status();
1836  }
1837 
1838  case kXR_fattrList:
1839  {
1840  Status status;
1841  std::vector<XAttr> resp;
1842 
1843  while( len > 0 )
1844  {
1845  std::string name;
1846  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1847  return status;
1848 
1849  kXR_int32 vlen = 0;
1850  if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1851  return status;
1852  vlen = ntohl( vlen );
1853 
1854  std::string value;
1855  if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1856  return status;
1857 
1858  resp.push_back( XAttr( name, value ) );
1859  }
1860 
1861  // set up the response object
1862  response = new AnyObject();
1863  response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1864 
1865  return Status();
1866  }
1867 
1868  default:
1869  return Status( stError, errDataError );
1870  }
1871  }
1872 
1873  //----------------------------------------------------------------------------
1874  // Perform the changes to the original request needed by the redirect
1875  // procedure - allocate new streamid, append redirection data and such
1876  //----------------------------------------------------------------------------
1877  Status XRootDMsgHandler::RewriteRequestRedirect( const URL &newUrl )
1878  {
1879  Log *log = DefaultEnv::GetLog();
1880 
1881  Status st;
1882  // Append any "xrd.*" parameters present in newCgi so that any authentication
1883  // requirements are properly enforced
1884  const URL::ParamsMap &newCgi = newUrl.GetParams();
1885  std::string xrdCgi = "";
1886  std::ostringstream ossXrd;
1887  for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1888  {
1889  if( it->first.compare( 0, 4, "xrd." ) )
1890  continue;
1891  ossXrd << it->first << '=' << it->second << '&';
1892  }
1893 
1894  xrdCgi = ossXrd.str();
1895  // Redirection URL containing also any original xrd.* opaque parameters
1896  XrdCl::URL authUrl;
1897 
1898  if (xrdCgi.empty())
1899  {
1900  authUrl = newUrl;
1901  }
1902  else
1903  {
1904  std::string surl = newUrl.GetURL();
1905  (surl.find('?') == std::string::npos) ? (surl += '?') :
1906  ((*surl.rbegin() != '&') ? (surl += '&') : (surl += ""));
1907  surl += xrdCgi;
1908  if (!authUrl.FromString(surl))
1909  {
1910  std::string surlLog = surl;
1911  if( unlikely( log->GetLevel() >= Log::ErrorMsg ) ) {
1912  surlLog = obfuscateAuth(surlLog);
1913  }
1914  log->Error( XRootDMsg, "[%s] Failed to build redirection URL from data: %s",
1915  newUrl.GetHostId().c_str(), surl.c_str());
1916  return Status(stError, errInvalidRedirectURL);
1917  }
1918  }
1919 
1920  //--------------------------------------------------------------------------
1921  // Rewrite particular requests
1922  //--------------------------------------------------------------------------
1924  MessageUtils::RewriteCGIAndPath( pRequest, newCgi, true, newUrl.GetPath() );
1926  return Status();
1927  }
1928 
1929  //----------------------------------------------------------------------------
1930  // Some requests need to be rewritten also after getting kXR_wait
1931  //----------------------------------------------------------------------------
1932  Status XRootDMsgHandler::RewriteRequestWait()
1933  {
1934  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1935 
1937 
1938  //------------------------------------------------------------------------
1939  // For kXR_locate and kXR_open request the kXR_refresh bit needs to be
1940  // turned off after wait
1941  //------------------------------------------------------------------------
1942  switch( req->header.requestid )
1943  {
1944  case kXR_locate:
1945  {
1946  uint16_t refresh = kXR_refresh;
1947  req->locate.options &= (~refresh);
1948  break;
1949  }
1950 
1951  case kXR_open:
1952  {
1953  uint16_t refresh = kXR_refresh;
1954  req->locate.options &= (~refresh);
1955  break;
1956  }
1957  }
1958 
1959  XRootDTransport::SetDescription( pRequest );
1961  return Status();
1962  }
1963 
1964  //----------------------------------------------------------------------------
1965  // Recover error
1966  //----------------------------------------------------------------------------
1967  void XRootDMsgHandler::HandleError( XRootDStatus status )
1968  {
1969  //--------------------------------------------------------------------------
1970  // If there was no error then do nothing
1971  //--------------------------------------------------------------------------
1972  if( status.IsOK() )
1973  return;
1974 
1975  if( pSidMgr && pMsgInFly && (
1976  status.code == errOperationExpired ||
1977  status.code == errOperationInterrupted ) )
1978  {
1979  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1980  pSidMgr->TimeOutSID( req->header.streamid );
1981  }
1982 
1983  bool noreplicas = ( status.code == errErrorResponse &&
1984  status.errNo == kXR_noReplicas );
1985 
1986  if( !noreplicas ) pLastError = status;
1987 
1988  Log *log = DefaultEnv::GetLog();
1989  log->Debug( XRootDMsg, "[%s] Handling error while processing %s: %s.",
1990  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str(),
1991  status.ToString().c_str() );
1992 
1993  //--------------------------------------------------------------------------
1994  // Check if it is a fatal TLS error that has been marked as potentially
1995  // recoverable, if yes check if we can downgrade from fatal to error.
1996  //--------------------------------------------------------------------------
1997  if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
1998  {
1999  if( pSslErrCnt < MaxSslErrRetry )
2000  {
2001  status.status &= ~stFatal; // switch off fatal&error bits
2002  status.status |= stError; // switch on error bit
2003  }
2004  ++pSslErrCnt; // count number of consecutive SSL errors
2005  }
2006  else
2007  pSslErrCnt = 0;
2008 
2009  //--------------------------------------------------------------------------
2010  // We have got an error message, we can recover it at the load balancer if:
2011  // 1) we haven't got it from the load balancer
2012  // 2) we have a load balancer assigned
2013  // 3) the error is either one of: kXR_FSError, kXR_IOError, kXR_ServerError,
2014  // kXR_NotFound
2015  // 4) in the case of kXR_NotFound a kXR_refresh flags needs to be set
2016  //--------------------------------------------------------------------------
2017  if( status.code == errErrorResponse )
2018  {
2019  if( RetriableErrorResponse( status ) )
2020  {
2021  UpdateTriedCGI(status.errNo);
2022  if( status.errNo == kXR_NotFound || status.errNo == kXR_Overloaded )
2023  SwitchOnRefreshFlag();
2024  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2025  return;
2026  }
2027  else
2028  {
2029  pStatus = status;
2030  HandleRspOrQueue();
2031  return;
2032  }
2033  }
2034 
2035  //--------------------------------------------------------------------------
2036  // Nothing can be done if:
2037  // 1) a user timeout has occurred
2038  // 2) has a non-zero session id
2039  // 3) if another error occurred and the validity of the message expired
2040  //--------------------------------------------------------------------------
2041  if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2042  status.code == errOperationInterrupted || time(0) >= pExpiration )
2043  {
2044  log->Error( XRootDMsg, "[%s] Unable to get the response to request %s",
2045  pUrl.GetHostId().c_str(),
2046  pRequest->GetObfuscatedDescription().c_str() );
2047  pStatus = status;
2048  HandleRspOrQueue();
2049  return;
2050  }
2051 
2052  //--------------------------------------------------------------------------
2053  // At this point we're left with connection errors, we recover them
2054  // at a load balancer if we have one and if not on the current server
2055  // until we get a response, an unrecoverable error or a timeout
2056  //--------------------------------------------------------------------------
2057  if( pLoadBalancer.url.IsValid() &&
2058  pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2059  {
2060  UpdateTriedCGI( kXR_ServerError );
2061  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2062  return;
2063  }
2064  else
2065  {
2066  if( !status.IsFatal() && IsRetriable() )
2067  {
2068  log->Info( XRootDMsg, "[%s] Retrying request: %s.",
2069  pUrl.GetHostId().c_str(),
2070  pRequest->GetObfuscatedDescription().c_str() );
2071 
2072  UpdateTriedCGI( kXR_ServerError );
2073  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2074  return;
2075  }
2076  pStatus = status;
2077  HandleRspOrQueue();
2078  return;
2079  }
2080  }
2081 
2082  //----------------------------------------------------------------------------
2083  // Retry the message at another server
2084  //----------------------------------------------------------------------------
2085  Status XRootDMsgHandler::RetryAtServer( const URL &url, RedirectEntry::Type entryType )
2086  {
2087  pResponse.reset();
2088  Log *log = DefaultEnv::GetLog();
2089 
2090  //--------------------------------------------------------------------------
2091  // Set up a redirect entry
2092  //--------------------------------------------------------------------------
2093  if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2094  pRdirEntry.reset( new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2095 
2096  if( pUrl.GetLocation() != url.GetLocation() )
2097  {
2098  pHosts->push_back( url );
2099 
2100  //------------------------------------------------------------------------
2101  // Assign a new stream id to the message
2102  //------------------------------------------------------------------------
2103 
2104  // first release the old stream id
2105  // (though it could be a redirect from a local
2106  // metalink file, in this case there's no SID)
2107  ClientRequestHdr *req = (ClientRequestHdr*)pRequest->GetBuffer();
2108  if( pSidMgr )
2109  {
2110  pSidMgr->ReleaseSID( req->streamid );
2111  pSidMgr.reset();
2112  }
2113 
2114  // then get the new SIDManager
2115  // (again this could be a redirect to a local
2116  // file and in this case there is no SID)
2117  if( !url.IsLocalFile() )
2118  {
2119  pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2120  Status st = pSidMgr->AllocateSID( req->streamid );
2121  if( !st.IsOK() )
2122  {
2123  log->Error( XRootDMsg, "[%s] Impossible to send message %s.",
2124  pUrl.GetHostId().c_str(),
2125  pRequest->GetObfuscatedDescription().c_str() );
2126  return st;
2127  }
2128  }
2129 
2130  pUrl = url;
2131  }
2132 
2133  if( pUrl.IsMetalink() && pFollowMetalink )
2134  {
2135  log->Debug( ExDbgMsg, "[%s] Metaling redirection for MsgHandler: %p (message: %s ).",
2136  pUrl.GetHostId().c_str(), this,
2137  pRequest->GetObfuscatedDescription().c_str() );
2138 
2139  return pPostMaster->Redirect( pUrl, pRequest, this );
2140  }
2141  else if( pUrl.IsLocalFile() )
2142  {
2143  HandleLocalRedirect( &pUrl );
2144  return Status();
2145  }
2146  else
2147  {
2148  log->Debug( ExDbgMsg, "[%s] Retry at server MsgHandler: %p (message: %s ).",
2149  pUrl.GetHostId().c_str(), this,
2150  pRequest->GetObfuscatedDescription().c_str() );
2151  return pPostMaster->Send( pUrl, pRequest, this, true, pExpiration );
2152  }
2153  }
2154 
2155  //----------------------------------------------------------------------------
2156  // Update the "tried=" part of the CGI of the current message
2157  //----------------------------------------------------------------------------
2158  void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2159  {
2160  URL::ParamsMap cgi;
2161  std::string tried;
2162 
2163  //--------------------------------------------------------------------------
2164  // In case a data server responded with a kXR_redirect and we fail at the
2165  // node where we were redirected to, the original data server should be
2166  // included in the tried CGI opaque info (instead of the current one).
2167  //--------------------------------------------------------------------------
2168  if( pEffectiveDataServerUrl )
2169  {
2170  tried = pEffectiveDataServerUrl->GetHostName();
2171  delete pEffectiveDataServerUrl;
2172  pEffectiveDataServerUrl = 0;
2173  }
2174  //--------------------------------------------------------------------------
2175  // Otherwise use the current URL.
2176  //--------------------------------------------------------------------------
2177  else
2178  tried = pUrl.GetHostName();
2179 
2180  // Report the reason for the failure to the next location
2181  //
2182  if (errNo)
2183  { if (errNo == kXR_NotFound) cgi["triedrc"] = "enoent";
2184  else if (errNo == kXR_IOError) cgi["triedrc"] = "ioerr";
2185  else if (errNo == kXR_FSError) cgi["triedrc"] = "fserr";
2186  else if (errNo == kXR_ServerError) cgi["triedrc"] = "srverr";
2187  }
2188 
2189  //--------------------------------------------------------------------------
2190  // If our current load balancer is a metamanager and we failed either
2191  // at a diskserver or at an unidentified node we also exclude the last
2192  // known manager
2193  //--------------------------------------------------------------------------
2194  if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags & kXR_attrMeta) )
2195  {
2196  HostList::reverse_iterator it;
2197  for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2198  {
2199  if( it->loadBalancer )
2200  break;
2201 
2202  tried += "," + it->url.GetHostName();
2203 
2204  if( it->flags & kXR_isManager )
2205  break;
2206  }
2207  }
2208 
2209  cgi["tried"] = tried;
2211  MessageUtils::RewriteCGIAndPath( pRequest, cgi, false, "" );
2213  }
2214 
2215  //----------------------------------------------------------------------------
2216  // Switch on the refresh flag for some requests
2217  //----------------------------------------------------------------------------
2218  void XRootDMsgHandler::SwitchOnRefreshFlag()
2219  {
2221  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2222  switch( req->header.requestid )
2223  {
2224  case kXR_locate:
2225  {
2226  req->locate.options |= kXR_refresh;
2227  break;
2228  }
2229 
2230  case kXR_open:
2231  {
2232  req->locate.options |= kXR_refresh;
2233  break;
2234  }
2235  }
2236  XRootDTransport::SetDescription( pRequest );
2238  }
2239 
2240  //------------------------------------------------------------------------
2241  // If the current thread is a worker thread from our thread-pool
2242  // handle the response, otherwise submit a new task to the thread-pool
2243  //------------------------------------------------------------------------
2244  void XRootDMsgHandler::HandleRspOrQueue()
2245  {
2246  JobManager *jobMgr = pPostMaster->GetJobManager();
2247  if( jobMgr->IsWorker() )
2248  HandleResponse();
2249  else
2250  {
2251  Log *log = DefaultEnv::GetLog();
2252  log->Debug( ExDbgMsg, "[%s] Passing to the thread-pool MsgHandler: %p (message: %s ).",
2253  pUrl.GetHostId().c_str(), this,
2254  pRequest->GetObfuscatedDescription().c_str() );
2255  jobMgr->QueueJob( new HandleRspJob( this ), 0 );
2256  }
2257  }
2258 
2259  //------------------------------------------------------------------------
2260  // Notify the FileStateHandler to retry Open() with new URL
2261  //------------------------------------------------------------------------
2262  void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2263  {
2264  Log *log = DefaultEnv::GetLog();
2265  log->Debug( ExDbgMsg, "[%s] Handling local redirect - MsgHandler: %p (message: %s ).",
2266  pUrl.GetHostId().c_str(), this,
2267  pRequest->GetObfuscatedDescription().c_str() );
2268 
2269  if( !pLFileHandler )
2270  {
2271  HandleError( XRootDStatus( stFatal, errNotSupported ) );
2272  return;
2273  }
2274 
2275  AnyObject *resp = 0;
2276  pLFileHandler->SetHostList( *pHosts );
2277  XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2278  if( !st.IsOK() )
2279  {
2280  HandleError( st );
2281  return;
2282  }
2283 
2284  pResponseHandler->HandleResponseWithHosts( new XRootDStatus(),
2285  resp,
2286  pHosts.release() );
2287  delete this;
2288 
2289  return;
2290  }
2291 
2292  //------------------------------------------------------------------------
2293  // Check if it is OK to retry this request
2294  //------------------------------------------------------------------------
2295  bool XRootDMsgHandler::IsRetriable()
2296  {
2297  std::string value;
2298  DefaultEnv::GetEnv()->GetString( "OpenRecovery", value );
2299  if( value == "true" ) return true;
2300 
2301  // check if it is a mutable open (open + truncate or open + create)
2302  ClientRequest *req = reinterpret_cast<ClientRequest*>( pRequest->GetBuffer() );
2303  if( req->header.requestid == htons( kXR_open ) )
2304  {
2305  bool _mutable = ( req->open.options & htons( kXR_delete ) ) ||
2306  ( req->open.options & htons( kXR_new ) );
2307 
2308  if( _mutable )
2309  {
2310  Log *log = DefaultEnv::GetLog();
2311  log->Debug( XRootDMsg,
2312  "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2313  pUrl.GetHostId().c_str(),
2314  pRequest->GetObfuscatedDescription().c_str() );
2315  // disallow retry if it is a mutable open
2316  return false;
2317  }
2318  }
2319 
2320  return true;
2321  }
2322 
2323  //------------------------------------------------------------------------
2324  // Check if for given request and Metalink redirector it is OK to omit
2325  // the kXR_wait and proceed straight to the next entry in the Metalink file
2326  //------------------------------------------------------------------------
2327  bool XRootDMsgHandler::OmitWait( Message &request, const URL &url )
2328  {
2329  // we can omit kXR_wait only if we have a Metalink redirector
2330  if( !url.IsMetalink() )
2331  return false;
2332 
2333  // we can omit kXR_wait only for requests that can be redirected
2334  // (kXR_read is the only stateful request that can be redirected)
2335  ClientRequest *req = reinterpret_cast<ClientRequest*>( request.GetBuffer() );
2336  if( pStateful && req->header.requestid != kXR_read )
2337  return false;
2338 
2339  // we can only omit kXR_wait if the Metalink redirect has more
2340  // replicas
2341  RedirectorRegistry &registry = RedirectorRegistry::Instance();
2342  VirtualRedirector *redirector = registry.Get( url );
2343 
2344  // we need more than one server as the current one is not reflected
2345  // in tried CGI
2346  if( redirector->Count( request ) > 1 )
2347  return true;
2348 
2349  return false;
2350  }
2351 
2352  //------------------------------------------------------------------------
2353  // Checks if the given error returned by server is retriable.
2354  //------------------------------------------------------------------------
2355  bool XRootDMsgHandler::RetriableErrorResponse( const Status &status )
2356  {
2357  // we can only retry error response if we have a valid load-balancer and
2358  // it is not our current URL
2359  if( !( pLoadBalancer.url.IsValid() &&
2360  pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2361  return false;
2362 
2363  // following errors are retriable at any load-balancer
2364  if( status.errNo == kXR_FSError || status.errNo == kXR_IOError ||
2365  status.errNo == kXR_ServerError || status.errNo == kXR_NotFound ||
2366  status.errNo == kXR_Overloaded || status.errNo == kXR_NoMemory )
2367  return true;
2368 
2369  // check if the load-balancer is a meta-manager, if yes there are
2370  // more errors that can be recovered
2371  if( !( pLoadBalancer.flags & kXR_attrMeta ) ) return false;
2372 
2373  // those errors are retriable for meta-managers
2374  if( status.errNo == kXR_Unsupported || status.errNo == kXR_FileLocked )
2375  return true;
2376 
2377  // in case of not-authorized error there is an imposed upper limit
2378  // on how many times we can retry this error
2379  if( status.errNo == kXR_NotAuthorized )
2380  {
2381  int limit = DefaultNotAuthorizedRetryLimit;
2382  DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", limit );
2383  bool ret = pNotAuthorizedCounter < limit;
2384  ++pNotAuthorizedCounter;
2385  if( !ret )
2386  {
2387  Log *log = DefaultEnv::GetLog();
2388  log->Error( XRootDMsg,
2389  "[%s] Reached limit of NotAuthorized retries!",
2390  pUrl.GetHostId().c_str() );
2391  }
2392  return ret;
2393  }
2394 
2395  // check if the load-balancer is a virtual (metalink) redirector,
2396  // if yes there are even more errors that can be recovered
2397  if( !( pLoadBalancer.flags & kXR_attrVirtRdr ) ) return false;
2398 
2399  // those errors are retriable for virtual (metalink) redirectors
2400  if( status.errNo == kXR_noserver || status.errNo == kXR_ArgTooLong )
2401  return true;
2402 
2403  // otherwise it is a non-retriable error
2404  return false;
2405  }
2406 
2407  //------------------------------------------------------------------------
2408  // Dump the redirect-trace-back into the log file
2409  //------------------------------------------------------------------------
2410  void XRootDMsgHandler::DumpRedirectTraceBack()
2411  {
2412  if( pRedirectTraceBack.empty() ) return;
2413 
2414  std::stringstream sstrm;
2415 
2416  sstrm << "Redirect trace-back:\n";
2417 
2418  int counter = 0;
2419 
2420  auto itr = pRedirectTraceBack.begin();
2421  sstrm << '\t' << counter << ". " << (*itr)->ToString() << '\n';
2422 
2423  auto prev = itr;
2424  ++itr;
2425  ++counter;
2426 
2427  for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2428  sstrm << '\t' << counter << ". "
2429  << (*itr)->ToString( (*prev)->status.IsOK() ) << '\n';
2430 
2431  int authlimit = DefaultNotAuthorizedRetryLimit;
2432  DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", authlimit );
2433 
2434  bool warn = !pStatus.IsOK() &&
2435  ( pStatus.code == errNotFound ||
2436  pStatus.code == errRedirectLimit ||
2437  ( pStatus.code == errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2438 
2439  Log *log = DefaultEnv::GetLog();
2440  if( warn )
2441  log->Warning( XRootDMsg, "%s", sstrm.str().c_str() );
2442  else
2443  log->Debug( XRootDMsg, "%s", sstrm.str().c_str() );
2444  }
2445 
2446  // Read data from buffer
2447  //------------------------------------------------------------------------
2448  template<typename T>
2449  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, T& result )
2450  {
2451  if( sizeof( T ) > buflen ) return Status( stError, errDataError );
2452 
2453  memcpy(&result, buffer, sizeof(T));
2454 
2455  buffer += sizeof( T );
2456  buflen -= sizeof( T );
2457 
2458  return Status();
2459  }
2460 
2461  //------------------------------------------------------------------------
2462  // Read a string from buffer
2463  //------------------------------------------------------------------------
2464  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result )
2465  {
2466  Status status;
2467  char c = 0;
2468 
2469  while( true )
2470  {
2471  if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2472  return status;
2473 
2474  if( c == 0 ) break;
2475  result += c;
2476  }
2477 
2478  return status;
2479  }
2480 
2481  //------------------------------------------------------------------------
2482  // Read a string from buffer
2483  //------------------------------------------------------------------------
2484  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen,
2485  size_t size, std::string &result )
2486  {
2487  Status status;
2488 
2489  if( size > buflen ) return Status( stError, errDataError );
2490 
2491  result.append( buffer, size );
2492  buffer += size;
2493  buflen -= size;
2494 
2495  return status;
2496  }
2497 
2498 }
@ kXR_NotAuthorized
Definition: XProtocol.hh:1000
@ kXR_NotFound
Definition: XProtocol.hh:1001
@ kXR_FileLocked
Definition: XProtocol.hh:993
@ kXR_noReplicas
Definition: XProtocol.hh:1019
@ kXR_Unsupported
Definition: XProtocol.hh:1003
@ kXR_ServerError
Definition: XProtocol.hh:1002
@ kXR_Overloaded
Definition: XProtocol.hh:1014
@ kXR_ArgTooLong
Definition: XProtocol.hh:992
@ kXR_noserver
Definition: XProtocol.hh:1004
@ kXR_IOError
Definition: XProtocol.hh:997
@ kXR_FSError
Definition: XProtocol.hh:995
@ kXR_NoMemory
Definition: XProtocol.hh:998
#define kXR_isManager
Definition: XProtocol.hh:1156
union ServerResponse::@0 body
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
struct ClientFattrRequest fattr
Definition: XProtocol.hh:854
#define kXR_collapseRedir
Definition: XProtocol.hh:1167
ServerResponseStatus status
Definition: XProtocol.hh:1309
#define kXR_attrMeta
Definition: XProtocol.hh:1159
kXR_char streamid[2]
Definition: XProtocol.hh:156
kXR_char streamid[2]
Definition: XProtocol.hh:914
kXR_unt16 options
Definition: XProtocol.hh:481
struct ClientDirlistRequest dirlist
Definition: XProtocol.hh:852
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_refresh
Definition: XProtocol.hh:459
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_retstat
Definition: XProtocol.hh:463
struct ClientOpenRequest open
Definition: XProtocol.hh:860
@ kXR_waitresp
Definition: XProtocol.hh:906
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_oksofar
Definition: XProtocol.hh:900
@ kXR_status
Definition: XProtocol.hh:907
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_attn
Definition: XProtocol.hh:901
@ kXR_wait
Definition: XProtocol.hh:905
@ kXR_error
Definition: XProtocol.hh:903
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1261
struct ClientRequestHdr header
Definition: XProtocol.hh:846
#define kXR_recoverWrts
Definition: XProtocol.hh:1166
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_chmod
Definition: XProtocol.hh:114
@ kXR_dirlist
Definition: XProtocol.hh:116
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_rm
Definition: XProtocol.hh:126
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_set
Definition: XProtocol.hh:130
@ kXR_rmdir
Definition: XProtocol.hh:127
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_protocol
Definition: XProtocol.hh:118
@ kXR_mv
Definition: XProtocol.hh:121
@ kXR_ping
Definition: XProtocol.hh:123
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_locate
Definition: XProtocol.hh:139
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
@ kXR_prepare
Definition: XProtocol.hh:133
#define kXR_isServer
Definition: XProtocol.hh:1157
#define kXR_attrVirtRdr
Definition: XProtocol.hh:1162
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1260
union ServerResponseV2::@1 info
#define kXR_PROTOCOLVERSION
Definition: XProtocol.hh:70
@ kXR_vfs
Definition: XProtocol.hh:763
struct ClientStatRequest stat
Definition: XProtocol.hh:873
kXR_char options
Definition: XProtocol.hh:769
#define kXR_ecRedir
Definition: XProtocol.hh:1168
struct ClientLocateRequest locate
Definition: XProtocol.hh:856
ServerResponseHeader hdr
Definition: XProtocol.hh:1287
long long kXR_int64
Definition: XPtypes.hh:98
int kXR_int32
Definition: XPtypes.hh:89
unsigned short kXR_unt16
Definition: XPtypes.hh:67
unsigned char kXR_char
Definition: XPtypes.hh:65
#define unlikely(x)
std::string obfuscateAuth(const std::string &input)
void Get(Type &object)
Retrieve the object being held.
Object for reading out data from the PgRead response.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
void SetHostList(const HostList &hostList)
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
Handle diagnostics.
Definition: XrdClLog.hh:101
@ ErrorMsg
report errors
Definition: XrdClLog.hh:109
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
Definition: XrdClSocket.hh:43
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
Definition: XrdClSocket.cc:461
bool IsEncrypted()
Definition: XrdClSocket.cc:867
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:458
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
Definition: XrdClURL.cc:61
void SetPassword(const std::string &password)
Set the password.
Definition: XrdClURL.hh:161
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:395
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClURL.cc:337
const std::string & GetUserName() const
Get the username.
Definition: XrdClURL.hh:135
const std::string & GetPassword() const
Get the password.
Definition: XrdClURL.hh:153
bool IsLocalFile() const
Definition: XrdClURL.cc:467
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
void SetProtocol(const std::string &protocol)
Set protocol.
Definition: XrdClURL.hh:126
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:445
void SetUserName(const std::string &userName)
Set the username.
Definition: XrdClURL.hh:143
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Definition: XrdClUtils.cc:703
Handle/Process/Forward XRootD messages.
virtual uint16_t Examine(std::shared_ptr< Message > &msg)
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
const Message * GetRequest() const
Get the request pointer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status)
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
virtual void OnStatusReady(const Message *message, XRootDStatus status)
The requested action has been performed and the status is available.
virtual bool IsRaw() const
Are we a raw writer or not?
virtual void Process()
Process the message if it was "taken" by the examine action.
virtual uint16_t InspectStatusRsp()
virtual uint16_t GetSid() const
const std::string & GetErrorMessage() const
Get error message.
static void SetDescription(Message *msg)
Get the description of a message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errRedirectLimit
Definition: XrdClStatus.hh:102
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errNotFound
Definition: XrdClStatus.hh:100
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint64_t ExDbgMsg
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errInvalidRedirectURL
Definition: XrdClStatus.hh:98
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errAuthFailed
Definition: XrdClStatus.hh:88
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85
none object for initializing empty Optional
XrdSysError Log
Definition: XrdConfig.cc:112
@ kXR_PartialResult
Definition: XProtocol.hh:1250
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
URL url
URL of the host.
uint32_t flags
Host type.
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version