XRootD
XrdClEcHandler.hh
Go to the documentation of this file.
1 /*
2  * XrdClEcHandler.hh
3  *
4  * Created on: 23 Mar 2021
5  * Author: simonm
6  */
7 
8 #ifndef SRC_XRDCL_XRDCLECHANDLER_HH_
9 #define SRC_XRDCL_XRDCLECHANDLER_HH_
10 
12 #include "XrdCl/XrdClUtils.hh"
15 
16 #include "XrdEc/XrdEcReader.hh"
17 #include "XrdEc/XrdEcStrmWriter.hh"
18 
19 #include "XrdOuc/XrdOucCRC.hh"
21 
22 #include <memory>
23 #include <iostream>
24 #include <chrono>
25 #include <algorithm>
26 #include <mutex>
27 
28 namespace XrdCl
29 {
30  class FreeSpace {
31  public:
32  std::string address;
33  uint64_t freeSpace;
34  FreeSpace() {};
35  bool operator<(const FreeSpace &a) const
36  {
37  return ((freeSpace > a.freeSpace) ? true : false);
38  }
39  void Dump() const
40  {
41  std::cout << address << " : " << freeSpace << std::endl;
42  }
43  };
44 
46  public:
49  // From the old location list, select a new location list
50  // n: select at least "n" nodes in the new location list
52  XrdCl::LocationInfo &newList,
53  uint32_t n);
54  void Dump();
55  private:
56  std::vector<FreeSpace> ServerList;
57  std::vector<std::string> ExportPaths;
58  time_t lastUpdateT = 0;
59  int xRatio = 10;
60  std::mutex lock;
61  bool initExportPaths = false;
62 
63  void TryInitExportPaths();
64  uint64_t GetFreeSpace(const std::string addr);
65  bool BlindSelect();
66  void UpdateSpaceInfo();
67  bool Exists(XrdCl::LocationInfo::Location &loc);
68  void AddServers(XrdCl::LocationInfo &locInfo);
69  };
70 
72  {
73  private:
74  XrdCl::ResponseHandler *realHandler;
75  public:
76  // constructor
77  EcPgReadResponseHandler(ResponseHandler *a) : realHandler(a) {}
78 
79  // Response Handler
81  AnyObject *rdresp)
82  {
83  if( !status->IsOK() )
84  {
85  realHandler->HandleResponse( status, rdresp );
86  delete this;
87  return;
88  }
89 
90  ChunkInfo *chunk = 0;
91  rdresp->Get(chunk);
92 
93  if (!chunk) {
94  delete this;
95  return;
96  }
97 
98  std::vector<uint32_t> cksums;
99  size_t nbpages = chunk->length / XrdSys::PageSize;
100  if( chunk->length % XrdSys::PageSize )
101  ++nbpages;
102  cksums.reserve( nbpages );
103 
104  size_t size = chunk->length;
105  char *buffer = reinterpret_cast<char*>( chunk->buffer );
106 
107  for( size_t pg = 0; pg < nbpages; ++pg )
108  {
109  size_t pgsize = XrdSys::PageSize;
110  if( pgsize > size ) pgsize = size;
111  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
112  cksums.push_back( crcval );
113  buffer += pgsize;
114  size -= pgsize;
115  }
116 
117  PageInfo *pages = new PageInfo(chunk->offset, chunk->length, chunk->buffer, std::move(cksums));
118  delete rdresp;
119  AnyObject *response = new AnyObject();
120  response->Set( pages );
121  realHandler->HandleResponse( status, response );
122 
123  delete this;
124  }
125  };
126 
127  class EcHandler : public FilePlugIn
128  {
129  public:
130  EcHandler( const URL &redir,
131  XrdEc::ObjCfg *objcfg,
132  std::unique_ptr<CheckSumHelper> cksHelper ) : redir( redir ),
133  fs( redir, false ),
134  objcfg( objcfg ),
135  curroff( 0 ),
136  cksHelper( std::move( cksHelper ) )
137  {
139  }
140 
141  virtual ~EcHandler()
142  {
143  }
144 
145  XRootDStatus Open( uint16_t flags,
146  ResponseHandler *handler,
147  uint16_t timeout )
148  {
149  if( ( flags & OpenFlags::Write ) || ( flags & OpenFlags::Update ) )
150  {
151  if( !( flags & OpenFlags::New ) || // it has to be a new file
152  ( flags & OpenFlags::Delete ) || // truncation is not supported
153  ( flags & OpenFlags::Read ) ) // write + read is not supported
155 
156  if( objcfg->plgr.empty() )
157  {
158  XRootDStatus st = LoadPlacement();
159  if( !st.IsOK() ) return st;
160  }
161  writer.reset( new XrdEc::StrmWriter( *objcfg ) );
162  writer->Open( handler, timeout );
163  return XRootDStatus();
164  }
165 
166  if( flags & OpenFlags::Read )
167  {
168  if( flags & OpenFlags::Write )
170 
171  if( objcfg->plgr.empty() )
172  {
173  XRootDStatus st = LoadPlacement( redir.GetPath() );
174  if( !st.IsOK() ) return st;
175  }
176  reader.reset( new XrdEc::Reader( *objcfg ) );
177  reader->Open( handler, timeout );
178  return XRootDStatus();
179  }
180 
182  }
183 
184  XRootDStatus Open( const std::string &url,
185  OpenFlags::Flags flags,
186  Access::Mode mode,
187  ResponseHandler *handler,
188  uint16_t timeout )
189  {
190  (void)url; (void)mode;
191  return Open( flags, handler, timeout );
192  }
193 
194 
195  //------------------------------------------------------------------------
197  //------------------------------------------------------------------------
199  uint16_t timeout )
200  {
201  if( writer )
202  {
203  writer->Close( ResponseHandler::Wrap( [this, handler]( XRootDStatus *st, AnyObject *rsp )
204  {
205  writer.reset();
206  if( st->IsOK() && bool( cksHelper ) )
207  {
208  std::string commit = redir.GetPath()
209  + "?xrdec.objid=" + objcfg->obj
210  + "&xrdec.close=true&xrdec.size=" + std::to_string( curroff );
211  if( cksHelper )
212  {
213  std::string ckstype = cksHelper->GetType();
214  std::string cksval;
215  auto st = cksHelper->GetCheckSum( cksval, ckstype );
216  if( !st.IsOK() )
217  {
218  handler->HandleResponse( new XRootDStatus( st ), nullptr );
219  return;
220  }
221  commit += "&xrdec.cksum=" + cksval;
222  }
223  Buffer arg; arg.FromString( commit );
224  auto st = fs.Query( QueryCode::OpaqueFile, arg, handler );
225  if( !st.IsOK() ) handler->HandleResponse( new XRootDStatus( st ), nullptr );
226  return;
227  }
228  handler->HandleResponse( st, rsp );
229  } ), timeout );
230  return XRootDStatus();
231  }
232 
233  if( reader )
234  {
235  reader->Close( ResponseHandler::Wrap( [this, handler]( XRootDStatus *st, AnyObject *rsp )
236  {
237  reader.reset();
238  handler->HandleResponse( st, rsp );
239  } ), timeout );
240  return XRootDStatus();
241  }
242 
243  return XRootDStatus( stError, errNotSupported );
244  }
245 
246  //------------------------------------------------------------------------
248  //------------------------------------------------------------------------
249  XRootDStatus Stat( bool force,
250  ResponseHandler *handler,
251  uint16_t timeout )
252  {
253 
254  if( !objcfg->nomtfile )
255  return fs.Stat( redir.GetPath(), handler, timeout );
256 
257  if( !force && statcache )
258  {
259  auto rsp = StatRsp( statcache->GetSize() );
260  Schedule( handler, rsp );
261  return XRootDStatus();
262  }
263 
264  if( writer )
265  {
266  statcache.reset( new StatInfo() );
267  statcache->SetSize( writer->GetSize() );
268  auto rsp = StatRsp( statcache->GetSize() );
269  Schedule( handler, rsp );
270  return XRootDStatus();
271  }
272 
273  if( reader )
274  {
275  statcache.reset( new StatInfo() );
276  statcache->SetSize( reader->GetSize() );
277  auto rsp = StatRsp( statcache->GetSize() );
278  Schedule( handler, rsp );
279  return XRootDStatus();
280  }
281 
282  return XRootDStatus( stError, errInvalidOp, 0, "File not open." );
283  }
284 
285  //------------------------------------------------------------------------
287  //------------------------------------------------------------------------
288  XRootDStatus Read( uint64_t offset,
289  uint32_t size,
290  void *buffer,
291  ResponseHandler *handler,
292  uint16_t timeout )
293  {
294  if( !reader ) return XRootDStatus( stError, errInternal );
295 
296  reader->Read( offset, size, buffer, handler, timeout );
297  return XRootDStatus();
298  }
299 
300  //------------------------------------------------------------------------
302  //------------------------------------------------------------------------
303  XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer,
304  ResponseHandler *handler,
305  uint16_t timeout)
306  {
307  ResponseHandler *substitHandler = new EcPgReadResponseHandler( handler );
308  XRootDStatus st = Read(offset, size, buffer, substitHandler, timeout);
309  return st;
310  }
311 
312 
313  //------------------------------------------------------------------------
315  //------------------------------------------------------------------------
316  XRootDStatus Write( uint64_t offset,
317  uint32_t size,
318  const void *buffer,
319  ResponseHandler *handler,
320  uint16_t timeout )
321  {
322  if( cksHelper )
323  cksHelper->Update( buffer, size );
324 
325  if( !writer ) return XRootDStatus( stError, errInternal );
326  if( offset != curroff ) return XRootDStatus( stError, errNotSupported );
327  writer->Write( size, buffer, handler );
328  curroff += size;
329  return XRootDStatus();
330  }
331 
332  //------------------------------------------------------------------------
334  //------------------------------------------------------------------------
335  XRootDStatus PgWrite( uint64_t offset,
336  uint32_t size,
337  const void *buffer,
338  std::vector<uint32_t> &cksums,
339  ResponseHandler *handler,
340  uint16_t timeout = 0 )
341  {
342  if(! cksums.empty() )
343  {
344  const char *data = static_cast<const char*>( buffer );
345  std::vector<uint32_t> local_cksums;
346  XrdOucPgrwUtils::csCalc( data, offset, size, local_cksums );
347  if (data) delete data;
348  if (local_cksums != cksums)
349  return XRootDStatus( stError, errInvalidArgs, 0, "data and crc32c digests do not match." );
350  }
351  return Write(offset, size, buffer, handler, timeout);
352  }
353 
354  //------------------------------------------------------------------------
356  //------------------------------------------------------------------------
357  bool IsOpen() const
358  {
359  return writer || reader;
360  }
361 
362  private:
363 
364  inline XRootDStatus LoadPlacement()
365  {
366  LocationInfo *infoAll = nullptr;
367  XRootDStatus st = fs.DeepLocate( "*", OpenFlags::None, infoAll );
368  std::unique_ptr<LocationInfo> ptr( infoAll );
369  if( !st.IsOK() ) return st;
370 
371  LocationInfo *info = new LocationInfo();
372  std::unique_ptr<LocationInfo> ptr1( info );
373 
374  static ServerSpaceInfo ssi;
375  ssi.SelectLocations(*infoAll, *info, objcfg->nbchunks);
376 
377  if( info->GetSize() < objcfg->nbchunks )
378  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
379  unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
380  shuffle (info->Begin(), info->End(), std::default_random_engine(seed));
381  for( size_t i = 0; i < objcfg->nbchunks; ++i )
382  {
383  auto &location = info->At( i );
384  objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
385  }
386  return XRootDStatus();
387  }
388 
389  inline XRootDStatus LoadPlacement( const std::string &path )
390  {
391  LocationInfo *info = nullptr;
392  XRootDStatus st = fs.DeepLocate( "*", OpenFlags::None, info );
393  std::unique_ptr<LocationInfo> ptr( info );
394  if( !st.IsOK() ) return st;
395  // The following check become meaningless
396  if( info->GetSize() < objcfg->nbdata )
397  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
398 
399  uint64_t verNumMax = 0;
400  std::vector<uint64_t> verNums;
401  std::vector<std::string> xattrkeys;
402  std::vector<XrdCl::XAttr> xattrvals;
403  xattrkeys.push_back("xrdec.strpver");
404  for( size_t i = 0; i < info->GetSize(); ++i )
405  {
406  FileSystem *fs_i = new FileSystem(info->At( i ).GetAddress());
407  xattrvals.clear();
408  st = fs_i->GetXAttr(path, xattrkeys, xattrvals, 0);
409  if (st.IsOK() && ! xattrvals[0].value.empty())
410  {
411  std::stringstream sstream(xattrvals[0].value);
412  uint64_t verNum;
413  sstream >> verNum;
414  verNums.push_back(verNum);
415  if (verNum > verNumMax)
416  verNumMax = verNum;
417  }
418  else
419  verNums.push_back(0);
420  delete fs_i;
421  }
422 
423  int n = 0;
424  for( size_t i = 0; i < info->GetSize(); ++i )
425  {
426  if ( verNums.at(i) == 0 || verNums.at(i) != verNumMax )
427  continue;
428  else
429  n++;
430  auto &location = info->At( i );
431  objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
432  }
433  if (n < objcfg->nbdata )
434  return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
435  return XRootDStatus();
436  }
437 
438  inline static AnyObject* StatRsp( uint64_t size )
439  {
440  StatInfo *info = new StatInfo();
441  info->SetSize( size );
442  AnyObject *rsp = new AnyObject();
443  rsp->Set( info );
444  return rsp;
445  }
446 
447  inline static void Schedule( ResponseHandler *handler, AnyObject *rsp )
448  {
449  ResponseJob *job = new ResponseJob( handler, new XRootDStatus(), rsp, nullptr );
451  }
452 
453  URL redir;
454  FileSystem fs;
455  std::unique_ptr<XrdEc::ObjCfg> objcfg;
456  std::unique_ptr<XrdEc::StrmWriter> writer;
457  std::unique_ptr<XrdEc::Reader> reader;
458  uint64_t curroff;
459  std::unique_ptr<CheckSumHelper> cksHelper;
460  std::unique_ptr<StatInfo> statcache;
461 
462  };
463 
464  //----------------------------------------------------------------------------
466  //----------------------------------------------------------------------------
468  {
469  public:
470  //------------------------------------------------------------------------
472  //------------------------------------------------------------------------
473  EcPlugInFactory( uint8_t nbdta, uint8_t nbprt, uint64_t chsz,
474  std::vector<std::string> && plgr ) :
475  nbdta( nbdta ), nbprt( nbprt ), chsz( chsz ), plgr( std::move( plgr ) )
476  {
477  }
478 
479  //------------------------------------------------------------------------
481  //------------------------------------------------------------------------
483  {
484  }
485 
486  //------------------------------------------------------------------------
488  //------------------------------------------------------------------------
489  virtual FilePlugIn *CreateFile( const std::string &u )
490  {
491  URL url( u );
492  XrdEc::ObjCfg *objcfg = new XrdEc::ObjCfg( url.GetPath(), nbdta, nbprt,
493  chsz, false, true );
494  objcfg->plgr = std::move( plgr );
495  return new EcHandler( url, objcfg, nullptr );
496  }
497 
498  //------------------------------------------------------------------------
500  //------------------------------------------------------------------------
501  virtual FileSystemPlugIn *CreateFileSystem( const std::string &url )
502  {
503  return nullptr;
504  }
505 
506  private:
507  uint8_t nbdta;
508  uint8_t nbprt;
509  uint64_t chsz;
510  std::vector<std::string> plgr;
511  };
512 
513  EcHandler* GetEcHandler( const URL &headnode, const URL &redirurl );
514 
515 } /* namespace XrdCl */
516 
517 #endif /* SRC_XRDCL_XRDCLECHANDLER_HH_ */
518 
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
Definition: XrdClBuffer.hh:34
void FromString(const std::string str)
Fill the buffer from a string.
Definition: XrdClBuffer.hh:205
static PostMaster * GetPostMaster()
Get default post master.
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout)
XRootDStatus PgWrite(uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout)
bool IsOpen() const
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
EcHandler(const URL &redir, XrdEc::ObjCfg *objcfg, std::unique_ptr< CheckSumHelper > cksHelper)
EcPgReadResponseHandler(ResponseHandler *a)
void HandleResponse(XRootDStatus *status, AnyObject *rdresp)
EcPlugInFactory(uint8_t nbdta, uint8_t nbprt, uint64_t chsz, std::vector< std::string > &&plgr)
Constructor.
virtual FilePlugIn * CreateFile(const std::string &u)
Create a file plug-in for the given URL.
virtual FileSystemPlugIn * CreateFileSystem(const std::string &url)
Create a file system plug-in for the given URL.
virtual ~EcPlugInFactory()
Destructor.
An interface for file plug-ins.
An interface for file plug-ins.
XRootDStatus Query(QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool operator<(const FreeSpace &a) const
std::string address
void Dump() const
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Path location info.
uint32_t GetSize() const
Get number of locations.
Iterator Begin()
Get the location begin iterator.
Location & At(uint32_t index)
Get the location at index.
Iterator End()
Get the location end iterator.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
void SelectLocations(XrdCl::LocationInfo &oldList, XrdCl::LocationInfo &newList, uint32_t n)
Object stat info.
URL representation.
Definition: XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
bool enable_plugins
Definition: XrdEcConfig.hh:77
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
static const int PageSize
Mode
Access mode.
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
@ Update
Open for reading and writing.
@ OpaqueFile
Implementation dependent.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::vector< std::string > plgr
Definition: XrdEcObjCfg.hh:92