XRootD
XrdClEcHandler.hh
Go to the documentation of this file.
1 /*
2  * XrdClEcHandler.hh
3  *
4  * Created on: 23 Mar 2021
5  * Author: simonm
6  */
7 
8 #ifndef SRC_XRDCL_XRDCLECHANDLER_HH_
9 #define SRC_XRDCL_XRDCLECHANDLER_HH_
10 
12 #include "XrdCl/XrdClUtils.hh"
15 
16 #include "XrdEc/XrdEcReader.hh"
17 #include "XrdEc/XrdEcStrmWriter.hh"
18 
19 #include "XrdOuc/XrdOucCRC.hh"
21 
22 #include <memory>
23 #include <iostream>
24 #include <chrono>
25 #include <algorithm>
26 #include <mutex>
27 
28 namespace XrdCl
29 {
30  class FreeSpace {
31  public:
32  std::string address;
33  uint64_t freeSpace;
34  FreeSpace() {};
35  bool operator<(const FreeSpace &a) const
36  {
37  return ((freeSpace > a.freeSpace) ? true : false);
38  }
39  void Dump() const
40  {
41  std::cout << address << " : " << freeSpace << std::endl;
42  }
43  };
44 
46  public:
49  // From the old location list, select a new location list
50  // n: select at least "n" nodes in the new location list
52  XrdCl::LocationInfo &newList,
53  uint32_t n);
54  void Dump();
55  private:
56  std::vector<FreeSpace> ServerList;
57  std::vector<std::string> ExportPaths;
58  time_t lastUpdateT = 0;
59  int xRatio = 10;
60  std::mutex lock;
61  bool initExportPaths = false;
62 
63  void TryInitExportPaths();
64  uint64_t GetFreeSpace(const std::string addr);
65  bool BlindSelect();
66  void UpdateSpaceInfo();
67  bool Exists(XrdCl::LocationInfo::Location &loc);
68  void AddServers(XrdCl::LocationInfo &locInfo);
69  };
70 
72  {
73  private:
74  XrdCl::ResponseHandler *realHandler;
75  public:
76  // constructor
77  EcPgReadResponseHandler(ResponseHandler *a) : realHandler(a) {}
78 
79  // Response Handler
81  AnyObject *rdresp)
82  {
83  if( !status->IsOK() )
84  {
85  realHandler->HandleResponse( status, rdresp );
86  delete this;
87  return;
88  }
89 
90  ChunkInfo *chunk = 0;
91  rdresp->Get(chunk);
92 
93  std::vector<uint32_t> cksums;
94  size_t nbpages = chunk->length / XrdSys::PageSize;
95  if( chunk->length % XrdSys::PageSize )
96  ++nbpages;
97  cksums.reserve( nbpages );
98 
99  size_t size = chunk->length;
100  char *buffer = reinterpret_cast<char*>( chunk->buffer );
101 
102  for( size_t pg = 0; pg < nbpages; ++pg )
103  {
104  size_t pgsize = XrdSys::PageSize;
105  if( pgsize > size ) pgsize = size;
106  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
107  cksums.push_back( crcval );
108  buffer += pgsize;
109  size -= pgsize;
110  }
111 
112  PageInfo *pages = new PageInfo(chunk->offset, chunk->length, chunk->buffer, std::move(cksums));
113  delete rdresp;
114  AnyObject *response = new AnyObject();
115  response->Set( pages );
116  realHandler->HandleResponse( status, response );
117 
118  delete this;
119  }
120  };
121 
122  class EcHandler : public FilePlugIn
123  {
124  public:
125  EcHandler( const URL &redir,
126  XrdEc::ObjCfg *objcfg,
127  std::unique_ptr<CheckSumHelper> cksHelper ) : redir( redir ),
128  fs( redir, false ),
129  objcfg( objcfg ),
130  curroff( 0 ),
131  cksHelper( std::move( cksHelper ) )
132  {
134  }
135 
136  virtual ~EcHandler()
137  {
138  }
139 
140  XRootDStatus Open( uint16_t flags,
141  ResponseHandler *handler,
142  uint16_t timeout )
143  {
144  if( ( flags & OpenFlags::Write ) || ( flags & OpenFlags::Update ) )
145  {
146  if( !( flags & OpenFlags::New ) || // it has to be a new file
147  ( flags & OpenFlags::Delete ) || // truncation is not supported
148  ( flags & OpenFlags::Read ) ) // write + read is not supported
150 
151  if( objcfg->plgr.empty() )
152  {
153  XRootDStatus st = LoadPlacement();
154  if( !st.IsOK() ) return st;
155  }
156  writer.reset( new XrdEc::StrmWriter( *objcfg ) );
157  writer->Open( handler, timeout );
158  return XRootDStatus();
159  }
160 
161  if( flags & OpenFlags::Read )
162  {
163  if( flags & OpenFlags::Write )
165 
166  if( objcfg->plgr.empty() )
167  {
168  XRootDStatus st = LoadPlacement( redir.GetPath() );
169  if( !st.IsOK() ) return st;
170  }
171  reader.reset( new XrdEc::Reader( *objcfg ) );
172  reader->Open( handler, timeout );
173  return XRootDStatus();
174  }
175 
177  }
178 
179  XRootDStatus Open( const std::string &url,
180  OpenFlags::Flags flags,
181  Access::Mode mode,
182  ResponseHandler *handler,
183  uint16_t timeout )
184  {
185  (void)url; (void)mode;
186  return Open( flags, handler, timeout );
187  }
188 
189 
190  //------------------------------------------------------------------------
192  //------------------------------------------------------------------------
194  uint16_t timeout )
195  {
196  if( writer )
197  {
198  writer->Close( ResponseHandler::Wrap( [this, handler]( XRootDStatus *st, AnyObject *rsp )
199  {
200  writer.reset();
201  if( st->IsOK() && bool( cksHelper ) )
202  {
203  std::string commit = redir.GetPath()
204  + "?xrdec.objid=" + objcfg->obj
205  + "&xrdec.close=true&xrdec.size=" + std::to_string( curroff );
206  if( cksHelper )
207  {
208  std::string ckstype = cksHelper->GetType();
209  std::string cksval;
210  auto st = cksHelper->GetCheckSum( cksval, ckstype );
211  if( !st.IsOK() )
212  {
213  handler->HandleResponse( new XRootDStatus( st ), nullptr );
214  return;
215  }
216  commit += "&xrdec.cksum=" + cksval;
217  }
218  Buffer arg; arg.FromString( commit );
219  auto st = fs.Query( QueryCode::OpaqueFile, arg, handler );
220  if( !st.IsOK() ) handler->HandleResponse( new XRootDStatus( st ), nullptr );
221  return;
222  }
223  handler->HandleResponse( st, rsp );
224  } ), timeout );
225  return XRootDStatus();
226  }
227 
228  if( reader )
229  {
230  reader->Close( ResponseHandler::Wrap( [this, handler]( XRootDStatus *st, AnyObject *rsp )
231  {
232  reader.reset();
233  handler->HandleResponse( st, rsp );
234  } ), timeout );
235  return XRootDStatus();
236  }
237 
238  return XRootDStatus( stError, errNotSupported );
239  }
240 
241  //------------------------------------------------------------------------
243  //------------------------------------------------------------------------
244  XRootDStatus Stat( bool force,
245  ResponseHandler *handler,
246  uint16_t timeout )
247  {
248 
249  if( !objcfg->nomtfile )
250  return fs.Stat( redir.GetPath(), handler, timeout );
251 
252  if( !force && statcache )
253  {
254  auto rsp = StatRsp( statcache->GetSize() );
255  Schedule( handler, rsp );
256  return XRootDStatus();
257  }
258 
259  if( writer )
260  {
261  statcache.reset( new StatInfo() );
262  statcache->SetSize( writer->GetSize() );
263  auto rsp = StatRsp( statcache->GetSize() );
264  Schedule( handler, rsp );
265  return XRootDStatus();
266  }
267 
268  if( reader )
269  {
270  statcache.reset( new StatInfo() );
271  statcache->SetSize( reader->GetSize() );
272  auto rsp = StatRsp( statcache->GetSize() );
273  Schedule( handler, rsp );
274  return XRootDStatus();
275  }
276 
277  return XRootDStatus( stError, errInvalidOp, 0, "File not open." );
278  }
279 
280  //------------------------------------------------------------------------
282  //------------------------------------------------------------------------
283  XRootDStatus Read( uint64_t offset,
284  uint32_t size,
285  void *buffer,
286  ResponseHandler *handler,
287  uint16_t timeout )
288  {
289  if( !reader ) return XRootDStatus( stError, errInternal );
290 
291  reader->Read( offset, size, buffer, handler, timeout );
292  return XRootDStatus();
293  }
294 
295  //------------------------------------------------------------------------
297  //------------------------------------------------------------------------
298  XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer,
299  ResponseHandler *handler,
300  uint16_t timeout)
301  {
302  ResponseHandler *substitHandler = new EcPgReadResponseHandler( handler );
303  XRootDStatus st = Read(offset, size, buffer, substitHandler, timeout);
304  return st;
305  }
306 
307 
308  //------------------------------------------------------------------------
310  //------------------------------------------------------------------------
311  XRootDStatus Write( uint64_t offset,
312  uint32_t size,
313  const void *buffer,
314  ResponseHandler *handler,
315  uint16_t timeout )
316  {
317  if( cksHelper )
318  cksHelper->Update( buffer, size );
319 
320  if( !writer ) return XRootDStatus( stError, errInternal );
321  if( offset != curroff ) return XRootDStatus( stError, errNotSupported );
322  writer->Write( size, buffer, handler );
323  curroff += size;
324  return XRootDStatus();
325  }
326 
327  //------------------------------------------------------------------------
329  //------------------------------------------------------------------------
330  XRootDStatus PgWrite( uint64_t offset,
331  uint32_t size,
332  const void *buffer,
333  std::vector<uint32_t> &cksums,
334  ResponseHandler *handler,
335  uint16_t timeout = 0 )
336  {
337  if(! cksums.empty() )
338  {
339  const char *data = static_cast<const char*>( buffer );
340  std::vector<uint32_t> local_cksums;
341  XrdOucPgrwUtils::csCalc( data, offset, size, local_cksums );
342  if (data) delete data;
343  if (local_cksums != cksums)
344  return XRootDStatus( stError, errInvalidArgs, 0, "data and crc32c digests do not match." );
345  }
346  return Write(offset, size, buffer, handler, timeout);
347  }
348 
349  //------------------------------------------------------------------------
351  //------------------------------------------------------------------------
352  bool IsOpen() const
353  {
354  return writer || reader;
355  }
356 
357  private:
358 
359  inline XRootDStatus LoadPlacement()
360  {
361  LocationInfo *infoAll = nullptr;
362  XRootDStatus st = fs.DeepLocate( "*", OpenFlags::None, infoAll );
363  std::unique_ptr<LocationInfo> ptr( infoAll );
364  if( !st.IsOK() ) return st;
365 
366  LocationInfo *info = new LocationInfo();
367  std::unique_ptr<LocationInfo> ptr1( info );
368 
369  static ServerSpaceInfo ssi;
370  ssi.SelectLocations(*infoAll, *info, objcfg->nbchunks);
371 
372  if( info->GetSize() < objcfg->nbchunks )
373  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
374  unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
375  shuffle (info->Begin(), info->End(), std::default_random_engine(seed));
376  for( size_t i = 0; i < objcfg->nbchunks; ++i )
377  {
378  auto &location = info->At( i );
379  objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
380  }
381  return XRootDStatus();
382  }
383 
384  inline XRootDStatus LoadPlacement( const std::string &path )
385  {
386  LocationInfo *info = nullptr;
387  XRootDStatus st = fs.DeepLocate( "*", OpenFlags::None, info );
388  std::unique_ptr<LocationInfo> ptr( info );
389  if( !st.IsOK() ) return st;
390  // The following check become meaningless
391  if( info->GetSize() < objcfg->nbdata )
392  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
393 
394  uint64_t verNumMax = 0;
395  std::vector<uint64_t> verNums;
396  std::vector<std::string> xattrkeys;
397  std::vector<XrdCl::XAttr> xattrvals;
398  xattrkeys.push_back("xrdec.strpver");
399  for( size_t i = 0; i < info->GetSize(); ++i )
400  {
401  FileSystem *fs_i = new FileSystem(info->At( i ).GetAddress());
402  xattrvals.clear();
403  st = fs_i->GetXAttr(path, xattrkeys, xattrvals, 0);
404  if (st.IsOK() && ! xattrvals[0].value.empty())
405  {
406  std::stringstream sstream(xattrvals[0].value);
407  uint64_t verNum;
408  sstream >> verNum;
409  verNums.push_back(verNum);
410  if (verNum > verNumMax)
411  verNumMax = verNum;
412  }
413  else
414  verNums.push_back(0);
415  delete fs_i;
416  }
417 
418  int n = 0;
419  for( size_t i = 0; i < info->GetSize(); ++i )
420  {
421  if ( verNums.at(i) == 0 || verNums.at(i) != verNumMax )
422  continue;
423  else
424  n++;
425  auto &location = info->At( i );
426  objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
427  }
428  if (n < objcfg->nbdata )
429  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
430  return XRootDStatus();
431  }
432 
433  inline static AnyObject* StatRsp( uint64_t size )
434  {
435  StatInfo *info = new StatInfo();
436  info->SetSize( size );
437  AnyObject *rsp = new AnyObject();
438  rsp->Set( info );
439  return rsp;
440  }
441 
442  inline static void Schedule( ResponseHandler *handler, AnyObject *rsp )
443  {
444  ResponseJob *job = new ResponseJob( handler, new XRootDStatus(), rsp, nullptr );
446  }
447 
448  URL redir;
449  FileSystem fs;
450  std::unique_ptr<XrdEc::ObjCfg> objcfg;
451  std::unique_ptr<XrdEc::StrmWriter> writer;
452  std::unique_ptr<XrdEc::Reader> reader;
453  uint64_t curroff;
454  std::unique_ptr<CheckSumHelper> cksHelper;
455  std::unique_ptr<StatInfo> statcache;
456 
457  };
458 
459  //----------------------------------------------------------------------------
461  //----------------------------------------------------------------------------
463  {
464  public:
465  //------------------------------------------------------------------------
467  //------------------------------------------------------------------------
468  EcPlugInFactory( uint8_t nbdta, uint8_t nbprt, uint64_t chsz,
469  std::vector<std::string> && plgr ) :
470  nbdta( nbdta ), nbprt( nbprt ), chsz( chsz ), plgr( std::move( plgr ) )
471  {
472  }
473 
474  //------------------------------------------------------------------------
476  //------------------------------------------------------------------------
478  {
479  }
480 
481  //------------------------------------------------------------------------
483  //------------------------------------------------------------------------
484  virtual FilePlugIn *CreateFile( const std::string &u )
485  {
486  URL url( u );
487  XrdEc::ObjCfg *objcfg = new XrdEc::ObjCfg( url.GetPath(), nbdta, nbprt,
488  chsz, false, true );
489  objcfg->plgr = std::move( plgr );
490  return new EcHandler( url, objcfg, nullptr );
491  }
492 
493  //------------------------------------------------------------------------
495  //------------------------------------------------------------------------
496  virtual FileSystemPlugIn *CreateFileSystem( const std::string &url )
497  {
498  return nullptr;
499  }
500 
501  private:
502  uint8_t nbdta;
503  uint8_t nbprt;
504  uint64_t chsz;
505  std::vector<std::string> plgr;
506  };
507 
508  EcHandler* GetEcHandler( const URL &headnode, const URL &redirurl );
509 
510 } /* namespace XrdCl */
511 
512 #endif /* SRC_XRDCL_XRDCLECHANDLER_HH_ */
513 
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
Definition: XrdClBuffer.hh:34
void FromString(const std::string str)
Fill the buffer from a string.
Definition: XrdClBuffer.hh:205
static PostMaster * GetPostMaster()
Get default post master.
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout)
XRootDStatus PgWrite(uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout)
bool IsOpen() const
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
EcHandler(const URL &redir, XrdEc::ObjCfg *objcfg, std::unique_ptr< CheckSumHelper > cksHelper)
EcPgReadResponseHandler(ResponseHandler *a)
void HandleResponse(XRootDStatus *status, AnyObject *rdresp)
EcPlugInFactory(uint8_t nbdta, uint8_t nbprt, uint64_t chsz, std::vector< std::string > &&plgr)
Constructor.
virtual FilePlugIn * CreateFile(const std::string &u)
Create a file plug-in for the given URL.
virtual FileSystemPlugIn * CreateFileSystem(const std::string &url)
Create a file system plug-in for the given URL.
virtual ~EcPlugInFactory()
Destructor.
An interface for file plug-ins.
An interface for file plug-ins.
XRootDStatus Query(QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool operator<(const FreeSpace &a) const
std::string address
void Dump() const
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Path location info.
uint32_t GetSize() const
Get number of locations.
Iterator Begin()
Get the location begin iterator.
Location & At(uint32_t index)
Get the location at index.
Iterator End()
Get the location end iterator.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
void SelectLocations(XrdCl::LocationInfo &oldList, XrdCl::LocationInfo &newList, uint32_t n)
Object stat info.
URL representation.
Definition: XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
bool enable_plugins
Definition: XrdEcConfig.hh:77
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
static const int PageSize
Mode
Access mode.
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
@ Update
Open for reading and writing.
@ OpaqueFile
Implementation dependent.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::vector< std::string > plgr
Definition: XrdEcObjCfg.hh:92