XRootD
XrdEc::block_t Struct Reference
+ Collaboration diagram for XrdEc::block_t:

Public Types

typedef std::tuple< uint64_t, uint32_t, char *, callback_targs_t
 
typedef std::vector< args_tpending_t
 
enum  state_t {
  Empty = 0 ,
  Loading ,
  Valid ,
  Missing ,
  Recovering
}
 

Public Member Functions

 block_t (size_t blkid, Reader &reader, ObjCfg &objcfg)
 
void carryout (pending_t &pending, const buffer_t &stripe, const XrdCl::XRootDStatus &st=XrdCl::XRootDStatus())
 
void fail_missing ()
 
stripes_t get_stripes ()
 

Static Public Member Functions

static bool error_correction (std::shared_ptr< block_t > &self)
 
static void read (std::shared_ptr< block_t > &self, size_t strpid, uint64_t offset, uint32_t size, char *usrbuff, callback_t usrcb, uint16_t timeout)
 
static callback_t read_callback (std::shared_ptr< block_t > &self, size_t strpid)
 

Public Attributes

size_t blkid
 
std::mutex mtx
 
ObjCfgobjcfg
 
std::vector< pending_tpending
 
Readerreader
 
bool recovering
 
std::vector< state_tstate
 
std::vector< buffer_tstripes
 

Detailed Description

Definition at line 117 of file XrdEcReader.cc.

Member Typedef Documentation

◆ args_t

typedef std::tuple<uint64_t, uint32_t, char*, callback_t> XrdEc::block_t::args_t

Definition at line 119 of file XrdEcReader.cc.

◆ pending_t

typedef std::vector<args_t> XrdEc::block_t::pending_t

Definition at line 120 of file XrdEcReader.cc.

Member Enumeration Documentation

◆ state_t

Enumerator
Empty 
Loading 
Valid 
Missing 
Recovering 

Definition at line 125 of file XrdEcReader.cc.

Constructor & Destructor Documentation

◆ block_t()

XrdEc::block_t::block_t ( size_t  blkid,
Reader reader,
ObjCfg objcfg 
)
inline

Definition at line 130 of file XrdEcReader.cc.

130  : reader( reader ),
131  objcfg( objcfg ),
135  blkid( blkid ),
136  recovering( 0 )
137  {
138  }
const uint8_t nbchunks
Definition: XrdEcObjCfg.hh:85
std::vector< state_t > state
Definition: XrdEcReader.cc:416
Reader & reader
Definition: XrdEcReader.cc:413
std::vector< buffer_t > stripes
Definition: XrdEcReader.cc:415
ObjCfg & objcfg
Definition: XrdEcReader.cc:414
std::vector< pending_t > pending
Definition: XrdEcReader.cc:417

Member Function Documentation

◆ carryout()

void XrdEc::block_t::carryout ( pending_t pending,
const buffer_t stripe,
const XrdCl::XRootDStatus st = XrdCl::XRootDStatus() 
)
inline

Definition at line 357 of file XrdEcReader.cc.

360  {
361  //-----------------------------------------------------------------------
362  // Iterate over all pending read operations for given stripe
363  //-----------------------------------------------------------------------
364  auto itr = pending.begin();
365  for( ; itr != pending.end() ; ++itr )
366  {
367  auto &args = *itr;
368  callback_t &callback = std::get<3>( args );
369  uint32_t nbrd = 0; // number of bytes read
370  //---------------------------------------------------------------------
371  // If the read was successful, copy the data to user buffer
372  //---------------------------------------------------------------------
373  if( st.IsOK() )
374  {
375  uint64_t offset = std::get<0>( args );
376  uint32_t size = std::get<1>( args );
377  char *usrbuff = std::get<2>( args );
378  // are we reading past the end of file?
379  if( offset > stripe.size() )
380  size = 0;
381  // are partially reading past the end of the file?
382  else if( offset + size > stripe.size() )
383  size = stripe.size() - offset;
384  if(usrbuff)
385  memcpy( usrbuff, stripe.data() + offset, size );
386  nbrd = size;
387  }
388  //---------------------------------------------------------------------
389  // Call the user callback
390  //---------------------------------------------------------------------
391  callback( st, nbrd );
392  }
393  //-----------------------------------------------------------------------
394  // Now we can clear the pending reads
395  //-----------------------------------------------------------------------
396  pending.clear();
397  }
std::function< void(const XrdCl::XRootDStatus &, uint32_t)> callback_t
Definition: XrdEcReader.hh:53
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124

◆ error_correction()

static bool XrdEc::block_t::error_correction ( std::shared_ptr< block_t > &  self)
inlinestatic

Definition at line 222 of file XrdEcReader.cc.

