XRootD
XrdClOperations.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_OPERATIONS_HH__
27 #define __XRD_CL_OPERATIONS_HH__
28 
29 #include <memory>
30 #include <stdexcept>
31 #include <sstream>
32 #include <tuple>
33 #include <future>
36 #include "XrdCl/XrdClArg.hh"
39 #include "XrdSys/XrdSysPthread.hh"
40 
42 #include "XrdCl/XrdClJobManager.hh"
43 #include "XrdCl/XrdClPostMaster.hh"
44 #include "XrdCl/XrdClDefaultEnv.hh"
45 
46 namespace XrdCl
47 {
48 
49  template<bool HasHndl> class Operation;
50 
51  class Pipeline;
52 
53 
54  //----------------------------------------------------------------------------
56  //----------------------------------------------------------------------------
57  typedef std::function<Operation<true>*(const XRootDStatus&)> rcvry_func;
58 
59  //----------------------------------------------------------------------------
62  //----------------------------------------------------------------------------
64  {
65  template<bool> friend class Operation;
66 
67  public:
68 
69  //------------------------------------------------------------------------
73  //------------------------------------------------------------------------
74  PipelineHandler( ResponseHandler *handler );
75 
76  //------------------------------------------------------------------------
78  //------------------------------------------------------------------------
80  {
81  }
82 
83  //------------------------------------------------------------------------
85  //------------------------------------------------------------------------
86  void HandleResponseWithHosts( XRootDStatus *status, AnyObject *response,
87  HostList *hostList );
88 
89  //------------------------------------------------------------------------
91  //------------------------------------------------------------------------
92  void HandleResponse( XRootDStatus *status, AnyObject *response );
93 
94  //------------------------------------------------------------------------
96  //------------------------------------------------------------------------
98  {
99  }
100 
101  //------------------------------------------------------------------------
105  //------------------------------------------------------------------------
106  void AddOperation( Operation<true> *operation );
107 
108  //------------------------------------------------------------------------
115  //------------------------------------------------------------------------
116  void Assign( const Timeout &timeout,
117  std::promise<XRootDStatus> prms,
118  std::function<void(const XRootDStatus&)> final,
119  Operation<true> *opr );
120 
121  //------------------------------------------------------------------------
123  //------------------------------------------------------------------------
124  void Assign( std::function<void(const XRootDStatus&)> final );
125 
126  //------------------------------------------------------------------------
128  //------------------------------------------------------------------------
129  void PreparePipelineStart();
130 
131  private:
132 
133  //------------------------------------------------------------------------
135  //------------------------------------------------------------------------
136  void HandleResponseImpl( XRootDStatus *status, AnyObject *response,
137  HostList *hostList = nullptr );
138 
139  inline void dealloc( XRootDStatus *status, AnyObject *response,
140  HostList *hostList )
141  {
142  delete status;
143  delete response;
144  delete hostList;
145  }
146 
147  //------------------------------------------------------------------------
149  //------------------------------------------------------------------------
150  std::unique_ptr<ResponseHandler> responseHandler;
151 
152  //------------------------------------------------------------------------
154  //------------------------------------------------------------------------
155  std::unique_ptr<Operation<true>> currentOperation;
156 
157  //------------------------------------------------------------------------
159  //------------------------------------------------------------------------
160  std::unique_ptr<Operation<true>> nextOperation;
161 
162  //------------------------------------------------------------------------
164  //------------------------------------------------------------------------
165  Timeout timeout;
166 
167  //------------------------------------------------------------------------
169  //------------------------------------------------------------------------
170  std::promise<XRootDStatus> prms;
171 
172  //------------------------------------------------------------------------
175  //------------------------------------------------------------------------
176  std::function<void(const XRootDStatus&)> final;
177  };
178 
179  //----------------------------------------------------------------------------
185  //----------------------------------------------------------------------------
186  template<bool HasHndl>
187  class Operation
188  {
189  // Declare friendship between templates
190  template<bool>
191  friend class Operation;
192 
193  friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
194 
195  friend class Pipeline;
196  friend class PipelineHandler;
197 
198  public:
199 
200  //------------------------------------------------------------------------
202  //------------------------------------------------------------------------
203  Operation() : valid( true )
204  {
205  }
206 
207  //------------------------------------------------------------------------
209  //------------------------------------------------------------------------
210  template<bool from>
212  handler( std::move( op.handler ) ), valid( true )
213  {
214  if( !op.valid ) throw std::invalid_argument( "Cannot construct "
215  "Operation from an invalid Operation!" );
216  op.valid = false;
217  }
218 
219  //------------------------------------------------------------------------
221  //------------------------------------------------------------------------
222  virtual ~Operation()
223  {
224  }
225 
226  //------------------------------------------------------------------------
228  //------------------------------------------------------------------------
229  virtual std::string ToString() = 0;
230 
231  //------------------------------------------------------------------------
235  //------------------------------------------------------------------------
236  virtual Operation<HasHndl>* Move() = 0;
237 
238  //------------------------------------------------------------------------
243  //------------------------------------------------------------------------
244  virtual Operation<true>* ToHandled() = 0;
245 
246  protected:
247 
248  //------------------------------------------------------------------------
253  //------------------------------------------------------------------------
254  void Run( Timeout timeout,
255  std::promise<XRootDStatus> prms,
256  std::function<void(const XRootDStatus&)> final )
257  {
258  static_assert(HasHndl, "Only an operation that has a handler can be assigned to workflow");
259  handler->Assign( timeout, std::move( prms ), std::move( final ), this );
260 
261  PipelineHandler *h = handler.release();
262  XRootDStatus st;
263  try
264  {
265  st = RunImpl( h, timeout );
266  }
267  catch( const operation_expired& ex )
268  {
270  }
271  catch( const PipelineException& ex ) // probably not needed
272  {
273  st = ex.GetError();
274  }
275  catch( const std::exception& ex )
276  {
277  st = XRootDStatus( stError, errInternal, 0, ex.what() );
278  }
279 
280  if( !st.IsOK() ){
281  ResponseJob *job = new ResponseJob(h, new XRootDStatus(st), 0, nullptr);
283  }
284  }
285 
286  //------------------------------------------------------------------------
292  //------------------------------------------------------------------------
293  virtual XRootDStatus RunImpl( PipelineHandler *handler, uint16_t timeout ) = 0;
294 
295  //------------------------------------------------------------------------
299  //------------------------------------------------------------------------
301  {
302  if( handler )
303  handler->AddOperation( op );
304  }
305 
306  //------------------------------------------------------------------------
308  //------------------------------------------------------------------------
309  std::unique_ptr<PipelineHandler> handler;
310 
311  //------------------------------------------------------------------------
313  //------------------------------------------------------------------------
314  bool valid;
315  };
316 
317  //----------------------------------------------------------------------------
323  //----------------------------------------------------------------------------
324  class Pipeline
325  {
326  template<bool> friend class ParallelOperation;
327  friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
328  friend class PipelineHandler;
329 
330  public:
331 
332  //------------------------------------------------------------------------
334  //------------------------------------------------------------------------
336  {
337  }
338 
339  //------------------------------------------------------------------------
341  //------------------------------------------------------------------------
343  operation( op->Move() )
344  {
345  }
346 
347  //------------------------------------------------------------------------
349  //------------------------------------------------------------------------
351  operation( op.Move() )
352  {
353  }
354 
355  //------------------------------------------------------------------------
357  //------------------------------------------------------------------------
359  operation( op.Move() )
360  {
361  }
362 
364  operation( op->ToHandled() )
365  {
366  }
367 
368  //------------------------------------------------------------------------
370  //------------------------------------------------------------------------
372  operation( op.ToHandled() )
373  {
374  }
375 
376  //------------------------------------------------------------------------
378  //------------------------------------------------------------------------
380  operation( op.ToHandled() )
381  {
382  }
383 
384  Pipeline( Pipeline &&pipe ) :
385  operation( std::move( pipe.operation ) )
386  {
387  }
388 
389  //------------------------------------------------------------------------
391  //------------------------------------------------------------------------
393  {
394  operation = std::move( pipe.operation );
395  return *this;
396  }
397 
398  //------------------------------------------------------------------------
400  //------------------------------------------------------------------------
402  {
403  operation->AddOperation( op.Move() );
404  return *this;
405  }
406 
407  //------------------------------------------------------------------------
409  //------------------------------------------------------------------------
411  {
412  operation->AddOperation( op.ToHandled() );
413  return *this;
414  }
415 
416  //------------------------------------------------------------------------
420  //------------------------------------------------------------------------
421  operator Operation<true>&()
422  {
423  if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." );
424  return *operation.get();
425  }
426 
427  //------------------------------------------------------------------------
431  //------------------------------------------------------------------------
432  operator bool()
433  {
434  return bool( operation );
435  }
436 
437  //------------------------------------------------------------------------
441  //------------------------------------------------------------------------
442  static void Stop( const XRootDStatus &status = XrdCl::XRootDStatus() );
443 
444  //------------------------------------------------------------------------
446  //------------------------------------------------------------------------
447  static void Repeat();
448 
449  //------------------------------------------------------------------------
451  //------------------------------------------------------------------------
452  static void Replace( Operation<false> &&opr );
453 
454  //------------------------------------------------------------------------
456  //------------------------------------------------------------------------
457  static void Replace( Pipeline p );
458 
459  //------------------------------------------------------------------------
461  //------------------------------------------------------------------------
462  static void Ignore();
463 
464  private:
465 
466  //------------------------------------------------------------------------
471  //------------------------------------------------------------------------
472  Operation<true>* operator->()
473  {
474  return operation.get();
475  }
476 
477  //------------------------------------------------------------------------
482  //------------------------------------------------------------------------
483  void Run( Timeout timeout, std::function<void(const XRootDStatus&)> final = nullptr )
484  {
485  if( ftr.valid() )
486  throw std::logic_error( "Pipeline is already running!" );
487 
488  // a promise that the pipe will have a result
489  std::promise<XRootDStatus> prms;
490  ftr = prms.get_future();
491 
492  if( !operation ) std::logic_error( "Empty pipeline!" );
493 
494  Operation<true> *opr = operation.release();
495  PipelineHandler *h = opr->handler.get();
496  if( h )
497  h->PreparePipelineStart();
498 
499  opr->Run( timeout, std::move( prms ), std::move( final ) );
500  }
501 
502  //------------------------------------------------------------------------
504  //------------------------------------------------------------------------
505  std::unique_ptr<Operation<true>> operation;
506 
507  //------------------------------------------------------------------------
509  //------------------------------------------------------------------------
510  std::future<XRootDStatus> ftr;
511 
512  };
513 
514  //----------------------------------------------------------------------------
521  //----------------------------------------------------------------------------
522  inline std::future<XRootDStatus> Async( Pipeline pipeline, uint16_t timeout = 0 )
523  {
524  pipeline.Run( timeout );
525  return std::move( pipeline.ftr );
526  }
527 
528  //----------------------------------------------------------------------------
536  //----------------------------------------------------------------------------
537  inline XRootDStatus WaitFor( Pipeline pipeline, uint16_t timeout = 0 )
538  {
539  return Async( std::move( pipeline ), timeout ).get();
540  }
541 
542  //----------------------------------------------------------------------------
549  //----------------------------------------------------------------------------
550  template<template<bool> class Derived, bool HasHndl, typename HdlrFactory, typename ... Args>
551  class ConcreteOperation: public Operation<HasHndl>
552  {
553  template<template<bool> class, bool, typename, typename ...>
554  friend class ConcreteOperation;
555 
556  public:
557 
558  //------------------------------------------------------------------------
562  //------------------------------------------------------------------------
563  ConcreteOperation( Args&&... args ) : args( std::tuple<Args...>( std::move( args )... ) ),
564  timeout( 0 )
565  {
566  static_assert( !HasHndl, "It is only possible to construct operation without handler" );
567  }
568 
569  //------------------------------------------------------------------------
575  //------------------------------------------------------------------------
576  template<bool from>
578  Operation<HasHndl>( std::move( op ) ), args( std::move( op.args ) ), timeout( 0 )
579  {
580  }
581 
582  //------------------------------------------------------------------------
590  //------------------------------------------------------------------------
591  template<typename Hdlr>
592  Derived<true> operator>>( Hdlr &&hdlr )
593  {
594  return this->StreamImpl( HdlrFactory::Create( hdlr ) );
595  }
596 
597  //------------------------------------------------------------------------
603  //------------------------------------------------------------------------
604  Derived<true> operator|( Operation<true> &op )
605  {
606  return PipeImpl( *this, op );
607  }
608 
609  //------------------------------------------------------------------------
615  //------------------------------------------------------------------------
616  Derived<true> operator|( Operation<true> &&op )
617  {
618  return PipeImpl( *this, op );
619  }
620 
621  //------------------------------------------------------------------------
627  //------------------------------------------------------------------------
628  Derived<true> operator|( Operation<false> &op )
629  {
630  return PipeImpl( *this, op );
631  }
632 
633  //------------------------------------------------------------------------
639  //------------------------------------------------------------------------
640  Derived<true> operator|( Operation<false> &&op )
641  {
642  return PipeImpl( *this, op );
643  }
644 
645  //------------------------------------------------------------------------
647  //------------------------------------------------------------------------
648  Derived<true> operator|( FinalOperation &&fo )
649  {
650  AllocHandler( *this );
651  this->handler->Assign( fo.final );
652  return this->template Transform<true>();
653  }
654 
655  //------------------------------------------------------------------------
659  //------------------------------------------------------------------------
661  {
662  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
663  return new Derived<HasHndl>( std::move( *me ) );
664  }
665 
666  //------------------------------------------------------------------------
670  //------------------------------------------------------------------------
672  {
673  this->handler.reset( new PipelineHandler() );
674  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
675  return new Derived<true>( std::move( *me ) );
676  }
677 
678  //------------------------------------------------------------------------
680  //------------------------------------------------------------------------
681  Derived<HasHndl> Timeout( uint16_t timeout )
682  {
683  this->timeout = timeout;
684  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
685  return std::move( *me );
686  }
687 
688  protected:
689 
690  //------------------------------------------------------------------------
694  //------------------------------------------------------------------------
695  template<bool to>
696  inline Derived<to> Transform()
697  {
698  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
699  return Derived<to>( std::move( *me ) );
700  }
701 
702  //------------------------------------------------------------------------
708  //------------------------------------------------------------------------
709  inline Derived<true> StreamImpl( ResponseHandler *handler )
710  {
711  static_assert( !HasHndl, "Operator >> is available only for operation without handler" );
712  this->handler.reset( new PipelineHandler( handler ) );
713  return Transform<true>();
714  }
715 
716  //------------------------------------------------------------------------
717  // Allocate handler if necessary
718  //------------------------------------------------------------------------
719  inline static
721  {
722  // nothing to do
723  }
724 
725  //------------------------------------------------------------------------
726  // Allocate handler if necessary
727  //------------------------------------------------------------------------
728  inline static
730  {
731  me.handler.reset( new PipelineHandler() );
732  }
733 
734  //------------------------------------------------------------------------
741  //------------------------------------------------------------------------
742  inline static
743  Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
744  Args...> &me, Operation<true> &op )
745  {
746  AllocHandler( me ); // if HasHndl is false allocate handler
747  me.AddOperation( op.Move() );
748  return me.template Transform<true>();
749  }
750 
751  //------------------------------------------------------------------------
758  //------------------------------------------------------------------------
759  inline static
760  Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
761  Args...> &me, Operation<false> &op )
762  {
763  AllocHandler( me ); // if HasHndl is false allocate handler
764  me.AddOperation( op.ToHandled() );
765  return me.template Transform<true>();
766  }
767 
768  //------------------------------------------------------------------------
770  //------------------------------------------------------------------------
771  std::tuple<Args...> args;
772 
773  //------------------------------------------------------------------------
775  //------------------------------------------------------------------------
776  uint16_t timeout;
777  };
778 }
779 
780 #endif // __XRD_CL_OPERATIONS_HH__
bool Create
static void AllocHandler(ConcreteOperation< Derived, true, HdlrFactory, Args... > &me)
static void AllocHandler(ConcreteOperation< Derived, false, HdlrFactory, Args... > &me)
std::tuple< Args... > args
Operation arguments.
ConcreteOperation(ConcreteOperation< Derived, from, HdlrFactory, Args... > &&op)
uint16_t timeout
Operation timeout.
ConcreteOperation(Args &&... args)
Operation< HasHndl > * Move()
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< false > &op)
Operation< true > * ToHandled()
Derived< true > operator|(FinalOperation &&fo)
Adds a final operation to the pipeline.
Derived< HasHndl > Timeout(uint16_t timeout)
Set operation timeout.
Derived< true > operator|(Operation< true > &&op)
Derived< true > StreamImpl(ResponseHandler *handler)
Derived< true > operator|(Operation< false > &op)
Derived< true > operator|(Operation< true > &op)
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< true > &op)
Derived< true > operator>>(Hdlr &&hdlr)
Derived< true > operator|(Operation< false > &&op)
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
virtual ~Operation()
Destructor.
friend class PipelineHandler
Operation()
Constructor.
void AddOperation(Operation< true > *op)
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
bool valid
Flag indicating if it is a valid object.
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
virtual std::string ToString()=0
Name of the operation.
virtual XRootDStatus RunImpl(PipelineHandler *handler, uint16_t timeout)=0
std::unique_ptr< PipelineHandler > handler
Operation handler.
virtual Operation< true > * ToHandled()=0
virtual Operation< HasHndl > * Move()=0
Operation(Operation< from > &&op)
Move constructor between template instances.
Pipeline exception, wrapps an XRootDStatus.
void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Callback function.
PipelineHandler()
Default Constructor.
void PreparePipelineStart()
Called by a pipeline on the handler of its first operation before Run.
void Assign(const Timeout &timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final, Operation< true > *opr)
~PipelineHandler()
Destructor.
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void AddOperation(Operation< true > *operation)
Pipeline(Operation< true > *op)
Constructor.
Pipeline(Operation< true > &&op)
Constructor.
static void Repeat()
Repeat current operation.
Pipeline(Operation< true > &op)
Constructor.
friend class PipelineHandler
Pipeline & operator=(Pipeline &&pipe)
Constructor.
Pipeline & operator|=(Operation< false > &&op)
Extend pipeline.
Pipeline(Pipeline &&pipe)
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
Pipeline(Operation< false > *op)
static void Stop(const XRootDStatus &status=XrdCl::XRootDStatus())
Pipeline(Operation< false > &&op)
Constructor.
Pipeline & operator|=(Operation< true > &&op)
Extend pipeline.
static void Replace(Operation< false > &&opr)
Replace current operation.
static void Ignore()
Ignore error and proceed with the pipeline.
Pipeline(Operation< false > &op)
Constructor.
Pipeline()
Default constructor.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
Call the user callback.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
std::function< Operation< true > *(const XRootDStatus &)> rcvry_func
Type of the recovery function to be provided by the user.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
std::vector< HostInfo > HostList
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124