XRootD
XrdEcUtilities.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDEC_XRDECUTILITIES_HH_
26 #define SRC_XRDEC_XRDECUTILITIES_HH_
27 
28 #include "XrdEc/XrdEcObjCfg.hh"
30 #include "XrdCl/XrdClFileSystem.hh"
31 #include "XrdCl/XrdClUtils.hh"
32 
33 #include <exception>
34 #include <memory>
35 #include <random>
36 #include <queue>
37 #include <mutex>
38 #include <condition_variable>
39 
40 namespace XrdEc
41 {
42  //---------------------------------------------------------------------------
44  //---------------------------------------------------------------------------
45  struct stripe_t
46  {
47  //-------------------------------------------------------------------------
52  //-------------------------------------------------------------------------
53  stripe_t( char *buffer, bool valid ) : buffer( buffer ), valid( valid )
54  {
55  }
56 
57  char *buffer; //< buffer with stripe data
58  bool valid; //< true if data are valid, otherwise false
59  };
60 
61  //---------------------------------------------------------------------------
63  //---------------------------------------------------------------------------
64  typedef std::vector<stripe_t> stripes_t;
65 
66  //----------------------------------------------------------------------------
68  //----------------------------------------------------------------------------
69  typedef std::vector<char> buffer_t;
70 
71  //----------------------------------------------------------------------------
73  //----------------------------------------------------------------------------
74  class IOError : public std::exception
75  {
76  public:
77 
78  //------------------------------------------------------------------------
82  //------------------------------------------------------------------------
83  IOError( const XrdCl::XRootDStatus &st ) noexcept : st( st ), msg( st.ToString() )
84  {
85  }
86 
87  //------------------------------------------------------------------------
89  //------------------------------------------------------------------------
90  IOError( const IOError &err ) noexcept : st( err.st ), msg( err.st.ToString() )
91  {
92  }
93 
94  //------------------------------------------------------------------------
96  //------------------------------------------------------------------------
97  IOError& operator=( const IOError &err ) noexcept
98  {
99  st = err.st;
100  msg = err.st.ToString();
101  return *this;
102  }
103 
104  //------------------------------------------------------------------------
106  //------------------------------------------------------------------------
107  virtual ~IOError()
108  {
109  }
110 
111  //------------------------------------------------------------------------
113  //------------------------------------------------------------------------
114  virtual const char* what() const noexcept
115  {
116  return msg.c_str();
117  }
118 
119  //------------------------------------------------------------------------
121  //------------------------------------------------------------------------
123  {
124  return st;
125  }
126 
127  enum
128  {
130  };
131 
132  private:
133 
134  //------------------------------------------------------------------------
136  //------------------------------------------------------------------------
138 
139  //------------------------------------------------------------------------
141  //------------------------------------------------------------------------
142  std::string msg;
143  };
144 
145  //---------------------------------------------------------------------------
152  //---------------------------------------------------------------------------
153  void ScheduleHandler( uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler );
154 
155  //---------------------------------------------------------------------------
160  //---------------------------------------------------------------------------
162 
163 
164  //---------------------------------------------------------------------------
165  // A class implementing synchronous queue
166  //---------------------------------------------------------------------------
167  template<typename Element>
168  struct sync_queue
169  {
170  //-------------------------------------------------------------------------
171  // An internal exception used for interrupting the `dequeue` method
172  //-------------------------------------------------------------------------
173  struct wait_interrupted{ };
174 
175  //-------------------------------------------------------------------------
176  // Default constructor
177  //-------------------------------------------------------------------------
178  sync_queue() : interrupted( false )
179  {
180  }
181 
182  //-------------------------------------------------------------------------
183  // Enqueue new element into the queue
184  //-------------------------------------------------------------------------
185  inline void enqueue( Element && element )
186  {
187  std::unique_lock<std::mutex> lck( mtx );
188  elements.push( std::move( element ) );
189  cv.notify_all();
190  }
191 
192  //-------------------------------------------------------------------------
193  // Dequeue an element from the front of the queue
194  // Note: if the queue is empty blocks until a new element is enqueued
195  //-------------------------------------------------------------------------
196  inline Element dequeue()
197  {
198  std::unique_lock<std::mutex> lck( mtx );
199  while( elements.empty() )
200  {
201  cv.wait( lck );
202  if( interrupted ) throw wait_interrupted();
203  }
204  Element element = std::move( elements.front() );
205  elements.pop();
206  return element;
207  }
208 
209  //-------------------------------------------------------------------------
210  // Dequeue an element from the front of the queue
211  // Note: if the queue is empty returns false, true otherwise
212  //-------------------------------------------------------------------------
213  inline bool dequeue( Element &e )
214  {
215  std::unique_lock<std::mutex> lck( mtx );
216  if( elements.empty() ) return false;
217  e = std::move( elements.front() );
218  elements.pop();
219  return true;
220  }
221 
222  //-------------------------------------------------------------------------
223  // Checks if the queue is empty
224  //-------------------------------------------------------------------------
225  bool empty()
226  {
227  std::unique_lock<std::mutex> lck( mtx );
228  return elements.empty();
229  }
230 
231  //-------------------------------------------------------------------------
232  // Interrupt all waiting `dequeue` routines
233  //-------------------------------------------------------------------------
234  inline void interrupt()
235  {
236  interrupted = true;
237  cv.notify_all();
238  }
239 
240  private:
241  std::queue<Element> elements; //< the queue itself
242  std::mutex mtx; //< mutex guarding the queue
243  std::condition_variable cv;
244  std::atomic<bool> interrupted; //< a flag, true if all `dequeue` routines
245  //< should be interrupted
246  };
247 
248  //---------------------------------------------------------------------------
249  // Extract the block ID from the chunk file name
250  //---------------------------------------------------------------------------
251  inline static size_t fntoblk( const std::string &fn )
252  {
253  size_t end = fn.rfind( '.' );
254  size_t begin = fn.rfind( '.', end - 1 ) + 1;
255  size_t len = end - begin;
256  return std::stoul( fn.substr( begin, len ) );
257  }
258 }
259 
260 #endif /* SRC_XRDEC_XRDECUTILITIES_HH_ */
Handle an async response.
Generic I/O exception, wraps up XrdCl::XRootDStatus (.
IOError(const IOError &err) noexcept
Copy constructor.
IOError & operator=(const IOError &err) noexcept
Assigment operator.
const XrdCl::XRootDStatus & Status() const
IOError(const XrdCl::XRootDStatus &st) noexcept
virtual ~IOError()
Destructor.
virtual const char * what() const noexcept
overloaded
std::vector< stripe_t > stripes_t
All stripes in a block.
static size_t fntoblk(const std::string &fn)
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
std::vector< char > buffer_t
a buffer type
Definition: XrdEcReader.hh:45
A buffer with stripe data and info on validity.
stripe_t(char *buffer, bool valid)
bool dequeue(Element &e)
void enqueue(Element &&element)