25 #ifndef SRC_XRDEC_XRDECSTRMWRITER_HH_
26 #define SRC_XRDEC_XRDECSTRMWRITER_HH_
65 writer_thread_stop( false ),
66 writer_thread( writer_routine, this ),
77 writer_thread_stop =
true;
110 return global_status.get_btswritten();
118 struct global_status_t
123 global_status_t(
StrmWriter *writer ) : writer( writer ),
126 stopped_writing( false ),
136 std::unique_lock<std::recursive_mutex> lck( mtx );
141 if( !st.
IsOK() ) status = st;
142 else btswritten += wrtsize;
147 if( btsleft == 0 && stopped_writing )
150 writer->CloseImpl( closeHandler );
167 std::unique_lock<std::recursive_mutex> lck( mtx );
171 stopped_writing =
true;
176 if( btsleft == 0 )
return writer->CloseImpl( handler, timeout );
180 closeHandler = handler;
188 std::unique_lock<std::recursive_mutex> lck( mtx );
192 inline void issue_write( uint64_t wrtsize )
194 std::unique_lock<std::recursive_mutex> lck( mtx );
198 inline uint64_t get_btswritten()
204 mutable std::recursive_mutex mtx;
208 bool stopped_writing;
218 inline void EnqueueBuff( std::unique_ptr<WrtBuff> wrtbuff )
223 static auto prepare_buff = []( WrtBuff *wrtbuff )
225 std::unique_ptr<WrtBuff> ptr( wrtbuff );
227 return ptr.release();
237 inline std::unique_ptr<WrtBuff> DequeueBuff()
239 std::future<WrtBuff*> ftr = buffers.
dequeue();
240 std::unique_ptr<WrtBuff> result( ftr.get() );
253 while( !me->writer_thread_stop )
255 std::unique_ptr<WrtBuff> wrtbuff( me->DequeueBuff() );
256 if( !wrtbuff )
continue;
257 me->WriteBuff( std::move( wrtbuff ) );
260 catch(
const buff_queue::wait_interrupted& ){ }
268 void WriteBuff( std::unique_ptr<WrtBuff> buff );
275 std::vector<char> GetMetadataBuffer();
284 const ObjCfg &objcfg;
285 std::unique_ptr<WrtBuff> wrtbuff;
286 std::vector<std::shared_ptr<XrdCl::ZipArchive>> dataarchs;
287 std::vector<std::shared_ptr<XrdCl::File>> metadataarchs;
288 std::vector<std::vector<char>> cdbuffs;
291 std::atomic<bool> writer_thread_stop;
293 std::thread writer_thread;
295 global_status_t global_status;
Handle an async response.
StrmWriter(const ObjCfg &objcfg)
Constructor.
void Open(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
virtual ~StrmWriter()
Destructor.
void Write(uint32_t size, const void *buff, XrdCl::ResponseHandler *handler)
void Close(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
static ThreadPool & Instance()
Singleton access.
bool IsOK() const
We're fine.
void enqueue(Element &&element)