26 #ifndef __XRD_CL_PARALLELOPERATION_HH__
27 #define __XRD_CL_PARALLELOPERATION_HH__
36 #include <condition_variable>
78 template<
bool HasHndl>
91 pipelines( std::move( obj.pipelines ) ),
92 policy( std::move( obj.policy ) )
103 template<
class Container>
106 static_assert( !HasHndl,
"Constructor is available only operation without handler");
108 pipelines.reserve( container.size() );
109 auto begin = std::make_move_iterator( container.begin() );
110 auto end = std::make_move_iterator( container.end() );
111 std::copy( begin, end, std::back_inserter( pipelines ) );
124 std::ostringstream oss;
126 for(
size_t i = 0; i < pipelines.size(); i++ )
128 oss << pipelines[i]->ToString();
129 if( i + 1 != pipelines.size() )
146 policy.reset(
new AllPolicy() );
147 return std::move( *
this );
158 policy.reset(
new AnyPolicy( pipelines.size() ) );
159 return std::move( *
this );
170 policy.reset(
new SomePolicy( pipelines.size(), threshold ) );
171 return std::move( *
this );
183 policy.reset(
new AtLeastPolicy( pipelines.size(), threshold ) );
184 return std::move( *
this );
201 if( status.
IsOK() )
return false;
206 XRootDStatus Result()
220 struct AnyPolicy :
public PolicyExecutor
222 AnyPolicy(
size_t size) : cnt( size )
231 size_t nb = cnt.fetch_sub( 1, std::memory_order_relaxed );
233 if( status.
IsOK() )
return true;
235 if( nb == 1 )
return true;
240 XRootDStatus Result()
246 std::atomic<size_t> cnt;
256 struct SomePolicy : PolicyExecutor
258 SomePolicy(
size_t size,
size_t threshold ) : failed( 0 ), succeeded( 0 ),
259 threshold( threshold ), size( size )
269 size_t s = succeeded.fetch_add( 1, std::memory_order_relaxed );
270 if( s + 1 == threshold )
return true;
274 size_t f = failed.fetch_add( 1, std::memory_order_relaxed );
276 if( f == size - threshold )
return true;
281 XRootDStatus Result()
287 std::atomic<size_t> failed;
288 std::atomic<size_t> succeeded;
289 const size_t threshold;
301 struct AtLeastPolicy : PolicyExecutor
303 AtLeastPolicy(
size_t size,
size_t threshold ) : pending_cnt( size ),
305 failed_threshold( size - threshold )
312 size_t pending = pending_cnt.fetch_sub( 1, std::memory_order_relaxed ) - 1;
314 if( status.
IsOK() )
return ( pending == 0 );
315 size_t nb = failed_cnt.fetch_add( 1, std::memory_order_relaxed );
316 if( nb == failed_threshold ) res = status;
319 return ( pending == 0 );
322 XRootDStatus Result()
328 std::atomic<size_t> pending_cnt;
329 std::atomic<size_t> failed_cnt;
330 const size_t failed_threshold;
339 barrier_t() : on( true ) { }
343 std::unique_lock<std::mutex> lck( mtx );
344 if( on ) cv.wait( lck );
349 std::unique_lock<std::mutex> lck( mtx );
355 std::condition_variable cv;
383 Handle( XRootDStatus() );
392 inline void Examine(
const XRootDStatus &st )
394 if( policy->Examine( st ) )
395 Handle( policy->Result() );
404 inline void Handle(
const XRootDStatus &st )
410 hdlr->HandleResponse(
new XRootDStatus( st ),
nullptr );
417 std::atomic<PipelineHandler*>
handler;
422 std::unique_ptr<PolicyExecutor> policy;
434 struct PipelineEnd :
public Job
439 PipelineEnd( std::shared_ptr<Ctx> &ctx,
454 std::shared_ptr<Ctx> ctx;
465 PipelineEnd *end =
new PipelineEnd( ctx, st );
479 if( !policy ) policy.reset(
new AllPolicy() );
481 std::shared_ptr<Ctx> ctx =
482 std::make_shared<Ctx>(
handler, policy.release() );
484 uint16_t
timeout = pipelineTimeout < this->timeout ?
485 pipelineTimeout : this->
timeout;
487 for(
size_t i = 0; i < pipelines.size(); ++i )
489 if( !pipelines[i] )
continue;
491 [ctx](
const XRootDStatus &st )
mutable { Schedule( ctx, st ); } );
495 return XRootDStatus();
498 std::vector<Pipeline> pipelines;
499 std::unique_ptr<PolicyExecutor> policy;
505 template<
class Container>
523 template<
typename ... Others>
524 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
527 template<
typename ... Others>
528 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
531 template<
typename ... Others>
532 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
538 template<
typename ... Others>
542 v.emplace_back( operation );
546 template<
typename ... Others>
550 v.emplace_back( operation );
554 template<
typename ... Others>
558 v.emplace_back( std::move( pipeline ) );
568 template<
typename ... Operations>
571 constexpr
size_t size =
sizeof...( operations );
572 std::vector<Pipeline> v;
uint16_t timeout
Operation timeout.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
friend class PipelineHandler
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
std::unique_ptr< PipelineHandler > handler
Operation handler.
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
ParallelOperation(Container &&container)
ParallelOperation< HasHndl > Some(size_t threshold)
ParallelOperation< HasHndl > All()
ParallelOperation< HasHndl > Any()
ParallelOperation< HasHndl > AtLeast(size_t threshold)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
virtual ~PolicyExecutor()
virtual XRootDStatus Result()=0
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
bool IsOK() const
We're fine.