XRootD
XrdClMetalinkRedirector.cc
Go to the documentation of this file.
1 /*
2  * XrdClMetalinkRedirector.cc
3  *
4  * Created on: May 2, 2016
5  * Author: simonm
6  */
7 
9 
10 #include "XrdCl/XrdClConstants.hh"
11 #include "XrdCl/XrdClDefaultEnv.hh"
12 #include "XrdCl/XrdClLog.hh"
14 #include "XrdCl/XrdClFile.hh"
16 #include "XrdCl/XrdClURL.hh"
17 #include "XrdCl/XrdClJobManager.hh"
18 #include "XrdCl/XrdClUtils.hh"
20 #include "XrdCl/XrdClPostMaster.hh"
21 
22 #include "XrdXml/XrdXmlMetaLink.hh"
23 
24 #include "XProtocol/XProtocol.hh"
25 
26 #include <arpa/inet.h>
27 #include <fcntl.h>
28 
29 namespace XrdCl
30 {
31 
32  void DeallocArgs( XRootDStatus *status, AnyObject *response,
33  HostList *hostList )
34  {
35  delete status;
36  delete response;
37  delete hostList;
38  }
39 
40  //----------------------------------------------------------------------------
41  // Read metalink response handler.
42  //
43  // If the whole file has been read triggers parsing and
44  // initializes the Redirector, otherwise triggers another
45  // read.
46  //----------------------------------------------------------------------------
48  {
49  public:
50  //----------------------------------------------------------------------------
51  // Constructor
52  //----------------------------------------------------------------------------
54  const std::string &content = "" ) :
55  pRedirector( mr ), pUserHandler( userHandler ), pBuffer(
56  new char[DefaultCPChunkSize] ), pContent( content )
57  {
58  }
59 
60  //----------------------------------------------------------------------------
61  // Destructor
62  //----------------------------------------------------------------------------
64  {
65  delete[] pBuffer;
66  }
67 
68  //----------------------------------------------------------------------------
69  // Handle the response
70  //----------------------------------------------------------------------------
71  virtual void HandleResponse( XRootDStatus *status, AnyObject *response )
72  {
73  try
74  {
75  // check the status
76  if( !status->IsOK() )
77  throw status;
78  // we don't need it anymore
79  delete status;
80  // make sure we got a response
81  if( !response )
82  throw new XRootDStatus( stError, errInternal );
83  // make sure the response is not empty
84  ChunkInfo * info = 0;
85  response->Get( info );
86  if( !info )
87  throw new XRootDStatus( stError, errInternal );
88  uint32_t bytesRead = info->length;
89  uint64_t offset = info->offset + bytesRead;
90  pContent += std::string( pBuffer, bytesRead );
91  // are we done ?
92  if( bytesRead > 0 )
93  {
94  // lets try to read another chunk
95  MetalinkReadHandler * mrh = new MetalinkReadHandler( pRedirector,
96  pUserHandler, pContent );
97  XRootDStatus st = pRedirector->pFile->Read( offset,
98  DefaultCPChunkSize, mrh->GetBuffer(), mrh );
99  if( !st.IsOK() )
100  {
101  delete mrh;
102  throw new XRootDStatus( st );
103  }
104  }
105  else // we have the whole metalink file
106  {
107  // we don't need the File object anymore
108  delete pRedirector->pFile;
109  pRedirector->pFile = 0;
110  // now we can parse the metalink file
111  XRootDStatus st = pRedirector->Parse( pContent );
112  // now with the redirector fully initialized we can handle pending requests
113  pRedirector->FinalizeInitialization();
114  // we are done, pass the status to the user (whatever it is)
115  if( pUserHandler )
116  pUserHandler->HandleResponse( new XRootDStatus( st ), 0 );
117  }
118  // clean up
119  delete response;
120  }
121  catch( XRootDStatus *status )
122  {
123  pRedirector->FinalizeInitialization( *status );
124  // if we were not able to read from the metalink,
125  // propagate the error to the user handler
126  if( pUserHandler )
127  pUserHandler->HandleResponse( status, 0 );
128  else
129  DeallocArgs( status, response, 0 );
130  }
131 
132  delete this;
133  }
134 
135  //----------------------------------------------------------------------------
136  // Get the receive-buffer
137  //----------------------------------------------------------------------------
138  char * GetBuffer()
139  {
140  return pBuffer;
141  }
142 
143  private:
144 
145  MetalinkRedirector *pRedirector;
146  ResponseHandler *pUserHandler;
147  char *pBuffer;
148  std::string pContent;
149  };
150 
151  //----------------------------------------------------------------------------
152  // Open metalink response handler.
153  //
154  // After successful open trrigers a read.
155  //----------------------------------------------------------------------------
157  {
158  public:
159  //----------------------------------------------------------------------------
160  // Constructor
161  //----------------------------------------------------------------------------
163  ResponseHandler *userHandler ) :
164  pRedirector( mr ), pUserHandler( userHandler )
165  {
166  }
167 
168  //----------------------------------------------------------------------------
169  // Handle the response
170  //----------------------------------------------------------------------------
171  virtual void HandleResponseWithHosts( XRootDStatus *status,
172  AnyObject *response, HostList *hostList )
173  {
174  try
175  {
176  if( status->IsOK() )
177  {
178  delete status;
179  // download the content
180  MetalinkReadHandler *mrh = new MetalinkReadHandler( pRedirector,
181  pUserHandler );
182  XRootDStatus st = pRedirector->pFile->Read( 0, DefaultCPChunkSize,
183  mrh->GetBuffer(), mrh );
184  if( !st.IsOK() )
185  {
186  delete mrh;
187  throw new XRootDStatus( stError, errInternal );
188  } else
189  {
190  delete response;
191  delete hostList;
192  }
193  } else
194  throw status;
195  } catch( XRootDStatus *status )
196  {
197  pRedirector->FinalizeInitialization( *status );
198  // if we were not able to schedule a read
199  // pass an error to the user handler
200  if( pUserHandler )
201  pUserHandler->HandleResponseWithHosts( status, response, hostList );
202  else
203  DeallocArgs( status, response, hostList );
204  }
205 
206  delete this;
207  }
208 
209  private:
210 
211  MetalinkRedirector *pRedirector;
212  ResponseHandler *pUserHandler;
213  };
214 
215  //----------------------------------------------------------------------------
216  // Constructor
217  //----------------------------------------------------------------------------
218  MetalinkRedirector::MetalinkRedirector( const std::string & url ) :
219  pUrl( url ), pFile( new File( File::DisableVirtRedirect ) ), pReady(
220  false ), pFileSize( -1 )
221  {
222  }
223 
224  //----------------------------------------------------------------------------
225  // Destructor
226  //----------------------------------------------------------------------------
228  {
229  delete pFile;
230  }
231 
232  //----------------------------------------------------------------------------
233  // Initializes the object with the content of the metalink file
234  //----------------------------------------------------------------------------
236  {
237  MetalinkOpenHandler *handler = new MetalinkOpenHandler( this, userHandler );
238  XRootDStatus st = pFile->Open( pUrl, OpenFlags::Read, Access::None, handler,
239  0 );
240  if( !st.IsOK() )
241  delete handler;
242 
243  return st;
244  }
245 
246  //----------------------------------------------------------------------------
247  // Parses the metalink file
248  //----------------------------------------------------------------------------
249  XRootDStatus MetalinkRedirector::Parse( const std::string &metalink )
250  {
251  Log *log = DefaultEnv::GetLog();
252  Env *env = DefaultEnv::GetEnv();
253  std::string glfnRedirector;
254  env->GetString( "GlfnRedirector", glfnRedirector );
255  // parse the metalink
256  XrdXmlMetaLink parser( "root:xroot:roots:xroots:file:", "xroot:",
257  glfnRedirector.empty() ? 0 : glfnRedirector.c_str() );
258  int size = 0;
259  XrdOucFileInfo **fileInfos = parser.ConvertAll( metalink.c_str(), size,
260  metalink.size() );
261  if( !fileInfos )
262  {
263  int ecode;
264  const char * etxt = parser.GetStatus( ecode );
265  log->Error( UtilityMsg,
266  "Failed to parse the metalink file: %s (error code: %d)", etxt,
267  ecode );
268  return XRootDStatus( stError, errDataError, 0,
269  "Malformed or corrupted metalink file." );
270  }
271  // we are expecting just one file per metalink (as agreed with RUCIO)
272  if( size != 1 )
273  {
274  log->Error( UtilityMsg, "Expected only one file per metalink." );
275  return XRootDStatus( stError, errDataError );
276  }
277 
278  InitCksum( fileInfos );
279  InitReplicas( fileInfos );
280  pTarget = fileInfos[0]->GetTargetName();
281  pFileSize = fileInfos[0]->GetSize();
282 
283  XrdXmlMetaLink::DeleteAll( fileInfos, size );
284 
285  return XRootDStatus();
286  }
287 
288  //----------------------------------------------------------------------------
289  // Finalize the initialization process:
290  // - mark as ready
291  // - setup the status
292  // - and handle pending redirects
293  //----------------------------------------------------------------------------
294  void MetalinkRedirector::FinalizeInitialization( const XRootDStatus &status )
295  {
296  XrdSysMutexHelper scopedLock( pMutex );
297  pReady = true;
298  pStatus = status;
299  // Handle pending redirects (those that were
300  // submitted before the metalink has been loaded)
301  while( !pPendingRedirects.empty() )
302  {
303  const Message *msg = pPendingRedirects.front().first;
304  MsgHandler *handler = pPendingRedirects.front().second;
305  pPendingRedirects.pop_front();
306  if( !handler || !msg )
307  continue;
308  HandleRequestImpl( msg, handler );
309  }
310  }
311 
312  //----------------------------------------------------------------------------
313  // Generates redirect response for the given request
314  //----------------------------------------------------------------------------
315  std::shared_ptr<Message> MetalinkRedirector::GetResponse( const Message *msg ) const
316  {
317  if( !pStatus.IsOK() )
318  return GetErrorMsg( msg, "Could not load the Metalink file.",
319  static_cast<XErrorCode>( XProtocol::mapError( pStatus.errNo ) ) );
320  const ClientRequestHdr *req =
321  reinterpret_cast<const ClientRequestHdr*>( msg->GetBuffer() );
322  // get the redirect location
323  std::string replica;
324  if( !GetReplica( *msg, replica ).IsOK() )
325  return GetErrorMsg( msg, "Metalink: no more replicas to try.", kXR_noReplicas );
326  auto resp = std::make_shared<Message>( sizeof(ServerResponse) );
327  ServerResponse* response =
328  reinterpret_cast<ServerResponse*>( resp->GetBuffer() );
329  response->hdr.status = kXR_redirect;
330  response->hdr.streamid[0] = req->streamid[0];
331  response->hdr.streamid[1] = req->streamid[1];
332  response->hdr.dlen = 4 + replica.size(); // 4 bytes are reserved for port number
333  response->body.redirect.port = -1; // this indicates that the full URL will be given in the 'host' field
334  memcpy( response->body.redirect.host, replica.c_str(), replica.size() );
335  return resp;
336  }
337 
338  //----------------------------------------------------------------------------
339  // Generates error response for the given request
340  //----------------------------------------------------------------------------
341  std::shared_ptr<Message> MetalinkRedirector::GetErrorMsg( const Message *msg,
342  const std::string &errMsg, XErrorCode code ) const
343  {
344  const ClientRequestHdr *req =
345  reinterpret_cast<const ClientRequestHdr*>( msg->GetBuffer() );
346 
347  auto resp = std::make_shared<Message>( sizeof(ServerResponse) );
348  ServerResponse* response =
349  reinterpret_cast<ServerResponse*>( resp->GetBuffer() );
350 
351  response->hdr.status = kXR_error;
352  response->hdr.streamid[0] = req->streamid[0];
353  response->hdr.streamid[1] = req->streamid[1];
354  response->hdr.dlen = 4 + errMsg.size();
355  response->body.error.errnum = htonl( code );
356  memcpy( response->body.error.errmsg, errMsg.c_str(), errMsg.size() );
357 
358  return resp;
359  }
360 
361  //----------------------------------------------------------------------------
362  // Creates an instant redirect response for the given message
363  // or an error response if there are no more replicas to try.
364  // The virtual response is being handled by the given handler.
365  //----------------------------------------------------------------------------
366  XRootDStatus MetalinkRedirector::HandleRequestImpl( const Message *msg,
367  MsgHandler *handler )
368  {
369  auto resp = GetResponse( msg );
370  JobManager *jobMan = DefaultEnv::GetPostMaster()->GetJobManager();
371  RedirectJob *job = new RedirectJob( handler, std::move( resp ) );
372  jobMan->QueueJob( job );
373  return XRootDStatus();
374  }
375 
376  //----------------------------------------------------------------------------
377  // If the MetalinkRedirector is initialized creates an instant
378  // redirect response, otherwise queues the request until initialization
379  // is done.
380  //----------------------------------------------------------------------------
382  MsgHandler *handler )
383  {
384  XrdSysMutexHelper scopedLck( pMutex );
385  // if the metalink data haven't been loaded yet, make it pending
386  if( !pReady )
387  {
388  pPendingRedirects.push_back(
389  std::make_pair( msg, handler ) );;
390  return XRootDStatus();
391  }
392  // otherwise generate a virtual response
393  return HandleRequestImpl( msg, handler );
394  }
395 
396  //----------------------------------------------------------------------------
398  //----------------------------------------------------------------------------
400  {
401  ReplicaList::const_iterator itr = GetReplica( req );
402  return pReplicas.end() - itr;
403  }
404 
405  //----------------------------------------------------------------------------
406  // Gets the file checksum if specified in the metalink
407  //----------------------------------------------------------------------------
408  void MetalinkRedirector::InitCksum( XrdOucFileInfo **fileInfos )
409  {
410  const char *chvalue = 0, *chtype = 0;
411  while( ( chtype = fileInfos[0]->GetDigest( chvalue ) ) )
412  {
413  pChecksums[chtype] = chvalue;
414  }
415  }
416 
417  //----------------------------------------------------------------------------
418  // Initializes replica list
419  //----------------------------------------------------------------------------
420  void MetalinkRedirector::InitReplicas( XrdOucFileInfo **fileInfos )
421  {
422  URL replica;
423  const char *url = 0;
424  while( ( url = fileInfos[0]->GetUrl() ) )
425  {
426  replica = URL( url );
427  if( !replica.IsValid() || replica.GetURL().size() > 4096 )
428  continue; // this is the internal limit (defined in the protocol)
429  pReplicas.push_back( replica.GetURL() );
430  }
431  }
432 
433  //----------------------------------------------------------------------------
435  //----------------------------------------------------------------------------
436  XRootDStatus MetalinkRedirector::GetReplica( const Message &msg,
437  std::string &replica ) const
438  {
439  ReplicaList::const_iterator itr = GetReplica( msg );
440  if( itr == pReplicas.end() )
441  return XRootDStatus( stError, errNotFound );
442 
444  int tlsmtl = DefaultTlsMetalink;
445  env->GetInt( "TlsMetalink", tlsmtl );
446  URL url( *itr );
447  if( tlsmtl && ( url.GetProtocol() == "root" || url.GetProtocol() == "xroot" ) )
448  url.SetProtocol( "roots" );
449  replica = url.GetURL();
450 
451  return XRootDStatus();
452  }
453 
454  //----------------------------------------------------------------------------
456  //----------------------------------------------------------------------------
457  MetalinkRedirector::ReplicaList::const_iterator
458  MetalinkRedirector::GetReplica( const Message &msg ) const
459  {
460  if( pReplicas.empty() )
461  return pReplicas.cend();
462 
463  std::string tried;
464  if( !GetCgiInfo( msg, "tried", tried ).IsOK() )
465  return pReplicas.cbegin();
466 
467  ReplicaList triedList;
468  Utils::splitString( triedList, tried, "," );
469  std::set<std::string> triedSet( triedList.begin(), triedList.end() );
470 
471  ReplicaList::const_iterator itr = pReplicas.begin();
472  for( ; itr != pReplicas.end(); ++itr )
473  {
474  URL url( *itr );
475  if( !triedSet.count( url.GetHostName() ) ) break;
476  }
477 
478  return itr;
479  }
480 
481  //----------------------------------------------------------------------------
483  //----------------------------------------------------------------------------
484  XRootDStatus MetalinkRedirector::GetCgiInfo( const Message &msg,
485  const std::string &key, std::string &value ) const
486  {
487  const ClientRequestHdr *req =
488  reinterpret_cast<const ClientRequestHdr*>( msg.GetBuffer() );
489  kXR_int32 dlen = msg.IsMarshalled() ? ntohl( req->dlen ) : req->dlen;
490  std::string url( msg.GetBuffer( 24 ), dlen );
491  size_t pos = url.find( '?' );
492  if( pos == std::string::npos )
493  return XRootDStatus( stError );
494  size_t start = url.find( key, pos );
495  if( start == std::string::npos )
496  return XRootDStatus( stError );
497  start += key.size() + 1; // the +1 stands for the '=' sign that is not part of the key
498  size_t end = url.find( '&', start );
499  if( end == std::string::npos )
500  end = url.size();
501  value = url.substr( start, end - start );
502  return XRootDStatus();
503  }
504 
505 } /* namespace XrdCl */
XErrorCode
Definition: XProtocol.hh:989
@ kXR_noReplicas
Definition: XProtocol.hh:1019
union ServerResponse::@0 body
kXR_char streamid[2]
Definition: XProtocol.hh:156
kXR_char streamid[2]
Definition: XProtocol.hh:914
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_error
Definition: XProtocol.hh:903
kXR_int32 dlen
Definition: XProtocol.hh:159
ServerResponseHeader hdr
Definition: XProtocol.hh:1287
int kXR_int32
Definition: XPtypes.hh:89
static int mapError(int rc)
Definition: XProtocol.hh:1361
void Get(Type &object)
Retrieve the object being held.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A file.
Definition: XrdClFile.hh:46
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:206
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:99
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
MetalinkOpenHandler(MetalinkRedirector *mr, ResponseHandler *userHandler)
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
MetalinkReadHandler(MetalinkRedirector *mr, ResponseHandler *userHandler, const std::string &content="")
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
XRootDStatus Load(ResponseHandler *userHandler)
virtual ~MetalinkRedirector()
Destructor.
XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)
MetalinkRedirector(const std::string &url)
virtual int Count(Message &req) const
Count how many replicas do we have left to try for given request.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
long long GetSize()
const char * GetTargetName()
const int DefaultCPChunkSize
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errNotFound
Definition: XrdClStatus.hh:100
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const int DefaultTlsMetalink
const uint64_t UtilityMsg
void DeallocArgs(XRootDStatus *status, AnyObject *response, HostList *hostList)
Describe a data chunk for vector read.
uint32_t length
offset in the file
@ Read
Open only for reading.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:148