XRootD
XrdEc::Reader Class Reference

#include <XrdEcReader.hh>

+ Collaboration diagram for XrdEc::Reader:

Public Member Functions

 Reader (ObjCfg &objcfg)
 
virtual ~Reader ()
 
void Close (XrdCl::ResponseHandler *handler, uint16_t timeout=0)
 Close the data object. More...
 
uint64_t GetSize ()
 
void Open (XrdCl::ResponseHandler *handler, uint16_t timeout=0)
 
void Read (uint64_t offset, uint32_t length, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout)
 
void VectorRead (const XrdCl::ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout)
 

Friends

class ::MicroTest
 
class ::XrdEcTests
 
struct block_t
 

Detailed Description

Definition at line 58 of file XrdEcReader.hh.

Constructor & Destructor Documentation

◆ Reader()

XrdEc::Reader::Reader ( ObjCfg objcfg)
inline

Constructor

Parameters
objcfg: configuration for the data object (e.g. number of data and parity stripes)

Definition at line 71 of file XrdEcReader.hh.

71  : objcfg( objcfg ), lstblk( 0 ), filesize( 0 )
72  {
73  }

◆ ~Reader()

XrdEc::Reader::~Reader ( )
virtual

Definition at line 427 of file XrdEcReader.cc.

428  {
429  }

Member Function Documentation

◆ Close()

void XrdEc::Reader::Close ( XrdCl::ResponseHandler handler,
uint16_t  timeout = 0 
)

Close the data object.

Definition at line 588 of file XrdEcReader.cc.

589  {
590  //---------------------------------------------------------------------
591  // prepare the pipelines ...
592  //---------------------------------------------------------------------
593  std::vector<XrdCl::Pipeline> closes;
594  closes.reserve( dataarchs.size() );
595  auto itr = dataarchs.begin();
596  for( ; itr != dataarchs.end() ; ++itr )
597  {
598  auto &zipptr = itr->second;
599  if( zipptr->IsOpen() )
600  {
601  zipptr->SetProperty( "BundledClose", "true");
602  closes.emplace_back( XrdCl::CloseArchive( *zipptr ) >>
603  [zipptr]( XrdCl::XRootDStatus& ){ } );
604  }
605  }
606 
607  // if there is nothing to close just schedule the handler
608  if( closes.empty() ) ScheduleHandler( handler );
609  // otherwise close the archives
610  else XrdCl::Async( XrdCl::Parallel( closes ) >> handler, timeout );
611  }
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)

References XrdCl::Async(), XrdCl::CloseArchive(), XrdCl::Parallel(), and XrdEc::ScheduleHandler().

+ Here is the call graph for this function:

◆ GetSize()

uint64_t XrdEc::Reader::GetSize ( )
inline
Returns
: get file size

Definition at line 121 of file XrdEcReader.hh.

122  {
123  return filesize;
124  }

◆ Open()

void XrdEc::Reader::Open ( XrdCl::ResponseHandler handler,
uint16_t  timeout = 0 
)

Open the erasure coded / striped object

Parameters
handler: user callback

Definition at line 434 of file XrdEcReader.cc.

