XRootD
XrdEcReader.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdEc/XrdEcReader.hh"
26 #include "XrdEc/XrdEcUtilities.hh"
27 #include "XrdEc/XrdEcConfig.hh"
28 #include "XrdEc/XrdEcObjCfg.hh"
29 #include "XrdEc/XrdEcThreadPool.hh"
30 
31 #include "XrdZip/XrdZipLFH.hh"
32 #include "XrdZip/XrdZipCDFH.hh"
33 #include "XrdZip/XrdZipUtils.hh"
34 
35 #include "XrdOuc/XrdOucCRC32C.hh"
36 
41 
42 #include "XrdCl/XrdClLog.hh"
43 #include "XrdCl/XrdClDefaultEnv.hh"
44 
45 #include <algorithm>
46 #include <iterator>
47 #include <numeric>
48 #include <tuple>
49 #include <set>
50 
51 namespace XrdEc
52 {
53  //---------------------------------------------------------------------------
54  // OpenOnly operation (@see ZipOperation) - a private ZIP operation
55  //---------------------------------------------------------------------------
56  template<bool HasHndl>
57  class OpenOnlyImpl: public XrdCl::ZipOperation<OpenOnlyImpl, HasHndl,
58  XrdCl::Resp<void>, XrdCl::Arg<std::string>, XrdCl::Arg<bool>>
59 
60  {
61  public:
62 
63  //-----------------------------------------------------------------------
64  // Inherit constructors from FileOperation (@see FileOperation)
65  //-----------------------------------------------------------------------
68 
69  //-----------------------------------------------------------------------
70  // Argument indexes in the args tuple
71  //-----------------------------------------------------------------------
72  enum { UrlArg, UpdtArg };
73 
74  //-----------------------------------------------------------------------
75  // @return : name of the operation (@see Operation)
76  //-----------------------------------------------------------------------
77  std::string ToString()
78  {
79  return "OpenOnly";
80  }
81 
82  protected:
83 
84  //-----------------------------------------------------------------------
85  // RunImpl operation (@see Operation)
86  //
87  // @param params : container with parameters forwarded from
88  // previous operation
89  // @return : status of the operation
90  //-----------------------------------------------------------------------
92  uint16_t pipelineTimeout )
93  {
94  std::string url = std::get<UrlArg>( this->args ).Get();
95  bool updt = std::get<UpdtArg>( this->args ).Get();
96  uint16_t timeout = pipelineTimeout < this->timeout ?
97  pipelineTimeout : this->timeout;
98  return this->zip->OpenOnly( url, updt, handler, timeout );
99  }
100  };
101 
102  //---------------------------------------------------------------------------
103  // Factory for creating OpenArchiveImpl objects
104  //---------------------------------------------------------------------------
107  XrdCl::Arg<bool> updt,
108  uint16_t timeout = 0 )
109  {
110  return OpenOnlyImpl<false>( std::move( zip ), std::move( fn ),
111  std::move( updt ) ).Timeout( timeout );
112  }
113 
114  //-------------------------------------------------------------------------
115  // A single data block
116  //-------------------------------------------------------------------------
117  struct block_t
118  {
119  typedef std::tuple<uint64_t, uint32_t, char*, callback_t> args_t;
120  typedef std::vector<args_t> pending_t;
121 
122  //-----------------------------------------------------------------------
123  // Stripe state: empty / loading / valid
124  //-----------------------------------------------------------------------
126 
127  //-----------------------------------------------------------------------
128  // Constructor
129  //-----------------------------------------------------------------------
131  objcfg( objcfg ),
132  stripes( objcfg.nbchunks ),
133  state( objcfg.nbchunks, Empty ),
134  pending( objcfg.nbchunks ),
135  blkid( blkid ),
136  recovering( 0 )
137  {
138  }
139 
140  //-----------------------------------------------------------------------
141  // Read data from stripe
142  //
143  // @param self : the block_t object
144  // @param strpid : stripe ID
145  // @param offset : relative offset within the stripe
146  // @param size : number of bytes to be read from the stripe
147  // @param usrbuff : user buffer for the data
148  // @param usrcb : user callback to be notified when the read operation
149  // has been resolved
150  // @param timeout : operation timeout
151  //-----------------------------------------------------------------------
152  static void read( std::shared_ptr<block_t> &self,
153  size_t strpid,
154  uint64_t offset,
155  uint32_t size,
156  char *usrbuff,
157  callback_t usrcb,
158  uint16_t timeout )
159  {
160  std::unique_lock<std::mutex> lck( self->mtx );
161 
162  //---------------------------------------------------------------------
163  // The cache is empty, we need to load the data
164  //---------------------------------------------------------------------
165  if( self->state[strpid] == Empty )
166  {
167  self->reader.Read( self->blkid, strpid, self->stripes[strpid],
168  read_callback( self, strpid ), timeout );
169  self->state[strpid] = Loading;
170  }
171  //---------------------------------------------------------------------
172  // The stripe is either corrupted or unreachable
173  //---------------------------------------------------------------------
174  if( self->state[strpid] == Missing )
175  {
176  if( !error_correction( self ) )
177  {
178  //-----------------------------------------------------------------
179  // Recovery was not possible, notify the user of the error
180  //-----------------------------------------------------------------
182  return;
183  }
184  //-------------------------------------------------------------------
185  // we fall through to the following if-statements that will handle
186  // Recovering / Valid state
187  //-------------------------------------------------------------------
188  }
189  //---------------------------------------------------------------------
190  // The cache is loading or recovering, we don't have the data yet
191  //---------------------------------------------------------------------
192  if( self->state[strpid] == Loading || self->state[strpid] == Recovering )
193  {
194  self->pending[strpid].emplace_back( offset, size, usrbuff, usrcb );
195  return;
196  }
197  //---------------------------------------------------------------------
198  // We do have the data so we can serve the user right away
199  //---------------------------------------------------------------------
200  if( self->state[strpid] == Valid )
201  {
202  if( offset + size > self->stripes[strpid].size() )
203  size = self->stripes[strpid].size() - offset;
204  if(usrbuff)
205  memcpy( usrbuff, self->stripes[strpid].data() + offset, size );
206  usrcb( XrdCl::XRootDStatus(), size );
207  return;
208  }
209  //---------------------------------------------------------------------
210  // In principle we should never end up here, nevertheless if this
211  // happens it is clearly an error ...
212  //---------------------------------------------------------------------
214  }
215 
216  //-----------------------------------------------------------------------
217  // If neccessary trigger error correction procedure
218  // @param self : the block_t object
219  // @return : false if the block is corrupted and cannot be recovered,
220  // true otherwise
221  //-----------------------------------------------------------------------
222  static bool error_correction( std::shared_ptr<block_t> &self )
223  {
224  //---------------------------------------------------------------------
225  // Do the accounting for our stripes
226  //---------------------------------------------------------------------
227  size_t missingcnt = 0, validcnt = 0, loadingcnt = 0, recoveringcnt = 0;
228  std::for_each( self->state.begin(), self->state.end(), [&]( state_t &s )
229  {
230  switch( s )
231  {
232  case Missing: ++missingcnt; break;
233  case Valid: ++validcnt; break;
234  case Loading: ++loadingcnt; break;
235  case Recovering: ++recoveringcnt; break;
236  default: ;
237  }
238  } );
239  //---------------------------------------------------------------------
240  // If there are no missing stripes all is good ...
241  //---------------------------------------------------------------------
242  if( missingcnt + recoveringcnt == 0 ) return true;
243  //---------------------------------------------------------------------
244  // Check if we can do the recovery at all (if too many stripes are
245  // missing it won't be possible)
246  //---------------------------------------------------------------------
247  if( missingcnt + recoveringcnt > self->objcfg.nbparity )
248  {
249  std::for_each( self->state.begin(), self->state.end(),
250  []( state_t &s ){ if( s == Recovering ) s = Missing; } );
251  return false;
252  }
253  //---------------------------------------------------------------------
254  // Check if we can do the recovery right away
255  //---------------------------------------------------------------------
256  if( validcnt >= self->objcfg.nbdata )
257  {
258  Config &cfg = Config::Instance();
259  stripes_t strps( self->get_stripes() );
260  try
261  {
262  cfg.GetRedundancy( self->objcfg ).compute( strps );
263  }
264  catch( const IOError &ex )
265  {
266  std::for_each( self->state.begin(), self->state.end(),
267  []( state_t &s ){ if( s == Recovering ) s = Missing; } );
268  return false;
269  }
270  //-------------------------------------------------------------------
271  // Now when we recovered the data we need to mark every stripe as
272  // valid and execute the pending reads
273  //-------------------------------------------------------------------
274  for( size_t strpid = 0; strpid < self->objcfg.nbchunks; ++strpid )
275  {
276  if( self->state[strpid] != Recovering ) continue;
277  self->state[strpid] = Valid;
278  self->carryout( self->pending[strpid], self->stripes[strpid] );
279  }
280  return true;
281  }
282  //---------------------------------------------------------------------
283  // Try loading the data and only then attempt recovery
284  //---------------------------------------------------------------------
285  size_t i = 0;
286  while( loadingcnt + validcnt < self->objcfg.nbdata && i < self->objcfg.nbchunks )
287  {
288  size_t strpid = i++;
289  if( self->state[strpid] != Empty ) continue;
290  self->reader.Read( self->blkid, strpid, self->stripes[strpid],
291  read_callback( self, strpid ) );
292  self->state[strpid] = Loading;
293  ++loadingcnt;
294  }
295 
296  //-------------------------------------------------------------------
297  // Now that we triggered the recovery procedure mark every missing
298  // stripe as recovering.
299  //-------------------------------------------------------------------
300  std::for_each( self->state.begin(), self->state.end(),
301  []( state_t &s ){ if( s == Missing ) s = Recovering; } );
302  return true;
303  }
304 
305  //-----------------------------------------------------------------------
306  // Get a callback for read operation
307  //-----------------------------------------------------------------------
308  inline static
309  callback_t read_callback( std::shared_ptr<block_t> &self, size_t strpid )
310  {
311  return [self, strpid]( const XrdCl::XRootDStatus &st, uint32_t ) mutable
312  {
313  std::unique_lock<std::mutex> lck( self->mtx );
314  self->state[strpid] = st.IsOK() ? Valid : Missing;
315  //------------------------------------------------------------
316  // Check if we need to do any error correction (either for
317  // the current stripe, or any other stripe)
318  //------------------------------------------------------------
319  bool recoverable = error_correction( self );
320  //------------------------------------------------------------
321  // Carry out the pending read requests if we got the data
322  //------------------------------------------------------------
323  if( st.IsOK() )
324  self->carryout( self->pending[strpid], self->stripes[strpid], st );
325  //------------------------------------------------------------
326  // Carry out the pending read requests if there was an error
327  // and we cannot recover
328  //------------------------------------------------------------
329  if( !recoverable )
330  self->fail_missing();
331  };
332  }
333 
334  //-----------------------------------------------------------------------
335  // Get stripes_t data structure used for error recovery
336  //-----------------------------------------------------------------------
338  {
339  stripes_t ret;
340  ret.reserve( objcfg.nbchunks );
341  for( size_t i = 0; i < objcfg.nbchunks; ++i )
342  {
343  if( state[i] == Valid )
344  ret.emplace_back( stripes[i].data(), true );
345  else
346  {
347  stripes[i].resize( objcfg.chunksize, 0 );
348  ret.emplace_back( stripes[i].data(), false );
349  }
350  }
351  return ret;
352  }
353 
354  //-------------------------------------------------------------------------
355  // Execute the pending read requests
356  //-------------------------------------------------------------------------
357  inline void carryout( pending_t &pending,
358  const buffer_t &stripe,
360  {
361  //-----------------------------------------------------------------------
362  // Iterate over all pending read operations for given stripe
363  //-----------------------------------------------------------------------
364  auto itr = pending.begin();
365  for( ; itr != pending.end() ; ++itr )
366  {
367  auto &args = *itr;
368  callback_t &callback = std::get<3>( args );
369  uint32_t nbrd = 0; // number of bytes read
370  //---------------------------------------------------------------------
371  // If the read was successful, copy the data to user buffer
372  //---------------------------------------------------------------------
373  if( st.IsOK() )
374  {
375  uint64_t offset = std::get<0>( args );
376  uint32_t size = std::get<1>( args );
377  char *usrbuff = std::get<2>( args );
378  // are we reading past the end of file?
379  if( offset > stripe.size() )
380  size = 0;
381  // are partially reading past the end of the file?
382  else if( offset + size > stripe.size() )
383  size = stripe.size() - offset;
384  if(usrbuff)
385  memcpy( usrbuff, stripe.data() + offset, size );
386  nbrd = size;
387  }
388  //---------------------------------------------------------------------
389  // Call the user callback
390  //---------------------------------------------------------------------
391  callback( st, nbrd );
392  }
393  //-----------------------------------------------------------------------
394  // Now we can clear the pending reads
395  //-----------------------------------------------------------------------
396  pending.clear();
397  }
398 
399  //-------------------------------------------------------------------------
400  // Execute pending read requests for missing stripes
401  //-------------------------------------------------------------------------
402  inline void fail_missing()
403  {
404  size_t size = objcfg.nbchunks;
405  for( size_t i = 0; i < size; ++i )
406  {
407  if( state[i] != Missing ) continue;
408  carryout( pending[i], stripes[i],
410  }
411  }
412 
415  std::vector<buffer_t> stripes; //< data buffer for every stripe
416  std::vector<state_t> state; //< state of every data buffer (empty/loading/valid)
417  std::vector<pending_t> pending; //< pending reads per stripe
418  size_t blkid; //< block ID
419  bool recovering; //< true if we are in the process of recovering data, false otherwise
420  std::mutex mtx;
421  };
422 
423  //---------------------------------------------------------------------------
424  // Destructor (we need it in the source file because block_t is defined in
425  // here)
426  //---------------------------------------------------------------------------
427  Reader::~Reader()
428  {
429  }
430 
431  //---------------------------------------------------------------------------
432  // Open the erasure coded / striped object
433  //---------------------------------------------------------------------------
434  void Reader::Open( XrdCl::ResponseHandler *handler, uint16_t timeout )
435  {
436  const size_t size = objcfg.plgr.size();
437  std::vector<XrdCl::Pipeline> opens; opens.reserve( size );
438  for( size_t i = 0; i < size; ++i )
439  {
440  // generate the URL
441  std::string url = objcfg.GetDataUrl( i );
442  archiveIndices.emplace(url, i);
443  // create the file object
444  dataarchs.emplace( url, std::make_shared<XrdCl::ZipArchive>(
445  Config::Instance().enable_plugins ) );
446  // open the archive
447  if( objcfg.nomtfile )
448  {
449  opens.emplace_back( XrdCl::OpenArchive( *dataarchs[url], url, XrdCl::OpenFlags::Read ) );
450  }
451  else
452  opens.emplace_back( OpenOnly( *dataarchs[url], url, false ) );
453  }
454 
455  auto pipehndl = [=]( const XrdCl::XRootDStatus &st )
456  { // set the central directories in ZIP archives (if we use metadata files)
457  auto itr = dataarchs.begin();
458  for( ; itr != dataarchs.end() ; ++itr )
459  {
460  const std::string &url = itr->first;
461  auto &zipptr = itr->second;
462  if( zipptr->openstage == XrdCl::ZipArchive::NotParsed )
463  zipptr->SetCD( metadata[url] );
464  else if( zipptr->openstage != XrdCl::ZipArchive::Done && !metadata.empty() )
465  AddMissing( metadata[url] );
466  auto itr = zipptr->cdmap.begin();
467  for( ; itr != zipptr->cdmap.end() ; ++itr )
468  {
469  urlmap.emplace( itr->first, url );
470  size_t blknb = fntoblk( itr->first );
471  if( blknb > lstblk ) lstblk = blknb;
472  }
473  }
474  metadata.clear();
475  // call user handler
476  if( handler )
477  handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
478  };
479  // in parallel open the data files and read the metadata
480  XrdCl::Pipeline p = objcfg.nomtfile
481  ? XrdCl::Parallel( opens ).AtLeast( objcfg.nbdata ) | ReadSize( 0 ) | XrdCl::Final( pipehndl )
482  : XrdCl::Parallel( ReadMetadata( 0 ),
483  XrdCl::Parallel( opens ).AtLeast( objcfg.nbdata ) ) >> pipehndl;
484  XrdCl::Async( std::move( p ), timeout );
485  }
486 
487  //-----------------------------------------------------------------------
488  // Read data from the data object
489  //-----------------------------------------------------------------------
490  void Reader::Read( uint64_t offset,
491  uint32_t length,
492  void *buffer,
493  XrdCl::ResponseHandler *handler,
494  uint16_t timeout )
495  {
496  if( objcfg.nomtfile )
497  {
498  if( offset >= filesize )
499  length = 0;
500  else if( offset + length > filesize )
501  length = filesize - offset;
502  }
503 
504  if( length == 0 )
505  {
506  ScheduleHandler( offset, 0, buffer, handler );
507  return;
508  }
509 
510  char *usrbuff = reinterpret_cast<char*>( buffer );
511  typedef std::tuple<uint64_t, uint32_t,
512  void*, uint32_t,
514  XrdCl::XRootDStatus> rdctx_t;
515  auto rdctx = std::make_shared<rdctx_t>( offset, 0, buffer,
516  length, handler,
518  auto rdmtx = std::make_shared<std::mutex>();
519 
520  while( length > 0 )
521  {
522  size_t blkid = offset / objcfg.datasize; //< ID of the block from which we will be reading
523  size_t strpid = ( offset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
524  uint64_t rdoff = offset - blkid * objcfg.datasize - strpid * objcfg.chunksize; //< relative read offset within the stripe
525  uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
526  if( rdsize > length ) rdsize = length;
527  //-------------------------------------------------------------------
528  // Make sure we operate on a valid block
529  //-------------------------------------------------------------------
530  std::unique_lock<std::mutex> lck( blkmtx );
531  if( !block || block->blkid != blkid )
532  block = std::make_shared<block_t>( blkid, *this, objcfg );
533  //-------------------------------------------------------------------
534  // Prepare the callback for reading from single stripe
535  //-------------------------------------------------------------------
536  auto blk = block;
537  lck.unlock();
538  auto callback = [blk, rdctx, rdsize, rdmtx]( const XrdCl::XRootDStatus &st, uint32_t nbrd )
539  {
540  std::unique_lock<std::mutex> lck( *rdmtx );
541  //---------------------------------------------------------------------
542  // update number of bytes left to be read (bytes requested not actually
543  // read)
544  //---------------------------------------------------------------------
545  std::get<3>( *rdctx ) -= rdsize;
546  //---------------------------------------------------------------------
547  // Handle failure ...
548  //---------------------------------------------------------------------
549  if( !st.IsOK() )
550  std::get<5>( *rdctx ) = st; // the error
551  //---------------------------------------------------------------------
552  // Handle success ...
553  //---------------------------------------------------------------------
554  else
555  std::get<1>( *rdctx ) += nbrd; // number of bytes read
556  //---------------------------------------------------------------------
557  // Are we done?
558  //---------------------------------------------------------------------
559  if( std::get<3>( *rdctx ) == 0 )
560  {
561  //-------------------------------------------------------------------
562  // Check if the read operation was successful ...
563  //-------------------------------------------------------------------
564  XrdCl::XRootDStatus &status = std::get<5>( *rdctx );
565  if( !status.IsOK() )
566  ScheduleHandler( std::get<4>( *rdctx ), status );
567  else
568  ScheduleHandler( std::get<0>( *rdctx ), std::get<1>( *rdctx ),
569  std::get<2>( *rdctx ), std::get<4>( *rdctx ) );
570  }
571  };
572  //-------------------------------------------------------------------
573  // Read data from a stripe
574  //-------------------------------------------------------------------
575  block_t::read( blk, strpid, rdoff, rdsize, usrbuff, callback, timeout );
576  //-------------------------------------------------------------------
577  // Update absolute offset, read length, and user buffer
578  //-------------------------------------------------------------------
579  offset += rdsize;
580  length -= rdsize;
581  usrbuff += rdsize;
582  }
583  }
584 
585  //-----------------------------------------------------------------------
586  // Close the data object
587  //-----------------------------------------------------------------------
588  void Reader::Close( XrdCl::ResponseHandler *handler, uint16_t timeout )
589  {
590  //---------------------------------------------------------------------
591  // prepare the pipelines ...
592  //---------------------------------------------------------------------
593  std::vector<XrdCl::Pipeline> closes;
594  closes.reserve( dataarchs.size() );
595  auto itr = dataarchs.begin();
596  for( ; itr != dataarchs.end() ; ++itr )
597  {
598  auto &zipptr = itr->second;
599  if( zipptr->IsOpen() )
600  {
601  zipptr->SetProperty( "BundledClose", "true");
602  closes.emplace_back( XrdCl::CloseArchive( *zipptr ) >>
603  [zipptr]( XrdCl::XRootDStatus& ){ } );
604  }
605  }
606 
607  // if there is nothing to close just schedule the handler
608  if( closes.empty() ) ScheduleHandler( handler );
609  // otherwise close the archives
610  else XrdCl::Async( XrdCl::Parallel( closes ) >> handler, timeout );
611  }
612 
613  //-------------------------------------------------------------------------
614  // on-definition is not allowed here beforeiven stripes from given block
615  //-------------------------------------------------------------------------
616  void Reader::Read( size_t blknb, size_t strpnb, buffer_t &buffer, callback_t cb, uint16_t timeout )
617  {
618  // generate the file name (blknb/strpnb)
619  std::string fn = objcfg.GetFileName( blknb, strpnb );
620  // if the block/stripe does not exist it means we are reading passed the end of the file
621  auto itr = urlmap.find( fn );
622  if( itr == urlmap.end() )
623  {
624  auto st = !IsMissing( fn ) ? XrdCl::XRootDStatus() :
625  XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotFound );
626  ThreadPool::Instance().Execute( cb, st, 0 );
627  return;
628  }
629  // get the URL of the ZIP archive with the respective data
630  const std::string &url = itr->second;
631  // get the ZipArchive object
632  auto &zipptr = dataarchs[url];
633  // check the size of the data to be read
634  XrdCl::StatInfo *info = nullptr;
635  auto st = zipptr->Stat( fn, info );
636  if( !st.IsOK() )
637  {
638  ThreadPool::Instance().Execute( cb, st, 0 );
639  return;
640  }
641  uint32_t rdsize = info->GetSize();
642  delete info;
643  // create a buffer for the data
644  buffer.resize( objcfg.chunksize );
645  // issue the read request
646  XrdCl::Async( XrdCl::ReadFrom( *zipptr, fn, 0, rdsize, buffer.data() ) >>
647  [zipptr, fn, cb, this]( XrdCl::XRootDStatus &st, XrdCl::ChunkInfo &ch )
648  {
649  //---------------------------------------------------
650  // If read failed there's nothing to do, just pass the
651  // status to user callback
652  //---------------------------------------------------
653  if( !st.IsOK() )
654  {
655  cb( st, 0 );
656  return;
657  }
658  //---------------------------------------------------
659  // Get the checksum for the read data
660  //---------------------------------------------------
661  uint32_t orgcksum = 0;
662  auto s = zipptr->GetCRC32( fn, orgcksum );
663  //---------------------------------------------------
664  // If we cannot extract the checksum assume the data
665  // are corrupted
666  //---------------------------------------------------
667  if( !st.IsOK() )
668  {
669  cb( st, 0 );
670  return;
671  }
672  //---------------------------------------------------
673  // Verify data integrity
674  //---------------------------------------------------
675  uint32_t cksum = objcfg.digest( 0, ch.buffer, ch.length );
676  if( orgcksum != cksum )
677  {
678  cb( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ), 0 );
679  return;
680  }
681  //---------------------------------------------------
682  // All is good, we can call now the user callback
683  //---------------------------------------------------
684  cb( XrdCl::XRootDStatus(), ch.length );
685  }, timeout );
686  }
687 
688  //-----------------------------------------------------------------------
689  // Read metadata for the object
690  //-----------------------------------------------------------------------
691  XrdCl::Pipeline Reader::ReadMetadata( size_t index )
692  {
693  const size_t size = objcfg.plgr.size();
694  // create the File object
695  auto file = std::make_shared<XrdCl::File>( Config::Instance().enable_plugins );
696  // prepare the URL for Open operation
697  std::string url = objcfg.GetMetadataUrl( index );
698  // arguments for the Read operation
699  XrdCl::Fwd<uint32_t> rdsize;
700  XrdCl::Fwd<void*> rdbuff;
701 
702  return XrdCl::Open( *file, url, XrdCl::OpenFlags::Read ) >>
703  [=]( XrdCl::XRootDStatus &st, XrdCl::StatInfo &info ) mutable
704  {
705  if( !st.IsOK() )
706  {
707  if( index + 1 < size )
708  XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
709  return;
710  }
711  // prepare the args for the subsequent operation
712  rdsize = info.GetSize();
713  rdbuff = new char[info.GetSize()];
714  }
715  | XrdCl::Read( *file, 0, rdsize, rdbuff ) >>
716  [=]( XrdCl::XRootDStatus &st, XrdCl::ChunkInfo &ch )
717  {
718  if( !st.IsOK() )
719  {
720  if( index + 1 < size )
721  XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
722  return;
723  }
724  // now parse the metadata
725  if( !ParseMetadata( ch ) )
726  {
727  if( index + 1 < size )
728  XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
729  return;
730  }
731  }
732  | XrdCl::Close( *file ) >>
733  []( XrdCl::XRootDStatus &st )
734  {
735  if( !st.IsOK() )
736  XrdCl::Pipeline::Ignore(); // ignore errors, we don't really care
737  }
738  | XrdCl::Final(
739  [rdbuff, file]( const XrdCl::XRootDStatus& )
740  {
741  // deallocate the buffer if necessary
742  if( rdbuff.Valid() )
743  {
744  char* buffer = reinterpret_cast<char*>( *rdbuff );
745  delete[] buffer;
746  }
747  } );
748  }
749 
750  //-----------------------------------------------------------------------
754  //-----------------------------------------------------------------------
755  XrdCl::Pipeline Reader::ReadSize( size_t index )
756  {
757  std::string url = objcfg.GetDataUrl( index );
758  return XrdCl::GetXAttr( dataarchs[url]->GetFile(), "xrdec.filesize" ) >>
759  [index, this]( XrdCl::XRootDStatus &st, std::string &size)
760  {
761  if( !st.IsOK() )
762  {
763  //-------------------------------------------------------------
764  // Check if we can recover the error or a diffrent location
765  //-------------------------------------------------------------
766  if( index + 1 < objcfg.plgr.size() )
767  XrdCl::Pipeline::Replace( ReadSize( index + 1 ) );
768  return;
769  }
770  filesize = std::stoull( size );
771  };
772  }
773 
774  //-----------------------------------------------------------------------
775  // Parse metadata from chunk info object
776  //-----------------------------------------------------------------------
777  bool Reader::ParseMetadata( XrdCl::ChunkInfo &ch )
778  {
779  const size_t mincnt = objcfg.nbdata + objcfg.nbparity;
780  const size_t maxcnt = objcfg.plgr.size();
781 
782  char *buffer = reinterpret_cast<char*>( ch.buffer );
783  size_t length = ch.length;
784 
785  for( size_t i = 0; i < maxcnt; ++i )
786  {
787  uint32_t signature = XrdZip::to<uint32_t>( buffer );
788  if( signature != XrdZip::LFH::lfhSign )
789  {
790  if( i + 1 < mincnt ) return false;
791  break;
792  }
793  XrdZip::LFH lfh( buffer );
794  // check if we are not reading passed the end of the buffer
795  if( lfh.lfhSize + lfh.uncompressedSize > length ) return false;
796  buffer += lfh.lfhSize;
797  length -= lfh.lfhSize;
798  // verify the checksum
799  uint32_t crc32val = objcfg.digest( 0, buffer, lfh.uncompressedSize );
800  if( crc32val != lfh.ZCRC32 ) return false;
801  // keep the metadata
802  std::string url = objcfg.GetDataUrl( std::stoull( lfh.filename ) );
803  metadata.emplace( url, buffer_t( buffer, buffer + lfh.uncompressedSize ) );
804  buffer += lfh.uncompressedSize;
805  length -= lfh.uncompressedSize;
806  }
807 
808  return true;
809  }
810 
811  //-----------------------------------------------------------------------
812  // Add all the entries from given Central Directory to missing
813  //-----------------------------------------------------------------------
814  void Reader::AddMissing( const buffer_t &cdbuff )
815  {
816  const char *buff = cdbuff.data();
817  size_t size = cdbuff.size();
818  // parse Central Directory records
819  XrdZip::cdvec_t cdvec;
820  XrdZip::cdmap_t cdmap;
821  std::tie(cdvec, cdmap ) = XrdZip::CDFH::Parse( buff, size );
822  auto itr = cdvec.begin();
823  for( ; itr != cdvec.end() ; ++itr )
824  {
825  XrdZip::CDFH &cdfh = **itr;
826  missing.insert( cdfh.filename );
827  }
828  }
829 
830  //-----------------------------------------------------------------------
832  //-----------------------------------------------------------------------
833  bool Reader::IsMissing( const std::string &fn )
834  {
835  // if the chunk is in the missing set return true
836  if( missing.count( fn ) ) return true;
837  // if we don't have a metadata file and the chunk exceeds last chunk
838  // also return true
839  if( objcfg.nomtfile && fntoblk( fn ) <= lstblk ) return true;
840  // otherwise return false
841  return false;
842  }
843 
844 
845  inline callback_t Reader::ErrorCorrected(Reader *reader, std::shared_ptr<block_t> &self, size_t blkid, size_t strpid){
846  return [reader, self, strpid, blkid]( const XrdCl::XRootDStatus &st, uint32_t ) mutable
847  {
848  std::unique_lock<std::mutex> readerLock(reader->missingChunksMutex);
849  reader->missingChunksVectorRead.erase(std::remove(reader->missingChunksVectorRead.begin(), reader->missingChunksVectorRead.end(), std::make_tuple(blkid, strpid)));
850  reader->waitMissing.notify_all();
851  };
852  }
853 
854  void Reader::MissingVectorRead(std::shared_ptr<block_t> &currentBlock, size_t blkid, size_t strpid, uint16_t timeout){
855  {
856  std::unique_lock<std::mutex> lk(missingChunksMutex);
857  missingChunksVectorRead.emplace_back(
858  std::make_tuple(blkid,strpid));
859  }
860  currentBlock->state[strpid] = block_t::Missing;
861  currentBlock->read(currentBlock, strpid, 0, objcfg.chunksize,
862  nullptr,
863  ErrorCorrected(this, currentBlock, blkid, strpid),
864  timeout);
865  }
866 
867 
868  void Reader::VectorRead(const XrdCl::ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout){
869  if(chunks.size() > 1024) {
871  return;
872  }
873 
874  std::vector<XrdCl::ChunkList> hostLists;
875  for(size_t dataHosts = 0; dataHosts < objcfg.plgr.size(); dataHosts++){
876  hostLists.emplace_back(XrdCl::ChunkList());
877  }
878 
879  auto log = XrdCl::DefaultEnv::GetLog();
880 
881  //bool useGlobalBuffer = buffer != nullptr;
882  char* globalBuffer = (char*)buffer;
883 
884  // host index, blkid, strpid
885  std::set<std::tuple<size_t, size_t, size_t>> requestedChunks;
886  // create block_ts for any requested block index
887  std::map<size_t, std::shared_ptr<block_t>> blockMap;
888 
889  // go through the requested lists of chunks and assign them to fitting hosts
890  for(size_t index = 0; index < chunks.size(); index++){
891  uint32_t remainLength = chunks[index].length;
892  uint64_t currentOffset = chunks[index].offset;
893 
894  while(remainLength > 0){
895  size_t blkid = currentOffset / objcfg.datasize; //< ID of the block from which we will be reading
896  size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
897  uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ; //< relative read offset within the stripe
898  //uint64_t offsetInFile = rdoff + blkid * objcfg.chunksize; // relative offset within the file
899  uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
900  if( rdsize > remainLength ) rdsize = remainLength;
901  if(currentOffset + rdsize >= filesize) {
902  rdsize = filesize - currentOffset;
903  remainLength = rdsize;
904  }
905 
906 
907  std::string fn = objcfg.GetFileName(blkid, strpid);
908 
909  auto itr = urlmap.find( fn );
910  if( itr == urlmap.end() )
911  {
912  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: No mapping of file to host found.");
913  break;
914  }
915  // get the URL of the ZIP archive with the respective data
916  const std::string &url = itr->second;
917  auto itr2 = archiveIndices.find(url);
918  if(itr2 == archiveIndices.end())
919  {
920  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't find host for file.");
921  break;
922  }
923  size_t indexOfArchive = archiveIndices[url];
924 
925  if (blockMap.find(blkid) == blockMap.end())
926  {
927  blockMap.emplace(blkid,
928  std::make_shared<block_t>(blkid, *this, objcfg));
929  }
930 
931  blockMap[blkid]->state[strpid] = block_t::Loading;
932  XrdCl::StatInfo* info = nullptr;
933  if(dataarchs[url]->Stat(objcfg.GetFileName(blkid, strpid), info).IsOK())
934  blockMap[blkid]->stripes[strpid].resize( info ->GetSize() );
935 
936  auto requestChunk = std::make_tuple(indexOfArchive, blkid, strpid);
937  if(requestedChunks.find(requestChunk) == requestedChunks.end())
938  {
939  uint64_t off = 0;
940  dataarchs[url]->GetOffset(objcfg.GetFileName(blkid, strpid), off);
941  hostLists[indexOfArchive].emplace_back(XrdCl::ChunkInfo(
942  off,
943  info ->GetSize(),
944  blockMap[blkid]->stripes[strpid].data()));
945 
946  // fill list of requested chunks by block and stripe id
947  requestedChunks.emplace(requestChunk);
948 
949  }
950  remainLength -= rdsize;
951  currentOffset += rdsize;
952 
953  }
954  }
955 
956  std::vector<XrdCl::Pipeline> hostPipes;
957  hostPipes.reserve(hostLists.size());
958  for(size_t i = 0; i < hostLists.size(); i++){
959  while(hostLists[i].size() > 0){
960  uint32_t range = hostLists[i].size() > 1024 ? 1024 : hostLists[i].size();
961  XrdCl::ChunkList partList(hostLists[i].begin(), hostLists[i].begin() + range);
962  hostLists[i].erase(hostLists[i].begin(), hostLists[i].begin() + range);
963  hostPipes.emplace_back(
964  XrdCl::VectorRead(XrdCl::Ctx<XrdCl::File>(dataarchs[objcfg.GetDataUrl(i)]->archive),
965  partList, nullptr, timeout)
966  >> [=](const XrdCl::XRootDStatus &st, XrdCl::VectorReadInfo ch) mutable
967  {
968  auto it = requestedChunks.begin();
969  while(it!=requestedChunks.end())
970  {
971  auto &args = *it;
972  size_t host = std::get<0>(args);
973  size_t blkid = std::get<1>(args);
974  size_t strpid = std::get<2>(args);
975  it++;
976  if(host == i)
977  {
978  std::shared_ptr<block_t> currentBlock = blockMap[blkid];
979 
980 
981  if(!st.IsOK())
982  {
983  log->Dump(XrdCl::XRootDMsg, "EC Vector Read of host %zu failed entirely.", i);
984  MissingVectorRead(currentBlock, blkid, strpid, timeout);
985  }
986  else{
987  uint32_t orgcksum = 0;
988  auto s = dataarchs[objcfg.GetDataUrl(i)]->GetCRC32( objcfg.GetFileName(blkid, strpid), orgcksum );
989  //---------------------------------------------------
990  // If we cannot extract the checksum assume the data
991  // are corrupted
992  //---------------------------------------------------
993  if( !st.IsOK() )
994  {
995  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't read CRC32 from CD.");
996  MissingVectorRead(currentBlock, blkid, strpid, timeout);
997  continue;
998  }
999  //---------------------------------------------------
1000  // Verify data integrity
1001  //---------------------------------------------------
1002  uint32_t cksum = objcfg.digest( 0, currentBlock->stripes[strpid].data(), currentBlock->stripes[strpid].size() );
1003  if( orgcksum != cksum )
1004  {
1005  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Wrong checksum for block %zu stripe %zu.", blkid, strpid);
1006  MissingVectorRead(currentBlock, blkid, strpid, timeout);
1007  continue;
1008  }
1009  else{
1010  currentBlock->state[strpid] = block_t::Valid;
1011  bool recoverable = currentBlock->error_correction( currentBlock );
1012  if(!recoverable)
1013  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't recover block %zu.", blkid);
1014  }
1015  }
1016  }
1017  }
1018  }
1019  );
1020  }
1021  }
1022 
1023  auto finalPipehndl = [=] (const XrdCl::XRootDStatus &st) mutable {
1024  // wait until all missing chunks are corrected (uses single reads to get parity stripes)
1025  std::unique_lock<std::mutex> lk(missingChunksMutex);
1026  waitMissing.wait(lk, [=] { return missingChunksVectorRead.size() == 0;});
1027 
1028  bool failed = false;
1029  for(size_t index = 0; index < chunks.size(); index++){
1030  uint32_t remainLength = chunks[index].length;
1031  uint64_t currentOffset = chunks[index].offset;
1032 
1033  char *localBuffer;
1034  if (globalBuffer)
1035  localBuffer = globalBuffer;
1036  else
1037  localBuffer = (char*)(chunks[index].buffer);
1038 
1039  while(remainLength > 0){
1040  size_t blkid = currentOffset / objcfg.datasize; //< ID of the block from which we will be reading
1041  size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
1042  uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ; //< relative read offset within the stripe
1043  uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
1044  if( rdsize > remainLength ) rdsize = remainLength;
1045 
1046  // put received data into given buffers
1047  if(blockMap.find(blkid) == blockMap.end() || blockMap[blkid] == nullptr){
1048  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Missing block %zu.", blkid);
1049  failed = true;
1050  break;
1051  }
1052  if(blockMap[blkid]->state[strpid] != block_t::Valid){
1053  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Invalid stripe in block %zu stripe %zu.", blkid, strpid);
1054  failed = true;
1055  break;
1056  }
1057 
1058  memcpy(localBuffer, blockMap[blkid]->stripes[strpid].data() + rdoff, rdsize);
1059 
1060  remainLength -= rdsize;
1061  currentOffset += rdsize;
1062  localBuffer += rdsize;
1063  }
1064  if(globalBuffer) globalBuffer = localBuffer;
1065  }
1066  if(handler){
1067  if(failed) log->Dump(XrdCl::XRootDMsg, "EC Vector Read failed (at least in part).");
1068  if(failed) handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, "Couldn't read all segments"), nullptr);
1069  else handler->HandleResponse(new XrdCl::XRootDStatus(), nullptr);
1070  }
1071  };
1072 
1073  XrdCl::Pipeline p = XrdCl::Parallel(hostPipes) |
1074  XrdCl::Final(finalPipehndl);
1075 
1076  XrdCl::Async(std::move(p), timeout);
1077 
1078  }
1079 
1080 } /* namespace XrdEc */
static unsigned long long int stoull(const std::string &s)
simple integer parsing, to be replaced by std::stoll when C++11 can be used
struct stat Stat
Definition: XrdCks.cc:49
ssize_t read(int fildes, void *buf, size_t nbyte)
XrdOucString Valid
std::tuple< Args... > args
Operation arguments.
uint16_t timeout
Operation timeout.
Derived< HasHndl > Timeout(uint16_t timeout)
Set operation timeout.
static Log * GetLog()
Get default log.
std::unique_ptr< PipelineHandler > handler
Operation handler.
static void Replace(Operation< false > &&opr)
Replace current operation.
static void Ignore()
Ignore error and proceed with the pipeline.
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
XrdCl::XRootDStatus RunImpl(XrdCl::PipelineHandler *handler, uint16_t pipelineTimeout)
Definition: XrdEcReader.cc:91
std::string ToString()
Name of the operation.
Definition: XrdEcReader.cc:77
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint16_t errNotFound
Definition: XrdClStatus.hh:100
const uint64_t XRootDMsg
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
GetXAttrImpl< false > GetXAttr(Ctx< File > file, Arg< std::string > name)
ZipReadFromImpl< false > ReadFrom(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ArchiveReadImpl objects.
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
FinalOperation Final
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
XrdCmsConfig Config
std::vector< stripe_t > stripes_t
All stripes in a block.
static size_t fntoblk(const std::string &fn)
std::function< void(const XrdCl::XRootDStatus &, uint32_t)> callback_t
Definition: XrdEcReader.hh:53
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
std::vector< char > buffer_t
a buffer type
Definition: XrdEcReader.hh:45
OpenOnlyImpl< false > OpenOnly(XrdCl::Ctx< XrdCl::ZipArchive > zip, XrdCl::Arg< std::string > fn, XrdCl::Arg< bool > updt, uint16_t timeout=0)
Definition: XrdEcReader.cc:105
std::vector< std::unique_ptr< CDFH > > cdvec_t
Definition: XrdZipCDFH.hh:46
std::unordered_map< std::string, size_t > cdmap_t
Definition: XrdZipCDFH.hh:56
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
bool Valid() const
Check if it contains a valid value.
Definition: XrdClFwd.hh:235
@ Read
Open only for reading.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
const uint8_t nbdata
Definition: XrdEcObjCfg.hh:87
const uint8_t nbchunks
Definition: XrdEcObjCfg.hh:85
static bool error_correction(std::shared_ptr< block_t > &self)
Definition: XrdEcReader.cc:222
void fail_missing()
Definition: XrdEcReader.cc:402
std::vector< state_t > state
Definition: XrdEcReader.cc:416
block_t(size_t blkid, Reader &reader, ObjCfg &objcfg)
Definition: XrdEcReader.cc:130
static void read(std::shared_ptr< block_t > &self, size_t strpid, uint64_t offset, uint32_t size, char *usrbuff, callback_t usrcb, uint16_t timeout)
Definition: XrdEcReader.cc:152
void carryout(pending_t &pending, const buffer_t &stripe, const XrdCl::XRootDStatus &st=XrdCl::XRootDStatus())
Definition: XrdEcReader.cc:357
stripes_t get_stripes()
Definition: XrdEcReader.cc:337
Reader & reader
Definition: XrdEcReader.cc:413
std::vector< buffer_t > stripes
Definition: XrdEcReader.cc:415
std::tuple< uint64_t, uint32_t, char *, callback_t > args_t
Definition: XrdEcReader.cc:119
ObjCfg & objcfg
Definition: XrdEcReader.cc:414
std::mutex mtx
Definition: XrdEcReader.cc:420
std::vector< args_t > pending_t
Definition: XrdEcReader.cc:120
std::vector< pending_t > pending
Definition: XrdEcReader.cc:417
static callback_t read_callback(std::shared_ptr< block_t > &self, size_t strpid)
Definition: XrdEcReader.cc:309
std::string filename
Definition: XrdZipCDFH.hh:344
static std::tuple< cdvec_t, cdmap_t > Parse(const char *buffer, uint32_t bufferSize, uint16_t nbCdRecords)
Definition: XrdZipCDFH.hh:75
A data structure representing ZIP Local File Header.
Definition: XrdZipLFH.hh:42
static const uint32_t lfhSign
Local File Header signature.
Definition: XrdZipLFH.hh:173