XRootD
XrdClOperations.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #include <stdexcept>
27 #include <string>
28 #include "XrdCl/XrdClOperations.hh"
29 #include "XrdCl/XrdClLog.hh"
30 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClConstants.hh"
32 
33 namespace
34 {
35  //----------------------------------------------------------------------------
37  //----------------------------------------------------------------------------
38  struct StopPipeline
39  {
40  StopPipeline( const XrdCl::XRootDStatus &status ) : status( status ) { }
41  XrdCl::XRootDStatus status;
42  };
43 
44  //----------------------------------------------------------------------------
46  //----------------------------------------------------------------------------
47  struct RepeatOpeation { };
48 
49  //----------------------------------------------------------------------------
51  //----------------------------------------------------------------------------
52  struct ReplaceOperation
53  {
54  ReplaceOperation( XrdCl::Operation<false> &&opr ) : opr( opr.ToHandled() )
55  {
56  }
57 
58  std::unique_ptr<XrdCl::Operation<true>> opr;
59  };
60 
61  //----------------------------------------------------------------------------
63  //----------------------------------------------------------------------------
64  struct ReplacePipeline
65  {
66  ReplacePipeline( XrdCl::Pipeline p ) : pipeline( std::move( p ) )
67  {
68  }
69 
70  XrdCl::Pipeline pipeline;
71  };
72 
73  //----------------------------------------------------------------------------
75  //----------------------------------------------------------------------------
76  struct IgnoreError { };
77 }
78 
79 namespace XrdCl
80 {
81 
82  //----------------------------------------------------------------------------
83  // OperationHandler Constructor.
84  //----------------------------------------------------------------------------
86  responseHandler( handler )
87  {
88  }
89 
90  //----------------------------------------------------------------------------
91  // OperationHandler::AddOperation
92  //----------------------------------------------------------------------------
94  {
95  if( nextOperation )
96  {
97  nextOperation->AddOperation( operation );
98  }
99  else
100  {
101  nextOperation.reset( operation );
102  }
103  }
104 
105  //----------------------------------------------------------------------------
106  // OperationHandler::HandleResponseImpl
107  //----------------------------------------------------------------------------
108  void PipelineHandler::HandleResponseImpl( XRootDStatus *status,
109  AnyObject *response, HostList *hostList )
110  {
111  std::unique_ptr<PipelineHandler> myself( this );
112 
113  // We need to copy status as original status object is destroyed in
114  // HandleResponse function
115  XRootDStatus st( *status );
116  if( responseHandler )
117  {
118  try
119  {
120  responseHandler->HandleResponseWithHosts( status, response, hostList );
121  }
122  catch( const StopPipeline &stop )
123  {
124  if( final ) final( stop.status );
125  prms.set_value( stop.status );
126  return;
127  }
128  catch( const RepeatOpeation &repeat )
129  {
130  Operation<true> *opr = currentOperation.release();
131  opr->handler.reset( myself.release() );
132  opr->Run( timeout, std::move( prms ), std::move( final ) );
133  return;
134  }
135  catch( ReplaceOperation &replace )
136  {
137  Operation<true> *opr = replace.opr.release();
138  opr->handler.reset( myself.release() );
139  opr->Run( timeout, std::move( prms ), std::move( final ) );
140  return;
141  }
142  catch( ReplacePipeline &replace )
143  {
144  Pipeline p = std::move( replace.pipeline );
145  Operation<true> *opr = p.operation.release();
146  opr->Run( timeout, std::move( prms ), std::move( final ) );
147  return;
148  }
149  catch( const IgnoreError &ignore )
150  {
151  st = XRootDStatus();
152  }
153  }
154  else
155  dealloc( status, response, hostList );
156 
157  if( !st.IsOK() || !nextOperation )
158  {
159  if( final ) final( st );
160  prms.set_value( st );
161  return;
162  }
163 
164  Operation<true> *opr = nextOperation.release();
165  opr->Run( timeout, std::move( prms ), std::move( final ) );
166  }
167 
168  //----------------------------------------------------------------------------
169  // OperationHandler::HandleResponseWithHosts
170  //----------------------------------------------------------------------------
172  AnyObject *response, HostList *hostList )
173  {
174  HandleResponseImpl( status, response, hostList );
175  }
176 
177  //----------------------------------------------------------------------------
178  // OperationHandler::HandleResponse
179  //----------------------------------------------------------------------------
181  AnyObject *response )
182  {
183  HandleResponseImpl( status, response );
184  }
185 
186  //----------------------------------------------------------------------------
187  // OperationHandler::AssignToWorkflow
188  //----------------------------------------------------------------------------
190  std::promise<XRootDStatus> p,
191  std::function<void(const XRootDStatus&)> f,
192  Operation<true> *opr )
193  {
194  timeout = t;
195  prms = std::move( p );
196  if( !final ) final = std::move( f );
197  else if( f )
198  {
199  auto f1 = std::move( final );
200  final = [f1, f]( const XRootDStatus &st ){ f1( st ); f( st ); };
201  }
202  currentOperation.reset( opr );
203  }
204 
205  //------------------------------------------------------------------------
206  // Assign the finalization routine
207  //------------------------------------------------------------------------
208  void PipelineHandler::Assign( std::function<void(const XRootDStatus&)> f )
209  {
210  final = std::move( f );
211  }
212 
213  //------------------------------------------------------------------------
214  // Called by a pipeline on the handler of its first operation before Run
215  //------------------------------------------------------------------------
217  {
218  // Move any final-function from the handler of the last operaiton to the
219  // first. It will be moved along the pipeline of handlers while the
220  // pipeline is run.
221 
222  if( final || !nextOperation ) return;
223  PipelineHandler *last = nextOperation->handler.get();
224  while( last )
225  {
226  Operation<true> *nextop = last->nextOperation.get();
227  if( !nextop ) break;
228  last = nextop->handler.get();
229  }
230  if( last )
231  {
232  // swap-then-move rather than only move as we need to guarantee that
233  // last->final is left without target.
234  std::function<void(const XRootDStatus&)> f;
235  f.swap( last->final );
236  Assign( std::move( f ) );
237  }
238  }
239 
240  //------------------------------------------------------------------------
241  // Stop the current pipeline
242  //------------------------------------------------------------------------
243  void Pipeline::Stop( const XRootDStatus &status )
244  {
245  throw StopPipeline( status );
246  }
247 
248  //------------------------------------------------------------------------
249  // Repeat current operation
250  //------------------------------------------------------------------------
252  {
253  throw RepeatOpeation();
254  }
255 
256  //------------------------------------------------------------------------
257  // Replace current operation
258  //------------------------------------------------------------------------
260  {
261  throw ReplaceOperation( std::move( opr ) );
262  }
263 
264  //------------------------------------------------------------------------
265  // Replace with pipeline
266  //------------------------------------------------------------------------
268  {
269  throw ReplacePipeline( std::move( p ) );
270  }
271 
272  //------------------------------------------------------------------------
273  // Ignore error and proceed with the pipeline
274  //------------------------------------------------------------------------
276  {
277  throw IgnoreError();
278  }
279 }
280 
std::unique_ptr< PipelineHandler > handler
Operation handler.
void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Callback function.
PipelineHandler()
Default Constructor.
void PreparePipelineStart()
Called by a pipeline on the handler of its first operation before Run.
void Assign(const Timeout &timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final, Operation< true > *opr)
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void AddOperation(Operation< true > *operation)
static void Repeat()
Repeat current operation.
static void Stop(const XRootDStatus &status=XrdCl::XRootDStatus())
static void Replace(Operation< false > &&opr)
Replace current operation.
static void Ignore()
Ignore error and proceed with the pipeline.
Handle an async response.
std::vector< HostInfo > HostList