XRootD
XrdEcThreadPool.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdCl/XrdClJobManager.hh"
26 
27 #include <future>
28 #include <type_traits>
29 
30 #ifndef SRC_XRDEC_XRDECTHREADPOOL_HH_
31 #define SRC_XRDEC_XRDECTHREADPOOL_HH_
32 
33 namespace XrdEc
34 {
35  //---------------------------------------------------------------------------
36  // A theread pool class for the XrdEc module
37  //---------------------------------------------------------------------------
38  class ThreadPool
39  {
40  private:
41 
42  // This is the type which holds sequences
43  template<int ... Is> struct sequence {};
44 
45  // First define the template signature
46  template <int ... Ns> struct seq_gen;
47 
48  // Recursion case
49  template <int I, int ... Ns>
50  struct seq_gen<I, Ns...>
51  {
52  using type = typename seq_gen<I - 1, I - 1, Ns...>::type;
53  };
54 
55  // Recursion abort
56  template <int ... Ns>
57  struct seq_gen<0, Ns...>
58  {
59  using type = sequence<Ns...>;
60  };
61 
62  // call functional with arguments in a tuple (implementation)
63  template <typename FUNC, typename TUPL, int ... INDICES>
64  inline static auto tuple_call_impl( FUNC &func, TUPL &args, sequence<INDICES...> ) -> decltype( func( std::move( std::get<INDICES>( args ) )... ) )
65  {
66  return func( std::move( std::get<INDICES>( args ) )... );
67  }
68 
69  // call functional with argumetns packaged in a tuple
70  template <typename FUNC, typename ... ARGs>
71  inline static auto tuple_call( FUNC &func, std::tuple<ARGs...> &tup ) ->decltype( tuple_call_impl( func, tup, typename seq_gen<sizeof...(ARGs)>::type{} ) )
72  {
73  return tuple_call_impl( func, tup, typename seq_gen<sizeof...(ARGs)>::type{} );
74  }
75 
76  //-----------------------------------------------------------------------
77  // Helper class implementing a job containing any functional and its
78  // arguments.
79  //-----------------------------------------------------------------------
80  template<typename FUNC, typename RET, typename ... ARGs>
81  class AnyJob : public XrdCl::Job
82  {
83  //---------------------------------------------------------------------
84  // Run the functional (returning void) with the packaged arguments
85  //---------------------------------------------------------------------
86  static inline void RunImpl( FUNC func, std::tuple<ARGs...> &args, std::promise<void> &prms )
87  {
88  tuple_call( func, args );
89  prms.set_value();
90  }
91 
92  //---------------------------------------------------------------------
93  // Run the functional (returning anything but void) with the packaged
94  // arguments
95  //---------------------------------------------------------------------
96  template<typename RETURN>
97  static inline void RunImpl( FUNC func, std::tuple<ARGs...> &args, std::promise<RETURN> &prms )
98  {
99  prms.set_value( tuple_call( func, args ) );
100  }
101 
102  public:
103  //-------------------------------------------------------------------
108  //-------------------------------------------------------------------
109  AnyJob( FUNC func, ARGs... args ) : func( std::move( func ) ),
110  args( std::tuple<ARGs...>( std::move( args )... ) )
111  {
112  }
113 
114  //-------------------------------------------------------------------
116  //-------------------------------------------------------------------
117  void Run( void *arg )
118  {
119  RunImpl( this->func, this->args, this->prms );
120  delete this;
121  }
122 
123  //-------------------------------------------------------------------
125  //-------------------------------------------------------------------
126  std::future<RET> GetFuture()
127  {
128  return prms.get_future();
129  }
130 
131  protected:
132 
133  FUNC func; //< the functional
134  std::tuple<ARGs...> args; //< the arguments
135  std::promise<RET> prms; //< the promiss that there will be a result
136  };
137 
138  public:
139 
140  //-----------------------------------------------------------------------
142  //-----------------------------------------------------------------------
144  {
145  threadpool.Stop();
146  threadpool.Finalize();
147  }
148 
149  //-----------------------------------------------------------------------
151  //-----------------------------------------------------------------------
153  {
154  static ThreadPool instance;
155  return instance;
156  }
157 
158  //-----------------------------------------------------------------------
160  //-----------------------------------------------------------------------
161  template<typename FUNC, typename ... ARGs>
162  inline std::future<std::invoke_result_t<FUNC, ARGs...>>
163  Execute( FUNC func, ARGs... args )
164  {
165  using RET = std::invoke_result_t<FUNC, ARGs...>;
166  auto *job = new AnyJob<FUNC, RET, ARGs...>( func, std::move( args )... );
167  std::future<RET> ftr = job->GetFuture();
168  threadpool.QueueJob( job, nullptr );
169  return ftr;
170  }
171 
172  private:
173 
174  //-----------------------------------------------------------------------
176  //-----------------------------------------------------------------------
177  ThreadPool() : threadpool( 64 )
178  {
179  threadpool.Initialize();
180  threadpool.Start();
181  }
182 
183  XrdCl::JobManager threadpool; //< the thread-pool itself
184  };
185 
186 }
187 
188 
189 #endif /* SRC_XRDEC_XRDECTHREADPOOL_HH_ */
#define I(x)
A synchronized queue.
bool Finalize()
Finalize the job manager, clear the queues.
bool Start()
Start the workers.
bool Initialize()
Initialize the job manager.
bool Stop()
Stop the workers.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
static ThreadPool & Instance()
Singleton access.
std::future< std::invoke_result_t< FUNC, ARGs... > > Execute(FUNC func, ARGs... args)
Schedule a functional (together with its arguments) for execution.
~ThreadPool()
Destructor.