46 const size_t size = objcfg.
plgr.size();
48 std::vector<XrdCl::Pipeline> opens;
49 opens.reserve( size );
51 for(
size_t i = 0; i < size; ++i )
52 dataarchs.emplace_back( std::make_shared<XrdCl::ZipArchive>(
55 for(
size_t i = 0; i < size; ++i )
65 if( !st.IsOK() ) global_status.report_open( st );
66 handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
85 global_status.issue_write( size );
87 const char* buffer =
reinterpret_cast<const char*
>( buff );
88 uint32_t wrtsize = size;
91 if( !wrtbuff ) wrtbuff.reset(
new WrtBuff( objcfg ) );
92 uint64_t written = wrtbuff->Write( wrtsize, buffer );
95 if( wrtbuff->Complete() ) EnqueueBuff( std::move( wrtbuff ) );
119 if( wrtbuff && !wrtbuff->Empty() ) EnqueueBuff( std::move( wrtbuff ) );
123 global_status.issue_close( handler, timeout );
129 void StrmWriter::WriteBuff( std::unique_ptr<WrtBuff> buff )
135 std::shared_ptr<WrtBuff> wrtbuff( std::move( buff ) );
140 static std::default_random_engine random_engine( std::chrono::system_clock::now().time_since_epoch().count() );
141 std::shared_ptr<sync_queue<size_t>> servers = std::make_shared<sync_queue<size_t>>();
142 std::vector<size_t> zipid( dataarchs.size() );
143 std::iota( zipid.begin(), zipid.end(), 0 );
144 std::shuffle( zipid.begin(), zipid.end(), random_engine );
145 auto itr = zipid.begin();
146 for( ; itr != zipid.end() ; ++itr ) servers->enqueue( std::move( *itr ) );
151 const size_t nbchunks = objcfg.
nbchunks;
152 std::vector<XrdCl::Pipeline> writes;
153 writes.reserve( nbchunks );
154 size_t blknb = next_blknb++;
155 uint64_t blksize = 0;
156 for(
size_t strpnb = 0; strpnb < nbchunks; ++strpnb )
158 std::string fn = objcfg.
GetFileName( blknb, strpnb );
159 uint32_t
crc32c = wrtbuff->GetCrc32c( strpnb );
160 uint64_t strpsize = wrtbuff->GetStrpSize( strpnb );
161 char* strpbuff = wrtbuff->GetStrpBuff( strpnb );
162 if( strpnb < objcfg.
nbdata ) blksize += strpsize;
169 if( !servers->dequeue( srvid ) )
172 0,
"No more data servers to try." );
177 for(
size_t i = strpnb + 1; i < objcfg.
nbdata; ++i )
178 blksize += wrtbuff->GetStrpSize( i );
179 global_status.report_wrt( err, blksize );
182 zip = *dataarchs[srvid];
198 if( !servers->dequeue( srvid ) )
return;
199 zip = *dataarchs[srvid];
211 writes.emplace_back( std::move( p ) );
224 const size_t cdcnt = objcfg.plgr.size();
225 std::vector<buffer_t> buffs; buffs.reserve( cdcnt );
226 std::vector<LFH> lfhs; lfhs.reserve( cdcnt );
227 std::vector<CDFH> cdfhs; cdfhs.reserve( cdcnt );
234 mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
235 for(
size_t i = 0; i < cdcnt; ++i )
237 std::string fn = std::to_string( i );
238 buffer_t buff( dataarchs[i]->GetCD() );
239 uint32_t cksum = objcfg.digest( 0, buff.data(), buff.size() );
240 lfhs.emplace_back( fn, cksum, buff.size(), time( 0 ) );
241 LFH &lfh = lfhs.back();
242 cdfhs.emplace_back( &lfh, mode, offset );
243 offset += LFH::lfhBaseSize + fn.size() + buff.size();
244 cdsize += cdfhs.back().cdfhSize;
245 buffs.emplace_back( std::move( buff ) );
248 uint64_t zipsize = offset + cdsize + EOCD::eocdBaseSize;
249 buffer_t zipbuff; zipbuff.reserve( zipsize );
254 for(
size_t i = 0; i < cdcnt; ++i )
257 std::copy( buffs[i].begin(), buffs[i].end(), std::back_inserter( zipbuff ) );
262 for(
size_t i = 0; i < cdcnt; ++i )
263 cdfhs[i].Serialize( zipbuff );
267 EOCD eocd( offset, cdcnt, cdsize );
268 eocd.Serialize( zipbuff );
285 const size_t size = objcfg.
plgr.size();
289 auto zipbuff = objcfg.
nomtfile ? nullptr :
290 std::make_shared<XrdZip::buffer_t>( GetMetadataBuffer() );
294 std::vector<XrdCl::Pipeline> closes;
295 std::vector<XrdCl::Pipeline> save_metadata;
296 closes.reserve( size );
297 std::string closeTime = std::to_string( time(NULL) );
299 std::vector<XrdCl::xattr_t> xav{ {
"xrdec.filesize", std::to_string(
GetSize())},
300 {
"xrdec.strpver", closeTime.c_str()} };
302 for(
size_t i = 0; i < size; ++i )
307 if( dataarchs[i]->IsOpen() )
311 closes.emplace_back( std::move( p ) );
319 metadataarchs.emplace_back( std::make_shared<XrdCl::File>(
322 |
XrdCl::Write( *metadataarchs[i], 0, zipbuff->size(), zipbuff->data() )
326 save_metadata.emplace_back( std::move( p ) );
334 if( save_metadata.empty() )
uint32_t crc32c(uint32_t crc, void const *buf, size_t len)
static void Repeat()
Repeat current operation.
Handle an async response.
static Config & Instance()
Singleton access.
void Open(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
void Write(uint32_t size, const void *buff, XrdCl::ResponseHandler *handler)
void Close(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
SetXAttrImpl< false > SetXAttr(Ctx< File > file, Arg< std::string > name, Arg< std::string > value)
const uint16_t stError
An error occurred that could potentially be retried.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
AppendFileImpl< false > AppendFile(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< uint32_t > crc32, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating ArchiveReadImpl objects.
const uint16_t errNoMoreReplicas
No more replicas to try.
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
std::vector< char > buffer_t
@ Write
Open only for writing.
bool IsOK() const
We're fine.
std::string GetDataUrl(size_t i) const
std::vector< std::string > plgr
std::string GetMetadataUrl(size_t i) const
std::string GetFileName(size_t blknb, size_t strpnb) const
A data structure representing ZIP Local File Header.
void Serialize(buffer_t &buffer)
Serialize the object into a buffer.