25 #ifndef SRC_XRDEC_XRDECWRTBUFF_HH_
26 #define SRC_XRDEC_XRDECWRTBUFF_HH_
39 #include <condition_variable>
66 std::unique_lock<std::mutex> lck( mtx );
80 if( currentsize < totalsize )
89 while( pool.empty() ) cv.wait( lck );
100 if( !buffer.GetBuffer() )
return;
101 std::unique_lock<std::mutex> lck( mtx );
102 buffer.SetCursor( 0 );
103 pool.emplace( std::move( buffer ) );
112 BufferPool() : totalsize( 1024 ), currentsize( 0 )
116 BufferPool(
const BufferPool& ) =
delete;
117 BufferPool( BufferPool&& ) =
delete;
118 BufferPool& operator=(
const BufferPool& ) =
delete;
119 BufferPool& operator=( BufferPool&& ) =
delete;
121 const size_t totalsize;
123 std::condition_variable cv;
125 std::queue<XrdCl::Buffer> pool;
150 wrtbuff( std::move( wrtbuff.wrtbuff ) ),
151 stripes( std::move( wrtbuff.stripes ) ),
152 cksums( std::move( wrtbuff.cksums ) )
169 uint32_t
Write( uint32_t size,
const char *buffer )
171 uint64_t bytesAccepted = size;
176 return bytesAccepted;
204 return stripes[strpnb].buffer;
214 if( strp < objcfg.
nbdata )
218 uint64_t expsize = ( strp + 1) * objcfg.
chunksize;
223 uint64_t delta = expsize - wrtbuff.
GetCursor();
261 for( i = 0; i < objcfg.
nbchunks; ++i )
267 for( uint8_t strpnb = 0; strpnb < objcfg.
nbchunks; ++strpnb )
271 cksums.emplace_back( std::move( ftr ) );
282 return cksums[strpnb].get();
290 std::vector<std::future<uint32_t>> cksums;
Binary blob representation.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
void Allocate(uint32_t size)
Allocate the buffer.
void SetCursor(uint32_t cursor)
Set the cursor.
uint32_t GetCursor() const
Get append cursor.
uint32_t GetSize() const
Get the size of the message.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Pool of buffer for caching writes.
void Recycle(XrdCl::Buffer &&buffer)
Give back a buffer to the poool.
static BufferPool & Instance()
Singleton access to the object.
XrdCl::Buffer Create(const ObjCfg &objcfg)
Create now buffer (or recycle existing one)
Global configuration for the EC module.
RedundancyProvider & GetRedundancy(const ObjCfg &objcfg)
Get redundancy provider for given data object configuration.
static Config & Instance()
Singleton access.
void compute(stripes_t &stripes)
static ThreadPool & Instance()
Singleton access.
std::future< std::invoke_result_t< FUNC, ARGs... > > Execute(FUNC func, ARGs... args)
Schedule a functional (together with its arguments) for execution.
uint32_t Write(uint32_t size, const char *buffer)
WrtBuff(const ObjCfg &objcfg)
uint32_t GetBlkSize()
Get size of the data in the buffer.
uint32_t GetStrpSize(uint8_t strp)
uint32_t GetCrc32c(size_t strpnb)
char * GetStrpBuff(uint8_t strpnb)
void Encode()
Calculate the parity for the data stripes and the crc32cs.
bool Empty()
True if there are no data in the buffer, false otherwise.
bool Complete()
True if the buffer if full, false otherwise.
WrtBuff(WrtBuff &&wrtbuff)
Move constructor.
std::vector< stripe_t > stripes_t
All stripes in a block.
uint32_t(* digest)(uint32_t, void const *, size_t)