XRootD
XrdEcWrtBuff.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDEC_XRDECWRTBUFF_HH_
26 #define SRC_XRDEC_XRDECWRTBUFF_HH_
27 
28 #include "XrdEc/XrdEcUtilities.hh"
29 #include "XrdEc/XrdEcObjCfg.hh"
30 #include "XrdEc/XrdEcConfig.hh"
31 #include "XrdEc/XrdEcThreadPool.hh"
32 
33 #include "XrdCl/XrdClBuffer.hh"
35 
36 #include "XrdOuc/XrdOucCRC32C.hh"
37 
38 #include <vector>
39 #include <condition_variable>
40 #include <mutex>
41 #include <future>
42 
43 namespace XrdEc
44 {
45  //---------------------------------------------------------------------------
47  //---------------------------------------------------------------------------
48  class BufferPool
49  {
50  public:
51 
52  //-----------------------------------------------------------------------
54  //-----------------------------------------------------------------------
55  static BufferPool& Instance()
56  {
57  static BufferPool instance;
58  return instance;
59  }
60 
61  //-----------------------------------------------------------------------
63  //-----------------------------------------------------------------------
64  XrdCl::Buffer Create( const ObjCfg &objcfg )
65  {
66  std::unique_lock<std::mutex> lck( mtx );
67  //---------------------------------------------------------------------
68  // If pool is not empty, recycle existing buffer
69  //---------------------------------------------------------------------
70  if( !pool.empty() )
71  {
72  XrdCl::Buffer buffer( std::move( pool.front() ) );
73  pool.pop();
74  return buffer;
75  }
76  //---------------------------------------------------------------------
77  // Check if we can create a new buffer object without exceeding the
78  // the maximum size of the pool
79  //---------------------------------------------------------------------
80  if( currentsize < totalsize )
81  {
82  XrdCl::Buffer buffer( objcfg.blksize );
83  ++currentsize;
84  return buffer;
85  }
86  //---------------------------------------------------------------------
87  // If not, we have to wait until there is a buffer we can recycle
88  //---------------------------------------------------------------------
89  while( pool.empty() ) cv.wait( lck );
90  XrdCl::Buffer buffer( std::move( pool.front() ) );
91  pool.pop();
92  return buffer;
93  }
94 
95  //-----------------------------------------------------------------------
97  //-----------------------------------------------------------------------
98  void Recycle( XrdCl::Buffer && buffer )
99  {
100  if( !buffer.GetBuffer() ) return;
101  std::unique_lock<std::mutex> lck( mtx );
102  buffer.SetCursor( 0 );
103  pool.emplace( std::move( buffer ) );
104  cv.notify_all();
105  }
106 
107  private:
108 
109  //-----------------------------------------------------------------------
110  // Default constructor
111  //-----------------------------------------------------------------------
112  BufferPool() : totalsize( 1024 ), currentsize( 0 )
113  {
114  }
115 
116  BufferPool( const BufferPool& ) = delete; //< Copy constructor
117  BufferPool( BufferPool&& ) = delete; //< Move constructor
118  BufferPool& operator=( const BufferPool& ) = delete; //< Copy assigment operator
119  BufferPool& operator=( BufferPool&& ) = delete; //< Move assigment operator
120 
121  const size_t totalsize; //< maximum size of the pool
122  size_t currentsize; //< current size of the pool
123  std::condition_variable cv;
124  std::mutex mtx;
125  std::queue<XrdCl::Buffer> pool; //< the pool itself
126  };
127 
128  //---------------------------------------------------------------------------
131  //---------------------------------------------------------------------------
132  class WrtBuff
133  {
134  public:
135  //-----------------------------------------------------------------------
139  //-----------------------------------------------------------------------
140  WrtBuff( const ObjCfg &objcfg ) : objcfg( objcfg ),
141  wrtbuff( BufferPool::Instance().Create( objcfg ) )
142  {
143  stripes.reserve( objcfg.nbchunks );
144  memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
145  }
146  //-----------------------------------------------------------------------
148  //-----------------------------------------------------------------------
149  WrtBuff( WrtBuff && wrtbuff ) : objcfg( wrtbuff.objcfg ),
150  wrtbuff( std::move( wrtbuff.wrtbuff ) ),
151  stripes( std::move( wrtbuff.stripes ) ),
152  cksums( std::move( wrtbuff.cksums ) )
153  {
154  }
155  //-----------------------------------------------------------------------
156  // Destructor
157  //-----------------------------------------------------------------------
159  {
160  BufferPool::Instance().Recycle( std::move( wrtbuff ) );
161  }
162  //-----------------------------------------------------------------------
168  //-----------------------------------------------------------------------
169  uint32_t Write( uint32_t size, const char *buffer )
170  {
171  uint64_t bytesAccepted = size; // bytes accepted by the buffer
172  if( wrtbuff.GetCursor() + bytesAccepted > objcfg.datasize )
173  bytesAccepted = objcfg.datasize - wrtbuff.GetCursor();
174  memcpy( wrtbuff.GetBufferAtCursor(), buffer, bytesAccepted );
175  wrtbuff.AdvanceCursor( bytesAccepted );
176  return bytesAccepted;
177  }
178  //-----------------------------------------------------------------------
182  //-----------------------------------------------------------------------
183  void Pad( uint32_t size )
184  {
185  // if the buffer exist we only need to move the cursor
186  if( wrtbuff.GetSize() != 0 )
187  {
188  wrtbuff.AdvanceCursor( size );
189  return;
190  }
191  // otherwise we allocate the buffer and set the cursor
192  wrtbuff.Allocate( objcfg.datasize );
193  memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
194  wrtbuff.SetCursor( size );
195  return;
196  }
197  //-----------------------------------------------------------------------
201  //-----------------------------------------------------------------------
202  inline char* GetStrpBuff( uint8_t strpnb )
203  {
204  return stripes[strpnb].buffer;
205  }
206  //-----------------------------------------------------------------------
210  //-----------------------------------------------------------------------
211  uint32_t GetStrpSize( uint8_t strp )
212  {
213  // Check if it is a data chunk?
214  if( strp < objcfg.nbdata )
215  {
216  // If the cursor is at least at the expected size
217  // it means we have the full chunk.
218  uint64_t expsize = ( strp + 1) * objcfg.chunksize;
219  if( expsize <= wrtbuff.GetCursor() )
220  return objcfg.chunksize;
221  // If the cursor is of by less than the chunk size
222  // it means we have a partial chunk
223  uint64_t delta = expsize - wrtbuff.GetCursor();
224  if( delta < objcfg.chunksize )
225  return objcfg.chunksize - delta;
226  // otherwise we are handling an empty chunk
227  return 0;
228  }
229  // It is a parity chunk so its size has to be equal
230  // to the size of the first chunk
231  return GetStrpSize( 0 );
232  }
233  //-----------------------------------------------------------------------
235  //-----------------------------------------------------------------------
236  inline uint32_t GetBlkSize()
237  {
238  return wrtbuff.GetCursor();
239  }
240  //-----------------------------------------------------------------------
242  //-----------------------------------------------------------------------
243  inline bool Complete()
244  {
245  return wrtbuff.GetCursor() == objcfg.datasize;
246  }
247  //-----------------------------------------------------------------------
249  //-----------------------------------------------------------------------
250  inline bool Empty()
251  {
252  return ( wrtbuff.GetSize() == 0 || wrtbuff.GetCursor() == 0 );
253  }
254  //-----------------------------------------------------------------------
256  //-----------------------------------------------------------------------
257  inline void Encode()
258  {
259  // first calculate the parity
260  uint8_t i ;
261  for( i = 0; i < objcfg.nbchunks; ++i )
262  stripes.emplace_back( wrtbuff.GetBuffer( i * objcfg.chunksize ), i < objcfg.nbdata );
263  Config &cfg = Config::Instance();
264  cfg.GetRedundancy( objcfg ).compute( stripes );
265  // then calculate the checksums
266  cksums.reserve( objcfg.nbchunks );
267  for( uint8_t strpnb = 0; strpnb < objcfg.nbchunks; ++strpnb )
268  {
269  size_t chunksize = GetStrpSize( strpnb );
270  std::future<uint32_t> ftr = ThreadPool::Instance().Execute( objcfg.digest, 0, stripes[strpnb].buffer, chunksize );
271  cksums.emplace_back( std::move( ftr ) );
272  }
273  }
274  //-----------------------------------------------------------------------
279  //-----------------------------------------------------------------------
280  inline uint32_t GetCrc32c( size_t strpnb )
281  {
282  return cksums[strpnb].get();
283  }
284 
285  private:
286 
287  ObjCfg objcfg; //< configuration for the data object
288  XrdCl::Buffer wrtbuff; //< the buffer for the data
289  stripes_t stripes; //< data stripes
290  std::vector<std::future<uint32_t>> cksums; //< crc32cs for the data stripes
291  };
292 
293 
294 } /* namespace XrdEc */
295 
296 #endif /* SRC_XRDEC_XRDECWRTBUFF_HH_ */
bool Create
Binary blob representation.
Definition: XrdClBuffer.hh:34
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void Allocate(uint32_t size)
Allocate the buffer.
Definition: XrdClBuffer.hh:110
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
Pool of buffer for caching writes.
Definition: XrdEcWrtBuff.hh:49
void Recycle(XrdCl::Buffer &&buffer)
Give back a buffer to the poool.
Definition: XrdEcWrtBuff.hh:98
static BufferPool & Instance()
Singleton access to the object.
Definition: XrdEcWrtBuff.hh:55
XrdCl::Buffer Create(const ObjCfg &objcfg)
Create now buffer (or recycle existing one)
Definition: XrdEcWrtBuff.hh:64
Global configuration for the EC module.
Definition: XrdEcConfig.hh:40
RedundancyProvider & GetRedundancy(const ObjCfg &objcfg)
Get redundancy provider for given data object configuration.
Definition: XrdEcConfig.hh:55
static Config & Instance()
Singleton access.
Definition: XrdEcConfig.hh:46
void compute(stripes_t &stripes)
static ThreadPool & Instance()
Singleton access.
std::future< std::invoke_result_t< FUNC, ARGs... > > Execute(FUNC func, ARGs... args)
Schedule a functional (together with its arguments) for execution.
uint32_t Write(uint32_t size, const char *buffer)
WrtBuff(const ObjCfg &objcfg)
uint32_t GetBlkSize()
Get size of the data in the buffer.
uint32_t GetStrpSize(uint8_t strp)
void Pad(uint32_t size)
uint32_t GetCrc32c(size_t strpnb)
char * GetStrpBuff(uint8_t strpnb)
void Encode()
Calculate the parity for the data stripes and the crc32cs.
bool Empty()
True if there are no data in the buffer, false otherwise.
bool Complete()
True if the buffer if full, false otherwise.
WrtBuff(WrtBuff &&wrtbuff)
Move constructor.
std::vector< stripe_t > stripes_t
All stripes in a block.
const uint64_t blksize
Definition: XrdEcObjCfg.hh:91
const uint8_t nbdata
Definition: XrdEcObjCfg.hh:87
const uint8_t nbchunks
Definition: XrdEcObjCfg.hh:85
uint32_t(* digest)(uint32_t, void const *, size_t)
Definition: XrdEcObjCfg.hh:96
const uint64_t chunksize
Definition: XrdEcObjCfg.hh:89
const uint64_t datasize
Definition: XrdEcObjCfg.hh:88