435  {
436  const size_t size = objcfg.plgr.size();
437  std::vector<XrdCl::Pipeline> opens; opens.reserve( size );
438  for( size_t i = 0; i < size; ++i )
439  {
440  // generate the URL
441  std::string url = objcfg.GetDataUrl( i );
442  archiveIndices.emplace(url, i);
443  // create the file object
444  dataarchs.emplace( url, std::make_shared<XrdCl::ZipArchive>(
445  Config::Instance().enable_plugins ) );
446  // open the archive
447  if( objcfg.nomtfile )
448  {
449  opens.emplace_back( XrdCl::OpenArchive( *dataarchs[url], url, XrdCl::OpenFlags::Read ) );
450  }
451  else
452  opens.emplace_back( OpenOnly( *dataarchs[url], url, false ) );
453  }
454 
455  auto pipehndl = [=]( const XrdCl::XRootDStatus &st )
456  { // set the central directories in ZIP archives (if we use metadata files)
457  auto itr = dataarchs.begin();
458  for( ; itr != dataarchs.end() ; ++itr )
459  {
460  const std::string &url = itr->first;
461  auto &zipptr = itr->second;
462  if( zipptr->openstage == XrdCl::ZipArchive::NotParsed )
463  zipptr->SetCD( metadata[url] );
464  else if( zipptr->openstage != XrdCl::ZipArchive::Done && !metadata.empty() )
465  AddMissing( metadata[url] );
466  auto itr = zipptr->cdmap.begin();
467  for( ; itr != zipptr->cdmap.end() ; ++itr )
468  {
469  urlmap.emplace( itr->first, url );
470  size_t blknb = fntoblk( itr->first );
471  if( blknb > lstblk ) lstblk = blknb;
472  }
473  }
474  metadata.clear();
475  // call user handler
476  if( handler )
477  handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
478  };
479  // in parallel open the data files and read the metadata
480  XrdCl::Pipeline p = objcfg.nomtfile
481  ? XrdCl::Parallel( opens ).AtLeast( objcfg.nbdata ) | ReadSize( 0 ) | XrdCl::Final( pipehndl )
482  : XrdCl::Parallel( ReadMetadata( 0 ),
483  XrdCl::Parallel( opens ).AtLeast( objcfg.nbdata ) ) >> pipehndl;
484  XrdCl::Async( std::move( p ), timeout );
485  }
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
FinalOperation Final
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
static size_t fntoblk(const std::string &fn)
OpenOnlyImpl< false > OpenOnly(XrdCl::Ctx< XrdCl::ZipArchive > zip, XrdCl::Arg< std::string > fn, XrdCl::Arg< bool > updt, uint16_t timeout=0)
Definition: XrdEcReader.cc:105
@ Read
Open only for reading.
std::string GetDataUrl(size_t i) const
Definition: XrdEcObjCfg.hh:65
std::vector< std::string > plgr
Definition: XrdEcObjCfg.hh:92
const uint8_t nbdata
Definition: XrdEcObjCfg.hh:87

References XrdCl::Async(), XrdEc::fntoblk(), XrdCl::ResponseHandler::HandleResponse(), XrdCl::OpenArchive(), XrdEc::OpenOnly(), XrdCl::Parallel(), and XrdCl::OpenFlags::Read.

+ Here is the call graph for this function:

◆ Read()

void XrdEc::Reader::Read ( uint64_t  offset,
uint32_t  length,
void *  buffer,
XrdCl::ResponseHandler handler,
uint16_t  timeout 
)

Read data from the data object

Parameters
offset: offset of the data to be read
length: length of the data to be read
buffer: buffer for the data to be read
handler: user callback

Definition at line 490 of file XrdEcReader.cc.

495  {
496  if( objcfg.nomtfile )
497  {
498  if( offset >= filesize )
499  length = 0;
500  else if( offset + length > filesize )
501  length = filesize - offset;
502  }
503 
504  if( length == 0 )
505  {
506  ScheduleHandler( offset, 0, buffer, handler );
507  return;
508  }
509 
510  char *usrbuff = reinterpret_cast<char*>( buffer );
511  typedef std::tuple<uint64_t, uint32_t,
512  void*, uint32_t,
514  XrdCl::XRootDStatus> rdctx_t;
515  auto rdctx = std::make_shared<rdctx_t>( offset, 0, buffer,
516  length, handler,
518  auto rdmtx = std::make_shared<std::mutex>();
519 
520  while( length > 0 )
521  {
522  size_t blkid = offset / objcfg.datasize; //< ID of the block from which we will be reading
523  size_t strpid = ( offset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
524  uint64_t rdoff = offset - blkid * objcfg.datasize - strpid * objcfg.chunksize; //< relative read offset within the stripe
525  uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
526  if( rdsize > length ) rdsize = length;
527  //-------------------------------------------------------------------
528  // Make sure we operate on a valid block
529  //-------------------------------------------------------------------
530  std::unique_lock<std::mutex> lck( blkmtx );
531  if( !block || block->blkid != blkid )
532  block = std::make_shared<block_t>( blkid, *this, objcfg );
533  //-------------------------------------------------------------------
534  // Prepare the callback for reading from single stripe
535  //-------------------------------------------------------------------
536  auto blk = block;
537  lck.unlock();
538  auto callback = [blk, rdctx, rdsize, rdmtx]( const XrdCl::XRootDStatus &st, uint32_t nbrd )
539  {
540  std::unique_lock<std::mutex> lck( *rdmtx );
541  //---------------------------------------------------------------------
542  // update number of bytes left to be read (bytes requested not actually
543  // read)
544  //---------------------------------------------------------------------
545  std::get<3>( *rdctx ) -= rdsize;
546  //---------------------------------------------------------------------
547  // Handle failure ...
548  //---------------------------------------------------------------------
549  if( !st.IsOK() )
550  std::get<5>( *rdctx ) = st; // the error
551  //---------------------------------------------------------------------
552  // Handle success ...
553  //---------------------------------------------------------------------
554  else
555  std::get<1>( *rdctx ) += nbrd; // number of bytes read
556  //---------------------------------------------------------------------
557  // Are we done?
558  //---------------------------------------------------------------------
559  if( std::get<3>( *rdctx ) == 0 )
560  {
561  //-------------------------------------------------------------------
562  // Check if the read operation was successful ...
563  //-------------------------------------------------------------------
564  XrdCl::XRootDStatus &status = std::get<5>( *rdctx );
565  if( !status.IsOK() )
566  ScheduleHandler( std::get<4>( *rdctx ), status );
567  else
568  ScheduleHandler( std::get<0>( *rdctx ), std::get<1>( *rdctx ),
569  std::get<2>( *rdctx ), std::get<4>( *rdctx ) );
570  }
571  };
572  //-------------------------------------------------------------------
573  // Read data from a stripe
574  //-------------------------------------------------------------------
575  block_t::read( blk, strpid, rdoff, rdsize, usrbuff, callback, timeout );
576  //-------------------------------------------------------------------
577  // Update absolute offset, read length, and user buffer
578  //-------------------------------------------------------------------
579  offset += rdsize;
580  length -= rdsize;
581  usrbuff += rdsize;
582  }
583  }
Handle an async response.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
const uint64_t chunksize
Definition: XrdEcObjCfg.hh:89
const uint64_t datasize
Definition: XrdEcObjCfg.hh:88
static void read(std::shared_ptr< block_t > &self, size_t strpid, uint64_t offset, uint32_t size, char *usrbuff, callback_t usrcb, uint16_t timeout)
Definition: XrdEcReader.cc:152

References XrdCl::Status::IsOK(), read(), and XrdEc::ScheduleHandler().

+ Here is the call graph for this function:

◆ VectorRead()

void XrdEc::Reader::VectorRead ( const XrdCl::ChunkList chunks,
void *  buffer,
XrdCl::ResponseHandler handler,
uint16_t  timeout 
)

Definition at line 868 of file XrdEcReader.cc.

868  {
869  if(chunks.size() > 1024) {
871  return;
872  }
873 
874  std::vector<XrdCl::ChunkList> hostLists;
875  for(size_t dataHosts = 0; dataHosts < objcfg.plgr.size(); dataHosts++){
876  hostLists.emplace_back(XrdCl::ChunkList());
877  }
878 
879  auto log = XrdCl::DefaultEnv::GetLog();
880 
881  //bool useGlobalBuffer = buffer != nullptr;
882  char* globalBuffer = (char*)buffer;
883 
884  // host index, blkid, strpid
885  std::set<std::tuple<size_t, size_t, size_t>> requestedChunks;
886  // create block_ts for any requested block index
887  std::map<size_t, std::shared_ptr<block_t>> blockMap;
888 
889  // go through the requested lists of chunks and assign them to fitting hosts
890  for(size_t index = 0; index < chunks.size(); index++){
891  uint32_t remainLength = chunks[index].length;
892  uint64_t currentOffset = chunks[index].offset;
893 
894  while(remainLength > 0){
895  size_t blkid = currentOffset / objcfg.datasize; //< ID of the block from which we will be reading
896  size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
897  uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ; //< relative read offset within the stripe
898  //uint64_t offsetInFile = rdoff + blkid * objcfg.chunksize; // relative offset within the file
899  uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
900  if( rdsize > remainLength ) rdsize = remainLength;
901  if(currentOffset + rdsize >= filesize) {
902  rdsize = filesize - currentOffset;
903  remainLength = rdsize;
904  }
905 
906 
907  std::string fn = objcfg.GetFileName(blkid, strpid);
908 
909  auto itr = urlmap.find( fn );
910  if( itr == urlmap.end() )
911  {
912  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: No mapping of file to host found.");
913  break;
914  }
915  // get the URL of the ZIP archive with the respective data
916  const std::string &url = itr->second;
917  auto itr2 = archiveIndices.find(url);
918  if(itr2 == archiveIndices.end())
919  {
920  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't find host for file.");
921  break;
922  }
923  size_t indexOfArchive = archiveIndices[url];
924 
925  if (blockMap.find(blkid) == blockMap.end())
926  {
927  blockMap.emplace(blkid,
928  std::make_shared<block_t>(blkid, *this, objcfg));
929  }
930 
931  blockMap[blkid]->state[strpid] = block_t::Loading;
932  XrdCl::StatInfo* info = nullptr;
933  if(dataarchs[url]->Stat(objcfg.GetFileName(blkid, strpid), info).IsOK())
934  blockMap[blkid]->stripes[strpid].resize( info ->GetSize() );
935 
936  auto requestChunk = std::make_tuple(indexOfArchive, blkid, strpid);
937  if(requestedChunks.find(requestChunk) == requestedChunks.end())
938  {
939  uint64_t off = 0;
940  dataarchs[url]->GetOffset(objcfg.GetFileName(blkid, strpid), off);
941  hostLists[indexOfArchive].emplace_back(XrdCl::ChunkInfo(
942  off,
943  info ->GetSize(),
944  blockMap[blkid]->stripes[strpid].data()));
945 
946  // fill list of requested chunks by block and stripe id
947  requestedChunks.emplace(requestChunk);
948 
949  }
950  remainLength -= rdsize;
951  currentOffset += rdsize;
952 
953  }
954  }
955 
956  std::vector<XrdCl::Pipeline> hostPipes;
957  hostPipes.reserve(hostLists.size());
958  for(size_t i = 0; i < hostLists.size(); i++){
959  while(hostLists[i].size() > 0){
960  uint32_t range = hostLists[i].size() > 1024 ? 1024 : hostLists[i].size();
961  XrdCl::ChunkList partList(hostLists[i].begin(), hostLists[i].begin() + range);
962  hostLists[i].erase(hostLists[i].begin(), hostLists[i].begin() + range);
963  hostPipes.emplace_back(
964  XrdCl::VectorRead(XrdCl::Ctx<XrdCl::File>(dataarchs[objcfg.GetDataUrl(i)]->archive),
965  partList, nullptr, timeout)
966  >> [=](const XrdCl::XRootDStatus &st, XrdCl::VectorReadInfo ch) mutable
967  {
968  auto it = requestedChunks.begin();
969  while(it!=requestedChunks.end())
970  {
971  auto &args = *it;
972  size_t host = std::get<0>(args);
973  size_t blkid = std::get<1>(args);
974  size_t strpid = std::get<2>(args);
975  it++;
976  if(host == i)
977  {
978  std::shared_ptr<block_t> currentBlock = blockMap[blkid];
979 
980 
981  if(!st.IsOK())
982  {
983  log->Dump(XrdCl::XRootDMsg, "EC Vector Read of host %zu failed entirely.", i);
984  MissingVectorRead(currentBlock, blkid, strpid, timeout);
985  }
986  else{
987  uint32_t orgcksum = 0;
988  auto s = dataarchs[objcfg.GetDataUrl(i)]->GetCRC32( objcfg.GetFileName(blkid, strpid), orgcksum );
989  //---------------------------------------------------
990  // If we cannot extract the checksum assume the data
991  // are corrupted
992  //---------------------------------------------------
993  if( !st.IsOK() )
994  {
995  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't read CRC32 from CD.");
996  MissingVectorRead(currentBlock, blkid, strpid, timeout);
997  continue;
998  }
999  //---------------------------------------------------
1000  // Verify data integrity
1001  //---------------------------------------------------
1002  uint32_t cksum = objcfg.digest( 0, currentBlock->stripes[strpid].data(), currentBlock->stripes[strpid].size() );
1003  if( orgcksum != cksum )
1004  {
1005  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Wrong checksum for block %zu stripe %zu.", blkid, strpid);
1006  MissingVectorRead(currentBlock, blkid, strpid, timeout);
1007  continue;
1008  }
1009  else{
1010  currentBlock->state[strpid] = block_t::Valid;
1011  bool recoverable = currentBlock->error_correction( currentBlock );
1012  if(!recoverable)
1013  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't recover block %zu.", blkid);
1014  }
1015  }
1016  }
1017  }
1018  }
1019  );
1020  }
1021  }
1022 
1023  auto finalPipehndl = [=] (const XrdCl::XRootDStatus &st) mutable {
1024  // wait until all missing chunks are corrected (uses single reads to get parity stripes)
1025  std::unique_lock<std::mutex> lk(missingChunksMutex);
1026  waitMissing.wait(lk, [=] { return missingChunksVectorRead.size() == 0;});
1027 
1028  bool failed = false;
1029  for(size_t index = 0; index < chunks.size(); index++){
1030  uint32_t remainLength = chunks[index].length;
1031  uint64_t currentOffset = chunks[index].offset;
1032 
1033  char *localBuffer;
1034  if (globalBuffer)
1035  localBuffer = globalBuffer;
1036  else
1037  localBuffer = (char*)(chunks[index].buffer);
1038 
1039  while(remainLength > 0){
1040  size_t blkid = currentOffset / objcfg.datasize; //< ID of the block from which we will be reading
1041  size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
1042  uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ; //< relative read offset within the stripe
1043  uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
1044  if( rdsize > remainLength ) rdsize = remainLength;
1045 
1046  // put received data into given buffers
1047  if(blockMap.find(blkid) == blockMap.end() || blockMap[blkid] == nullptr){
1048  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Missing block %zu.", blkid);
1049  failed = true;
1050  break;
1051  }
1052  if(blockMap[blkid]->state[strpid] != block_t::Valid){
1053  log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Invalid stripe in block %zu stripe %zu.", blkid, strpid);
1054  failed = true;
1055  break;
1056  }
1057 
1058  memcpy(localBuffer, blockMap[blkid]->stripes[strpid].data() + rdoff, rdsize);
1059 
1060  remainLength -= rdsize;
1061  currentOffset += rdsize;
1062  localBuffer += rdsize;
1063  }
1064  if(globalBuffer) globalBuffer = localBuffer;
1065  }
1066  if(handler){
1067  if(failed) log->Dump(XrdCl::XRootDMsg, "EC Vector Read failed (at least in part).");
1068  if(failed) handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, "Couldn't read all segments"), nullptr);
1069  else handler->HandleResponse(new XrdCl::XRootDStatus(), nullptr);
1070  }
1071  };
1072 
1073  XrdCl::Pipeline p = XrdCl::Parallel(hostPipes) |
1074  XrdCl::Final(finalPipehndl);
1075 
1076  XrdCl::Async(std::move(p), timeout);
1077 
1078  }
struct stat Stat
Definition: XrdCks.cc:49
static Log * GetLog()
Get default log.
Object stat info.
uint64_t GetSize()
Definition: XrdEcReader.hh:121
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t XRootDMsg
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
Describe a data chunk for vector read.
std::string GetFileName(size_t blknb, size_t strpnb) const
Definition: XrdEcObjCfg.hh:79

References XrdCl::errInvalidArgs, XrdCl::DefaultEnv::GetLog(), XrdCl::ResponseHandler::HandleResponse(), Stat, XrdCl::stError, XrdCl::VectorRead(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

Friends And Related Function Documentation

◆ ::MicroTest

friend class ::MicroTest
friend

Definition at line 60 of file XrdEcReader.hh.

◆ ::XrdEcTests

friend class ::XrdEcTests
friend

Definition at line 61 of file XrdEcReader.hh.

◆ block_t

friend struct block_t
friend

Definition at line 62 of file XrdEcReader.hh.


The documentation for this class was generated from the following files: