XRootD
XrdEcRedundancyProvider.cc
Go to the documentation of this file.
1 /************************************************************************
2  * KineticIo - a file io interface library to kinetic devices. *
3  * *
4  * This Source Code Form is subject to the terms of the Mozilla *
5  * Public License, v. 2.0. If a copy of the MPL was not *
6  * distributed with this file, You can obtain one at *
7  * https://mozilla.org/MP:/2.0/. *
8  * *
9  * This program is distributed in the hope that it will be useful, *
10  * but is provided AS-IS, WITHOUT ANY WARRANTY; including without *
11  * the implied warranty of MERCHANTABILITY, NON-INFRINGEMENT or *
12  * FITNESS FOR A PARTICULAR PURPOSE. See the Mozilla Public *
13  * License for more details. *
14  ************************************************************************/
15 
17 
18 #include <isa-l.h>
19 #include <cstring>
20 #include <sstream>
21 #include <algorithm>
22 
23 namespace XrdEc
24 {
25 
26 //--------------------------------------------------------------------------
30 //--------------------------------------------------------------------------
31 class Convert{
32  public:
33  //--------------------------------------------------------------------------
38  //--------------------------------------------------------------------------
39  template<typename...Args>
40  static std::string toString(Args&&...args){
41  std::stringstream s;
42  argsToStream(s, std::forward<Args>(args)...);
43  return s.str();
44  }
45  private:
46  //--------------------------------------------------------------------------
50  //--------------------------------------------------------------------------
51  template<typename Last>
52  static void argsToStream(std::stringstream& stream, Last&& last) {
53  stream << last;
54  }
55 
56  //--------------------------------------------------------------------------
61  //--------------------------------------------------------------------------
62  template<typename First, typename...Rest >
63  static void argsToStream(std::stringstream& stream, First&& first, Rest&&...rest) {
64  stream << first;
65  argsToStream(stream, std::forward<Rest>(rest)...);
66  }
67 };
68 
69 
70 
71 /* This function is (almost) completely ripped from the erasure_code_test.cc file
72  distributed with the isa-l library. */
74  unsigned char* encode_matrix, // in: encode matrix
75  unsigned char* decode_matrix, // in: buffer, out: generated decode matrix
76  unsigned int* decode_index, // out: order of healthy blocks used for decoding [data#1, data#3, ..., parity#1... ]
77  unsigned char* src_err_list, // in: array of #nerrs size [index error #1, index error #2, ... ]
78  unsigned char* src_in_err, // in: array of #data size > [1,0,0,0,1,0...] -> 0 == no error, 1 == error
79  unsigned int nerrs, // #total errors
80  unsigned int nsrcerrs, // #data errors
81  unsigned int k, // #data
82  unsigned int m // #data+parity
83 )
84 {
85  unsigned i, j, p;
86  unsigned int r;
87  unsigned char* invert_matrix, * backup, * b, s;
88  int incr = 0;
89 
90  size_t mk = (size_t)m * (size_t)k;
91  std::vector<unsigned char> memory(3 * mk);
92  b = &memory[0];
93  backup = &memory[mk];
94  invert_matrix = &memory[2 * mk];
95 
96  // Construct matrix b by removing error rows
97  for (i = 0, r = 0; i < k; i++, r++) {
98  while (src_in_err[r]) {
99  r++;
100  }
101  for (j = 0; j < k; j++) {
102  b[k * i + j] = encode_matrix[k * r + j];
103  backup[k * i + j] = encode_matrix[k * r + j];
104  }
105  decode_index[i] = r;
106  }
107  incr = 0;
108  while (gf_invert_matrix(b, invert_matrix, k) < 0) {
109  if (nerrs == (m - k)) {
110  return -1;
111  }
112  incr++;
113  memcpy(b, backup, mk);
114  for (i = nsrcerrs; i < nerrs - nsrcerrs; i++) {
115  if (src_err_list[i] == (decode_index[k - 1] + incr)) {
116  // skip the erased parity line
117  incr++;
118  continue;
119  }
120  }
121  if (decode_index[k - 1] + incr >= m) {
122  return -1;
123  }
124  decode_index[k - 1] += incr;
125  for (j = 0; j < k; j++) {
126  b[k * (k - 1) + j] = encode_matrix[k * decode_index[k - 1] + j];
127  }
128 
129  };
130 
131  for (i = 0; i < nsrcerrs; i++) {
132  for (j = 0; j < k; j++) {
133  decode_matrix[k * i + j] = invert_matrix[k * src_err_list[i] + j];
134  }
135  }
136  /* src_err_list from encode_matrix * invert of b for parity decoding */
137  for (p = nsrcerrs; p < nerrs; p++) {
138  for (i = 0; i < k; i++) {
139  s = 0;
140  for (j = 0; j < k; j++) {
141  s ^= gf_mul(invert_matrix[j * k + i],
142  encode_matrix[k * src_err_list[p] + j]);
143  }
144 
145  decode_matrix[k * p + i] = s;
146  }
147  }
148  return 0;
149 }
150 
152  objcfg( objcfg ),
153  encode_matrix( objcfg.nbchunks * objcfg.nbdata )
154 {
155  // k = data
156  // m = data + parity
157  gf_gen_cauchy1_matrix( encode_matrix.data(), static_cast<int>( objcfg.nbchunks ), static_cast<int>( objcfg.nbdata ) );
158 }
159 
160 
161 std::string RedundancyProvider::getErrorPattern( stripes_t &stripes ) const
162 {
163  std::string pattern( objcfg.nbchunks, 0 );
164  for( uint8_t i = 0; i < objcfg.nbchunks; ++i )
165  if( !stripes[i].valid ) pattern[i] = '\1';
166 
167  return pattern;
168 }
169 
170 
171 RedundancyProvider::CodingTable& RedundancyProvider::getCodingTable( const std::string& pattern )
172 {
173  std::lock_guard<std::mutex> lock(mutex);
174 
175  /* If decode matrix is not already cached we have to construct it. */
176  if( !cache.count(pattern) )
177  {
178  /* Expand pattern */
179  int nerrs = 0, nsrcerrs = 0;
180  unsigned char err_indx_list[objcfg.nbparity];
181  /* Avoid narrowing cast warning, size is always < 256 */
182  uint8_t n = static_cast<uint8_t>(pattern.size() & 0xff);
183  for (uint8_t i = 0; i < n; i++) {
184  if (pattern[i]) {
185  err_indx_list[nerrs++] = i;
186  if (i < objcfg.nbdata) { nsrcerrs++; }
187  }
188  }
189 
190  /* Allocate Decode Object. */
191  CodingTable dd;
192  dd.nErrors = nerrs;
193  dd.blockIndices.resize( objcfg.nbdata );
194  dd.table.resize( objcfg.nbdata * objcfg.nbparity * 32);
195 
196  /* Compute decode matrix. */
197  std::vector<unsigned char> decode_matrix(objcfg.nbchunks * objcfg.nbdata);
198 
199  if (gf_gen_decode_matrix( encode_matrix.data(), decode_matrix.data(), dd.blockIndices.data(),
200  err_indx_list, (unsigned char*) pattern.c_str(), nerrs, nsrcerrs,
201  static_cast<int>( objcfg.nbdata ), static_cast<int>( objcfg.nbchunks ) ) )
202  throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, errno, "Failed computing decode matrix" ) );
203 
204  /* Compute Tables. */
205  ec_init_tables( static_cast<int>( objcfg.nbdata ), nerrs, decode_matrix.data(), dd.table.data() );
206  cache.insert( std::make_pair(pattern, dd) );
207  }
208  return cache.at(pattern);
209 }
210 
211 void RedundancyProvider::replication( stripes_t &stripes )
212 {
213  // get index of a valid block
214  void *healthy = nullptr;
215  for( auto itr = stripes.begin(); itr != stripes.end(); ++itr )
216  {
217  if( itr->valid )
218  healthy = itr->buffer;
219  }
220 
221  if( !healthy ) throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ) );
222 
223  // now replicate, by now 'buffers' should contain all chunks
224  for( uint8_t i = 0; i < objcfg.nbchunks; ++i )
225  {
226  if( !stripes[i].valid )
227  memcpy( stripes[i].buffer, healthy, objcfg.chunksize );
228  }
229 }
230 
232 {
233  /* throws if stripe is not recoverable */
234  std::string pattern = getErrorPattern( stripes );
235 
236  /* nothing to do if there are no parity blocks. */
237  if ( !objcfg.nbparity ) return;
238 
239  /* in case of a single data block use replication */
240  if ( objcfg.nbdata == 1 )
241  return replication( stripes );
242 
243  /* normal operation: erasure coding */
244  CodingTable& dd = getCodingTable(pattern);
245 
246  unsigned char* inbuf[objcfg.nbdata];
247  for( uint8_t i = 0; i < objcfg.nbdata; i++ )
248  inbuf[i] = reinterpret_cast<unsigned char*>( stripes[dd.blockIndices[i]].buffer );
249 
250  std::vector<unsigned char> memory( dd.nErrors * objcfg.chunksize );
251 
252  unsigned char* outbuf[dd.nErrors];
253  for (int i = 0; i < dd.nErrors; i++)
254  {
255  outbuf[i] = &memory[i * objcfg.chunksize];
256  }
257 
258  ec_encode_data(
259  static_cast<int>( objcfg.chunksize ), // Length of each block of data (vector) of source or destination data.
260  static_cast<int>( objcfg.nbdata ), // The number of vector sources in the generator matrix for coding.
261  dd.nErrors, // The number of output vectors to concurrently encode/decode.
262  dd.table.data(), // Pointer to array of input tables
263  inbuf, // Array of pointers to source input buffers
264  outbuf // Array of pointers to coded output buffers
265  );
266 
267  int e = 0;
268  for (size_t i = 0; i < objcfg.nbchunks; i++)
269  {
270  if( pattern[i] )
271  {
272  memcpy( stripes[i].buffer, outbuf[e], objcfg.chunksize );
273  e++;
274  }
275  }
276 }
277 
278 
279 };
Class for computing parities and recovering data.
static std::string toString(Args &&...args)
RedundancyProvider(const ObjCfg &objcfg)
void compute(stripes_t &stripes)
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
std::vector< stripe_t > stripes_t
All stripes in a block.
static int gf_gen_decode_matrix(unsigned char *encode_matrix, unsigned char *decode_matrix, unsigned int *decode_index, unsigned char *src_err_list, unsigned char *src_in_err, unsigned int nerrs, unsigned int nsrcerrs, unsigned int k, unsigned int m)
const uint8_t nbdata
Definition: XrdEcObjCfg.hh:87
const uint8_t nbchunks
Definition: XrdEcObjCfg.hh:85
const uint8_t nbparity
Definition: XrdEcObjCfg.hh:86
const uint64_t chunksize
Definition: XrdEcObjCfg.hh:89