56 template<
bool HasHndl>
58 XrdCl::Resp<void>, XrdCl::Arg<std::string>, XrdCl::Arg<bool>>
92 uint16_t pipelineTimeout )
94 std::string url = std::get<UrlArg>( this->
args ).Get();
95 bool updt = std::get<UpdtArg>( this->
args ).Get();
96 uint16_t
timeout = pipelineTimeout < this->timeout ?
97 pipelineTimeout : this->
timeout;
108 uint16_t timeout = 0 )
111 std::move( updt ) ).
Timeout( timeout );
119 typedef std::tuple<uint64_t, uint32_t, char*, callback_t>
args_t;
152 static void read( std::shared_ptr<block_t> &
self,
160 std::unique_lock<std::mutex> lck( self->mtx );
165 if( self->state[strpid] ==
Empty )
167 self->reader.Read( self->blkid, strpid, self->stripes[strpid],
174 if( self->state[strpid] ==
Missing )
194 self->pending[strpid].emplace_back( offset, size, usrbuff, usrcb );
200 if( self->state[strpid] ==
Valid )
202 if( offset + size > self->stripes[strpid].size() )
203 size =
self->stripes[strpid].size() - offset;
205 memcpy( usrbuff, self->stripes[strpid].data() + offset, size );
227 size_t missingcnt = 0, validcnt = 0, loadingcnt = 0, recoveringcnt = 0;
228 std::for_each( self->state.begin(), self->state.end(), [&](
state_t &s )
232 case Missing: ++missingcnt; break;
233 case Valid: ++validcnt; break;
234 case Loading: ++loadingcnt; break;
235 case Recovering: ++recoveringcnt; break;
242 if( missingcnt + recoveringcnt == 0 )
return true;
247 if( missingcnt + recoveringcnt > self->objcfg.nbparity )
249 std::for_each( self->state.begin(), self->state.end(),
250 [](
state_t &s ){ if( s == Recovering ) s = Missing; } );
256 if( validcnt >= self->objcfg.nbdata )
262 cfg.GetRedundancy( self->objcfg ).compute( strps );
264 catch(
const IOError &ex )
266 std::for_each( self->state.begin(), self->state.end(),
267 [](
state_t &s ){ if( s == Recovering ) s = Missing; } );
274 for(
size_t strpid = 0; strpid <
self->objcfg.nbchunks; ++strpid )
276 if( self->state[strpid] !=
Recovering )
continue;
277 self->state[strpid] =
Valid;
278 self->carryout( self->pending[strpid], self->stripes[strpid] );
289 if( self->state[strpid] !=
Empty )
continue;
290 self->reader.Read( self->blkid, strpid, self->stripes[strpid],
300 std::for_each( self->state.begin(), self->state.end(),
301 [](
state_t &s ){ if( s == Missing ) s = Recovering; } );
313 std::unique_lock<std::mutex> lck( self->mtx );
314 self->state[strpid] = st.
IsOK() ?
Valid : Missing;
319 bool recoverable = error_correction(
self );
324 self->carryout( self->pending[strpid], self->stripes[strpid], st );
330 self->fail_missing();
340 ret.reserve( objcfg.nbchunks );
341 for(
size_t i = 0; i < objcfg.nbchunks; ++i )
343 if( state[i] ==
Valid )
344 ret.emplace_back( stripes[i].data(),
true );
347 stripes[i].resize( objcfg.chunksize, 0 );
348 ret.emplace_back( stripes[i].data(),
false );
364 auto itr = pending.begin();
365 for( ; itr != pending.end() ; ++itr )
375 uint64_t offset = std::get<0>( args );
376 uint32_t size = std::get<1>( args );
377 char *usrbuff = std::get<2>( args );
379 if( offset > stripe.size() )
382 else if( offset + size > stripe.size() )
383 size = stripe.size() - offset;
385 memcpy( usrbuff, stripe.data() + offset, size );
391 callback( st, nbrd );
404 size_t size = objcfg.nbchunks;
405 for(
size_t i = 0; i < size; ++i )
407 if( state[i] != Missing )
continue;
408 carryout( pending[i], stripes[i],
436 const size_t size = objcfg.plgr.size();
437 std::vector<XrdCl::Pipeline> opens; opens.reserve( size );
438 for(
size_t i = 0; i < size; ++i )
441 std::string url = objcfg.GetDataUrl( i );
442 archiveIndices.emplace(url, i);
444 dataarchs.emplace( url, std::make_shared<XrdCl::ZipArchive>(
445 Config::Instance().enable_plugins ) );
447 if( objcfg.nomtfile )
452 opens.emplace_back(
OpenOnly( *dataarchs[url], url,
false ) );
457 auto itr = dataarchs.begin();
458 for( ; itr != dataarchs.end() ; ++itr )
460 const std::string &url = itr->first;
461 auto &zipptr = itr->second;
462 if( zipptr->openstage == XrdCl::ZipArchive::NotParsed )
463 zipptr->SetCD( metadata[url] );
464 else if( zipptr->openstage != XrdCl::ZipArchive::Done && !metadata.empty() )
465 AddMissing( metadata[url] );
466 auto itr = zipptr->cdmap.begin();
467 for( ; itr != zipptr->cdmap.end() ; ++itr )
469 urlmap.emplace( itr->first, url );
470 size_t blknb =
fntoblk( itr->first );
471 if( blknb > lstblk ) lstblk = blknb;
490 void Reader::Read( uint64_t offset,
496 if( objcfg.nomtfile )
498 if( offset >= filesize )
500 else if( offset + length > filesize )
501 length = filesize - offset;
510 char *usrbuff =
reinterpret_cast<char*
>( buffer );
511 typedef std::tuple<uint64_t, uint32_t,
515 auto rdctx = std::make_shared<rdctx_t>( offset, 0, buffer,
518 auto rdmtx = std::make_shared<std::mutex>();
522 size_t blkid = offset / objcfg.datasize;
523 size_t strpid = ( offset % objcfg.datasize ) / objcfg.chunksize;
524 uint64_t rdoff = offset - blkid * objcfg.datasize - strpid * objcfg.chunksize;
525 uint32_t rdsize = objcfg.chunksize - rdoff;
526 if( rdsize > length ) rdsize = length;
530 std::unique_lock<std::mutex> lck( blkmtx );
531 if( !block || block->blkid != blkid )
532 block = std::make_shared<block_t>( blkid, *
this, objcfg );
538 auto callback = [blk, rdctx, rdsize, rdmtx](
const XrdCl::XRootDStatus &st, uint32_t nbrd )
540 std::unique_lock<std::mutex> lck( *rdmtx );
545 std::get<3>( *rdctx ) -= rdsize;
550 std::get<5>( *rdctx ) = st;
555 std::get<1>( *rdctx ) += nbrd;
559 if( std::get<3>( *rdctx ) == 0 )
569 std::get<2>( *rdctx ), std::get<4>( *rdctx ) );
575 block_t::read( blk, strpid, rdoff, rdsize, usrbuff, callback, timeout );
593 std::vector<XrdCl::Pipeline> closes;
594 closes.reserve( dataarchs.size() );
595 auto itr = dataarchs.begin();
596 for( ; itr != dataarchs.end() ; ++itr )
598 auto &zipptr = itr->second;
599 if( zipptr->IsOpen() )
601 zipptr->SetProperty(
"BundledClose",
"true");
616 void Reader::Read(
size_t blknb,
size_t strpnb,
buffer_t &buffer,
callback_t cb, uint16_t timeout )
619 std::string fn = objcfg.GetFileName( blknb, strpnb );
621 auto itr = urlmap.find( fn );
622 if( itr == urlmap.end() )
626 ThreadPool::Instance().Execute( cb, st, 0 );
630 const std::string &url = itr->second;
632 auto &zipptr = dataarchs[url];
635 auto st = zipptr->Stat( fn, info );
638 ThreadPool::Instance().Execute( cb, st, 0 );
641 uint32_t rdsize = info->
GetSize();
644 buffer.resize( objcfg.chunksize );
661 uint32_t orgcksum = 0;
662 auto s = zipptr->GetCRC32( fn, orgcksum );
675 uint32_t cksum = objcfg.digest( 0, ch.
buffer, ch.
length );
676 if( orgcksum != cksum )
678 cb( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ), 0 );
693 const size_t size = objcfg.plgr.size();
695 auto file = std::make_shared<XrdCl::File>( Config::Instance().enable_plugins );
697 std::string url = objcfg.GetMetadataUrl( index );
707 if( index + 1 < size )
713 rdbuff =
new char[info.
GetSize()];
720 if( index + 1 < size )
725 if( !ParseMetadata( ch ) )
727 if( index + 1 < size )
744 char* buffer = reinterpret_cast<char*>( *rdbuff );
757 std::string url = objcfg.GetDataUrl( index );
758 return XrdCl::GetXAttr( dataarchs[url]->GetFile(),
"xrdec.filesize" ) >>
766 if( index + 1 < objcfg.plgr.size() )
779 const size_t mincnt = objcfg.nbdata + objcfg.nbparity;
780 const size_t maxcnt = objcfg.plgr.size();
782 char *buffer =
reinterpret_cast<char*
>( ch.
buffer );
783 size_t length = ch.
length;
785 for(
size_t i = 0; i < maxcnt; ++i )
787 uint32_t signature = XrdZip::to<uint32_t>( buffer );
790 if( i + 1 < mincnt )
return false;
795 if( lfh.lfhSize + lfh.uncompressedSize > length )
return false;
796 buffer += lfh.lfhSize;
797 length -= lfh.lfhSize;
799 uint32_t crc32val = objcfg.digest( 0, buffer, lfh.uncompressedSize );
800 if( crc32val != lfh.ZCRC32 )
return false;
802 std::string url = objcfg.GetDataUrl(
std::stoull( lfh.filename ) );
803 metadata.emplace( url,
buffer_t( buffer, buffer + lfh.uncompressedSize ) );
804 buffer += lfh.uncompressedSize;
805 length -= lfh.uncompressedSize;
814 void Reader::AddMissing(
const buffer_t &cdbuff )
816 const char *buff = cdbuff.data();
817 size_t size = cdbuff.size();
822 auto itr = cdvec.begin();
823 for( ; itr != cdvec.end() ; ++itr )
833 bool Reader::IsMissing(
const std::string &fn )
836 if( missing.count( fn ) )
return true;
839 if( objcfg.nomtfile &&
fntoblk( fn ) <= lstblk )
return true;
845 inline callback_t Reader::ErrorCorrected(Reader *reader, std::shared_ptr<block_t> &
self,
size_t blkid,
size_t strpid){
848 std::unique_lock<std::mutex> readerLock(reader->missingChunksMutex);
849 reader->missingChunksVectorRead.erase(std::remove(reader->missingChunksVectorRead.begin(), reader->missingChunksVectorRead.end(), std::make_tuple(blkid, strpid)));
850 reader->waitMissing.notify_all();
854 void Reader::MissingVectorRead(std::shared_ptr<block_t> ¤tBlock,
size_t blkid,
size_t strpid, uint16_t timeout){
856 std::unique_lock<std::mutex> lk(missingChunksMutex);
857 missingChunksVectorRead.emplace_back(
858 std::make_tuple(blkid,strpid));
860 currentBlock->state[strpid] = block_t::Missing;
861 currentBlock->read(currentBlock, strpid, 0, objcfg.chunksize,
863 ErrorCorrected(
this, currentBlock, blkid, strpid),
869 if(chunks.size() > 1024) {
874 std::vector<XrdCl::ChunkList> hostLists;
875 for(
size_t dataHosts = 0; dataHosts < objcfg.plgr.size(); dataHosts++){
882 char* globalBuffer = (
char*)buffer;
885 std::set<std::tuple<size_t, size_t, size_t>> requestedChunks;
887 std::map<size_t, std::shared_ptr<block_t>> blockMap;
890 for(
size_t index = 0; index < chunks.size(); index++){
891 uint32_t remainLength = chunks[index].length;
892 uint64_t currentOffset = chunks[index].offset;
894 while(remainLength > 0){
895 size_t blkid = currentOffset / objcfg.datasize;
896 size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize;
897 uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ;
899 uint32_t rdsize = objcfg.chunksize - rdoff;
900 if( rdsize > remainLength ) rdsize = remainLength;
901 if(currentOffset + rdsize >= filesize) {
902 rdsize = filesize - currentOffset;
903 remainLength = rdsize;
907 std::string fn = objcfg.GetFileName(blkid, strpid);
909 auto itr = urlmap.find( fn );
910 if( itr == urlmap.end() )
912 log->Dump(
XrdCl::XRootDMsg,
"EC Vector Read: No mapping of file to host found.");
916 const std::string &url = itr->second;
917 auto itr2 = archiveIndices.find(url);
918 if(itr2 == archiveIndices.end())
920 log->Dump(
XrdCl::XRootDMsg,
"EC Vector Read: Couldn't find host for file.");
923 size_t indexOfArchive = archiveIndices[url];
925 if (blockMap.find(blkid) == blockMap.end())
927 blockMap.emplace(blkid,
928 std::make_shared<block_t>(blkid, *
this, objcfg));
931 blockMap[blkid]->state[strpid] = block_t::Loading;
933 if(dataarchs[url]->
Stat(objcfg.GetFileName(blkid, strpid), info).IsOK())
934 blockMap[blkid]->stripes[strpid].resize( info ->GetSize() );
936 auto requestChunk = std::make_tuple(indexOfArchive, blkid, strpid);
937 if(requestedChunks.find(requestChunk) == requestedChunks.end())
940 dataarchs[url]->GetOffset(objcfg.GetFileName(blkid, strpid), off);
944 blockMap[blkid]->stripes[strpid].data()));
947 requestedChunks.emplace(requestChunk);
950 remainLength -= rdsize;
951 currentOffset += rdsize;
956 std::vector<XrdCl::Pipeline> hostPipes;
957 hostPipes.reserve(hostLists.size());
958 for(
size_t i = 0; i < hostLists.size(); i++){
959 while(hostLists[i].size() > 0){
960 uint32_t range = hostLists[i].size() > 1024 ? 1024 : hostLists[i].size();
961 XrdCl::ChunkList partList(hostLists[i].begin(), hostLists[i].begin() + range);
962 hostLists[i].erase(hostLists[i].begin(), hostLists[i].begin() + range);
963 hostPipes.emplace_back(
965 partList,
nullptr, timeout)
968 auto it = requestedChunks.begin();
969 while(it!=requestedChunks.end())
972 size_t host = std::get<0>(args);
973 size_t blkid = std::get<1>(args);
974 size_t strpid = std::get<2>(args);
978 std::shared_ptr<block_t> currentBlock = blockMap[blkid];
983 log->Dump(XrdCl::XRootDMsg,
"EC Vector Read of host %zu failed entirely.", i);
984 MissingVectorRead(currentBlock, blkid, strpid, timeout);
987 uint32_t orgcksum = 0;
988 auto s = dataarchs[objcfg.GetDataUrl(i)]->GetCRC32( objcfg.GetFileName(blkid, strpid), orgcksum );
995 log->Dump(XrdCl::XRootDMsg,
"EC Vector Read: Couldn't read CRC32 from CD.");
996 MissingVectorRead(currentBlock, blkid, strpid, timeout);
1002 uint32_t cksum = objcfg.digest( 0, currentBlock->stripes[strpid].data(), currentBlock->stripes[strpid].size() );
1003 if( orgcksum != cksum )
1005 log->Dump(XrdCl::XRootDMsg,
"EC Vector Read: Wrong checksum for block %zu stripe %zu.", blkid, strpid);
1006 MissingVectorRead(currentBlock, blkid, strpid, timeout);
1010 currentBlock->state[strpid] = block_t::Valid;
1011 bool recoverable = currentBlock->error_correction( currentBlock );
1013 log->Dump(XrdCl::XRootDMsg,
"EC Vector Read: Couldn't recover block %zu.", blkid);
1025 std::unique_lock<std::mutex> lk(missingChunksMutex);
1026 waitMissing.wait(lk, [=] {
return missingChunksVectorRead.size() == 0;});
1028 bool failed =
false;
1029 for(
size_t index = 0; index < chunks.size(); index++){
1030 uint32_t remainLength = chunks[index].length;
1031 uint64_t currentOffset = chunks[index].offset;
1035 localBuffer = globalBuffer;
1037 localBuffer = (
char*)(chunks[index].buffer);
1039 while(remainLength > 0){
1040 size_t blkid = currentOffset / objcfg.datasize;
1041 size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize;
1042 uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ;
1043 uint32_t rdsize = objcfg.chunksize - rdoff;
1044 if( rdsize > remainLength ) rdsize = remainLength;
1047 if(blockMap.find(blkid) == blockMap.end() || blockMap[blkid] ==
nullptr){
1053 log->Dump(
XrdCl::XRootDMsg,
"EC Vector Read: Invalid stripe in block %zu stripe %zu.", blkid, strpid);
1058 memcpy(localBuffer, blockMap[blkid]->stripes[strpid].data() + rdoff, rdsize);
1060 remainLength -= rdsize;
1061 currentOffset += rdsize;
1062 localBuffer += rdsize;
1064 if(globalBuffer) globalBuffer = localBuffer;
1067 if(failed) log->Dump(
XrdCl::XRootDMsg,
"EC Vector Read failed (at least in part).");
static unsigned long long int stoull(const std::string &s)
simple integer parsing, to be replaced by std::stoll when C++11 can be used
ssize_t read(int fildes, void *buf, size_t nbyte)
std::tuple< Args... > args
Operation arguments.
uint16_t timeout
Operation timeout.
Derived< HasHndl > Timeout(uint16_t timeout)
Set operation timeout.
static Log * GetLog()
Get default log.
std::unique_ptr< PipelineHandler > handler
Operation handler.
static void Replace(Operation< false > &&opr)
Replace current operation.
static void Ignore()
Ignore error and proceed with the pipeline.
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
uint64_t GetSize() const
Get size (in bytes)
friend class ZipOperation
Ctx< ZipArchive > zip
The file object itself.
static Config & Instance()
Singleton access.
XrdCl::XRootDStatus RunImpl(XrdCl::PipelineHandler *handler, uint16_t pipelineTimeout)
std::string ToString()
Name of the operation.
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint16_t errNotFound
const uint16_t errDataError
data is corrupted
GetXAttrImpl< false > GetXAttr(Ctx< File > file, Arg< std::string > name)
ZipReadFromImpl< false > ReadFrom(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ArchiveReadImpl objects.
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint16_t errInvalidOp
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
const uint16_t errInvalidArgs
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
std::vector< stripe_t > stripes_t
All stripes in a block.
static size_t fntoblk(const std::string &fn)
std::function< void(const XrdCl::XRootDStatus &, uint32_t)> callback_t
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
std::vector< char > buffer_t
a buffer type
OpenOnlyImpl< false > OpenOnly(XrdCl::Ctx< XrdCl::ZipArchive > zip, XrdCl::Arg< std::string > fn, XrdCl::Arg< bool > updt, uint16_t timeout=0)
std::vector< std::unique_ptr< CDFH > > cdvec_t
std::unordered_map< std::string, size_t > cdmap_t
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
bool Valid() const
Check if it contains a valid value.
@ Read
Open only for reading.
bool IsOK() const
We're fine.
static bool error_correction(std::shared_ptr< block_t > &self)
std::vector< state_t > state
block_t(size_t blkid, Reader &reader, ObjCfg &objcfg)
static void read(std::shared_ptr< block_t > &self, size_t strpid, uint64_t offset, uint32_t size, char *usrbuff, callback_t usrcb, uint16_t timeout)
void carryout(pending_t &pending, const buffer_t &stripe, const XrdCl::XRootDStatus &st=XrdCl::XRootDStatus())
std::vector< buffer_t > stripes
std::tuple< uint64_t, uint32_t, char *, callback_t > args_t
std::vector< args_t > pending_t
std::vector< pending_t > pending
static callback_t read_callback(std::shared_ptr< block_t > &self, size_t strpid)
static std::tuple< cdvec_t, cdmap_t > Parse(const char *buffer, uint32_t bufferSize, uint16_t nbCdRecords)
A data structure representing ZIP Local File Header.
static const uint32_t lfhSign
Local File Header signature.