XRootD
XrdClFileStateHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClStatus.hh"
29 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClConstants.hh"
35 #include "XrdCl/XrdClMonitor.hh"
36 #include "XrdCl/XrdClFileTimer.hh"
38 #include "XrdCl/XrdClJobManager.hh"
40 #include "XrdCl/XrdClAnyObject.hh"
41 #include "XrdCl/XrdClUtils.hh"
42 
43 #ifdef WITH_XRDEC
44 #include "XrdCl/XrdClEcHandler.hh"
45 #endif
46 
47 #include "XrdOuc/XrdOucCRC.hh"
49 #include "XrdOuc/XrdOucUtils.hh"
50 
52 #include "XrdSys/XrdSysPageSize.hh"
53 #include "XrdSys/XrdSysPthread.hh"
54 
55 #include <sstream>
56 #include <memory>
57 #include <numeric>
58 #include <sys/time.h>
59 #include <uuid/uuid.h>
60 #include <mutex>
61 
62 namespace
63 {
64  //----------------------------------------------------------------------------
65  // Helper callback for handling PgRead responses
66  //----------------------------------------------------------------------------
67  class PgReadHandler : public XrdCl::ResponseHandler
68  {
69  friend class PgReadRetryHandler;
70 
71  public:
72 
73  //------------------------------------------------------------------------
74  // Constructor
75  //------------------------------------------------------------------------
76  PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77  XrdCl::ResponseHandler *userHandler,
78  uint64_t orgOffset ) :
79  stateHandler( stateHandler ),
80  userHandler( userHandler ),
81  orgOffset( orgOffset ),
82  maincall( true ),
83  retrycnt( 0 ),
84  nbrepair( 0 )
85  {
86  }
87 
88  //------------------------------------------------------------------------
89  // Handle the response
90  //------------------------------------------------------------------------
92  XrdCl::AnyObject *response,
93  XrdCl::HostList *hostList )
94  {
95  using namespace XrdCl;
96 
97  std::unique_lock<std::mutex> lck( mtx );
98 
99  if( !maincall )
100  {
101  //--------------------------------------------------------------------
102  // We are serving PgRead retry request
103  //--------------------------------------------------------------------
104  --retrycnt;
105  if( !status->IsOK() )
106  st.reset( status );
107  else
108  {
109  delete status; // by convention other args are null (see PgReadRetryHandler)
110  ++nbrepair; // update number of repaired pages
111  }
112 
113  if( retrycnt == 0 )
114  {
115  //------------------------------------------------------------------
116  // All retries came back
117  //------------------------------------------------------------------
118  if( st->IsOK() )
119  {
120  PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
121  pginf.SetNbRepair( nbrepair );
122  userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
123  }
124  else
125  userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
126  lck.unlock();
127  delete this;
128  }
129 
130  return;
131  }
132 
133  //----------------------------------------------------------------------
134  // We are serving main PgRead request
135  //----------------------------------------------------------------------
136  if( !status->IsOK() )
137  {
138  //--------------------------------------------------------------------
139  // The main PgRead request has failed
140  //--------------------------------------------------------------------
141  userHandler->HandleResponseWithHosts( status, response, hostList );
142  lck.unlock();
143  delete this;
144  return;
145  }
146 
147  maincall = false;
148 
149  //----------------------------------------------------------------------
150  // Do the integrity check
151  //----------------------------------------------------------------------
152  PageInfo *pginf = 0;
153  response->Get( pginf );
154 
155  uint64_t pgoff = pginf->GetOffset();
156  uint32_t bytesRead = pginf->GetLength();
157  std::vector<uint32_t> &cksums = pginf->GetCksums();
158  char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
159  size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
160  uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
161  if( pgsize > bytesRead ) pgsize = bytesRead;
162 
163  for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
164  {
165  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
166  if( crcval != cksums[pgnb] )
167  {
168  Log *log = DefaultEnv::GetLog();
169  log->Info( FileMsg, "[%p@%s] Received corrupted page, will retry page #%zu.",
170  this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
171 
172  XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
173  if( !st.IsOK())
174  {
175  *status = st; // the reason for this failure
176  break;
177  }
178  ++retrycnt; // update the retry counter
179  }
180 
181  bytesRead -= pgsize;
182  buffer += pgsize;
183  pgoff += pgsize;
184  pgsize = XrdSys::PageSize;
185  if( pgsize > bytesRead ) pgsize = bytesRead;
186  }
187 
188 
189  if( retrycnt == 0 )
190  {
191  //--------------------------------------------------------------------
192  // All went well!
193  //--------------------------------------------------------------------
194  userHandler->HandleResponseWithHosts( status, response, hostList );
195  lck.unlock();
196  delete this;
197  return;
198  }
199 
200  //----------------------------------------------------------------------
201  // We have to wait for retries!
202  //----------------------------------------------------------------------
203  resp.reset( response );
204  hosts.reset( hostList );
205  st.reset( status );
206  }
207 
208  void UpdateCksum( size_t pgnb, uint32_t crcval )
209  {
210  if( resp )
211  {
212  XrdCl::PageInfo *pginf = 0;
213  resp->Get( pginf );
214  pginf->GetCksums()[pgnb] = crcval;
215  }
216  }
217 
218  private:
219 
220  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221  XrdCl::ResponseHandler *userHandler;
222  uint64_t orgOffset;
223 
224  std::unique_ptr<XrdCl::AnyObject> resp;
225  std::unique_ptr<XrdCl::HostList> hosts;
226  std::unique_ptr<XrdCl::XRootDStatus> st;
227 
228  std::mutex mtx;
229  bool maincall;
230  size_t retrycnt;
231  size_t nbrepair;
232 
233  };
234 
235  //----------------------------------------------------------------------------
236  // Helper callback for handling PgRead retries
237  //----------------------------------------------------------------------------
238  class PgReadRetryHandler : public XrdCl::ResponseHandler
239  {
240  public:
241 
242  PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
243  pgnb( pgnb )
244  {
245 
246  }
247 
248  //------------------------------------------------------------------------
249  // Handle the response
250  //------------------------------------------------------------------------
251  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252  XrdCl::AnyObject *response,
253  XrdCl::HostList *hostList )
254  {
255  using namespace XrdCl;
256 
257  if( !status->IsOK() )
258  {
259  Log *log = DefaultEnv::GetLog();
260  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
261  this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262  pgReadHandler->HandleResponseWithHosts( status, response, hostList );
263  delete this;
264  return;
265  }
266 
267  XrdCl::PageInfo *pginf = 0;
268  response->Get( pginf );
269  if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
270  {
271  Log *log = DefaultEnv::GetLog();
272  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
273  this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
274  // we retry a page at a time so the length cannot exceed 4KB
275  DeleteArgs( status, response, hostList );
276  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
277  delete this;
278  return;
279  }
280 
281  uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
282  if( crcval != pginf->GetCksums().front() )
283  {
284  Log *log = DefaultEnv::GetLog();
285  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
286  this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287  DeleteArgs( status, response, hostList );
288  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
289  delete this;
290  return;
291  }
292 
293  Log *log = DefaultEnv::GetLog();
294  log->Info( FileMsg, "[%p@%s] Successfully recovered page #%zu.",
295  this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
296 
297  DeleteArgs( 0, response, hostList );
298  pgReadHandler->UpdateCksum( pgnb, crcval );
299  pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
300  delete this;
301  }
302 
303  private:
304 
305  inline void DeleteArgs( XrdCl::XRootDStatus *status,
306  XrdCl::AnyObject *response,
307  XrdCl::HostList *hostList )
308  {
309  delete status;
310  delete response;
311  delete hostList;
312  }
313 
314  PgReadHandler *pgReadHandler;
315  size_t pgnb;
316  };
317 
318  //----------------------------------------------------------------------------
319  // Handle PgRead substitution with ordinary Read
320  //----------------------------------------------------------------------------
322  {
323  public:
324 
325  //------------------------------------------------------------------------
326  // Constructor
327  //------------------------------------------------------------------------
328  PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329  XrdCl::ResponseHandler *userHandler ) :
330  stateHandler( stateHandler ),
331  userHandler( userHandler )
332  {
333  }
334 
335  //------------------------------------------------------------------------
336  // Handle the response
337  //------------------------------------------------------------------------
338  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339  XrdCl::AnyObject *rdresp,
340  XrdCl::HostList *hostList )
341  {
342  if( !status->IsOK() )
343  {
344  userHandler->HandleResponseWithHosts( status, rdresp, hostList );
345  delete this;
346  return;
347  }
348 
349  using namespace XrdCl;
350 
351  ChunkInfo *chunk = 0;
352  rdresp->Get( chunk );
353 
354  std::vector<uint32_t> cksums;
355  if( stateHandler->pIsChannelEncrypted )
356  {
357  size_t nbpages = chunk->length / XrdSys::PageSize;
358  if( chunk->length % XrdSys::PageSize )
359  ++nbpages;
360  cksums.reserve( nbpages );
361 
362  size_t size = chunk->length;
363  char *buffer = reinterpret_cast<char*>( chunk->buffer );
364 
365  for( size_t pg = 0; pg < nbpages; ++pg )
366  {
367  size_t pgsize = XrdSys::PageSize;
368  if( pgsize > size ) pgsize = size;
369  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
370  cksums.push_back( crcval );
371  buffer += pgsize;
372  size -= pgsize;
373  }
374  }
375 
376  PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
377  chunk->buffer, std::move( cksums ) );
378  delete rdresp;
379  AnyObject *response = new AnyObject();
380  response->Set( pages );
381  userHandler->HandleResponseWithHosts( status, response, hostList );
382 
383  delete this;
384  }
385 
386  private:
387 
388  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
389  XrdCl::ResponseHandler *userHandler;
390  };
391 
392  //----------------------------------------------------------------------------
393  // Object that does things to the FileStateHandler when kXR_open returns
394  // and then calls the user handler
395  //----------------------------------------------------------------------------
396  class OpenHandler: public XrdCl::ResponseHandler
397  {
398  public:
399  //------------------------------------------------------------------------
400  // Constructor
401  //------------------------------------------------------------------------
402  OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
403  XrdCl::ResponseHandler *userHandler ):
404  pStateHandler( stateHandler ),
405  pUserHandler( userHandler )
406  {
407  }
408 
409  //------------------------------------------------------------------------
410  // Handle the response
411  //------------------------------------------------------------------------
412  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
413  XrdCl::AnyObject *response,
414  XrdCl::HostList *hostList )
415  {
416  using namespace XrdCl;
417 
418  //----------------------------------------------------------------------
419  // Extract the statistics info
420  //----------------------------------------------------------------------
421  OpenInfo *openInfo = 0;
422  if( status->IsOK() )
423  response->Get( openInfo );
424 #ifdef WITH_XRDEC
425  else
426  //--------------------------------------------------------------------
427  // Handle EC redirect
428  //--------------------------------------------------------------------
429  if( status->code == errRedirect )
430  {
431  std::string ecurl = status->GetErrorMessage();
432  EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
433  if( ecHandler )
434  {
435  pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
436  ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
437  return;
438  }
439  }
440 #endif
441  //----------------------------------------------------------------------
442  // Notify the state handler and the client and say bye bye
443  //----------------------------------------------------------------------
444  pStateHandler->OnOpen( status, openInfo, hostList );
445  delete response;
446  if( pUserHandler )
447  pUserHandler->HandleResponseWithHosts( status, 0, hostList );
448  else
449  {
450  delete status;
451  delete hostList;
452  }
453  delete this;
454  }
455 
456  private:
457  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
458  XrdCl::ResponseHandler *pUserHandler;
459  };
460 
461  //----------------------------------------------------------------------------
462  // Object that does things to the FileStateHandler when kXR_close returns
463  // and then calls the user handler
464  //----------------------------------------------------------------------------
465  class CloseHandler: public XrdCl::ResponseHandler
466  {
467  public:
468  //------------------------------------------------------------------------
469  // Constructor
470  //------------------------------------------------------------------------
471  CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
472  XrdCl::ResponseHandler *userHandler,
473  XrdCl::Message *message ):
474  pStateHandler( stateHandler ),
475  pUserHandler( userHandler ),
476  pMessage( message )
477  {
478  }
479 
480  //------------------------------------------------------------------------
482  //------------------------------------------------------------------------
483  virtual ~CloseHandler()
484  {
485  delete pMessage;
486  }
487 
488  //------------------------------------------------------------------------
489  // Handle the response
490  //------------------------------------------------------------------------
491  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
492  XrdCl::AnyObject *response,
493  XrdCl::HostList *hostList )
494  {
495  pStateHandler->OnClose( status );
496  if( pUserHandler )
497  pUserHandler->HandleResponseWithHosts( status, response, hostList );
498  else
499  {
500  delete response;
501  delete status;
502  delete hostList;
503  }
504 
505  delete this;
506  }
507 
508  private:
509  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
510  XrdCl::ResponseHandler *pUserHandler;
511  XrdCl::Message *pMessage;
512  };
513 
514  //----------------------------------------------------------------------------
515  // Stateful message handler
516  //----------------------------------------------------------------------------
517  class StatefulHandler: public XrdCl::ResponseHandler
518  {
519  public:
520  //------------------------------------------------------------------------
521  // Constructor
522  //------------------------------------------------------------------------
523  StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
524  XrdCl::ResponseHandler *userHandler,
525  XrdCl::Message *message,
526  const XrdCl::MessageSendParams &sendParams ):
527  pStateHandler( stateHandler ),
528  pUserHandler( userHandler ),
529  pMessage( message ),
530  pSendParams( sendParams )
531  {
532  }
533 
534  //------------------------------------------------------------------------
535  // Destructor
536  //------------------------------------------------------------------------
537  virtual ~StatefulHandler()
538  {
539  delete pMessage;
540  delete pSendParams.chunkList;
541  delete pSendParams.kbuff;
542  }
543 
544  //------------------------------------------------------------------------
545  // Handle the response
546  //------------------------------------------------------------------------
547  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
548  XrdCl::AnyObject *response,
549  XrdCl::HostList *hostList )
550  {
551  using namespace XrdCl;
552  std::unique_ptr<AnyObject> responsePtr( response );
553  pSendParams.hostList = hostList;
554 
555  //----------------------------------------------------------------------
556  // Houston we have a problem...
557  //----------------------------------------------------------------------
558  if( !status->IsOK() )
559  {
560  XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
561  return;
562  }
563 
564  //----------------------------------------------------------------------
565  // We're clear
566  //----------------------------------------------------------------------
567  responsePtr.release();
568  XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
569  if( pUserHandler )
570  pUserHandler->HandleResponseWithHosts( status, response, hostList );
571  else
572  {
573  delete status,
574  delete response;
575  delete hostList;
576  }
577  delete this;
578  }
579 
580  //------------------------------------------------------------------------
582  //------------------------------------------------------------------------
583  XrdCl::ResponseHandler *GetUserHandler()
584  {
585  return pUserHandler;
586  }
587 
588  private:
589  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
590  XrdCl::ResponseHandler *pUserHandler;
591  XrdCl::Message *pMessage;
592  XrdCl::MessageSendParams pSendParams;
593  };
594 
595  //----------------------------------------------------------------------------
596  // Release-buffer Handler
597  //----------------------------------------------------------------------------
598  class ReleaseBufferHandler: public XrdCl::ResponseHandler
599  {
600  public:
601 
602  //------------------------------------------------------------------------
603  // Constructor
604  //------------------------------------------------------------------------
605  ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
606  buffer( std::move( buffer ) ),
607  handler( handler )
608  {
609  }
610 
611  //------------------------------------------------------------------------
612  // Handle the response
613  //------------------------------------------------------------------------
614  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
615  XrdCl::AnyObject *response,
616  XrdCl::HostList *hostList )
617  {
618  if (handler)
619  handler->HandleResponseWithHosts( status, response, hostList );
620  }
621 
622  //------------------------------------------------------------------------
623  // Get the underlying buffer
624  //------------------------------------------------------------------------
625  XrdCl::Buffer& GetBuffer()
626  {
627  return buffer;
628  }
629 
630  private:
631  XrdCl::Buffer buffer;
632  XrdCl::ResponseHandler *handler;
633  };
634 }
635 
636 namespace XrdCl
637 {
638  //----------------------------------------------------------------------------
639  // Constructor
640  //----------------------------------------------------------------------------
641  FileStateHandler::FileStateHandler( FilePlugIn *& plugin ):
642  pFileState( Closed ),
643  pStatInfo( 0 ),
644  pFileUrl( 0 ),
645  pDataServer( 0 ),
646  pLoadBalancer( 0 ),
647  pStateRedirect( 0 ),
648  pWrtRecoveryRedir( 0 ),
649  pFileHandle( 0 ),
650  pOpenMode( 0 ),
651  pOpenFlags( 0 ),
652  pSessionId( 0 ),
653  pDoRecoverRead( true ),
654  pDoRecoverWrite( true ),
655  pFollowRedirects( true ),
656  pUseVirtRedirector( true ),
657  pIsChannelEncrypted( false ),
658  pAllowBundledClose( false ),
659  pPlugin( plugin )
660  {
661  pFileHandle = new uint8_t[4];
662  ResetMonitoringVars();
665  pLFileHandler = new LocalFileHandler();
666  }
667 
668  //------------------------------------------------------------------------
673  //------------------------------------------------------------------------
674  FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
675  pFileState( Closed ),
676  pStatInfo( 0 ),
677  pFileUrl( 0 ),
678  pDataServer( 0 ),
679  pLoadBalancer( 0 ),
680  pStateRedirect( 0 ),
681  pWrtRecoveryRedir( 0 ),
682  pFileHandle( 0 ),
683  pOpenMode( 0 ),
684  pOpenFlags( 0 ),
685  pSessionId( 0 ),
686  pDoRecoverRead( true ),
687  pDoRecoverWrite( true ),
688  pFollowRedirects( true ),
689  pUseVirtRedirector( useVirtRedirector ),
690  pAllowBundledClose( false ),
691  pPlugin( plugin )
692  {
693  pFileHandle = new uint8_t[4];
694  ResetMonitoringVars();
697  pLFileHandler = new LocalFileHandler();
698  }
699 
700  //----------------------------------------------------------------------------
701  // Destructor
702  //----------------------------------------------------------------------------
704  {
705  //--------------------------------------------------------------------------
706  // This, in principle, should never ever happen. Except for the case
707  // when we're interfaced with ROOT that may call this desctructor from
708  // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
709  // has been finalized by the linker. So, if we don't have the log object
710  // at this point we just give up the hope.
711  //--------------------------------------------------------------------------
712  if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
713  DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
714 
717 
720 
721  if( pFileState != Closed && DefaultEnv::GetLog() )
722  {
723  XRootDStatus st;
724  MonitorClose( &st );
725  ResetMonitoringVars();
726  }
727 
728  // check if the logger is still there, this is only for root, as root might
729  // have unload us already so in this case we don't want to do anything
730  if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
731  {
733  registry.Release( *pFileUrl );
734  }
735 
736  delete pStatInfo;
737  delete pFileUrl;
738  delete pDataServer;
739  delete pLoadBalancer;
740  delete [] pFileHandle;
741  delete pLFileHandler;
742  }
743 
744  //----------------------------------------------------------------------------
745  // Open the file pointed to by the given URL
746  //----------------------------------------------------------------------------
747  XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
748  const std::string &url,
749  uint16_t flags,
750  uint16_t mode,
751  ResponseHandler *handler,
752  uint16_t timeout )
753  {
754  XrdSysMutexHelper scopedLock( self->pMutex );
755 
756  //--------------------------------------------------------------------------
757  // Check if we can proceed
758  //--------------------------------------------------------------------------
759  if( self->pFileState == Error )
760  return self->pStatus;
761 
762  if( self->pFileState == OpenInProgress )
764 
765  if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
766  self->pFileState == Recovering )
767  return XRootDStatus( stError, errInvalidOp );
768 
769  self->pFileState = OpenInProgress;
770 
771  //--------------------------------------------------------------------------
772  // Check if the parameters are valid
773  //--------------------------------------------------------------------------
774  Log *log = DefaultEnv::GetLog();
775 
776  if( self->pFileUrl )
777  {
778  if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
779  {
781  registry.Release( *self->pFileUrl );
782  }
783  delete self->pFileUrl;
784  self->pFileUrl = 0;
785  }
786 
787  self->pFileUrl = new URL( url );
788 
789  //--------------------------------------------------------------------------
790  // Add unique uuid to each open request so replays due to error/timeout
791  // recovery can be correctly handled.
792  //--------------------------------------------------------------------------
793  URL::ParamsMap cgi = self->pFileUrl->GetParams();
794  uuid_t uuid;
795  char requuid[37]= {0};
796  uuid_generate( uuid );
797  uuid_unparse( uuid, requuid );
798  cgi["xrdcl.requuid"] = requuid;
799  self->pFileUrl->SetParams( cgi );
800 
801  if( !self->pFileUrl->IsValid() )
802  {
803  log->Error( FileMsg, "[%p@%s] Trying to open invalid url: %s",
804  self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
805  self->pStatus = XRootDStatus( stError, errInvalidArgs );
806  self->pFileState = Closed;
807  return self->pStatus;
808  }
809 
810  //--------------------------------------------------------------------------
811  // Check if the recovery procedures should be enabled
812  //--------------------------------------------------------------------------
813  const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
814  URL::ParamsMap::const_iterator it;
815  it = urlParams.find( "xrdcl.recover-reads" );
816  if( (it != urlParams.end() && it->second == "false") ||
817  !self->pDoRecoverRead )
818  {
819  self->pDoRecoverRead = false;
820  log->Debug( FileMsg, "[%p@%s] Read recovery procedures are disabled",
821  self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
822  }
823 
824  it = urlParams.find( "xrdcl.recover-writes" );
825  if( (it != urlParams.end() && it->second == "false") ||
826  !self->pDoRecoverWrite )
827  {
828  self->pDoRecoverWrite = false;
829  log->Debug( FileMsg, "[%p@%s] Write recovery procedures are disabled",
830  self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
831  }
832 
833  //--------------------------------------------------------------------------
834  // Open the file
835  //--------------------------------------------------------------------------
836  log->Debug( FileMsg, "[%p@%s] Sending an open command", self.get(),
837  self->pFileUrl->GetObfuscatedURL().c_str() );
838 
839  self->pOpenMode = mode;
840  self->pOpenFlags = flags;
841  OpenHandler *openHandler = new OpenHandler( self, handler );
842 
843  Message *msg;
844  ClientOpenRequest *req;
845  std::string path = self->pFileUrl->GetPathWithFilteredParams();
846  MessageUtils::CreateRequest( msg, req, path.length() );
847 
848  req->requestid = kXR_open;
849  req->mode = mode;
850  req->options = flags | kXR_async | kXR_retstat;
851  req->dlen = path.length();
852  msg->Append( path.c_str(), path.length(), 24 );
853 
855  MessageSendParams params; params.timeout = timeout;
856  params.followRedirects = self->pFollowRedirects;
858 
859  XRootDStatus st = self->IssueRequest( *self->pFileUrl, msg, openHandler, params );
860 
861  if( !st.IsOK() )
862  {
863  delete openHandler;
864  self->pStatus = st;
865  self->pFileState = Closed;
866  return st;
867  }
868  return st;
869  }
870 
871  //----------------------------------------------------------------------------
872  // Close the file object
873  //----------------------------------------------------------------------------
874  XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
875  ResponseHandler *handler,
876  uint16_t timeout )
877  {
878  XrdSysMutexHelper scopedLock( self->pMutex );
879 
880  //--------------------------------------------------------------------------
881  // Check if we can proceed
882  //--------------------------------------------------------------------------
883  if( self->pFileState == Error )
884  return self->pStatus;
885 
886  if( self->pFileState == CloseInProgress )
888 
889  if( self->pFileState == Closed )
890  return XRootDStatus( stOK, suAlreadyDone );
891 
892  if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
893  return XRootDStatus( stError, errInvalidOp );
894 
895  if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
896  return XRootDStatus( stError, errInvalidOp );
897 
898  self->pFileState = CloseInProgress;
899 
900  Log *log = DefaultEnv::GetLog();
901  log->Debug( FileMsg, "[%p@%s] Sending a close command for handle %#x to %s",
902  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
903  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
904 
905  //--------------------------------------------------------------------------
906  // Close the file
907  //--------------------------------------------------------------------------
908  Message *msg;
909  ClientCloseRequest *req;
910  MessageUtils::CreateRequest( msg, req );
911 
912  req->requestid = kXR_close;
913  memcpy( req->fhandle, self->pFileHandle, 4 );
914 
916  msg->SetSessionId( self->pSessionId );
917  CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
918  MessageSendParams params;
919  params.timeout = timeout;
920  params.followRedirects = false;
921  params.stateful = true;
923 
924  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
925 
926  if( !st.IsOK() )
927  {
928  // an invalid-session error means the connection to the server has been
929  // closed, which in turn means that the server closed the file already
930  if( st.code == errInvalidSession || st.code == errSocketDisconnected ||
932  st.code == errPollerError || st.code == errSocketError )
933  {
934  self->pFileState = Closed;
935  ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
936  nullptr, nullptr );
938  return XRootDStatus();
939  }
940 
941  delete closeHandler;
942  self->pStatus = st;
943  self->pFileState = Error;
944  return st;
945  }
946  return st;
947  }
948 
949  //----------------------------------------------------------------------------
950  // Stat the file
951  //----------------------------------------------------------------------------
952  XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
953  bool force,
954  ResponseHandler *handler,
955  uint16_t timeout )
956  {
957  XrdSysMutexHelper scopedLock( self->pMutex );
958 
959  if( self->pFileState == Error ) return self->pStatus;
960 
961  if( self->pFileState != Opened && self->pFileState != Recovering )
962  return XRootDStatus( stError, errInvalidOp );
963 
964  //--------------------------------------------------------------------------
965  // Return the cached info
966  //--------------------------------------------------------------------------
967  if( !force )
968  {
969  AnyObject *obj = new AnyObject();
970  obj->Set( new StatInfo( *self->pStatInfo ) );
971  if (handler)
972  handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
973  return XRootDStatus();
974  }
975 
976  Log *log = DefaultEnv::GetLog();
977  log->Debug( FileMsg, "[%p@%s] Sending a stat command for handle %#x to %s",
978  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
979  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
980 
981  //--------------------------------------------------------------------------
982  // Issue a new stat request
983  // stating a file handle doesn't work (fixed in 3.2.0) so we need to
984  // stat the pat
985  //--------------------------------------------------------------------------
986  Message *msg;
987  ClientStatRequest *req;
988  std::string path = self->pFileUrl->GetPath();
989  MessageUtils::CreateRequest( msg, req );
990 
991  req->requestid = kXR_stat;
992  memcpy( req->fhandle, self->pFileHandle, 4 );
993 
994  MessageSendParams params;
995  params.timeout = timeout;
996  params.followRedirects = false;
997  params.stateful = true;
999 
1001  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1002 
1003  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1004  }
1005 
1006  //----------------------------------------------------------------------------
1007  // Read a data chunk at a given offset - sync
1008  //----------------------------------------------------------------------------
1009  XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1010  uint64_t offset,
1011  uint32_t size,
1012  void *buffer,
1013  ResponseHandler *handler,
1014  uint16_t timeout )
1015  {
1016  XrdSysMutexHelper scopedLock( self->pMutex );
1017 
1018  if( self->pFileState == Error ) return self->pStatus;
1019 
1020  if( self->pFileState != Opened && self->pFileState != Recovering )
1021  return XRootDStatus( stError, errInvalidOp );
1022 
1023  Log *log = DefaultEnv::GetLog();
1024  log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1025  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1026  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1027 
1028  Message *msg;
1029  ClientReadRequest *req;
1030  MessageUtils::CreateRequest( msg, req );
1031 
1032  req->requestid = kXR_read;
1033  req->offset = offset;
1034  req->rlen = size;
1035  memcpy( req->fhandle, self->pFileHandle, 4 );
1036 
1037  ChunkList *list = new ChunkList();
1038  list->push_back( ChunkInfo( offset, size, buffer ) );
1039 
1041  MessageSendParams params;
1042  params.timeout = timeout;
1043  params.followRedirects = false;
1044  params.stateful = true;
1045  params.chunkList = list;
1047  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1048 
1049  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1050  }
1051 
1052  //------------------------------------------------------------------------
1053  // Read data pages at a given offset
1054  //------------------------------------------------------------------------
1055  XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1056  uint64_t offset,
1057  uint32_t size,
1058  void *buffer,
1059  ResponseHandler *handler,
1060  uint16_t timeout )
1061  {
1062  int issupported = true;
1063  AnyObject obj;
1065  int protver = 0;
1066  XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1067  if( st1.IsOK() && st2.IsOK() )
1068  {
1069  int *ptr = 0;
1070  obj.Get( ptr );
1071  issupported = ( *ptr & kXR_suppgrw ) && ( protver >= kXR_PROTPGRWVERSION );
1072  delete ptr;
1073  }
1074  else
1075  issupported = false;
1076 
1077  if( !issupported )
1078  {
1079  DefaultEnv::GetLog()->Debug( FileMsg, "[%p@%s] PgRead not supported; substituting with Read.",
1080  self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1081  ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1082  auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1083  if( !st.IsOK() ) delete substitHandler;
1084  return st;
1085  }
1086 
1087  ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1088  auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1089  if( !st.IsOK() ) delete pgHandler;
1090  return st;
1091  }
1092 
1093  XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1094  uint64_t offset,
1095  uint32_t size,
1096  size_t pgnb,
1097  void *buffer,
1098  PgReadHandler *handler,
1099  uint16_t timeout )
1100  {
1101  if( size > (uint32_t)XrdSys::PageSize )
1102  return XRootDStatus( stError, errInvalidArgs, EINVAL,
1103  "PgRead retry size exceeded 4KB." );
1104 
1105  ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1106  XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1107  if( !st.IsOK() ) delete retryHandler;
1108  return st;
1109  }
1110 
1111  XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1112  uint64_t offset,
1113  uint32_t size,
1114  void *buffer,
1115  uint16_t flags,
1116  ResponseHandler *handler,
1117  uint16_t timeout )
1118  {
1119  XrdSysMutexHelper scopedLock( self->pMutex );
1120 
1121  if( self->pFileState == Error ) return self->pStatus;
1122 
1123  if( self->pFileState != Opened && self->pFileState != Recovering )
1124  return XRootDStatus( stError, errInvalidOp );
1125 
1126  Log *log = DefaultEnv::GetLog();
1127  log->Debug( FileMsg, "[%p@%s] Sending a pgread command for handle %#x to %s",
1128  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1129  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1130 
1131  Message *msg;
1132  ClientPgReadRequest *req;
1133  MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1134 
1135  req->requestid = kXR_pgread;
1136  req->offset = offset;
1137  req->rlen = size;
1138  memcpy( req->fhandle, self->pFileHandle, 4 );
1139 
1140  //--------------------------------------------------------------------------
1141  // Now adjust the message size so it can hold PgRead arguments
1142  //--------------------------------------------------------------------------
1143  req->dlen = sizeof( ClientPgReadReqArgs );
1144  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1145  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1146  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1147  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1148  args->reqflags = flags;
1149 
1150  ChunkList *list = new ChunkList();
1151  list->push_back( ChunkInfo( offset, size, buffer ) );
1152 
1154  MessageSendParams params;
1155  params.timeout = timeout;
1156  params.followRedirects = false;
1157  params.stateful = true;
1158  params.chunkList = list;
1160  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1161 
1162  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1163  }
1164 
1165  //----------------------------------------------------------------------------
1166  // Write a data chunk at a given offset - async
1167  //----------------------------------------------------------------------------
1168  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1169  uint64_t offset,
1170  uint32_t size,
1171  const void *buffer,
1172  ResponseHandler *handler,
1173  uint16_t timeout )
1174  {
1175  XrdSysMutexHelper scopedLock( self->pMutex );
1176 
1177  if( self->pFileState == Error ) return self->pStatus;
1178 
1179  if( self->pFileState != Opened && self->pFileState != Recovering )
1180  return XRootDStatus( stError, errInvalidOp );
1181 
1182  Log *log = DefaultEnv::GetLog();
1183  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1184  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1185  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1186 
1187  Message *msg;
1188  ClientWriteRequest *req;
1189  MessageUtils::CreateRequest( msg, req );
1190 
1191  req->requestid = kXR_write;
1192  req->offset = offset;
1193  req->dlen = size;
1194  memcpy( req->fhandle, self->pFileHandle, 4 );
1195 
1196  ChunkList *list = new ChunkList();
1197  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1198 
1199  MessageSendParams params;
1200  params.timeout = timeout;
1201  params.followRedirects = false;
1202  params.stateful = true;
1203  params.chunkList = list;
1204 
1206 
1208  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1209 
1210  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1211  }
1212 
1213  //----------------------------------------------------------------------------
1214  // Write a data chunk at a given offset
1215  //----------------------------------------------------------------------------
1216  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1217  uint64_t offset,
1218  Buffer &&buffer,
1219  ResponseHandler *handler,
1220  uint16_t timeout )
1221  {
1222  //--------------------------------------------------------------------------
1223  // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1224  // so fall back to normal write
1225  //--------------------------------------------------------------------------
1226  if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1227  {
1228  Log *log = DefaultEnv::GetLog();
1229  log->Info( FileMsg, "[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1230  "cannot convert it to kernel space buffer.", self.get(),
1231  self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1232 
1233  void *buff = buffer.GetBuffer();
1234  uint32_t size = buffer.GetSize();
1235  ReleaseBufferHandler *wrtHandler =
1236  new ReleaseBufferHandler( std::move( buffer ), handler );
1237  XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1238  if( !st.IsOK() )
1239  {
1240  buffer = std::move( wrtHandler->GetBuffer() );
1241  delete wrtHandler;
1242  }
1243  return st;
1244  }
1245 
1246  //--------------------------------------------------------------------------
1247  // Transfer the data from user space to kernel space
1248  //--------------------------------------------------------------------------
1249  uint32_t length = buffer.GetSize();
1250  char *ubuff = buffer.Release();
1251 
1252  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1253  ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1254  if( ret < 0 )
1255  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1256 
1257  //--------------------------------------------------------------------------
1258  // Now create a write request and enqueue it
1259  //--------------------------------------------------------------------------
1260  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1261  }
1262 
1263  //----------------------------------------------------------------------------
1264  // Write a data from a given file descriptor at a given offset - async
1265  //----------------------------------------------------------------------------
1266  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1267  uint64_t offset,
1268  uint32_t size,
1269  Optional<uint64_t> fdoff,
1270  int fd,
1271  ResponseHandler *handler,
1272  uint16_t timeout )
1273  {
1274  //--------------------------------------------------------------------------
1275  // Read the data from the file descriptor into a kernel buffer
1276  //--------------------------------------------------------------------------
1277  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1278  ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1279  XrdSys::Read( fd, *kbuff, size );
1280  if( ret < 0 )
1281  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1282 
1283  //--------------------------------------------------------------------------
1284  // Now create a write request and enqueue it
1285  //--------------------------------------------------------------------------
1286  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1287  }
1288 
1289  //----------------------------------------------------------------------------
1290  // Write number of pages at a given offset - async
1291  //----------------------------------------------------------------------------
1292  XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1293  uint64_t offset,
1294  uint32_t size,
1295  const void *buffer,
1296  std::vector<uint32_t> &cksums,
1297  ResponseHandler *handler,
1298  uint16_t timeout )
1299  {
1300  //--------------------------------------------------------------------------
1301  // Resolve timeout value
1302  //--------------------------------------------------------------------------
1303  if( timeout == 0 )
1304  {
1305  int val = DefaultRequestTimeout;
1306  XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1307  timeout = val;
1308  }
1309 
1310  //--------------------------------------------------------------------------
1311  // Validate the digest vector size
1312  //--------------------------------------------------------------------------
1313  if( cksums.empty() )
1314  {
1315  const char *data = static_cast<const char*>( buffer );
1316  XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1317  }
1318  else
1319  {
1320  size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1321  if( crc32cCnt != cksums.size() )
1322  return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1323  }
1324 
1325  //--------------------------------------------------------------------------
1326  // Create a context for PgWrite operation
1327  //--------------------------------------------------------------------------
1328  struct pgwrt_t
1329  {
1330  pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1331  {
1332  }
1333 
1334  ~pgwrt_t()
1335  {
1336  if( handler )
1337  {
1338  // if all retries were successful no error status was set
1339  if( !status ) status = new XRootDStatus();
1340  handler->HandleResponse( status, nullptr );
1341  }
1342  }
1343 
1344  static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1345  {
1346  if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1347  return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1348  }
1349 
1350  inline void SetStatus( XRootDStatus* s )
1351  {
1352  if( !status ) status = s;
1353  else delete s;
1354  }
1355 
1356  ResponseHandler *handler;
1357  XRootDStatus *status;
1358  };
1359  auto pgwrt = std::make_shared<pgwrt_t>( handler );
1360 
1361  int fLen, lLen;
1362  XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1363  uint32_t fstpglen = fLen;
1364 
1365  time_t start = ::time( nullptr );
1366  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1367  {
1368  std::unique_ptr<AnyObject> scoped( r );
1369  // if the request failed simply pass the status to the
1370  // user handler
1371  if( !s->IsOK() )
1372  {
1373  pgwrt->SetStatus( s );
1374  return; // pgwrt destructor will call the handler
1375  }
1376  // also if the request was sucessful and there were no
1377  // corrupted pages pass the status to the user handler
1378  RetryInfo *inf = nullptr;
1379  r->Get( inf );
1380  if( !inf->NeedRetry() )
1381  {
1382  pgwrt->SetStatus( s );
1383  return; // pgwrt destructor will call the handler
1384  }
1385  delete s;
1386  // first adjust the timeout value
1387  uint16_t elapsed = ::time( nullptr ) - start;
1388  if( elapsed >= timeout )
1389  {
1390  pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1391  return; // pgwrt destructor will call the handler
1392  }
1393  else timeout -= elapsed;
1394  // retransmit the corrupted pages
1395  for( size_t i = 0; i < inf->Size(); ++i )
1396  {
1397  auto tpl = inf->At( i );
1398  uint64_t pgoff = std::get<0>( tpl );
1399  uint32_t pglen = std::get<1>( tpl );
1400  const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1401  uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1402  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1403  {
1404  std::unique_ptr<AnyObject> scoped( r );
1405  // if we failed simply set the status
1406  if( !s->IsOK() )
1407  {
1408  pgwrt->SetStatus( s );
1409  return; // the destructor will call the handler
1410  }
1411  delete s;
1412  // otherwise check if the data were not corrupted again
1413  RetryInfo *inf = nullptr;
1414  r->Get( inf );
1415  if( inf->NeedRetry() ) // so we failed in the end
1416  {
1417  DefaultEnv::GetLog()->Warning( FileMsg, "[%p@%s] Failed retransmitting corrupted "
1418  "page: pgoff=%llu, pglen=%u, pgdigest=%u", self.get(),
1419  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1420  pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1421  "Failed to retransmit corrupted page" ) );
1422  }
1423  else
1424  DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Succesfuly retransmitted corrupted "
1425  "page: pgoff=%llu, pglen=%u, pgdigest=%u", self.get(),
1426  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1427  } );
1428  auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1429  if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1430  DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Retransmitting corrupted page: "
1431  "pgoff=%llu, pglen=%u, pgdigest=%u", self.get(),
1432  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1433  }
1434  } );
1435 
1436  auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1437  if( !st.IsOK() )
1438  {
1439  pgwrt->handler = nullptr;
1440  delete h;
1441  }
1442  return st;
1443  }
1444 
1445  //------------------------------------------------------------------------
1446  // Write number of pages at a given offset - async
1447  //------------------------------------------------------------------------
1448  XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1449  uint64_t offset,
1450  uint32_t size,
1451  const void *buffer,
1452  uint32_t digest,
1453  ResponseHandler *handler,
1454  uint16_t timeout )
1455  {
1456  std::vector<uint32_t> cksums{ digest };
1457  return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1458  }
1459 
1460  //------------------------------------------------------------------------
1461  // Write number of pages at a given offset - async
1462  //------------------------------------------------------------------------
1463  XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1464  uint64_t offset,
1465  uint32_t size,
1466  const void *buffer,
1467  std::vector<uint32_t> &cksums,
1468  kXR_char flags,
1469  ResponseHandler *handler,
1470  uint16_t timeout )
1471  {
1472  XrdSysMutexHelper scopedLock( self->pMutex );
1473 
1474  if( self->pFileState == Error ) return self->pStatus;
1475 
1476  if( self->pFileState != Opened && self->pFileState != Recovering )
1477  return XRootDStatus( stError, errInvalidOp );
1478 
1479  Log *log = DefaultEnv::GetLog();
1480  log->Debug( FileMsg, "[%p@%s] Sending a pgwrite command for handle %#x to %s",
1481  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1482  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1483 
1484  //--------------------------------------------------------------------------
1485  // Create the message
1486  //--------------------------------------------------------------------------
1487  Message *msg;
1488  ClientPgWriteRequest *req;
1489  MessageUtils::CreateRequest( msg, req );
1490 
1491  req->requestid = kXR_pgwrite;
1492  req->offset = offset;
1493  req->dlen = size + cksums.size() * sizeof( uint32_t );
1494  req->reqflags = flags;
1495  memcpy( req->fhandle, self->pFileHandle, 4 );
1496 
1497  ChunkList *list = new ChunkList();
1498  list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1499 
1500  MessageSendParams params;
1501  params.timeout = timeout;
1502  params.followRedirects = false;
1503  params.stateful = true;
1504  params.chunkList = list;
1505  params.crc32cDigests.swap( cksums );
1506 
1508 
1510  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1511 
1512  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1513  }
1514 
1515  //----------------------------------------------------------------------------
1516  // Commit all pending disk writes - async
1517  //----------------------------------------------------------------------------
1518  XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1519  ResponseHandler *handler,
1520  uint16_t timeout )
1521  {
1522  XrdSysMutexHelper scopedLock( self->pMutex );
1523 
1524  if( self->pFileState == Error ) return self->pStatus;
1525 
1526  if( self->pFileState != Opened && self->pFileState != Recovering )
1527  return XRootDStatus( stError, errInvalidOp );
1528 
1529  Log *log = DefaultEnv::GetLog();
1530  log->Debug( FileMsg, "[%p@%s] Sending a sync command for handle %#x to %s",
1531  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1532  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1533 
1534  Message *msg;
1535  ClientSyncRequest *req;
1536  MessageUtils::CreateRequest( msg, req );
1537 
1538  req->requestid = kXR_sync;
1539  memcpy( req->fhandle, self->pFileHandle, 4 );
1540 
1541  MessageSendParams params;
1542  params.timeout = timeout;
1543  params.followRedirects = false;
1544  params.stateful = true;
1546 
1548  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1549 
1550  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1551  }
1552 
1553  //----------------------------------------------------------------------------
1554  // Truncate the file to a particular size - async
1555  //----------------------------------------------------------------------------
1556  XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1557  uint64_t size,
1558  ResponseHandler *handler,
1559  uint16_t timeout )
1560  {
1561  XrdSysMutexHelper scopedLock( self->pMutex );
1562 
1563  if( self->pFileState == Error ) return self->pStatus;
1564 
1565  if( self->pFileState != Opened && self->pFileState != Recovering )
1566  return XRootDStatus( stError, errInvalidOp );
1567 
1568  Log *log = DefaultEnv::GetLog();
1569  log->Debug( FileMsg, "[%p@%s] Sending a truncate command for handle %#x to %s",
1570  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1571  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1572 
1573  Message *msg;
1574  ClientTruncateRequest *req;
1575  MessageUtils::CreateRequest( msg, req );
1576 
1577  req->requestid = kXR_truncate;
1578  memcpy( req->fhandle, self->pFileHandle, 4 );
1579  req->offset = size;
1580 
1581  MessageSendParams params;
1582  params.timeout = timeout;
1583  params.followRedirects = false;
1584  params.stateful = true;
1586 
1588  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1589 
1590  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1591  }
1592 
1593  //----------------------------------------------------------------------------
1594  // Read scattered data chunks in one operation - async
1595  //----------------------------------------------------------------------------
1596  XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1597  const ChunkList &chunks,
1598  void *buffer,
1599  ResponseHandler *handler,
1600  uint16_t timeout )
1601  {
1602  //--------------------------------------------------------------------------
1603  // Sanity check
1604  //--------------------------------------------------------------------------
1605  XrdSysMutexHelper scopedLock( self->pMutex );
1606 
1607  if( self->pFileState == Error ) return self->pStatus;
1608 
1609  if( self->pFileState != Opened && self->pFileState != Recovering )
1610  return XRootDStatus( stError, errInvalidOp );
1611 
1612  Log *log = DefaultEnv::GetLog();
1613  log->Debug( FileMsg, "[%p@%s] Sending a vector read command for handle %#x to %s",
1614  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1615  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1616 
1617  //--------------------------------------------------------------------------
1618  // Build the message
1619  //--------------------------------------------------------------------------
1620  Message *msg;
1621  ClientReadVRequest *req;
1622  MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1623 
1624  req->requestid = kXR_readv;
1625  req->dlen = sizeof(readahead_list)*chunks.size();
1626 
1627  ChunkList *list = new ChunkList();
1628  char *cursor = (char*)buffer;
1629 
1630  //--------------------------------------------------------------------------
1631  // Copy the chunk info
1632  //--------------------------------------------------------------------------
1633  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1634  for( size_t i = 0; i < chunks.size(); ++i )
1635  {
1636  dataChunk[i].rlen = chunks[i].length;
1637  dataChunk[i].offset = chunks[i].offset;
1638  memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1639 
1640  void *chunkBuffer;
1641  if( cursor )
1642  {
1643  chunkBuffer = cursor;
1644  cursor += chunks[i].length;
1645  }
1646  else
1647  chunkBuffer = chunks[i].buffer;
1648 
1649  list->push_back( ChunkInfo( chunks[i].offset,
1650  chunks[i].length,
1651  chunkBuffer ) );
1652  }
1653 
1654  //--------------------------------------------------------------------------
1655  // Send the message
1656  //--------------------------------------------------------------------------
1657  MessageSendParams params;
1658  params.timeout = timeout;
1659  params.followRedirects = false;
1660  params.stateful = true;
1661  params.chunkList = list;
1663 
1665  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1666 
1667  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1668  }
1669 
1670  //------------------------------------------------------------------------
1671  // Write scattered data chunks in one operation - async
1672  //------------------------------------------------------------------------
1673  XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1674  const ChunkList &chunks,
1675  ResponseHandler *handler,
1676  uint16_t timeout )
1677  {
1678  //--------------------------------------------------------------------------
1679  // Sanity check
1680  //--------------------------------------------------------------------------
1681  XrdSysMutexHelper scopedLock( self->pMutex );
1682 
1683  if( self->pFileState == Error ) return self->pStatus;
1684 
1685  if( self->pFileState != Opened && self->pFileState != Recovering )
1686  return XRootDStatus( stError, errInvalidOp );
1687 
1688  Log *log = DefaultEnv::GetLog();
1689  log->Debug( FileMsg, "[%p@%s] Sending a vector write command for handle %#x to %s",
1690  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1691  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1692 
1693  //--------------------------------------------------------------------------
1694  // Determine the size of the payload
1695  //--------------------------------------------------------------------------
1696 
1697  // the size of write vector
1698  uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1699 
1700  //--------------------------------------------------------------------------
1701  // Build the message
1702  //--------------------------------------------------------------------------
1703  Message *msg;
1704  ClientWriteVRequest *req;
1705  MessageUtils::CreateRequest( msg, req, payloadSize );
1706 
1707  req->requestid = kXR_writev;
1708  req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1709 
1710  ChunkList *list = new ChunkList();
1711 
1712  //--------------------------------------------------------------------------
1713  // Copy the chunk info
1714  //--------------------------------------------------------------------------
1715  XrdProto::write_list *writeList =
1716  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1717 
1718 
1719 
1720  for( size_t i = 0; i < chunks.size(); ++i )
1721  {
1722  writeList[i].wlen = chunks[i].length;
1723  writeList[i].offset = chunks[i].offset;
1724  memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1725 
1726  list->push_back( ChunkInfo( chunks[i].offset,
1727  chunks[i].length,
1728  chunks[i].buffer ) );
1729  }
1730 
1731  //--------------------------------------------------------------------------
1732  // Send the message
1733  //--------------------------------------------------------------------------
1734  MessageSendParams params;
1735  params.timeout = timeout;
1736  params.followRedirects = false;
1737  params.stateful = true;
1738  params.chunkList = list;
1740 
1742  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1743 
1744  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1745  }
1746 
1747  //------------------------------------------------------------------------
1748  // Write scattered buffers in one operation - async
1749  //------------------------------------------------------------------------
1750  XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1751  uint64_t offset,
1752  const struct iovec *iov,
1753  int iovcnt,
1754  ResponseHandler *handler,
1755  uint16_t timeout )
1756  {
1757  XrdSysMutexHelper scopedLock( self->pMutex );
1758 
1759  if( self->pFileState == Error ) return self->pStatus;
1760 
1761  if( self->pFileState != Opened && self->pFileState != Recovering )
1762  return XRootDStatus( stError, errInvalidOp );
1763 
1764  Log *log = DefaultEnv::GetLog();
1765  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1766  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1767  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1768 
1769  Message *msg;
1770  ClientWriteRequest *req;
1771  MessageUtils::CreateRequest( msg, req );
1772 
1773  ChunkList *list = new ChunkList();
1774 
1775  uint32_t size = 0;
1776  for( int i = 0; i < iovcnt; ++i )
1777  {
1778  if( iov[i].iov_len == 0 ) continue;
1779  size += iov[i].iov_len;
1780  list->push_back( ChunkInfo( 0, iov[i].iov_len,
1781  (char*)iov[i].iov_base ) );
1782  }
1783 
1784  req->requestid = kXR_write;
1785  req->offset = offset;
1786  req->dlen = size;
1787  memcpy( req->fhandle, self->pFileHandle, 4 );
1788 
1789  MessageSendParams params;
1790  params.timeout = timeout;
1791  params.followRedirects = false;
1792  params.stateful = true;
1793  params.chunkList = list;
1794 
1796 
1798  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1799 
1800  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1801  }
1802 
1803  //------------------------------------------------------------------------
1804  // Read data into scattered buffers in one operation - async
1805  //------------------------------------------------------------------------
1806  XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1807  uint64_t offset,
1808  struct iovec *iov,
1809  int iovcnt,
1810  ResponseHandler *handler,
1811  uint16_t timeout )
1812  {
1813  XrdSysMutexHelper scopedLock( self->pMutex );
1814 
1815  if( self->pFileState == Error ) return self->pStatus;
1816 
1817  if( self->pFileState != Opened && self->pFileState != Recovering )
1818  return XRootDStatus( stError, errInvalidOp );
1819 
1820  Log *log = DefaultEnv::GetLog();
1821  log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1822  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1823  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1824 
1825  Message *msg;
1826  ClientReadRequest *req;
1827  MessageUtils::CreateRequest( msg, req );
1828 
1829  // calculate the total read size
1830  size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1831  {
1832  return acc + rhs.iov_len;
1833  } );
1834  req->requestid = kXR_read;
1835  req->offset = offset;
1836  req->rlen = size;
1837  msg->SetVirtReqID( kXR_virtReadv );
1838  memcpy( req->fhandle, self->pFileHandle, 4 );
1839 
1840  ChunkList *list = new ChunkList();
1841  list->reserve( iovcnt );
1842  uint64_t choff = offset;
1843  for( int i = 0; i < iovcnt; ++i )
1844  {
1845  list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1846  choff += iov[i].iov_len;
1847  }
1848 
1850  MessageSendParams params;
1851  params.timeout = timeout;
1852  params.followRedirects = false;
1853  params.stateful = true;
1854  params.chunkList = list;
1856  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1857 
1858  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1859  }
1860 
1861  //----------------------------------------------------------------------------
1862  // Performs a custom operation on an open file, server implementation
1863  // dependent - async
1864  //----------------------------------------------------------------------------
1865  XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1866  const Buffer &arg,
1867  ResponseHandler *handler,
1868  uint16_t timeout )
1869  {
1870  XrdSysMutexHelper scopedLock( self->pMutex );
1871 
1872  if( self->pFileState == Error ) return self->pStatus;
1873 
1874  if( self->pFileState != Opened && self->pFileState != Recovering )
1875  return XRootDStatus( stError, errInvalidOp );
1876 
1877  Log *log = DefaultEnv::GetLog();
1878  log->Debug( FileMsg, "[%p@%s] Sending a fcntl command for handle %#x to %s",
1879  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1880  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1881 
1882  Message *msg;
1883  ClientQueryRequest *req;
1884  MessageUtils::CreateRequest( msg, req, arg.GetSize() );
1885 
1886  req->requestid = kXR_query;
1887  req->infotype = kXR_Qopaqug;
1888  req->dlen = arg.GetSize();
1889  memcpy( req->fhandle, self->pFileHandle, 4 );
1890  msg->Append( arg.GetBuffer(), arg.GetSize(), 24 );
1891 
1892  MessageSendParams params;
1893  params.timeout = timeout;
1894  params.followRedirects = false;
1895  params.stateful = true;
1897 
1899  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1900 
1901  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1902  }
1903 
1904  //----------------------------------------------------------------------------
1905  // Get access token to a file - async
1906  //----------------------------------------------------------------------------
1907  XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
1908  ResponseHandler *handler,
1909  uint16_t timeout )
1910  {
1911  XrdSysMutexHelper scopedLock( self->pMutex );
1912 
1913  if( self->pFileState == Error ) return self->pStatus;
1914 
1915  if( self->pFileState != Opened && self->pFileState != Recovering )
1916  return XRootDStatus( stError, errInvalidOp );
1917 
1918  Log *log = DefaultEnv::GetLog();
1919  log->Debug( FileMsg, "[%p@%s] Sending a visa command for handle %#x to %s",
1920  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1921  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1922 
1923  Message *msg;
1924  ClientQueryRequest *req;
1925  MessageUtils::CreateRequest( msg, req );
1926 
1927  req->requestid = kXR_query;
1928  req->infotype = kXR_Qvisa;
1929  memcpy( req->fhandle, self->pFileHandle, 4 );
1930 
1931  MessageSendParams params;
1932  params.timeout = timeout;
1933  params.followRedirects = false;
1934  params.stateful = true;
1936 
1938  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1939 
1940  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1941  }
1942 
1943  //------------------------------------------------------------------------
1944  // Set extended attributes - async
1945  //------------------------------------------------------------------------
1946  XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
1947  const std::vector<xattr_t> &attrs,
1948  ResponseHandler *handler,
1949  uint16_t timeout )
1950  {
1951  XrdSysMutexHelper scopedLock( self->pMutex );
1952 
1953  if( self->pFileState == Error ) return self->pStatus;
1954 
1955  if( self->pFileState != Opened && self->pFileState != Recovering )
1956  return XRootDStatus( stError, errInvalidOp );
1957 
1958  Log *log = DefaultEnv::GetLog();
1959  log->Debug( FileMsg, "[%p@%s] Sending a fattr set command for handle %#x to %s",
1960  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1961  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1962 
1963  //--------------------------------------------------------------------------
1964  // Issue a new fattr get request
1965  //--------------------------------------------------------------------------
1966  return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
1967  }
1968 
1969  //------------------------------------------------------------------------
1970  // Get extended attributes - async
1971  //------------------------------------------------------------------------
1972  XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
1973  const std::vector<std::string> &attrs,
1974  ResponseHandler *handler,
1975  uint16_t timeout )
1976  {
1977  XrdSysMutexHelper scopedLock( self->pMutex );
1978 
1979  if( self->pFileState == Error ) return self->pStatus;
1980 
1981  if( self->pFileState != Opened && self->pFileState != Recovering )
1982  return XRootDStatus( stError, errInvalidOp );
1983 
1984  Log *log = DefaultEnv::GetLog();
1985  log->Debug( FileMsg, "[%p@%s] Sending a fattr get command for handle %#x to %s",
1986  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1987  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1988 
1989  //--------------------------------------------------------------------------
1990  // Issue a new fattr get request
1991  //--------------------------------------------------------------------------
1992  return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
1993  }
1994 
1995  //------------------------------------------------------------------------
1996  // Delete extended attributes - async
1997  //------------------------------------------------------------------------
1998  XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
1999  const std::vector<std::string> &attrs,
2000  ResponseHandler *handler,
2001  uint16_t timeout )
2002  {
2003  XrdSysMutexHelper scopedLock( self->pMutex );
2004 
2005  if( self->pFileState == Error ) return self->pStatus;
2006 
2007  if( self->pFileState != Opened && self->pFileState != Recovering )
2008  return XRootDStatus( stError, errInvalidOp );
2009 
2010  Log *log = DefaultEnv::GetLog();
2011  log->Debug( FileMsg, "[%p@%s] Sending a fattr del command for handle %#x to %s",
2012  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2013  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2014 
2015  //--------------------------------------------------------------------------
2016  // Issue a new fattr del request
2017  //--------------------------------------------------------------------------
2018  return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2019  }
2020 
2021  //------------------------------------------------------------------------
2022  // List extended attributes - async
2023  //------------------------------------------------------------------------
2024  XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2025  ResponseHandler *handler,
2026  uint16_t timeout )
2027  {
2028  XrdSysMutexHelper scopedLock( self->pMutex );
2029 
2030  if( self->pFileState == Error ) return self->pStatus;
2031 
2032  if( self->pFileState != Opened && self->pFileState != Recovering )
2033  return XRootDStatus( stError, errInvalidOp );
2034 
2035  Log *log = DefaultEnv::GetLog();
2036  log->Debug( FileMsg, "[%p@%s] Sending a fattr list command for handle %#x to %s",
2037  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2038  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2039 
2040  //--------------------------------------------------------------------------
2041  // Issue a new fattr get request
2042  //--------------------------------------------------------------------------
2043  static const std::vector<std::string> nothing;
2044  return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2045  nothing, handler, timeout );
2046  }
2047 
2048  //------------------------------------------------------------------------
2058  //------------------------------------------------------------------------
2059  XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2060  kXR_char code,
2061  ResponseHandler *handler,
2062  uint16_t timeout )
2063  {
2064  XrdSysMutexHelper scopedLock( self->pMutex );
2065 
2066  if( self->pFileState == Error ) return self->pStatus;
2067 
2068  if( self->pFileState != Opened && self->pFileState != Recovering )
2069  return XRootDStatus( stError, errInvalidOp );
2070 
2071  Log *log = DefaultEnv::GetLog();
2072  log->Debug( FileMsg, "[%p@%s] Sending a checkpoint command for handle %#x to %s",
2073  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2074  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2075 
2076  Message *msg;
2077  ClientChkPointRequest *req;
2078  MessageUtils::CreateRequest( msg, req );
2079 
2080  req->requestid = kXR_chkpoint;
2081  req->opcode = code;
2082  memcpy( req->fhandle, self->pFileHandle, 4 );
2083 
2084  MessageSendParams params;
2085  params.timeout = timeout;
2086  params.followRedirects = false;
2087  params.stateful = true;
2088 
2090 
2092  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2093 
2094  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2095  }
2096 
2097  //------------------------------------------------------------------------
2107  //------------------------------------------------------------------------
2108  XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2109  uint64_t offset,
2110  uint32_t size,
2111  const void *buffer,
2112  ResponseHandler *handler,
2113  uint16_t timeout )
2114  {
2115  XrdSysMutexHelper scopedLock( self->pMutex );
2116 
2117  if( self->pFileState == Error ) return self->pStatus;
2118 
2119  if( self->pFileState != Opened && self->pFileState != Recovering )
2120  return XRootDStatus( stError, errInvalidOp );
2121 
2122  Log *log = DefaultEnv::GetLog();
2123  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2124  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2125  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2126 
2127  Message *msg;
2128  ClientChkPointRequest *req;
2129  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2130 
2131  req->requestid = kXR_chkpoint;
2132  req->opcode = kXR_ckpXeq;
2133  req->dlen = 24; // as specified in the protocol specification
2134  memcpy( req->fhandle, self->pFileHandle, 4 );
2135 
2137  wrtreq->requestid = kXR_write;
2138  wrtreq->offset = offset;
2139  wrtreq->dlen = size;
2140  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2141 
2142  ChunkList *list = new ChunkList();
2143  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2144 
2145  MessageSendParams params;
2146  params.timeout = timeout;
2147  params.followRedirects = false;
2148  params.stateful = true;
2149  params.chunkList = list;
2150 
2152 
2154  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2155 
2156  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2157  }
2158 
2159  //------------------------------------------------------------------------
2169  //------------------------------------------------------------------------
2170  XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2171  uint64_t offset,
2172  const struct iovec *iov,
2173  int iovcnt,
2174  ResponseHandler *handler,
2175  uint16_t timeout )
2176  {
2177  XrdSysMutexHelper scopedLock( self->pMutex );
2178 
2179  if( self->pFileState == Error ) return self->pStatus;
2180 
2181  if( self->pFileState != Opened && self->pFileState != Recovering )
2182  return XRootDStatus( stError, errInvalidOp );
2183 
2184  Log *log = DefaultEnv::GetLog();
2185  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2186  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2187  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2188 
2189  Message *msg;
2190  ClientChkPointRequest *req;
2191  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2192 
2193  req->requestid = kXR_chkpoint;
2194  req->opcode = kXR_ckpXeq;
2195  req->dlen = 24; // as specified in the protocol specification
2196  memcpy( req->fhandle, self->pFileHandle, 4 );
2197 
2198  ChunkList *list = new ChunkList();
2199  uint32_t size = 0;
2200  for( int i = 0; i < iovcnt; ++i )
2201  {
2202  if( iov[i].iov_len == 0 ) continue;
2203  size += iov[i].iov_len;
2204  list->push_back( ChunkInfo( 0, iov[i].iov_len,
2205  (char*)iov[i].iov_base ) );
2206  }
2207 
2209  wrtreq->requestid = kXR_write;
2210  wrtreq->offset = offset;
2211  wrtreq->dlen = size;
2212  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2213 
2214  MessageSendParams params;
2215  params.timeout = timeout;
2216  params.followRedirects = false;
2217  params.stateful = true;
2218  params.chunkList = list;
2219 
2221 
2223  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2224 
2225  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2226  }
2227 
2228  //----------------------------------------------------------------------------
2229  // Check if the file is open
2230  //----------------------------------------------------------------------------
2232  {
2233  XrdSysMutexHelper scopedLock( pMutex );
2234 
2235  if( pFileState == Opened || pFileState == Recovering )
2236  return true;
2237  return false;
2238  }
2239 
2240  //----------------------------------------------------------------------------
2241  // Set file property
2242  //----------------------------------------------------------------------------
2243  bool FileStateHandler::SetProperty( const std::string &name,
2244  const std::string &value )
2245  {
2246  XrdSysMutexHelper scopedLock( pMutex );
2247  if( name == "ReadRecovery" )
2248  {
2249  if( value == "true" ) pDoRecoverRead = true;
2250  else pDoRecoverRead = false;
2251  return true;
2252  }
2253  else if( name == "WriteRecovery" )
2254  {
2255  if( value == "true" ) pDoRecoverWrite = true;
2256  else pDoRecoverWrite = false;
2257  return true;
2258  }
2259  else if( name == "FollowRedirects" )
2260  {
2261  if( value == "true" ) pFollowRedirects = true;
2262  else pFollowRedirects = false;
2263  return true;
2264  }
2265  else if( name == "BundledClose" )
2266  {
2267  if( value == "true" ) pAllowBundledClose = true;
2268  else pAllowBundledClose = false;
2269  return true;
2270  }
2271  return false;
2272  }
2273 
2274  //----------------------------------------------------------------------------
2275  // Get file property
2276  //----------------------------------------------------------------------------
2277  bool FileStateHandler::GetProperty( const std::string &name,
2278  std::string &value ) const
2279  {
2280  XrdSysMutexHelper scopedLock( pMutex );
2281  if( name == "ReadRecovery" )
2282  {
2283  if( pDoRecoverRead ) value = "true";
2284  else value = "false";
2285  return true;
2286  }
2287  else if( name == "WriteRecovery" )
2288  {
2289  if( pDoRecoverWrite ) value = "true";
2290  else value = "false";
2291  return true;
2292  }
2293  else if( name == "FollowRedirects" )
2294  {
2295  if( pFollowRedirects ) value = "true";
2296  else value = "false";
2297  return true;
2298  }
2299  else if( name == "DataServer" && pDataServer )
2300  { value = pDataServer->GetHostId(); return true; }
2301  else if( name == "LastURL" && pDataServer )
2302  { value = pDataServer->GetURL(); return true; }
2303  else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2304  { value = pWrtRecoveryRedir->GetHostId(); return true; }
2305  value = "";
2306  return false;
2307  }
2308 
2309  //----------------------------------------------------------------------------
2310  // Process the results of the opening operation
2311  //----------------------------------------------------------------------------
2313  const OpenInfo *openInfo,
2314  const HostList *hostList )
2315  {
2316  Log *log = DefaultEnv::GetLog();
2317  XrdSysMutexHelper scopedLock( pMutex );
2318 
2319  //--------------------------------------------------------------------------
2320  // Assign the data server and the load balancer
2321  //--------------------------------------------------------------------------
2322  std::string lastServer = pFileUrl->GetHostId();
2323  if( hostList )
2324  {
2325  delete pDataServer;
2326  delete pLoadBalancer;
2327  pLoadBalancer = 0;
2328  delete pWrtRecoveryRedir;
2329  pWrtRecoveryRedir = 0;
2330 
2331  pDataServer = new URL( hostList->back().url );
2332  pDataServer->SetParams( pFileUrl->GetParams() );
2333  if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2334  lastServer = pDataServer->GetHostId();
2335  HostList::const_iterator itC;
2336  URL::ParamsMap params = pDataServer->GetParams();
2337  for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2338  {
2339  MessageUtils::MergeCGI( params,
2340  itC->url.GetParams(),
2341  true );
2342  }
2343  pDataServer->SetParams( params );
2344 
2345  HostList::const_reverse_iterator it;
2346  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2347  if( it->loadBalancer )
2348  {
2349  pLoadBalancer = new URL( it->url );
2350  break;
2351  }
2352 
2353  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2354  if( it->flags & kXR_recoverWrts )
2355  {
2356  pWrtRecoveryRedir = new URL( it->url );
2357  break;
2358  }
2359  }
2360 
2361  log->Debug(FileMsg, "[%p@%s] Open has returned with status %s",
2362  this, pFileUrl->GetObfuscatedURL().c_str(), status->ToStr().c_str() );
2363 
2364  if( pDataServer && !pDataServer->IsLocalFile() )
2365  {
2366  //------------------------------------------------------------------------
2367  // Check if we are using a secure connection
2368  //------------------------------------------------------------------------
2369  XrdCl::AnyObject isencobj;
2371  QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2372  if( st.IsOK() )
2373  {
2374  bool *isenc;
2375  isencobj.Get( isenc );
2376  pIsChannelEncrypted = *isenc;
2377  delete isenc;
2378  }
2379  }
2380 
2381  //--------------------------------------------------------------------------
2382  // We have failed
2383  //--------------------------------------------------------------------------
2384  pStatus = *status;
2385  if( !pStatus.IsOK() || !openInfo )
2386  {
2387  log->Debug(FileMsg, "[%p@%s] Error while opening at %s: %s",
2388  this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2389  pStatus.ToStr().c_str() );
2390  FailQueuedMessages( pStatus );
2391  pFileState = Error;
2392 
2393  //------------------------------------------------------------------------
2394  // Report to monitoring
2395  //------------------------------------------------------------------------
2397  if( mon )
2398  {
2400  i.file = pFileUrl;
2401  i.status = status;
2403  mon->Event( Monitor::EvErrIO, &i );
2404  }
2405  }
2406  //--------------------------------------------------------------------------
2407  // We have succeeded
2408  //--------------------------------------------------------------------------
2409  else
2410  {
2411  //------------------------------------------------------------------------
2412  // Store the response info
2413  //------------------------------------------------------------------------
2414  openInfo->GetFileHandle( pFileHandle );
2415  pSessionId = openInfo->GetSessionId();
2416  if( openInfo->GetStatInfo() )
2417  {
2418  delete pStatInfo;
2419  pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2420  }
2421 
2422  log->Debug( FileMsg, "[%p@%s] successfully opened at %s, handle: %#x, "
2423  "session id: %llu", this, pFileUrl->GetObfuscatedURL().c_str(),
2424  pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2425  (unsigned long long) pSessionId );
2426 
2427  //------------------------------------------------------------------------
2428  // Inform the monitoring about opening success
2429  //------------------------------------------------------------------------
2430  gettimeofday( &pOpenTime, 0 );
2432  if( mon )
2433  {
2435  i.file = pFileUrl;
2436  i.dataServer = pDataServer->GetHostId();
2437  i.oFlags = pOpenFlags;
2438  i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2439  mon->Event( Monitor::EvOpen, &i );
2440  }
2441 
2442  //------------------------------------------------------------------------
2443  // Resend the queued messages if any
2444  //------------------------------------------------------------------------
2445  ReSendQueuedMessages();
2446  pFileState = Opened;
2447  }
2448  }
2449 
2450  //----------------------------------------------------------------------------
2451  // Process the results of the closing operation
2452  //----------------------------------------------------------------------------
2454  {
2455  Log *log = DefaultEnv::GetLog();
2456  XrdSysMutexHelper scopedLock( pMutex );
2457 
2458  log->Debug(FileMsg, "[%p@%s] Close returned from %s with: %s", this,
2459  pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2460  status->ToStr().c_str() );
2461 
2462  log->Dump(FileMsg, "[%p@%s] Items in the fly %zu, queued for recovery %zu",
2463  this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2464 
2465  MonitorClose( status );
2466  ResetMonitoringVars();
2467 
2468  pStatus = *status;
2469  pFileState = Closed;
2470  }
2471 
2472  //----------------------------------------------------------------------------
2473  // Handle an error while sending a stateful message
2474  //----------------------------------------------------------------------------
2475  void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2476  XRootDStatus *status,
2477  Message *message,
2478  ResponseHandler *userHandler,
2479  MessageSendParams &sendParams )
2480  {
2481  //--------------------------------------------------------------------------
2482  // It may be a redirection
2483  //--------------------------------------------------------------------------
2484  if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2485  {
2486  static const std::string root = "root", xroot = "xroot", file = "file",
2487  roots = "roots", xroots = "xroots";
2488  std::string msg = status->GetErrorMessage();
2489  if( !msg.compare( 0, root.size(), root ) ||
2490  !msg.compare( 0, xroot.size(), xroot ) ||
2491  !msg.compare( 0, file.size(), file ) ||
2492  !msg.compare( 0, roots.size(), roots ) ||
2493  !msg.compare( 0, xroots.size(), xroots ) )
2494  {
2495  FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2496  return;
2497  }
2498  }
2499 
2500  //--------------------------------------------------------------------------
2501  // Handle error
2502  //--------------------------------------------------------------------------
2503  Log *log = DefaultEnv::GetLog();
2504  XrdSysMutexHelper scopedLock( self->pMutex );
2505  self->pInTheFly.erase( message );
2506 
2507  log->Dump( FileMsg, "[%p@%s] File state error encountered. Message %s "
2508  "returned with %s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2509  message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2510 
2511  //--------------------------------------------------------------------------
2512  // Report to monitoring
2513  //--------------------------------------------------------------------------
2515  if( mon )
2516  {
2518  i.file = self->pFileUrl;
2519  i.status = status;
2520 
2521  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2522  switch( req->header.requestid )
2523  {
2524  case kXR_read: i.opCode = Monitor::ErrorInfo::ErrRead; break;
2530  default: i.opCode = Monitor::ErrorInfo::ErrUnc;
2531  }
2532 
2533  mon->Event( Monitor::EvErrIO, &i );
2534  }
2535 
2536  //--------------------------------------------------------------------------
2537  // The message is not recoverable
2538  // (message using a kernel buffer is not recoverable by definition)
2539  //--------------------------------------------------------------------------
2540  if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2541  {
2542  log->Error( FileMsg, "[%p@%s] Fatal file state error. Message %s "
2543  "returned with %s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2544  message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2545 
2546  self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2547  delete status;
2548  return;
2549  }
2550 
2551  //--------------------------------------------------------------------------
2552  // Insert the message to the recovery queue and start the recovery
2553  // procedure if we don't have any more message in the fly
2554  //--------------------------------------------------------------------------
2555  self->pCloseReason = *status;
2556  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2557  delete status;
2558  }
2559 
2560  //----------------------------------------------------------------------------
2561  // Handle stateful redirect
2562  //----------------------------------------------------------------------------
2563  void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2564  const std::string &redirectUrl,
2565  Message *message,
2566  ResponseHandler *userHandler,
2567  MessageSendParams &sendParams )
2568  {
2569  XrdSysMutexHelper scopedLock( self->pMutex );
2570  self->pInTheFly.erase( message );
2571 
2572  //--------------------------------------------------------------------------
2573  // Register the state redirect url and append the new cgi information to
2574  // the file URL
2575  //--------------------------------------------------------------------------
2576  if( !self->pStateRedirect )
2577  {
2578  std::ostringstream o;
2579  self->pStateRedirect = new URL( redirectUrl );
2580  URL::ParamsMap params = self->pFileUrl->GetParams();
2581  MessageUtils::MergeCGI( params,
2582  self->pStateRedirect->GetParams(),
2583  false );
2584  self->pFileUrl->SetParams( params );
2585  }
2586 
2587  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2588  }
2589 
2590  //----------------------------------------------------------------------------
2591  // Handle stateful response
2592  //----------------------------------------------------------------------------
2593  void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2594  XRootDStatus *status,
2595  Message *message,
2596  AnyObject *response,
2597  HostList */*urlList*/ )
2598  {
2599  Log *log = DefaultEnv::GetLog();
2600  XrdSysMutexHelper scopedLock( self->pMutex );
2601 
2602  log->Dump( FileMsg, "[%p@%s] Got state response for message %s",
2603  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2604  message->GetObfuscatedDescription().c_str() );
2605 
2606  //--------------------------------------------------------------------------
2607  // Since this message may be the last "in-the-fly" and no recovery
2608  // is done if messages are in the fly, we may need to trigger recovery
2609  //--------------------------------------------------------------------------
2610  self->pInTheFly.erase( message );
2611  RunRecovery( self );
2612 
2613  //--------------------------------------------------------------------------
2614  // Play with the actual response before returning it. This is a good
2615  // place to do caching in the future.
2616  //--------------------------------------------------------------------------
2617  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2618  switch( req->header.requestid )
2619  {
2620  //------------------------------------------------------------------------
2621  // Cache the stat response
2622  //------------------------------------------------------------------------
2623  case kXR_stat:
2624  {
2625  StatInfo *info = 0;
2626  response->Get( info );
2627  delete self->pStatInfo;
2628  self->pStatInfo = new StatInfo( *info );
2629  break;
2630  }
2631 
2632  //------------------------------------------------------------------------
2633  // Handle read response
2634  //------------------------------------------------------------------------
2635  case kXR_read:
2636  {
2637  ++self->pRCount;
2638  self->pRBytes += req->read.rlen;
2639  break;
2640  }
2641 
2642  //------------------------------------------------------------------------
2643  // Handle read response
2644  //------------------------------------------------------------------------
2645  case kXR_pgread:
2646  {
2647  ++self->pRCount;
2648  self->pRBytes += req->pgread.rlen;
2649  break;
2650  }
2651 
2652  //------------------------------------------------------------------------
2653  // Handle readv response
2654  //------------------------------------------------------------------------
2655  case kXR_readv:
2656  {
2657  ++self->pVRCount;
2658  size_t segs = req->header.dlen/sizeof(readahead_list);
2659  readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2660  for( size_t i = 0; i < segs; ++i )
2661  self->pVRBytes += dataChunk[i].rlen;
2662  self->pVSegs += segs;
2663  break;
2664  }
2665 
2666  //------------------------------------------------------------------------
2667  // Handle write response
2668  //------------------------------------------------------------------------
2669  case kXR_write:
2670  {
2671  ++self->pWCount;
2672  self->pWBytes += req->write.dlen;
2673  break;
2674  }
2675 
2676  //------------------------------------------------------------------------
2677  // Handle write response
2678  //------------------------------------------------------------------------
2679  case kXR_pgwrite:
2680  {
2681  ++self->pWCount;
2682  self->pWBytes += req->pgwrite.dlen;
2683  break;
2684  }
2685 
2686  //------------------------------------------------------------------------
2687  // Handle writev response
2688  //------------------------------------------------------------------------
2689  case kXR_writev:
2690  {
2691  ++self->pVWCount;
2692  size_t size = req->header.dlen/sizeof(readahead_list);
2693  XrdProto::write_list *wrtList =
2694  reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2695  for( size_t i = 0; i < size; ++i )
2696  self->pVWBytes += wrtList[i].wlen;
2697  break;
2698  }
2699  };
2700  }
2701 
2702  //------------------------------------------------------------------------
2704  //------------------------------------------------------------------------
2705  void FileStateHandler::Tick( time_t now )
2706  {
2707  if (pMutex.CondLock())
2708  {TimeOutRequests( now );
2709  pMutex.UnLock();
2710  }
2711  }
2712 
2713  //----------------------------------------------------------------------------
2714  // Declare timeout on requests being recovered
2715  //----------------------------------------------------------------------------
2717  {
2718  if( !pToBeRecovered.empty() )
2719  {
2720  Log *log = DefaultEnv::GetLog();
2721  log->Dump( FileMsg, "[%p@%s] Got a timer event", this,
2722  pFileUrl->GetObfuscatedURL().c_str() );
2723  RequestList::iterator it;
2725  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2726  {
2727  if( it->params.expires <= now )
2728  {
2729  jobMan->QueueJob( new ResponseJob(
2730  it->handler,
2732  0, it->params.hostList ) );
2733  it = pToBeRecovered.erase( it );
2734  }
2735  else
2736  ++it;
2737  }
2738  }
2739  }
2740 
2741  //----------------------------------------------------------------------------
2742  // Called in the child process after the fork
2743  //----------------------------------------------------------------------------
2745  {
2746  Log *log = DefaultEnv::GetLog();
2747 
2748  if( pFileState == Closed || pFileState == Error )
2749  return;
2750 
2751  if( (IsReadOnly() && pDoRecoverRead) ||
2752  (!IsReadOnly() && pDoRecoverWrite) )
2753  {
2754  log->Debug( FileMsg, "[%p@%s] Putting the file in recovery state in "
2755  "process %d", this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2756  pFileState = Recovering;
2757  pInTheFly.clear();
2758  pToBeRecovered.clear();
2759  }
2760  else
2761  pFileState = Error;
2762  }
2763 
2764  //------------------------------------------------------------------------
2765  // Try other data server
2766  //------------------------------------------------------------------------
2767  XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, uint16_t timeout )
2768  {
2769  XrdSysMutexHelper scopedLock( self->pMutex );
2770 
2771  if( self->pFileState != Opened || !self->pLoadBalancer )
2772  return XRootDStatus( stError, errInvalidOp );
2773 
2774  self->pFileState = Recovering;
2775 
2776  Log *log = DefaultEnv::GetLog();
2777  log->Debug( FileMsg, "[%p@%s] Reopen file at next data server.",
2778  self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2779 
2780  // merge CGI
2781  auto lbcgi = self->pLoadBalancer->GetParams();
2782  auto dtcgi = self->pDataServer->GetParams();
2783  MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2784  // update tried CGI
2785  auto itr = lbcgi.find( "tried" );
2786  if( itr == lbcgi.end() )
2787  lbcgi["tried"] = self->pDataServer->GetHostName();
2788  else
2789  {
2790  std::string tried = itr->second;
2791  tried += "," + self->pDataServer->GetHostName();
2792  lbcgi["tried"] = tried;
2793  }
2794  self->pLoadBalancer->SetParams( lbcgi );
2795 
2796  return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2797  }
2798 
2799  //------------------------------------------------------------------------
2800  // Generic implementation of xattr operation
2801  //------------------------------------------------------------------------
2802  template<typename T>
2803  Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2804  kXR_char subcode,
2805  kXR_char options,
2806  const std::vector<T> &attrs,
2807  ResponseHandler *handler,
2808  uint16_t timeout )
2809  {
2810  //--------------------------------------------------------------------------
2811  // Issue a new fattr request
2812  //--------------------------------------------------------------------------
2813  Message *msg;
2814  ClientFattrRequest *req;
2815  MessageUtils::CreateRequest( msg, req );
2816 
2817  req->requestid = kXR_fattr;
2818  req->subcode = subcode;
2819  req->numattr = attrs.size();
2820  req->options = options;
2821  memcpy( req->fhandle, self->pFileHandle, 4 );
2822  XRootDStatus st = MessageUtils::CreateXAttrBody( msg, attrs );
2823  if( !st.IsOK() ) return st;
2824 
2825  MessageSendParams params;
2826  params.timeout = timeout;
2827  params.followRedirects = false;
2828  params.stateful = true;
2830 
2832  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2833 
2834  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2835  }
2836 
2837  //----------------------------------------------------------------------------
2838  // Send a message to a host or put it in the recovery queue
2839  //----------------------------------------------------------------------------
2840  Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2841  const URL &url,
2842  Message *msg,
2843  ResponseHandler *handler,
2844  MessageSendParams &sendParams )
2845  {
2846  //--------------------------------------------------------------------------
2847  // Recovering
2848  //--------------------------------------------------------------------------
2849  if( self->pFileState == Recovering )
2850  {
2851  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2852  }
2853 
2854  //--------------------------------------------------------------------------
2855  // Trying to send
2856  //--------------------------------------------------------------------------
2857  if( self->pFileState == Opened )
2858  {
2859  msg->SetSessionId( self->pSessionId );
2860  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2861 
2862  //------------------------------------------------------------------------
2863  // Invalid session id means that the connection has been broken while we
2864  // were idle so we haven't been informed about this fact earlier.
2865  //------------------------------------------------------------------------
2866  if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
2867  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2868 
2869  if( st.IsOK() )
2870  self->pInTheFly.insert(msg);
2871  else
2872  delete handler;
2873  return st;
2874  }
2875  return Status( stError, errInvalidOp );
2876  }
2877 
2878  //----------------------------------------------------------------------------
2879  // Check if the stateful error is recoverable
2880  //----------------------------------------------------------------------------
2881  bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
2882  {
2883  const auto recoverable_errors = {
2887  errInternal,
2888  errTlsError,
2890  };
2891 
2892  if (pDoRecoverRead || pDoRecoverWrite)
2893  for (const auto error : recoverable_errors)
2894  if (status.code == error)
2895  return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
2896 
2897  return false;
2898  }
2899 
2900  //----------------------------------------------------------------------------
2901  // Check if the file is open for read only
2902  //----------------------------------------------------------------------------
2903  bool FileStateHandler::IsReadOnly() const
2904  {
2905  if( (pOpenFlags & kXR_open_read) && !(pOpenFlags & kXR_open_updt) &&
2906  !(pOpenFlags & kXR_open_apnd ) )
2907  return true;
2908  return false;
2909  }
2910 
2911  //----------------------------------------------------------------------------
2912  // Recover a message
2913  //----------------------------------------------------------------------------
2914  Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
2915  RequestData rd,
2916  bool callbackOnFailure )
2917  {
2918  self->pFileState = Recovering;
2919 
2920  Log *log = DefaultEnv::GetLog();
2921  log->Dump( FileMsg, "[%p@%s] Putting message %s in the recovery list",
2922  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2923  rd.request->GetObfuscatedDescription().c_str() );
2924 
2925  Status st = RunRecovery( self );
2926  if( st.IsOK() )
2927  {
2928  self->pToBeRecovered.push_back( rd );
2929  return st;
2930  }
2931 
2932  if( callbackOnFailure )
2933  self->FailMessage( rd, st );
2934 
2935  return st;
2936  }
2937 
2938  //----------------------------------------------------------------------------
2939  // Run the recovery procedure if appropriate
2940  //----------------------------------------------------------------------------
2941  Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
2942  {
2943  if( self->pFileState != Recovering )
2944  return Status();
2945 
2946  if( !self->pInTheFly.empty() )
2947  return Status();
2948 
2949  Log *log = DefaultEnv::GetLog();
2950  log->Debug( FileMsg, "[%p@%s] Running the recovery procedure", self.get(),
2951  self->pFileUrl->GetObfuscatedURL().c_str() );
2952 
2953  Status st;
2954  if( self->pStateRedirect )
2955  {
2956  SendClose( self, 0 );
2957  st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
2958  delete self->pStateRedirect; self->pStateRedirect = 0;
2959  }
2960  else if( self->IsReadOnly() && self->pLoadBalancer )
2961  st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
2962  else
2963  st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
2964 
2965  if( !st.IsOK() )
2966  {
2967  self->pFileState = Error;
2968  self->pStatus = st;
2969  self->FailQueuedMessages( st );
2970  }
2971 
2972  return st;
2973  }
2974 
2975  //----------------------------------------------------------------------------
2976  // Send a close and ignore the response
2977  //----------------------------------------------------------------------------
2978  XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
2979  uint16_t timeout )
2980  {
2981  Message *msg;
2982  ClientCloseRequest *req;
2983  MessageUtils::CreateRequest( msg, req );
2984 
2985  req->requestid = kXR_close;
2986  memcpy( req->fhandle, self->pFileHandle, 4 );
2987 
2989  msg->SetSessionId( self->pSessionId );
2991  [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
2992  MessageSendParams params;
2993  params.timeout = timeout;
2994  params.followRedirects = false;
2995  params.stateful = true;
2996 
2998 
2999  return self->IssueRequest( *self->pDataServer, msg, handler, params );
3000  }
3001 
3002  //----------------------------------------------------------------------------
3003  // Re-open the current file at a given server
3004  //----------------------------------------------------------------------------
3005  XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3006  const URL &url,
3007  uint16_t timeout )
3008  {
3009  Log *log = DefaultEnv::GetLog();
3010  log->Dump( FileMsg, "[%p@%s] Sending a recovery open command to %s",
3011  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.GetObfuscatedURL().c_str() );
3012 
3013  //--------------------------------------------------------------------------
3014  // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3015  // procedure to delete a file that has been partially updated or fail it
3016  // because a partially uploaded file already exists.
3017  //--------------------------------------------------------------------------
3018  if( self->pOpenFlags & kXR_delete)
3019  {
3020  self->pOpenFlags &= ~kXR_delete;
3021  self->pOpenFlags |= kXR_open_updt;
3022  }
3023 
3024  self->pOpenFlags &= ~kXR_new;
3025 
3026  Message *msg;
3027  ClientOpenRequest *req;
3028  URL u = url;
3029 
3030  if( url.GetPath().empty() )
3031  u.SetPath( self->pFileUrl->GetPath() );
3032 
3033  std::string path = u.GetPathWithFilteredParams();
3034  MessageUtils::CreateRequest( msg, req, path.length() );
3035 
3036  req->requestid = kXR_open;
3037  req->mode = self->pOpenMode;
3038  req->options = self->pOpenFlags;
3039  req->dlen = path.length();
3040  msg->Append( path.c_str(), path.length(), 24 );
3041 
3042  // create a new reopen handler
3043  // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3044  // until we know that 'SendMessage' was successful)
3045  OpenHandler *openHandler = new OpenHandler( self, 0 );
3046  MessageSendParams params; params.timeout = timeout;
3049 
3050  //--------------------------------------------------------------------------
3051  // Issue the open request
3052  //--------------------------------------------------------------------------
3053  XRootDStatus st = self->IssueRequest( url, msg, openHandler, params );
3054 
3055  // if there was a problem destroy the open handler
3056  if( !st.IsOK() )
3057  {
3058  delete openHandler;
3059  self->pStatus = st;
3060  self->pFileState = Closed;
3061  }
3062  return st;
3063  }
3064 
3065  //------------------------------------------------------------------------
3066  // Fail a message
3067  //------------------------------------------------------------------------
3068  void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3069  {
3070  Log *log = DefaultEnv::GetLog();
3071  log->Dump( FileMsg, "[%p@%s] Failing message %s with %s",
3072  this, pFileUrl->GetObfuscatedURL().c_str(),
3073  rd.request->GetObfuscatedDescription().c_str(),
3074  status.ToStr().c_str() );
3075 
3076  StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3077  if( !sh )
3078  {
3079  Log *log = DefaultEnv::GetLog();
3080  log->Error( FileMsg, "[%p@%s] Internal error while recovering %s",
3081  this, pFileUrl->GetObfuscatedURL().c_str(),
3082  rd.request->GetObfuscatedDescription().c_str() );
3083  return;
3084  }
3085 
3087  ResponseHandler *userHandler = sh->GetUserHandler();
3088  jobMan->QueueJob( new ResponseJob(
3089  userHandler,
3090  new XRootDStatus( status ),
3091  0, rd.params.hostList ) );
3092 
3093  delete sh;
3094  }
3095 
3096  //----------------------------------------------------------------------------
3097  // Fail queued messages
3098  //----------------------------------------------------------------------------
3099  void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3100  {
3101  RequestList::iterator it;
3102  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3103  FailMessage( *it, status );
3104  pToBeRecovered.clear();
3105  }
3106 
3107  //------------------------------------------------------------------------
3108  // Re-send queued messages
3109  //------------------------------------------------------------------------
3110  void FileStateHandler::ReSendQueuedMessages()
3111  {
3112  RequestList::iterator it;
3113  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3114  {
3115  it->request->SetSessionId( pSessionId );
3116  ReWriteFileHandle( it->request );
3117  XRootDStatus st = IssueRequest( *pDataServer, it->request,
3118  it->handler, it->params );
3119  if( !st.IsOK() )
3120  FailMessage( *it, st );
3121  }
3122  pToBeRecovered.clear();
3123  }
3124 
3125  //------------------------------------------------------------------------
3126  // Re-write file handle
3127  //------------------------------------------------------------------------
3128  void FileStateHandler::ReWriteFileHandle( Message *msg )
3129  {
3131  switch( hdr->requestid )
3132  {
3133  case kXR_read:
3134  {
3136  memcpy( req->fhandle, pFileHandle, 4 );
3137  break;
3138  }
3139  case kXR_write:
3140  {
3142  memcpy( req->fhandle, pFileHandle, 4 );
3143  break;
3144  }
3145  case kXR_sync:
3146  {
3148  memcpy( req->fhandle, pFileHandle, 4 );
3149  break;
3150  }
3151  case kXR_truncate:
3152  {
3154  memcpy( req->fhandle, pFileHandle, 4 );
3155  break;
3156  }
3157  case kXR_readv:
3158  {
3160  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3161  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3162  memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3163  break;
3164  }
3165  case kXR_writev:
3166  {
3167  ClientWriteVRequest *req =
3168  reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3169  XrdProto::write_list *wrtList =
3170  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3171  size_t size = req->dlen / sizeof(XrdProto::write_list);
3172  for( size_t i = 0; i < size; ++i )
3173  memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3174  break;
3175  }
3176  case kXR_pgread:
3177  {
3179  memcpy( req->fhandle, pFileHandle, 4 );
3180  break;
3181  }
3182  case kXR_pgwrite:
3183  {
3185  memcpy( req->fhandle, pFileHandle, 4 );
3186  break;
3187  }
3188  }
3189 
3190  Log *log = DefaultEnv::GetLog();
3191  log->Dump( FileMsg, "[%p@%s] Rewritten file handle for %s to %#x",
3192  this, pFileUrl->GetObfuscatedURL().c_str(), msg->GetObfuscatedDescription().c_str(),
3193  *((uint32_t*)pFileHandle) );
3195  }
3196 
3197  //----------------------------------------------------------------------------
3198  // Dispatch monitoring information on close
3199  //----------------------------------------------------------------------------
3200  void FileStateHandler::MonitorClose( const XRootDStatus *status )
3201  {
3203  if( mon )
3204  {
3206  i.file = pFileUrl;
3207  i.oTOD = pOpenTime;
3208  gettimeofday( &i.cTOD, 0 );
3209  i.rBytes = pRBytes;
3210  i.vrBytes = pVRBytes;
3211  i.wBytes = pWBytes;
3212  i.vwBytes = pVWBytes;
3213  i.vSegs = pVSegs;
3214  i.rCount = pRCount;
3215  i.vCount = pVRCount;
3216  i.wCount = pWCount;
3217  i.status = status;
3218  mon->Event( Monitor::EvClose, &i );
3219  }
3220  }
3221 
3222  XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3223  Message *msg,
3224  ResponseHandler *handler,
3225  MessageSendParams &sendParams )
3226  {
3227  // first handle Metalinks
3228  if( pUseVirtRedirector && url.IsMetalink() )
3229  return MessageUtils::RedirectMessage( url, msg, handler,
3230  sendParams, pLFileHandler );
3231 
3232  // than local file access
3233  if( url.IsLocalFile() )
3234  return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3235 
3236  // and finally ordinary XRootD requests
3237  return MessageUtils::SendMessage( url, msg, handler,
3238  sendParams, pLFileHandler );
3239  }
3240 
3241  //------------------------------------------------------------------------
3242  // Send a write request with payload being stored in a kernel buffer
3243  //------------------------------------------------------------------------
3244  XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3245  uint64_t offset,
3246  uint32_t length,
3247  std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3248  ResponseHandler *handler,
3249  uint16_t timeout )
3250  {
3251  //--------------------------------------------------------------------------
3252  // Create the write request
3253  //--------------------------------------------------------------------------
3254  XrdSysMutexHelper scopedLock( self->pMutex );
3255 
3256  if( self->pFileState != Opened && self->pFileState != Recovering )
3257  return XRootDStatus( stError, errInvalidOp );
3258 
3259  Log *log = DefaultEnv::GetLog();
3260  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
3261  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3262  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3263 
3264  Message *msg;
3265  ClientWriteRequest *req;
3266  MessageUtils::CreateRequest( msg, req );
3267 
3268  req->requestid = kXR_write;
3269  req->offset = offset;
3270  req->dlen = length;
3271  memcpy( req->fhandle, self->pFileHandle, 4 );
3272 
3273  MessageSendParams params;
3274  params.timeout = timeout;
3275  params.followRedirects = false;
3276  params.stateful = true;
3277  params.kbuff = kbuff.release();
3278  params.chunkList = new ChunkList();
3279 
3281 
3283  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3284 
3285  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3286  }
3287 }
kXR_unt16 requestid
Definition: XProtocol.hh:479
kXR_unt16 requestid
Definition: XProtocol.hh:630
kXR_unt16 requestid
Definition: XProtocol.hh:806
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
#define kXR_suppgrw
Definition: XProtocol.hh:1174
kXR_char fhandle[4]
Definition: XProtocol.hh:531
kXR_char fhandle[4]
Definition: XProtocol.hh:782
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:861
kXR_char fhandle[4]
Definition: XProtocol.hh:807
kXR_char fhandle[4]
Definition: XProtocol.hh:771
kXR_int64 offset
Definition: XProtocol.hh:646
kXR_unt16 requestid
Definition: XProtocol.hh:644
@ kXR_virtReadv
Definition: XProtocol.hh:150
kXR_unt16 options
Definition: XProtocol.hh:481
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:862
kXR_unt16 requestid
Definition: XProtocol.hh:228
@ kXR_async
Definition: XProtocol.hh:458
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_open_read
Definition: XProtocol.hh:456
@ kXR_open_updt
Definition: XProtocol.hh:457
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_open_apnd
Definition: XProtocol.hh:462
@ kXR_retstat
Definition: XProtocol.hh:463
struct ClientRequestHdr header
Definition: XProtocol.hh:846
kXR_char fhandle[4]
Definition: XProtocol.hh:509
#define kXR_recoverWrts
Definition: XProtocol.hh:1166
kXR_unt16 infotype
Definition: XProtocol.hh:631
kXR_char fhandle[4]
Definition: XProtocol.hh:645
kXR_char fhandle[4]
Definition: XProtocol.hh:229
kXR_unt16 requestid
Definition: XProtocol.hh:157
kXR_char fhandle[4]
Definition: XProtocol.hh:633
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
struct ClientReadRequest read
Definition: XProtocol.hh:867
kXR_int32 rlen
Definition: XProtocol.hh:660
kXR_unt16 requestid
Definition: XProtocol.hh:768
kXR_int32 dlen
Definition: XProtocol.hh:483
kXR_char fhandle[4]
Definition: XProtocol.hh:794
kXR_unt16 mode
Definition: XProtocol.hh:480
kXR_unt16 requestid
Definition: XProtocol.hh:508
kXR_unt16 requestid
Definition: XProtocol.hh:781
kXR_char fhandle[4]
Definition: XProtocol.hh:204
kXR_int64 offset
Definition: XProtocol.hh:661
#define kXR_PROTPGRWVERSION
Definition: XProtocol.hh:73
kXR_int64 offset
Definition: XProtocol.hh:808
struct ClientWriteRequest write
Definition: XProtocol.hh:876
kXR_int32 rlen
Definition: XProtocol.hh:647
kXR_unt16 requestid
Definition: XProtocol.hh:670
@ kXR_Qopaqug
Definition: XProtocol.hh:625
@ kXR_Qvisa
Definition: XProtocol.hh:622
kXR_int32 dlen
Definition: XProtocol.hh:159
unsigned char kXR_char
Definition: XPtypes.hh:65
static int mapError(int rc)
Definition: XProtocol.hh:1361
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
Definition: XrdClBuffer.hh:34
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
Definition: XrdClBuffer.hh:164
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
An interface for file plug-ins.
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, uint16_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, uint16_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Closed
The file is closed.
@ Opened
Opening has succeeded.
@ Error
Opening has failed.
@ Recovering
Recovering from an error.
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool IsOpen() const
Check if the file is open.
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, uint16_t timeout)
Try other data server.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:458
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:395
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition: XrdClURL.cc:324
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition: XrdClURL.cc:491
void SetPath(const std::string &path)
Set the path.
Definition: XrdClURL.hh:225
bool IsLocalFile() const
Definition: XrdClURL.cc:467
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
Definition: XrdClUtils.hh:235
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
Definition: XrdClStatus.hh:76
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t errPollerError
Definition: XrdClStatus.hh:75
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInProgress
Definition: XrdClStatus.hh:59
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint64_t FileMsg
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
Definition: XrdClStatus.hh:78
const uint16_t errSocketError
Definition: XrdClStatus.hh:72
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errSocketDisconnected
Definition: XrdClStatus.hh:74
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition: XProtocol.hh:298
kXR_char fhandle[4]
Definition: XProtocol.hh:288
kXR_unt16 requestid
Definition: XProtocol.hh:287
kXR_unt16 requestid
Definition: XProtocol.hh:820
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Describe a file close event.
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted