XRootD
XrdClMessageUtils.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClDefaultEnv.hh"
28 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClAnyObject.hh"
30 #include "XrdCl/XrdClSIDManager.hh"
31 #include "XrdCl/XrdClPostMaster.hh"
35 
36 #include "XProtocol/XProtocol.hh"
37 
38 namespace XrdCl
39 {
40  //----------------------------------------------------------------------------
41  // Send a message
42  //----------------------------------------------------------------------------
44  Message *msg,
45  ResponseHandler *handler,
46  MessageSendParams &sendParams,
47  LocalFileHandler *lFileHandler )
48  {
49  //--------------------------------------------------------------------------
50  // Get the stuff needed to send the message
51  //--------------------------------------------------------------------------
52  Log *log = DefaultEnv::GetLog();
53  PostMaster *postMaster = DefaultEnv::GetPostMaster();
54  XRootDStatus st;
55 
56  if( !postMaster )
58 
59  log->Dump( XRootDMsg, "[%s] Sending message %s",
60  url.GetHostId().c_str(), msg->GetObfuscatedDescription().c_str() );
61 
62  //--------------------------------------------------------------------------
63  // Get an instance of SID manager object
64  //--------------------------------------------------------------------------
65  std::shared_ptr<SIDManager> sidMgr( SIDMgrPool::Instance().GetSIDMgr( url ) );
67 
68  //--------------------------------------------------------------------------
69  // Allocate the SID and marshall the message
70  //--------------------------------------------------------------------------
71  st = sidMgr->AllocateSID( req->streamid );
72  if( !st.IsOK() )
73  {
74  log->Error( XRootDMsg, "[%s] Unable to allocate stream id",
75  url.GetHostId().c_str() );
76  return st;
77  }
78 
79  //--------------------------------------------------------------------------
80  // Make sure that in case of checkpoint xeq request the embedded request
81  // SID is matching
82  //--------------------------------------------------------------------------
83  if( req->requestid == kXR_chkpoint )
84  {
85  ClientRequest *r = (ClientRequest*)req;
86  if( r->chkpoint.opcode == kXR_ckpXeq )
87  {
88  ClientRequest *xeq = (ClientRequest*) msg->GetBuffer( sizeof( ClientChkPointRequest ) );
89  xeq->header.streamid[0] = req->streamid[0];
90  xeq->header.streamid[1] = req->streamid[1];
91  }
92  }
93 
95 
96  //--------------------------------------------------------------------------
97  // Create and set up the message handler
98  //--------------------------------------------------------------------------
99  XRootDMsgHandler *msgHandler;
100  msgHandler = new XRootDMsgHandler( msg, handler, &url, sidMgr, lFileHandler );
101  msgHandler->SetExpiration( sendParams.expires );
102  msgHandler->SetRedirectAsAnswer( !sendParams.followRedirects );
103  msgHandler->SetOksofarAsAnswer( sendParams.chunkedResponse );
104  msgHandler->SetChunkList( sendParams.chunkList );
105  msgHandler->SetKernelBuffer( sendParams.kbuff );
106  msgHandler->SetRedirectCounter( sendParams.redirectLimit );
107  msgHandler->SetStateful( sendParams.stateful );
108  msgHandler->SetCrc32cDigests( std::move( sendParams.crc32cDigests ) );
109 
110  if( sendParams.loadBalancer.url.IsValid() )
111  msgHandler->SetLoadBalancer( sendParams.loadBalancer );
112 
113  HostList *list = 0;
114  if( sendParams.hostList )
115  {
116  list = sendParams.hostList;
117  sendParams.hostList = nullptr;
118  }
119  else
120  list = new HostList();
121  list->push_back( url );
122  msgHandler->SetHostList( list );
123 
124  //--------------------------------------------------------------------------
125  // Send the message
126  //--------------------------------------------------------------------------
127  st = postMaster->Send( url, msg, msgHandler, sendParams.stateful,
128  sendParams.expires );
129  if( !st.IsOK() )
130  {
132  log->Error( XRootDMsg, "[%s] Unable to send the message %s: %s",
133  url.GetHostId().c_str(), msg->GetObfuscatedDescription().c_str(),
134  st.ToString().c_str() );
135 
136  // we need to reassign req as its current value might have been
137  // invalidated in the meanwhile due to a realloc
138  req = (ClientRequestHdr*)msg->GetBuffer();
139  // Release the SID as the request was never send
140  sidMgr->ReleaseSID( req->streamid );
141  delete msgHandler;
142  return st;
143  }
144  return XRootDStatus();
145  }
146 
147  //----------------------------------------------------------------------------
148  // Redirect a message
149  //----------------------------------------------------------------------------
151  Message *msg,
152  ResponseHandler *handler,
153  MessageSendParams &sendParams,
154  LocalFileHandler *lFileHandler )
155  {
156  //--------------------------------------------------------------------------
157  // Register a new virtual redirector
158  //--------------------------------------------------------------------------
160  Status st = registry.Register( url );
161  if( !st.IsOK() )
162  return st;
163 
164  //--------------------------------------------------------------------------
165  // Get the stuff needed to send the message
166  //--------------------------------------------------------------------------
167  Log *log = DefaultEnv::GetLog();
168  PostMaster *postMaster = DefaultEnv::GetPostMaster();
169 
170  if( !postMaster )
171  return Status( stError, errUninitialized );
172 
173  log->Dump( XRootDMsg, "[%s] Redirecting message %s",
174  url.GetHostId().c_str(), msg->GetObfuscatedDescription().c_str() );
175 
177 
178  //--------------------------------------------------------------------------
179  // Create and set up the message handler
180  //--------------------------------------------------------------------------
181  XRootDMsgHandler *msgHandler;
182  msgHandler = new XRootDMsgHandler( msg, handler, &url, std::shared_ptr<SIDManager>(), lFileHandler );
183  msgHandler->SetExpiration( sendParams.expires );
184  msgHandler->SetRedirectAsAnswer( !sendParams.followRedirects );
185  msgHandler->SetOksofarAsAnswer( sendParams.chunkedResponse );
186  msgHandler->SetChunkList( sendParams.chunkList );
187  msgHandler->SetRedirectCounter( sendParams.redirectLimit );
188  msgHandler->SetFollowMetalink( true );
189 
190  HostInfo info( url, true );
192  sendParams.loadBalancer = info;
193  msgHandler->SetLoadBalancer( info );
194 
195  HostList *list = 0;
196  list = new HostList();
197  list->push_back( info );
198  msgHandler->SetHostList( list );
199 
200  //--------------------------------------------------------------------------
201  // Redirect the message
202  //--------------------------------------------------------------------------
203  st = postMaster->Redirect( url, msg, msgHandler );
204  if( !st.IsOK() )
205  {
207  log->Error( XRootDMsg, "[%s] Unable to send the message %s: %s",
208  url.GetHostId().c_str(), msg->GetObfuscatedDescription().c_str(),
209  st.ToString().c_str() );
210  delete msgHandler;
211  delete list;
212  return st;
213  }
214  return Status();
215  }
216 
217  //----------------------------------------------------------------------------
218  // Process sending params
219  //----------------------------------------------------------------------------
221  {
222  //--------------------------------------------------------------------------
223  // Timeout
224  //--------------------------------------------------------------------------
225  Env *env = DefaultEnv::GetEnv();
226  if( sendParams.timeout == 0 )
227  {
228  int requestTimeout = DefaultRequestTimeout;
229  env->GetInt( "RequestTimeout", requestTimeout );
230  sendParams.timeout = requestTimeout;
231  }
232 
233  if( sendParams.expires == 0 )
234  sendParams.expires = ::time(0)+sendParams.timeout;
235 
236  //--------------------------------------------------------------------------
237  // Redirect limit
238  //--------------------------------------------------------------------------
239  if( sendParams.redirectLimit == 0 )
240  {
241  int redirectLimit = DefaultRedirectLimit;
242  env->GetInt( "RedirectLimit", redirectLimit );
243  sendParams.redirectLimit = redirectLimit;
244  }
245  }
246 
247  //----------------------------------------------------------------------------
249  //----------------------------------------------------------------------------
251  const URL::ParamsMap &newCgi,
252  bool replace,
253  const std::string &newPath )
254  {
255  ClientRequest *req = (ClientRequest *)msg->GetBuffer();
256  switch( req->header.requestid )
257  {
258  case kXR_chmod:
259  case kXR_mkdir:
260  case kXR_mv:
261  case kXR_open:
262  case kXR_rm:
263  case kXR_rmdir:
264  case kXR_stat:
265  case kXR_truncate:
266  {
267  //----------------------------------------------------------------------
268  // Get the pointer to the appropriate path
269  //----------------------------------------------------------------------
270  char *path = msg->GetBuffer( 24 );
271  size_t length = req->header.dlen;
272  if( req->header.requestid == kXR_mv )
273  {
274  for( int i = 0; i < req->header.dlen; ++i, ++path, --length )
275  if( *path == ' ' )
276  break;
277  ++path;
278  --length;
279  }
280 
281  //----------------------------------------------------------------------
282  // Create a fake URL from an existing CGI
283  //----------------------------------------------------------------------
284  char *pathWithNull = new char[length+1];
285  memcpy( pathWithNull, path, length );
286  pathWithNull[length] = 0;
287  std::ostringstream o;
288  o << "fake://fake:111/" << pathWithNull;
289  delete [] pathWithNull;
290 
291  URL currentPath( o.str() );
292  URL::ParamsMap currentCgi = currentPath.GetParams();
293  MergeCGI( currentCgi, newCgi, replace );
294  currentPath.SetParams( currentCgi );
295  if( !newPath.empty() )
296  currentPath.SetPath( newPath );
297  std::string newPathWitParams = currentPath.GetPathWithFilteredParams();
298 
299  //----------------------------------------------------------------------
300  // Write the path with the new cgi appended to the message
301  //----------------------------------------------------------------------
302  uint32_t newDlen = req->header.dlen - length + newPathWitParams.size();
303  msg->ReAllocate( 24+newDlen );
304  req = (ClientRequest *)msg->GetBuffer();
305  path = msg->GetBuffer( 24 );
306  if( req->header.requestid == kXR_mv )
307  {
308  for( int i = 0; i < req->header.dlen; ++i, ++path )
309  if( *path == ' ' )
310  break;
311  ++path;
312  }
313  memcpy( path, newPathWitParams.c_str(), newPathWitParams.size() );
314  req->header.dlen = newDlen;
315  break;
316  }
317  case kXR_locate:
318  {
319  Env *env = DefaultEnv::GetEnv();
320  int preserveLocateTried = DefaultPreserveLocateTried;
321  env->GetInt( "PreserveLocateTried", preserveLocateTried );
322 
323  if( !preserveLocateTried ) break;
324 
325  //----------------------------------------------------------------------
326  // In case of locate we only want to preserve tried/triedrc CGI info
327  //----------------------------------------------------------------------
328  URL::ParamsMap triedCgi;
329  URL::ParamsMap::const_iterator itr = newCgi.find( "triedrc" );
330  if( itr != newCgi.end() )
331  triedCgi[itr->first] = itr->second;
332  itr = newCgi.find( "tried" );
333  if( itr != newCgi.end() )
334  triedCgi[itr->first] = itr->second;
335 
336  //----------------------------------------------------------------------
337  // Is there anything to do?
338  //----------------------------------------------------------------------
339  if( triedCgi.empty() ) break;
340 
341  //----------------------------------------------------------------------
342  // Get the pointer to the appropriate path
343  //----------------------------------------------------------------------
344  char *path = msg->GetBuffer( 24 );
345  size_t length = req->header.dlen;
346 
347  //----------------------------------------------------------------------
348  // Create a fake URL from an existing CGI
349  //----------------------------------------------------------------------
350  std::string strpath( path, length );
351  std::ostringstream o;
352  o << "fake://fake:111/" << strpath;
353 
354  URL currentPath( o.str() );
355  URL::ParamsMap currentCgi = currentPath.GetParams();
356  MergeCGI( currentCgi, triedCgi, replace );
357  currentPath.SetParams( currentCgi );
358  std::string pathWitParams = currentPath.GetPathWithFilteredParams();
359 
360  //----------------------------------------------------------------------
361  // Write the path with the new cgi appended to the message
362  //----------------------------------------------------------------------
363  uint32_t newDlen = pathWitParams.size();
364  msg->ReAllocate( 24+newDlen );
365  req = (ClientRequest *)msg->GetBuffer();
366  path = msg->GetBuffer( 24 );
367  memcpy( path, pathWitParams.c_str(), pathWitParams.size() );
368  req->header.dlen = newDlen;
369  break;
370  }
371  }
373  }
374 
375  //------------------------------------------------------------------------
377  //------------------------------------------------------------------------
379  const URL::ParamsMap &cgi2,
380  bool replace )
381  {
382  URL::ParamsMap::const_iterator it;
383  for( it = cgi2.begin(); it != cgi2.end(); ++it )
384  {
385  if( replace || cgi1.find( it->first ) == cgi1.end() )
386  cgi1[it->first] = it->second;
387  else
388  {
389  std::string &v = cgi1[it->first];
390  if( v.empty() )
391  v = it->second;
392  else
393  {
394  v += ',';
395  v += it->second;
396  }
397  }
398  }
399  }
400 
401  //------------------------------------------------------------------------
403  //------------------------------------------------------------------------
404  Status MessageUtils::CreateXAttrVec( const std::vector<xattr_t> &attrs,
405  std::vector<char> &avec )
406  {
407  if( attrs.empty() )
408  return Status();
409 
410  if( attrs.size() > xfaLimits::kXR_faMaxVars )
411  return Status( stError, errInvalidArgs );
412 
413  //----------------------------------------------------------------------
414  // Calculate the name and value vector lengths
415  //----------------------------------------------------------------------
416 
417  // 2 bytes for rc + 1 byte for null character at the end
418  static const int name_overhead = 3;
419  // 4 bytes for value length
420  static const int value_overhead = 4;
421 
422  size_t nlen = 0, vlen = 0;
423  for( auto itr = attrs.begin(); itr != attrs.end(); ++itr )
424  {
425  nlen += std::get<xattr_name>( *itr ).size() + name_overhead;
426  vlen += std::get<xattr_value>( *itr ).size() + value_overhead;
427  }
428 
429  if( nlen > xfaLimits::kXR_faMaxNlen )
430  return Status( stError, errInvalidArgs );
431 
432  if( vlen > xfaLimits::kXR_faMaxVlen )
433  return Status( stError, errInvalidArgs );
434 
435  //----------------------------------------------------------------------
436  // Create name and value vectors
437  //----------------------------------------------------------------------
438  avec.resize( nlen + vlen, 0 );
439  char *nvec = avec.data(), *vvec = avec.data() + nlen;
440 
441  for( auto itr = attrs.begin(); itr != attrs.end(); ++itr )
442  {
443  const std::string &name = std::get<xattr_name>( *itr );
444  nvec = ClientFattrRequest::NVecInsert( name.c_str(), nvec );
445  const std::string &value = std::get<xattr_value>( *itr );
446  vvec = ClientFattrRequest::VVecInsert( value.c_str(), vvec );
447  }
448 
449  return Status();
450  }
451 
452  //------------------------------------------------------------------------
453  // Create xattr name vector vector
454  //------------------------------------------------------------------------
455  Status MessageUtils::CreateXAttrVec( const std::vector<std::string> &attrs,
456  std::vector<char> &nvec )
457  {
458  if( attrs.empty() )
459  return Status();
460 
461  if( attrs.size() > xfaLimits::kXR_faMaxVars )
462  return Status( stError, errInvalidArgs );
463 
464  //----------------------------------------------------------------------
465  // Calculate the name and value vector lengths
466  //----------------------------------------------------------------------
467 
468  // 2 bytes for rc + 1 byte for null character at the end
469  static const int name_overhead = 3;
470 
471  size_t nlen = 0;
472  for( auto itr = attrs.begin(); itr != attrs.end(); ++itr )
473  nlen += itr->size() + name_overhead;
474 
475  if( nlen > xfaLimits::kXR_faMaxNlen )
476  return Status( stError, errInvalidArgs );
477 
478  //----------------------------------------------------------------------
479  // Create name vector
480  //----------------------------------------------------------------------
481  nvec.resize( nlen, 0 );
482  char *nptr = nvec.data();
483 
484  for( auto itr = attrs.begin(); itr != attrs.end(); ++itr )
485  nptr = ClientFattrRequest::NVecInsert( itr->c_str(), nptr );
486 
487  return Status();
488  }
489 }
#define kXR_isManager
Definition: XProtocol.hh:1156
#define kXR_attrMeta
Definition: XProtocol.hh:1159
kXR_char streamid[2]
Definition: XProtocol.hh:156
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
struct ClientRequestHdr header
Definition: XProtocol.hh:846
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_chmod
Definition: XProtocol.hh:114
@ kXR_rm
Definition: XProtocol.hh:126
@ kXR_rmdir
Definition: XProtocol.hh:127
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_mv
Definition: XProtocol.hh:121
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_locate
Definition: XProtocol.hh:139
#define kXR_attrVirtRdr
Definition: XProtocol.hh:1162
@ kXR_faMaxVars
Definition: XProtocol.hh:280
@ kXR_faMaxVlen
Definition: XProtocol.hh:282
@ kXR_faMaxNlen
Definition: XProtocol.hh:281
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849
kXR_int32 dlen
Definition: XProtocol.hh:159
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void ReAllocate(uint32_t size)
Reallocate the buffer to a new location of a given size.
Definition: XrdClBuffer.hh:88
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static Status CreateXAttrVec(const std::vector< xattr_t > &attrs, std::vector< char > &avec)
Create xattr vector.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
A hub for dispatching and receiving messages.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
XRootDStatus Register(const URL &url)
Creates a new virtual redirector and registers it (async).
Handle an async response.
static SIDMgrPool & Instance()
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:395
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition: XrdClURL.cc:324
void SetPath(const std::string &path)
Set the path.
Definition: XrdClURL.hh:225
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:445
Handle/Process/Forward XRootD messages.
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
void SetFollowMetalink(bool followMetalink)
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
void SetStateful(bool stateful)
void SetOksofarAsAnswer(bool oksofarAsAnswer)
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
void SetRedirectAsAnswer(bool redirectAsAnswer)
static void SetDescription(Message *msg)
Get the description of a message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
const int DefaultPreserveLocateTried
const int DefaultRedirectLimit
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const int DefaultRequestTimeout
static char * VVecInsert(const char *value, char *buffer)
Definition: XProtocol.cc:188
static char * NVecInsert(const char *name, char *buffer)
Definition: XProtocol.cc:172
URL url
URL of the host.
uint32_t flags
Host type.
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Procedure execution status.
Definition: XrdClStatus.hh:115
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97