XRootD
XrdClParallelOperation.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_PARALLELOPERATION_HH__
27 #define __XRD_CL_PARALLELOPERATION_HH__
28 
29 #include "XrdCl/XrdClOperations.hh"
31 #include "XrdCl/XrdClDefaultEnv.hh"
32 #include "XrdCl/XrdClPostMaster.hh"
33 #include "XrdCl/XrdClJobManager.hh"
34 
35 #include <atomic>
36 #include <condition_variable>
37 #include <mutex>
38 
39 namespace XrdCl
40 {
41 
42  //----------------------------------------------------------------------------
43  // Interface for different execution policies:
44  // - all : all operations need to succeed in order for the parallel
45  // operation to be successful
46  // - any : just one of the operations needs to succeed in order for
47  // the parallel operation to be successful
48  // - some : n (user defined) operations need to succeed in order for
49  // the parallel operation to be successful
50  // - at least : at least n (user defined) operations need to succeed in
51  // order for the parallel operation to be successful (the
52  // user handler will be called only when all operations are
53  // resolved)
54  //
55  // @param status : status returned by one of the aggregated operations
56  //
57  // @return : true if the status should be passed to the user handler,
58  // false otherwise.
59  //----------------------------------------------------------------------------
61  {
62  virtual ~PolicyExecutor()
63  {
64  }
65 
66  virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
67 
68  virtual XRootDStatus Result() = 0;
69  };
70 
71  //----------------------------------------------------------------------------
77  //----------------------------------------------------------------------------
78  template<bool HasHndl>
79  class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
80  {
81  template<bool> friend class ParallelOperation;
82 
83  public:
84 
85  //------------------------------------------------------------------------
87  //------------------------------------------------------------------------
88  template<bool from>
90  ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
91  pipelines( std::move( obj.pipelines ) ),
92  policy( std::move( obj.policy ) )
93  {
94  }
95 
96  //------------------------------------------------------------------------
102  //------------------------------------------------------------------------
103  template<class Container>
104  ParallelOperation( Container &&container )
105  {
106  static_assert( !HasHndl, "Constructor is available only operation without handler");
107 
108  pipelines.reserve( container.size() );
109  auto begin = std::make_move_iterator( container.begin() );
110  auto end = std::make_move_iterator( container.end() );
111  std::copy( begin, end, std::back_inserter( pipelines ) );
112  container.clear(); // there's junk inside so we clear it
113  }
114 
116  {
117  }
118 
119  //------------------------------------------------------------------------
121  //------------------------------------------------------------------------
122  std::string ToString()
123  {
124  std::ostringstream oss;
125  oss << "Parallel(";
126  for( size_t i = 0; i < pipelines.size(); i++ )
127  {
128  oss << pipelines[i]->ToString();
129  if( i + 1 != pipelines.size() )
130  {
131  oss << " && ";
132  }
133  }
134  oss << ")";
135  return oss.str();
136  }
137 
138  //------------------------------------------------------------------------
143  //------------------------------------------------------------------------
145  {
146  policy.reset( new AllPolicy() );
147  return std::move( *this );
148  }
149 
150  //------------------------------------------------------------------------
155  //------------------------------------------------------------------------
157  {
158  policy.reset( new AnyPolicy( pipelines.size() ) );
159  return std::move( *this );
160  }
161 
162  //------------------------------------------------------------------------
163  // Set policy to `Some`
167  //------------------------------------------------------------------------
168  ParallelOperation<HasHndl> Some( size_t threshold )
169  {
170  policy.reset( new SomePolicy( pipelines.size(), threshold ) );
171  return std::move( *this );
172  }
173 
174  //------------------------------------------------------------------------
180  //------------------------------------------------------------------------
182  {
183  policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
184  return std::move( *this );
185  }
186 
187  private:
188 
189  //------------------------------------------------------------------------
194  //------------------------------------------------------------------------
195  struct AllPolicy : public PolicyExecutor
196  {
197  bool Examine( const XrdCl::XRootDStatus &status )
198  {
199  // keep the status in case this is the final result
200  res = status;
201  if( status.IsOK() ) return false;
202  // we require all request to succeed
203  return true;
204  }
205 
206  XRootDStatus Result()
207  {
208  return res;
209  }
210 
211  XRootDStatus res;
212  };
213 
214  //------------------------------------------------------------------------
219  //------------------------------------------------------------------------
220  struct AnyPolicy : public PolicyExecutor
221  {
222  AnyPolicy( size_t size) : cnt( size )
223  {
224  }
225 
226  bool Examine( const XrdCl::XRootDStatus &status )
227  {
228  // keep the status in case this is the final result
229  res = status;
230  // decrement the counter
231  size_t nb = cnt.fetch_sub( 1, std::memory_order_relaxed );
232  // we require just one operation to be successful
233  if( status.IsOK() ) return true;
234  // lets see if this is the last one?
235  if( nb == 1 ) return true;
236  // we still have a chance there will be one that is successful
237  return false;
238  }
239 
240  XRootDStatus Result()
241  {
242  return res;
243  }
244 
245  private:
246  std::atomic<size_t> cnt;
247  XRootDStatus res;
248  };
249 
250  //------------------------------------------------------------------------
255  //------------------------------------------------------------------------
256  struct SomePolicy : PolicyExecutor
257  {
258  SomePolicy( size_t size, size_t threshold ) : failed( 0 ), succeeded( 0 ),
259  threshold( threshold ), size( size )
260  {
261  }
262 
263  bool Examine( const XrdCl::XRootDStatus &status )
264  {
265  // keep the status in case this is the final result
266  res = status;
267  if( status.IsOK() )
268  {
269  size_t s = succeeded.fetch_add( 1, std::memory_order_relaxed );
270  if( s + 1 == threshold ) return true; // we reached the threshold
271  // we are not yet there
272  return false;
273  }
274  size_t f = failed.fetch_add( 1, std::memory_order_relaxed );
275  // did we drop below the threshold
276  if( f == size - threshold ) return true;
277  // we still have a chance there will be enough of successful operations
278  return false;
279  }
280 
281  XRootDStatus Result()
282  {
283  return res;
284  }
285 
286  private:
287  std::atomic<size_t> failed;
288  std::atomic<size_t> succeeded;
289  const size_t threshold;
290  const size_t size;
291  XRootDStatus res;
292  };
293 
294  //------------------------------------------------------------------------
300  //------------------------------------------------------------------------
301  struct AtLeastPolicy : PolicyExecutor
302  {
303  AtLeastPolicy( size_t size, size_t threshold ) : pending_cnt( size ),
304  failed_cnt( 0 ),
305  failed_threshold( size - threshold )
306  {
307  }
308 
309  bool Examine( const XrdCl::XRootDStatus &status )
310  {
311  // update number of pending operations
312  size_t pending = pending_cnt.fetch_sub( 1, std::memory_order_relaxed ) - 1;
313  // although we might have the minimum to succeed we wait for the rest
314  if( status.IsOK() ) return ( pending == 0 );
315  size_t nb = failed_cnt.fetch_add( 1, std::memory_order_relaxed );
316  if( nb == failed_threshold ) res = status; // we dropped below the threshold
317  // if we still have to wait for pending operations return false,
318  // otherwise all is done, return true
319  return ( pending == 0 );
320  }
321 
322  XRootDStatus Result()
323  {
324  return res;
325  }
326 
327  private:
328  std::atomic<size_t> pending_cnt;
329  std::atomic<size_t> failed_cnt;
330  const size_t failed_threshold;
331  XRootDStatus res;
332  };
333 
334  //------------------------------------------------------------------------
336  //------------------------------------------------------------------------
337  struct barrier_t
338  {
339  barrier_t() : on( true ) { }
340 
341  void wait()
342  {
343  std::unique_lock<std::mutex> lck( mtx );
344  if( on ) cv.wait( lck );
345  }
346 
347  void lift()
348  {
349  std::unique_lock<std::mutex> lck( mtx );
350  on = false;
351  cv.notify_all();
352  }
353 
354  private:
355  std::condition_variable cv;
356  std::mutex mtx;
357  bool on;
358  };
359 
360  //------------------------------------------------------------------------
365  //------------------------------------------------------------------------
366  struct Ctx
367  {
368  //----------------------------------------------------------------------
372  //----------------------------------------------------------------------
373  Ctx( PipelineHandler *handler, PolicyExecutor *policy ): handler( handler ),
374  policy( policy )
375  {
376  }
377 
378  //----------------------------------------------------------------------
380  //----------------------------------------------------------------------
381  ~Ctx()
382  {
383  Handle( XRootDStatus() );
384  }
385 
386  //----------------------------------------------------------------------
391  //----------------------------------------------------------------------
392  inline void Examine( const XRootDStatus &st )
393  {
394  if( policy->Examine( st ) )
395  Handle( policy->Result() );
396  }
397 
398  //----------------------------------------------------------------------
403  //---------------------------------------------------------------------
404  inline void Handle( const XRootDStatus &st )
405  {
406  PipelineHandler* hdlr = handler.exchange( nullptr, std::memory_order_relaxed );
407  if( hdlr )
408  {
409  barrier.wait();
410  hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
411  }
412  }
413 
414  //----------------------------------------------------------------------
416  //----------------------------------------------------------------------
417  std::atomic<PipelineHandler*> handler;
418 
419  //----------------------------------------------------------------------
421  //----------------------------------------------------------------------
422  std::unique_ptr<PolicyExecutor> policy;
423 
424  //----------------------------------------------------------------------
427  //----------------------------------------------------------------------
428  barrier_t barrier;
429  };
430 
431  //------------------------------------------------------------------------
433  //------------------------------------------------------------------------
434  struct PipelineEnd : public Job
435  {
436  //----------------------------------------------------------------------
437  // Constructor
438  //----------------------------------------------------------------------
439  PipelineEnd( std::shared_ptr<Ctx> &ctx,
440  const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
441  {
442  }
443 
444  //----------------------------------------------------------------------
445  // Run Ctx::Examine in the thread-pool
446  //----------------------------------------------------------------------
447  void Run( void* )
448  {
449  ctx->Examine( st );
450  delete this;
451  }
452 
453  private:
454  std::shared_ptr<Ctx> ctx; //< ParallelOperaion context
455  XrdCl::XRootDStatus st; //< final status of the ParallelOperation
456  };
457 
458  //------------------------------------------------------------------------
460  //------------------------------------------------------------------------
461  inline static
462  void Schedule( std::shared_ptr<Ctx> &ctx, const XrdCl::XRootDStatus &st)
463  {
465  PipelineEnd *end = new PipelineEnd( ctx, st );
466  mgr->QueueJob( end, nullptr );
467  }
468 
469  //------------------------------------------------------------------------
475  //------------------------------------------------------------------------
476  XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout )
477  {
478  // make sure we have a valid policy for the parallel operation
479  if( !policy ) policy.reset( new AllPolicy() );
480 
481  std::shared_ptr<Ctx> ctx =
482  std::make_shared<Ctx>( handler, policy.release() );
483 
484  uint16_t timeout = pipelineTimeout < this->timeout ?
485  pipelineTimeout : this->timeout;
486 
487  for( size_t i = 0; i < pipelines.size(); ++i )
488  {
489  if( !pipelines[i] ) continue;
490  pipelines[i].Run( timeout,
491  [ctx]( const XRootDStatus &st ) mutable { Schedule( ctx, st ); } );
492  }
493 
494  ctx->barrier.lift();
495  return XRootDStatus();
496  }
497 
498  std::vector<Pipeline> pipelines;
499  std::unique_ptr<PolicyExecutor> policy;
500  };
501 
502  //----------------------------------------------------------------------------
504  //----------------------------------------------------------------------------
505  template<class Container>
506  inline ParallelOperation<false> Parallel( Container &&container )
507  {
508  return ParallelOperation<false>( container );
509  }
510 
511  //----------------------------------------------------------------------------
513  //----------------------------------------------------------------------------
514  inline void PipesToVec( std::vector<Pipeline>& )
515  {
516  // base case
517  }
518 
519  //----------------------------------------------------------------------------
520  // Declare PipesToVec (we need to do declare those functions ahead of
521  // definitions, as they may call each other.
522  //----------------------------------------------------------------------------
523  template<typename ... Others>
524  inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
525  Others&... others );
526 
527  template<typename ... Others>
528  inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
529  Others&... others );
530 
531  template<typename ... Others>
532  inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
533  Others&... others );
534 
535  //----------------------------------------------------------------------------
536  // Define PipesToVec
537  //----------------------------------------------------------------------------
538  template<typename ... Others>
539  void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
540  Others&... others )
541  {
542  v.emplace_back( operation );
543  PipesToVec( v, others... );
544  }
545 
546  template<typename ... Others>
547  void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
548  Others&... others )
549  {
550  v.emplace_back( operation );
551  PipesToVec( v, others... );
552  }
553 
554  template<typename ... Others>
555  void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
556  Others&... others )
557  {
558  v.emplace_back( std::move( pipeline ) );
559  PipesToVec( v, others... );
560  }
561 
562  //----------------------------------------------------------------------------
567  //----------------------------------------------------------------------------
568  template<typename ... Operations>
569  inline ParallelOperation<false> Parallel( Operations&& ... operations )
570  {
571  constexpr size_t size = sizeof...( operations );
572  std::vector<Pipeline> v;
573  v.reserve( size );
574  PipesToVec( v, operations... );
575  return Parallel( v );
576  }
577 }
578 
579 #endif // __XRD_CL_OPERATIONS_HH__
static PostMaster * GetPostMaster()
Get default post master.
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
friend class PipelineHandler
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
std::unique_ptr< PipelineHandler > handler
Operation handler.
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
ParallelOperation(Container &&container)
ParallelOperation< HasHndl > Some(size_t threshold)
ParallelOperation< HasHndl > All()
ParallelOperation< HasHndl > Any()
ParallelOperation< HasHndl > AtLeast(size_t threshold)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
virtual XRootDStatus Result()=0
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124