223  {
224  //---------------------------------------------------------------------
225  // Do the accounting for our stripes
226  //---------------------------------------------------------------------
227  size_t missingcnt = 0, validcnt = 0, loadingcnt = 0, recoveringcnt = 0;
228  std::for_each( self->state.begin(), self->state.end(), [&]( state_t &s )
229  {
230  switch( s )
231  {
232  case Missing: ++missingcnt; break;
233  case Valid: ++validcnt; break;
234  case Loading: ++loadingcnt; break;
235  case Recovering: ++recoveringcnt; break;
236  default: ;
237  }
238  } );
239  //---------------------------------------------------------------------
240  // If there are no missing stripes all is good ...
241  //---------------------------------------------------------------------
242  if( missingcnt + recoveringcnt == 0 ) return true;
243  //---------------------------------------------------------------------
244  // Check if we can do the recovery at all (if too many stripes are
245  // missing it won't be possible)
246  //---------------------------------------------------------------------
247  if( missingcnt + recoveringcnt > self->objcfg.nbparity )
248  {
249  std::for_each( self->state.begin(), self->state.end(),
250  []( state_t &s ){ if( s == Recovering ) s = Missing; } );
251  return false;
252  }
253  //---------------------------------------------------------------------
254  // Check if we can do the recovery right away
255  //---------------------------------------------------------------------
256  if( validcnt >= self->objcfg.nbdata )
257  {
258  Config &cfg = Config::Instance();
259  stripes_t strps( self->get_stripes() );
260  try
261  {
262  cfg.GetRedundancy( self->objcfg ).compute( strps );
263  }
264  catch( const IOError &ex )
265  {
266  std::for_each( self->state.begin(), self->state.end(),
267  []( state_t &s ){ if( s == Recovering ) s = Missing; } );
268  return false;
269  }
270  //-------------------------------------------------------------------
271  // Now when we recovered the data we need to mark every stripe as
272  // valid and execute the pending reads
273  //-------------------------------------------------------------------
274  for( size_t strpid = 0; strpid < self->objcfg.nbchunks; ++strpid )
275  {
276  if( self->state[strpid] != Recovering ) continue;
277  self->state[strpid] = Valid;
278  self->carryout( self->pending[strpid], self->stripes[strpid] );
279  }
280  return true;
281  }
282  //---------------------------------------------------------------------
283  // Try loading the data and only then attempt recovery
284  //---------------------------------------------------------------------
285  size_t i = 0;
286  while( loadingcnt + validcnt < self->objcfg.nbdata && i < self->objcfg.nbchunks )
287  {
288  size_t strpid = i++;
289  if( self->state[strpid] != Empty ) continue;
290  self->reader.Read( self->blkid, strpid, self->stripes[strpid],
291  read_callback( self, strpid ) );
292  self->state[strpid] = Loading;
293  ++loadingcnt;
294  }
295 
296  //-------------------------------------------------------------------
297  // Now that we triggered the recovery procedure mark every missing
298  // stripe as recovering.
299  //-------------------------------------------------------------------
300  std::for_each( self->state.begin(), self->state.end(),
301  []( state_t &s ){ if( s == Missing ) s = Recovering; } );
302  return true;
303  }
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
XrdCmsConfig Config
std::vector< stripe_t > stripes_t
All stripes in a block.
const uint8_t nbdata
Definition: XrdEcObjCfg.hh:87
static callback_t read_callback(std::shared_ptr< block_t > &self, size_t strpid)
Definition: XrdEcReader.cc:309

Referenced by read().

+ Here is the caller graph for this function:

◆ fail_missing()

void XrdEc::block_t::fail_missing ( )
inline

Definition at line 402 of file XrdEcReader.cc.

403  {
404  size_t size = objcfg.nbchunks;
405  for( size_t i = 0; i < size; ++i )
406  {
407  if( state[i] != Missing ) continue;
408  carryout( pending[i], stripes[i],
410  }
411  }
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
void carryout(pending_t &pending, const buffer_t &stripe, const XrdCl::XRootDStatus &st=XrdCl::XRootDStatus())
Definition: XrdEcReader.cc:357

References XrdCl::errDataError, and XrdCl::stError.

◆ get_stripes()

stripes_t XrdEc::block_t::get_stripes ( )
inline

Definition at line 337 of file XrdEcReader.cc.

338  {
339  stripes_t ret;
340  ret.reserve( objcfg.nbchunks );
341  for( size_t i = 0; i < objcfg.nbchunks; ++i )
342  {
343  if( state[i] == Valid )
344  ret.emplace_back( stripes[i].data(), true );
345  else
346  {
347  stripes[i].resize( objcfg.chunksize, 0 );
348  ret.emplace_back( stripes[i].data(), false );
349  }
350  }
351  return ret;
352  }
const uint64_t chunksize
Definition: XrdEcObjCfg.hh:89

References Valid.

◆ read()

static void XrdEc::block_t::read ( std::shared_ptr< block_t > &  self,
size_t  strpid,
uint64_t  offset,
uint32_t  size,
char *  usrbuff,
callback_t  usrcb,
uint16_t  timeout 
)
inlinestatic

Definition at line 152 of file XrdEcReader.cc.

159  {
160  std::unique_lock<std::mutex> lck( self->mtx );
161 
162  //---------------------------------------------------------------------
163  // The cache is empty, we need to load the data
164  //---------------------------------------------------------------------
165  if( self->state[strpid] == Empty )
166  {
167  self->reader.Read( self->blkid, strpid, self->stripes[strpid],
168  read_callback( self, strpid ), timeout );
169  self->state[strpid] = Loading;
170  }
171  //---------------------------------------------------------------------
172  // The stripe is either corrupted or unreachable
173  //---------------------------------------------------------------------
174  if( self->state[strpid] == Missing )
175  {
176  if( !error_correction( self ) )
177  {
178  //-----------------------------------------------------------------
179  // Recovery was not possible, notify the user of the error
180  //-----------------------------------------------------------------
182  return;
183  }
184  //-------------------------------------------------------------------
185  // we fall through to the following if-statements that will handle
186  // Recovering / Valid state
187  //-------------------------------------------------------------------
188  }
189  //---------------------------------------------------------------------
190  // The cache is loading or recovering, we don't have the data yet
191  //---------------------------------------------------------------------
192  if( self->state[strpid] == Loading || self->state[strpid] == Recovering )
193  {
194  self->pending[strpid].emplace_back( offset, size, usrbuff, usrcb );
195  return;
196  }
197  //---------------------------------------------------------------------
198  // We do have the data so we can serve the user right away
199  //---------------------------------------------------------------------
200  if( self->state[strpid] == Valid )
201  {
202  if( offset + size > self->stripes[strpid].size() )
203  size = self->stripes[strpid].size() - offset;
204  if(usrbuff)
205  memcpy( usrbuff, self->stripes[strpid].data() + offset, size );
206  usrcb( XrdCl::XRootDStatus(), size );
207  return;
208  }
209  //---------------------------------------------------------------------
210  // In principle we should never end up here, nevertheless if this
211  // happens it is clearly an error ...
212  //---------------------------------------------------------------------
214  }
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
static bool error_correction(std::shared_ptr< block_t > &self)
Definition: XrdEcReader.cc:222

References Empty, XrdCl::errDataError, XrdCl::errInvalidOp, error_correction(), Loading, Missing, read_callback(), Recovering, XrdCl::stError, and Valid.

+ Here is the call graph for this function:

◆ read_callback()

static callback_t XrdEc::block_t::read_callback ( std::shared_ptr< block_t > &  self,
size_t  strpid 
)
inlinestatic

Definition at line 309 of file XrdEcReader.cc.

310  {
311  return [self, strpid]( const XrdCl::XRootDStatus &st, uint32_t ) mutable
312  {
313  std::unique_lock<std::mutex> lck( self->mtx );
314  self->state[strpid] = st.IsOK() ? Valid : Missing;
315  //------------------------------------------------------------
316  // Check if we need to do any error correction (either for
317  // the current stripe, or any other stripe)
318  //------------------------------------------------------------
319  bool recoverable = error_correction( self );
320  //------------------------------------------------------------
321  // Carry out the pending read requests if we got the data
322  //------------------------------------------------------------
323  if( st.IsOK() )
324  self->carryout( self->pending[strpid], self->stripes[strpid], st );
325  //------------------------------------------------------------
326  // Carry out the pending read requests if there was an error
327  // and we cannot recover
328  //------------------------------------------------------------
329  if( !recoverable )
330  self->fail_missing();
331  };
332  }

References XrdCl::Status::IsOK(), and Valid.

Referenced by read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Data Documentation

◆ blkid

size_t XrdEc::block_t::blkid

Definition at line 418 of file XrdEcReader.cc.

◆ mtx

std::mutex XrdEc::block_t::mtx

Definition at line 420 of file XrdEcReader.cc.

◆ objcfg

ObjCfg& XrdEc::block_t::objcfg

Definition at line 414 of file XrdEcReader.cc.

◆ pending

std::vector<pending_t> XrdEc::block_t::pending

Definition at line 417 of file XrdEcReader.cc.

◆ reader

Reader& XrdEc::block_t::reader

Definition at line 413 of file XrdEcReader.cc.

◆ recovering

bool XrdEc::block_t::recovering

Definition at line 419 of file XrdEcReader.cc.

◆ state

std::vector<state_t> XrdEc::block_t::state

Definition at line 416 of file XrdEcReader.cc.

◆ stripes

std::vector<buffer_t> XrdEc::block_t::stripes

Definition at line 415 of file XrdEcReader.cc.


The documentation for this struct was generated from the following file: