XRootD
XrdClFileStateHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClStatus.hh"
29 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClConstants.hh"
35 #include "XrdCl/XrdClMonitor.hh"
36 #include "XrdCl/XrdClFileTimer.hh"
38 #include "XrdCl/XrdClJobManager.hh"
40 #include "XrdCl/XrdClAnyObject.hh"
41 #include "XrdCl/XrdClUtils.hh"
42 
43 #ifdef WITH_XRDEC
44 #include "XrdCl/XrdClEcHandler.hh"
45 #endif
46 
47 #include "XrdOuc/XrdOucCRC.hh"
49 #include "XrdOuc/XrdOucUtils.hh"
50 
52 #include "XrdSys/XrdSysPageSize.hh"
53 #include "XrdSys/XrdSysPthread.hh"
54 
55 #include <sstream>
56 #include <memory>
57 #include <numeric>
58 #include <sys/time.h>
59 #include <uuid/uuid.h>
60 #include <mutex>
61 
62 namespace
63 {
64  //----------------------------------------------------------------------------
65  // Helper callback for handling PgRead responses
66  //----------------------------------------------------------------------------
67  class PgReadHandler : public XrdCl::ResponseHandler
68  {
69  friend class PgReadRetryHandler;
70 
71  public:
72 
73  //------------------------------------------------------------------------
74  // Constructor
75  //------------------------------------------------------------------------
76  PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77  XrdCl::ResponseHandler *userHandler,
78  uint64_t orgOffset ) :
79  stateHandler( stateHandler ),
80  userHandler( userHandler ),
81  orgOffset( orgOffset ),
82  maincall( true ),
83  retrycnt( 0 ),
84  nbrepair( 0 )
85  {
86  }
87 
88  //------------------------------------------------------------------------
89  // Handle the response
90  //------------------------------------------------------------------------
92  XrdCl::AnyObject *response,
93  XrdCl::HostList *hostList )
94  {
95  using namespace XrdCl;
96 
97  std::unique_lock<std::mutex> lck( mtx );
98 
99  if( !maincall )
100  {
101  //--------------------------------------------------------------------
102  // We are serving PgRead retry request
103  //--------------------------------------------------------------------
104  --retrycnt;
105  if( !status->IsOK() )
106  st.reset( status );
107  else
108  {
109  delete status; // by convention other args are null (see PgReadRetryHandler)
110  ++nbrepair; // update number of repaired pages
111  }
112 
113  if( retrycnt == 0 )
114  {
115  //------------------------------------------------------------------
116  // All retries came back
117  //------------------------------------------------------------------
118  if( st->IsOK() )
119  {
120  PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
121  pginf.SetNbRepair( nbrepair );
122  userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
123  }
124  else
125  userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
126  lck.unlock();
127  delete this;
128  }
129 
130  return;
131  }
132 
133  //----------------------------------------------------------------------
134  // We are serving main PgRead request
135  //----------------------------------------------------------------------
136  if( !status->IsOK() )
137  {
138  //--------------------------------------------------------------------
139  // The main PgRead request has failed
140  //--------------------------------------------------------------------
141  userHandler->HandleResponseWithHosts( status, response, hostList );
142  lck.unlock();
143  delete this;
144  return;
145  }
146 
147  maincall = false;
148 
149  //----------------------------------------------------------------------
150  // Do the integrity check
151  //----------------------------------------------------------------------
152  PageInfo *pginf = 0;
153  response->Get( pginf );
154 
155  uint64_t pgoff = pginf->GetOffset();
156  uint32_t bytesRead = pginf->GetLength();
157  std::vector<uint32_t> &cksums = pginf->GetCksums();
158  char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
159  size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
160  uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
161  if( pgsize > bytesRead ) pgsize = bytesRead;
162 
163  for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
164  {
165  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
166  if( crcval != cksums[pgnb] )
167  {
168  Log *log = DefaultEnv::GetLog();
169  log->Info( FileMsg, "[%p@%s] Received corrupted page, will retry page #%zu.",
170  (void*)this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
171 
172  XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
173  if( !st.IsOK())
174  {
175  *status = st; // the reason for this failure
176  break;
177  }
178  ++retrycnt; // update the retry counter
179  }
180 
181  bytesRead -= pgsize;
182  buffer += pgsize;
183  pgoff += pgsize;
184  pgsize = XrdSys::PageSize;
185  if( pgsize > bytesRead ) pgsize = bytesRead;
186  }
187 
188 
189  if( retrycnt == 0 )
190  {
191  //--------------------------------------------------------------------
192  // All went well!
193  //--------------------------------------------------------------------
194  userHandler->HandleResponseWithHosts( status, response, hostList );
195  lck.unlock();
196  delete this;
197  return;
198  }
199 
200  //----------------------------------------------------------------------
201  // We have to wait for retries!
202  //----------------------------------------------------------------------
203  resp.reset( response );
204  hosts.reset( hostList );
205  st.reset( status );
206  }
207 
208  void UpdateCksum( size_t pgnb, uint32_t crcval )
209  {
210  if( resp )
211  {
212  XrdCl::PageInfo *pginf = 0;
213  resp->Get( pginf );
214  pginf->GetCksums()[pgnb] = crcval;
215  }
216  }
217 
218  private:
219 
220  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221  XrdCl::ResponseHandler *userHandler;
222  uint64_t orgOffset;
223 
224  std::unique_ptr<XrdCl::AnyObject> resp;
225  std::unique_ptr<XrdCl::HostList> hosts;
226  std::unique_ptr<XrdCl::XRootDStatus> st;
227 
228  std::mutex mtx;
229  bool maincall;
230  size_t retrycnt;
231  size_t nbrepair;
232 
233  };
234 
235  //----------------------------------------------------------------------------
236  // Helper callback for handling PgRead retries
237  //----------------------------------------------------------------------------
238  class PgReadRetryHandler : public XrdCl::ResponseHandler
239  {
240  public:
241 
242  PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
243  pgnb( pgnb )
244  {
245 
246  }
247 
248  //------------------------------------------------------------------------
249  // Handle the response
250  //------------------------------------------------------------------------
251  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252  XrdCl::AnyObject *response,
253  XrdCl::HostList *hostList )
254  {
255  using namespace XrdCl;
256 
257  if( !status->IsOK() )
258  {
259  Log *log = DefaultEnv::GetLog();
260  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
261  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262  pgReadHandler->HandleResponseWithHosts( status, response, hostList );
263  delete this;
264  return;
265  }
266 
267  XrdCl::PageInfo *pginf = 0;
268  response->Get( pginf );
269  if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
270  {
271  Log *log = DefaultEnv::GetLog();
272  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
273  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
274  // we retry a page at a time so the length cannot exceed 4KB
275  DeleteArgs( status, response, hostList );
276  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
277  delete this;
278  return;
279  }
280 
281  uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
282  if( crcval != pginf->GetCksums().front() )
283  {
284  Log *log = DefaultEnv::GetLog();
285  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
286  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287  DeleteArgs( status, response, hostList );
288  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
289  delete this;
290  return;
291  }
292 
293  Log *log = DefaultEnv::GetLog();
294  log->Info( FileMsg, "[%p@%s] Successfully recovered page #%zu.",
295  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
296 
297  DeleteArgs( 0, response, hostList );
298  pgReadHandler->UpdateCksum( pgnb, crcval );
299  pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
300  delete this;
301  }
302 
303  private:
304 
305  inline void DeleteArgs( XrdCl::XRootDStatus *status,
306  XrdCl::AnyObject *response,
307  XrdCl::HostList *hostList )
308  {
309  delete status;
310  delete response;
311  delete hostList;
312  }
313 
314  PgReadHandler *pgReadHandler;
315  size_t pgnb;
316  };
317 
318  //----------------------------------------------------------------------------
319  // Handle PgRead substitution with ordinary Read
320  //----------------------------------------------------------------------------
322  {
323  public:
324 
325  //------------------------------------------------------------------------
326  // Constructor
327  //------------------------------------------------------------------------
328  PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329  XrdCl::ResponseHandler *userHandler ) :
330  stateHandler( stateHandler ),
331  userHandler( userHandler )
332  {
333  }
334 
335  //------------------------------------------------------------------------
336  // Handle the response
337  //------------------------------------------------------------------------
338  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339  XrdCl::AnyObject *rdresp,
340  XrdCl::HostList *hostList )
341  {
342  using namespace XrdCl;
343 
344  if( !status->IsOK() )
345  {
346  userHandler->HandleResponseWithHosts( status, rdresp, hostList );
347  delete this;
348  return;
349  }
350 
351 
352  ChunkInfo *chunk = nullptr;
353  rdresp->Get( chunk );
354 
355  if( !chunk )
356  {
357  userHandler->HandleResponseWithHosts( status, rdresp, hostList );
358  delete this;
359  return;
360  }
361 
362  std::vector<uint32_t> cksums;
363  if( stateHandler->pIsChannelEncrypted )
364  {
365  size_t nbpages = chunk->length / XrdSys::PageSize;
366  if( chunk->length % XrdSys::PageSize )
367  ++nbpages;
368  cksums.reserve( nbpages );
369 
370  size_t size = chunk->length;
371  char *buffer = reinterpret_cast<char*>( chunk->buffer );
372 
373  for( size_t pg = 0; pg < nbpages; ++pg )
374  {
375  size_t pgsize = XrdSys::PageSize;
376  if( pgsize > size ) pgsize = size;
377  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
378  cksums.push_back( crcval );
379  buffer += pgsize;
380  size -= pgsize;
381  }
382  }
383 
384  PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
385  chunk->buffer, std::move( cksums ) );
386  delete rdresp;
387  AnyObject *response = new AnyObject();
388  response->Set( pages );
389  userHandler->HandleResponseWithHosts( status, response, hostList );
390 
391  delete this;
392  }
393 
394  private:
395 
396  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
397  XrdCl::ResponseHandler *userHandler;
398  };
399 
400  //----------------------------------------------------------------------------
401  // Object that does things to the FileStateHandler when kXR_open returns
402  // and then calls the user handler
403  //----------------------------------------------------------------------------
404  class OpenHandler: public XrdCl::ResponseHandler
405  {
406  public:
407  //------------------------------------------------------------------------
408  // Constructor
409  //------------------------------------------------------------------------
410  OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
411  XrdCl::ResponseHandler *userHandler ):
412  pStateHandler( stateHandler ),
413  pUserHandler( userHandler )
414  {
415  }
416 
417  //------------------------------------------------------------------------
418  // Handle the response
419  //------------------------------------------------------------------------
420  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
421  XrdCl::AnyObject *response,
422  XrdCl::HostList *hostList )
423  {
424  using namespace XrdCl;
425 
426  //----------------------------------------------------------------------
427  // Extract the statistics info
428  //----------------------------------------------------------------------
429  OpenInfo *openInfo = 0;
430  if( status->IsOK() )
431  response->Get( openInfo );
432 #ifdef WITH_XRDEC
433  else
434  //--------------------------------------------------------------------
435  // Handle EC redirect
436  //--------------------------------------------------------------------
437  if( status->code == errRedirect )
438  {
439  std::string ecurl = status->GetErrorMessage();
440  EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
441  if( ecHandler )
442  {
443  pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
444  ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
445  return;
446  }
447  }
448 #endif
449  //----------------------------------------------------------------------
450  // Notify the state handler and the client and say bye bye
451  //----------------------------------------------------------------------
452  pStateHandler->OnOpen( status, openInfo, hostList );
453  delete response;
454  if( pUserHandler )
455  pUserHandler->HandleResponseWithHosts( status, 0, hostList );
456  else
457  {
458  delete status;
459  delete hostList;
460  }
461  delete this;
462  }
463 
464  private:
465  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
466  XrdCl::ResponseHandler *pUserHandler;
467  };
468 
469  //----------------------------------------------------------------------------
470  // Object that does things to the FileStateHandler when kXR_close returns
471  // and then calls the user handler
472  //----------------------------------------------------------------------------
473  class CloseHandler: public XrdCl::ResponseHandler
474  {
475  public:
476  //------------------------------------------------------------------------
477  // Constructor
478  //------------------------------------------------------------------------
479  CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
480  XrdCl::ResponseHandler *userHandler,
481  XrdCl::Message *message ):
482  pStateHandler( stateHandler ),
483  pUserHandler( userHandler ),
484  pMessage( message )
485  {
486  }
487 
488  //------------------------------------------------------------------------
490  //------------------------------------------------------------------------
491  virtual ~CloseHandler()
492  {
493  delete pMessage;
494  }
495 
496  //------------------------------------------------------------------------
497  // Handle the response
498  //------------------------------------------------------------------------
499  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
500  XrdCl::AnyObject *response,
501  XrdCl::HostList *hostList )
502  {
503  pStateHandler->OnClose( status );
504  if( pUserHandler )
505  pUserHandler->HandleResponseWithHosts( status, response, hostList );
506  else
507  {
508  delete response;
509  delete status;
510  delete hostList;
511  }
512 
513  delete this;
514  }
515 
516  private:
517  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
518  XrdCl::ResponseHandler *pUserHandler;
519  XrdCl::Message *pMessage;
520  };
521 
522  //----------------------------------------------------------------------------
523  // Stateful message handler
524  //----------------------------------------------------------------------------
525  class StatefulHandler: public XrdCl::ResponseHandler
526  {
527  public:
528  //------------------------------------------------------------------------
529  // Constructor
530  //------------------------------------------------------------------------
531  StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
532  XrdCl::ResponseHandler *userHandler,
533  XrdCl::Message *message,
534  const XrdCl::MessageSendParams &sendParams ):
535  pStateHandler( stateHandler ),
536  pUserHandler( userHandler ),
537  pMessage( message ),
538  pSendParams( sendParams )
539  {
540  }
541 
542  //------------------------------------------------------------------------
543  // Destructor
544  //------------------------------------------------------------------------
545  virtual ~StatefulHandler()
546  {
547  delete pMessage;
548  delete pSendParams.chunkList;
549  delete pSendParams.kbuff;
550  }
551 
552  //------------------------------------------------------------------------
553  // Handle the response
554  //------------------------------------------------------------------------
555  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
556  XrdCl::AnyObject *response,
557  XrdCl::HostList *hostList )
558  {
559  using namespace XrdCl;
560  std::unique_ptr<AnyObject> responsePtr( response );
561  pSendParams.hostList = hostList;
562 
563  //----------------------------------------------------------------------
564  // Houston we have a problem...
565  //----------------------------------------------------------------------
566  if( !status->IsOK() )
567  {
568  XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
569  return;
570  }
571 
572  //----------------------------------------------------------------------
573  // We're clear
574  //----------------------------------------------------------------------
575  responsePtr.release();
576  XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
577  if( pUserHandler )
578  pUserHandler->HandleResponseWithHosts( status, response, hostList );
579  else
580  {
581  delete status,
582  delete response;
583  delete hostList;
584  }
585  delete this;
586  }
587 
588  //------------------------------------------------------------------------
590  //------------------------------------------------------------------------
591  XrdCl::ResponseHandler *GetUserHandler()
592  {
593  return pUserHandler;
594  }
595 
596  private:
597  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
598  XrdCl::ResponseHandler *pUserHandler;
599  XrdCl::Message *pMessage;
600  XrdCl::MessageSendParams pSendParams;
601  };
602 
603  //----------------------------------------------------------------------------
604  // Release-buffer Handler
605  //----------------------------------------------------------------------------
606  class ReleaseBufferHandler: public XrdCl::ResponseHandler
607  {
608  public:
609 
610  //------------------------------------------------------------------------
611  // Constructor
612  //------------------------------------------------------------------------
613  ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
614  buffer( std::move( buffer ) ),
615  handler( handler )
616  {
617  }
618 
619  //------------------------------------------------------------------------
620  // Handle the response
621  //------------------------------------------------------------------------
622  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
623  XrdCl::AnyObject *response,
624  XrdCl::HostList *hostList )
625  {
626  if (handler)
627  handler->HandleResponseWithHosts( status, response, hostList );
628  }
629 
630  //------------------------------------------------------------------------
631  // Get the underlying buffer
632  //------------------------------------------------------------------------
633  XrdCl::Buffer& GetBuffer()
634  {
635  return buffer;
636  }
637 
638  private:
639  XrdCl::Buffer buffer;
640  XrdCl::ResponseHandler *handler;
641  };
642 }
643 
644 namespace XrdCl
645 {
646  //----------------------------------------------------------------------------
647  // Constructor
648  //----------------------------------------------------------------------------
649  FileStateHandler::FileStateHandler( FilePlugIn *& plugin ):
650  pFileState( Closed ),
651  pStatInfo( 0 ),
652  pFileUrl( 0 ),
653  pDataServer( 0 ),
654  pLoadBalancer( 0 ),
655  pStateRedirect( 0 ),
656  pWrtRecoveryRedir( 0 ),
657  pFileHandle( 0 ),
658  pOpenMode( 0 ),
659  pOpenFlags( 0 ),
660  pSessionId( 0 ),
661  pDoRecoverRead( true ),
662  pDoRecoverWrite( true ),
663  pFollowRedirects( true ),
664  pUseVirtRedirector( true ),
665  pIsChannelEncrypted( false ),
666  pAllowBundledClose( false ),
667  pPlugin( plugin )
668  {
669  pFileHandle = new uint8_t[4];
670  ResetMonitoringVars();
673  pLFileHandler = new LocalFileHandler();
674  }
675 
676  //------------------------------------------------------------------------
681  //------------------------------------------------------------------------
682  FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
683  pFileState( Closed ),
684  pStatInfo( 0 ),
685  pFileUrl( 0 ),
686  pDataServer( 0 ),
687  pLoadBalancer( 0 ),
688  pStateRedirect( 0 ),
689  pWrtRecoveryRedir( 0 ),
690  pFileHandle( 0 ),
691  pOpenMode( 0 ),
692  pOpenFlags( 0 ),
693  pSessionId( 0 ),
694  pDoRecoverRead( true ),
695  pDoRecoverWrite( true ),
696  pFollowRedirects( true ),
697  pUseVirtRedirector( useVirtRedirector ),
698  pAllowBundledClose( false ),
699  pPlugin( plugin )
700  {
701  pFileHandle = new uint8_t[4];
702  ResetMonitoringVars();
705  pLFileHandler = new LocalFileHandler();
706  }
707 
708  //----------------------------------------------------------------------------
709  // Destructor
710  //----------------------------------------------------------------------------
712  {
713  //--------------------------------------------------------------------------
714  // This, in principle, should never ever happen. Except for the case
715  // when we're interfaced with ROOT that may call this desctructor from
716  // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
717  // has been finalized by the linker. So, if we don't have the log object
718  // at this point we just give up the hope.
719  //--------------------------------------------------------------------------
720  if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
721  DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
722 
725 
728 
729  if( pFileState != Closed && DefaultEnv::GetLog() )
730  {
731  XRootDStatus st;
732  MonitorClose( &st );
733  ResetMonitoringVars();
734  }
735 
736  // check if the logger is still there, this is only for root, as root might
737  // have unload us already so in this case we don't want to do anything
738  if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
739  {
741  registry.Release( *pFileUrl );
742  }
743 
744  delete pStatInfo;
745  delete pFileUrl;
746  delete pDataServer;
747  delete pLoadBalancer;
748  delete [] pFileHandle;
749  delete pLFileHandler;
750  }
751 
752  //----------------------------------------------------------------------------
753  // Open the file pointed to by the given URL
754  //----------------------------------------------------------------------------
755  XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
756  const std::string &url,
757  uint16_t flags,
758  uint16_t mode,
759  ResponseHandler *handler,
760  uint16_t timeout )
761  {
762  XrdSysMutexHelper scopedLock( self->pMutex );
763 
764  //--------------------------------------------------------------------------
765  // Check if we can proceed
766  //--------------------------------------------------------------------------
767  if( self->pFileState == Error )
768  return self->pStatus;
769 
770  if( self->pFileState == OpenInProgress )
772 
773  if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
774  self->pFileState == Recovering )
775  return XRootDStatus( stError, errInvalidOp );
776 
777  self->pFileState = OpenInProgress;
778 
779  //--------------------------------------------------------------------------
780  // Check if the parameters are valid
781  //--------------------------------------------------------------------------
782  Log *log = DefaultEnv::GetLog();
783 
784  if( self->pFileUrl )
785  {
786  if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
787  {
789  registry.Release( *self->pFileUrl );
790  }
791  delete self->pFileUrl;
792  self->pFileUrl = 0;
793  }
794 
795  self->pFileUrl = new URL( url );
796 
797  //--------------------------------------------------------------------------
798  // Add unique uuid to each open request so replays due to error/timeout
799  // recovery can be correctly handled.
800  //--------------------------------------------------------------------------
801  URL::ParamsMap cgi = self->pFileUrl->GetParams();
802  uuid_t uuid;
803  char requuid[37]= {0};
804  uuid_generate( uuid );
805  uuid_unparse( uuid, requuid );
806  cgi["xrdcl.requuid"] = requuid;
807  self->pFileUrl->SetParams( cgi );
808 
809  if( !self->pFileUrl->IsValid() )
810  {
811  log->Error( FileMsg, "[%p@%s] Trying to open invalid url: %s",
812  (void*)self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
813  self->pStatus = XRootDStatus( stError, errInvalidArgs );
814  self->pFileState = Closed;
815  return self->pStatus;
816  }
817 
818  //--------------------------------------------------------------------------
819  // Check if the recovery procedures should be enabled
820  //--------------------------------------------------------------------------
821  const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
822  URL::ParamsMap::const_iterator it;
823  it = urlParams.find( "xrdcl.recover-reads" );
824  if( (it != urlParams.end() && it->second == "false") ||
825  !self->pDoRecoverRead )
826  {
827  self->pDoRecoverRead = false;
828  log->Debug( FileMsg, "[%p@%s] Read recovery procedures are disabled",
829  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
830  }
831 
832  it = urlParams.find( "xrdcl.recover-writes" );
833  if( (it != urlParams.end() && it->second == "false") ||
834  !self->pDoRecoverWrite )
835  {
836  self->pDoRecoverWrite = false;
837  log->Debug( FileMsg, "[%p@%s] Write recovery procedures are disabled",
838  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
839  }
840 
841  //--------------------------------------------------------------------------
842  // Open the file
843  //--------------------------------------------------------------------------
844  log->Debug( FileMsg, "[%p@%s] Sending an open command", (void*)self.get(),
845  self->pFileUrl->GetObfuscatedURL().c_str() );
846 
847  self->pOpenMode = mode;
848  self->pOpenFlags = flags;
849  OpenHandler *openHandler = new OpenHandler( self, handler );
850 
851  Message *msg;
852  ClientOpenRequest *req;
853  std::string path = self->pFileUrl->GetPathWithFilteredParams();
854  MessageUtils::CreateRequest( msg, req, path.length() );
855 
856  req->requestid = kXR_open;
857  req->mode = mode;
858  req->options = flags | kXR_async | kXR_retstat;
859  req->dlen = path.length();
860  msg->Append( path.c_str(), path.length(), 24 );
861 
863  MessageSendParams params; params.timeout = timeout;
864  params.followRedirects = self->pFollowRedirects;
866 
867  XRootDStatus st = self->IssueRequest( *self->pFileUrl, msg, openHandler, params );
868 
869  if( !st.IsOK() )
870  {
871  delete openHandler;
872  self->pStatus = st;
873  self->pFileState = Closed;
874  return st;
875  }
876  return st;
877  }
878 
879  //----------------------------------------------------------------------------
880  // Close the file object
881  //----------------------------------------------------------------------------
882  XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
883  ResponseHandler *handler,
884  uint16_t timeout )
885  {
886  XrdSysMutexHelper scopedLock( self->pMutex );
887 
888  //--------------------------------------------------------------------------
889  // Check if we can proceed
890  //--------------------------------------------------------------------------
891  if( self->pFileState == Error )
892  return self->pStatus;
893 
894  if( self->pFileState == CloseInProgress )
896 
897  if( self->pFileState == Closed )
898  return XRootDStatus( stOK, suAlreadyDone );
899 
900  if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
901  return XRootDStatus( stError, errInvalidOp );
902 
903  if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
904  return XRootDStatus( stError, errInvalidOp );
905 
906  self->pFileState = CloseInProgress;
907 
908  Log *log = DefaultEnv::GetLog();
909  log->Debug( FileMsg, "[%p@%s] Sending a close command for handle %#x to %s",
910  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
911  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
912 
913  //--------------------------------------------------------------------------
914  // Close the file
915  //--------------------------------------------------------------------------
916  Message *msg;
917  ClientCloseRequest *req;
918  MessageUtils::CreateRequest( msg, req );
919 
920  req->requestid = kXR_close;
921  memcpy( req->fhandle, self->pFileHandle, 4 );
922 
924  msg->SetSessionId( self->pSessionId );
925  CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
926  MessageSendParams params;
927  params.timeout = timeout;
928  params.followRedirects = false;
929  params.stateful = true;
931 
932  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
933 
934  if( !st.IsOK() )
935  {
936  // an invalid-session error means the connection to the server has been
937  // closed, which in turn means that the server closed the file already
938  if( st.code == errInvalidSession || st.code == errSocketDisconnected ||
940  st.code == errPollerError || st.code == errSocketError )
941  {
942  self->pFileState = Closed;
943  ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
944  nullptr, nullptr );
946  return XRootDStatus();
947  }
948 
949  delete closeHandler;
950  self->pStatus = st;
951  self->pFileState = Error;
952  return st;
953  }
954  return st;
955  }
956 
957  //----------------------------------------------------------------------------
958  // Stat the file
959  //----------------------------------------------------------------------------
960  XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
961  bool force,
962  ResponseHandler *handler,
963  uint16_t timeout )
964  {
965  XrdSysMutexHelper scopedLock( self->pMutex );
966 
967  if( self->pFileState == Error ) return self->pStatus;
968 
969  if( self->pFileState != Opened && self->pFileState != Recovering )
970  return XRootDStatus( stError, errInvalidOp );
971 
972  //--------------------------------------------------------------------------
973  // Return the cached info
974  //--------------------------------------------------------------------------
975  if( !force )
976  {
977  AnyObject *obj = new AnyObject();
978  obj->Set( new StatInfo( *self->pStatInfo ) );
979  if (handler)
980  handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
981  return XRootDStatus();
982  }
983 
984  Log *log = DefaultEnv::GetLog();
985  log->Debug( FileMsg, "[%p@%s] Sending a stat command for handle %#x to %s",
986  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
987  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
988 
989  //--------------------------------------------------------------------------
990  // Issue a new stat request
991  // stating a file handle doesn't work (fixed in 3.2.0) so we need to
992  // stat the pat
993  //--------------------------------------------------------------------------
994  Message *msg;
995  ClientStatRequest *req;
996  std::string path = self->pFileUrl->GetPath();
997  MessageUtils::CreateRequest( msg, req );
998 
999  req->requestid = kXR_stat;
1000  memcpy( req->fhandle, self->pFileHandle, 4 );
1001 
1002  MessageSendParams params;
1003  params.timeout = timeout;
1004  params.followRedirects = false;
1005  params.stateful = true;
1007 
1009  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1010 
1011  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1012  }
1013 
1014  //----------------------------------------------------------------------------
1015  // Read a data chunk at a given offset - sync
1016  //----------------------------------------------------------------------------
1017  XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1018  uint64_t offset,
1019  uint32_t size,
1020  void *buffer,
1021  ResponseHandler *handler,
1022  uint16_t timeout )
1023  {
1024  XrdSysMutexHelper scopedLock( self->pMutex );
1025 
1026  if( self->pFileState == Error ) return self->pStatus;
1027 
1028  if( self->pFileState != Opened && self->pFileState != Recovering )
1029  return XRootDStatus( stError, errInvalidOp );
1030 
1031  Log *log = DefaultEnv::GetLog();
1032  log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1033  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1034  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1035 
1036  Message *msg;
1037  ClientReadRequest *req;
1038  MessageUtils::CreateRequest( msg, req );
1039 
1040  req->requestid = kXR_read;
1041  req->offset = offset;
1042  req->rlen = size;
1043  memcpy( req->fhandle, self->pFileHandle, 4 );
1044 
1045  ChunkList *list = new ChunkList();
1046  list->push_back( ChunkInfo( offset, size, buffer ) );
1047 
1049  MessageSendParams params;
1050  params.timeout = timeout;
1051  params.followRedirects = false;
1052  params.stateful = true;
1053  params.chunkList = list;
1055  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1056 
1057  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1058  }
1059 
1060  //------------------------------------------------------------------------
1061  // Read data pages at a given offset
1062  //------------------------------------------------------------------------
1063  XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1064  uint64_t offset,
1065  uint32_t size,
1066  void *buffer,
1067  ResponseHandler *handler,
1068  uint16_t timeout )
1069  {
1070  int issupported = true;
1071  AnyObject obj;
1073  int protver = 0;
1074  XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1075  if( st1.IsOK() && st2.IsOK() )
1076  {
1077  int *ptr = 0;
1078  obj.Get( ptr );
1079  issupported = ( ptr && (*ptr & kXR_suppgrw) ) && ( protver >= kXR_PROTPGRWVERSION );
1080  delete ptr;
1081  }
1082  else
1083  issupported = false;
1084 
1085  if( !issupported )
1086  {
1087  DefaultEnv::GetLog()->Debug( FileMsg, "[%p@%s] PgRead not supported; substituting with Read.",
1088  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1089  ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1090  auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1091  if( !st.IsOK() ) delete substitHandler;
1092  return st;
1093  }
1094 
1095  ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1096  auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1097  if( !st.IsOK() ) delete pgHandler;
1098  return st;
1099  }
1100 
1101  XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1102  uint64_t offset,
1103  uint32_t size,
1104  size_t pgnb,
1105  void *buffer,
1106  PgReadHandler *handler,
1107  uint16_t timeout )
1108  {
1109  if( size > (uint32_t)XrdSys::PageSize )
1110  return XRootDStatus( stError, errInvalidArgs, EINVAL,
1111  "PgRead retry size exceeded 4KB." );
1112 
1113  ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1114  XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1115  if( !st.IsOK() ) delete retryHandler;
1116  return st;
1117  }
1118 
1119  XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1120  uint64_t offset,
1121  uint32_t size,
1122  void *buffer,
1123  uint16_t flags,
1124  ResponseHandler *handler,
1125  uint16_t timeout )
1126  {
1127  XrdSysMutexHelper scopedLock( self->pMutex );
1128 
1129  if( self->pFileState == Error ) return self->pStatus;
1130 
1131  if( self->pFileState != Opened && self->pFileState != Recovering )
1132  return XRootDStatus( stError, errInvalidOp );
1133 
1134  Log *log = DefaultEnv::GetLog();
1135  log->Debug( FileMsg, "[%p@%s] Sending a pgread command for handle %#x to %s",
1136  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1137  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1138 
1139  Message *msg;
1140  ClientPgReadRequest *req;
1141  MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1142 
1143  req->requestid = kXR_pgread;
1144  req->offset = offset;
1145  req->rlen = size;
1146  memcpy( req->fhandle, self->pFileHandle, 4 );
1147 
1148  //--------------------------------------------------------------------------
1149  // Now adjust the message size so it can hold PgRead arguments
1150  //--------------------------------------------------------------------------
1151  req->dlen = sizeof( ClientPgReadReqArgs );
1152  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1153  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1154  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1155  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1156  args->reqflags = flags;
1157 
1158  ChunkList *list = new ChunkList();
1159  list->push_back( ChunkInfo( offset, size, buffer ) );
1160 
1162  MessageSendParams params;
1163  params.timeout = timeout;
1164  params.followRedirects = false;
1165  params.stateful = true;
1166  params.chunkList = list;
1168  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1169 
1170  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1171  }
1172 
1173  //----------------------------------------------------------------------------
1174  // Write a data chunk at a given offset - async
1175  //----------------------------------------------------------------------------
1176  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1177  uint64_t offset,
1178  uint32_t size,
1179  const void *buffer,
1180  ResponseHandler *handler,
1181  uint16_t timeout )
1182  {
1183  XrdSysMutexHelper scopedLock( self->pMutex );
1184 
1185  if( self->pFileState == Error ) return self->pStatus;
1186 
1187  if( self->pFileState != Opened && self->pFileState != Recovering )
1188  return XRootDStatus( stError, errInvalidOp );
1189 
1190  Log *log = DefaultEnv::GetLog();
1191  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1192  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1193  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1194 
1195  Message *msg;
1196  ClientWriteRequest *req;
1197  MessageUtils::CreateRequest( msg, req );
1198 
1199  req->requestid = kXR_write;
1200  req->offset = offset;
1201  req->dlen = size;
1202  memcpy( req->fhandle, self->pFileHandle, 4 );
1203 
1204  ChunkList *list = new ChunkList();
1205  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1206 
1207  MessageSendParams params;
1208  params.timeout = timeout;
1209  params.followRedirects = false;
1210  params.stateful = true;
1211  params.chunkList = list;
1212 
1214 
1216  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1217 
1218  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1219  }
1220 
1221  //----------------------------------------------------------------------------
1222  // Write a data chunk at a given offset
1223  //----------------------------------------------------------------------------
1224  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1225  uint64_t offset,
1226  Buffer &&buffer,
1227  ResponseHandler *handler,
1228  uint16_t timeout )
1229  {
1230  //--------------------------------------------------------------------------
1231  // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1232  // so fall back to normal write
1233  //--------------------------------------------------------------------------
1234  if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1235  {
1236  Log *log = DefaultEnv::GetLog();
1237  log->Info( FileMsg, "[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1238  "cannot convert it to kernel space buffer.", (void*)self.get(),
1239  self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1240 
1241  void *buff = buffer.GetBuffer();
1242  uint32_t size = buffer.GetSize();
1243  ReleaseBufferHandler *wrtHandler =
1244  new ReleaseBufferHandler( std::move( buffer ), handler );
1245  XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1246  if( !st.IsOK() )
1247  {
1248  buffer = std::move( wrtHandler->GetBuffer() );
1249  delete wrtHandler;
1250  }
1251  return st;
1252  }
1253 
1254  //--------------------------------------------------------------------------
1255  // Transfer the data from user space to kernel space
1256  //--------------------------------------------------------------------------
1257  uint32_t length = buffer.GetSize();
1258  char *ubuff = buffer.Release();
1259 
1260  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1261  ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1262  if( ret < 0 )
1263  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1264 
1265  //--------------------------------------------------------------------------
1266  // Now create a write request and enqueue it
1267  //--------------------------------------------------------------------------
1268  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1269  }
1270 
1271  //----------------------------------------------------------------------------
1272  // Write a data from a given file descriptor at a given offset - async
1273  //----------------------------------------------------------------------------
1274  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1275  uint64_t offset,
1276  uint32_t size,
1277  Optional<uint64_t> fdoff,
1278  int fd,
1279  ResponseHandler *handler,
1280  uint16_t timeout )
1281  {
1282  //--------------------------------------------------------------------------
1283  // Read the data from the file descriptor into a kernel buffer
1284  //--------------------------------------------------------------------------
1285  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1286  ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1287  XrdSys::Read( fd, *kbuff, size );
1288  if( ret < 0 )
1289  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1290 
1291  //--------------------------------------------------------------------------
1292  // Now create a write request and enqueue it
1293  //--------------------------------------------------------------------------
1294  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1295  }
1296 
1297  //----------------------------------------------------------------------------
1298  // Write number of pages at a given offset - async
1299  //----------------------------------------------------------------------------
1300  XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1301  uint64_t offset,
1302  uint32_t size,
1303  const void *buffer,
1304  std::vector<uint32_t> &cksums,
1305  ResponseHandler *handler,
1306  uint16_t timeout )
1307  {
1308  //--------------------------------------------------------------------------
1309  // Resolve timeout value
1310  //--------------------------------------------------------------------------
1311  if( timeout == 0 )
1312  {
1313  int val = DefaultRequestTimeout;
1314  XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1315  timeout = val;
1316  }
1317 
1318  //--------------------------------------------------------------------------
1319  // Validate the digest vector size
1320  //--------------------------------------------------------------------------
1321  if( cksums.empty() )
1322  {
1323  const char *data = static_cast<const char*>( buffer );
1324  XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1325  }
1326  else
1327  {
1328  size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1329  if( crc32cCnt != cksums.size() )
1330  return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1331  }
1332 
1333  //--------------------------------------------------------------------------
1334  // Create a context for PgWrite operation
1335  //--------------------------------------------------------------------------
1336  struct pgwrt_t
1337  {
1338  pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1339  {
1340  }
1341 
1342  ~pgwrt_t()
1343  {
1344  if( handler )
1345  {
1346  // if all retries were successful no error status was set
1347  if( !status ) status = new XRootDStatus();
1348  handler->HandleResponse( status, nullptr );
1349  }
1350  }
1351 
1352  static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1353  {
1354  if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1355  return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1356  }
1357 
1358  inline void SetStatus( XRootDStatus* s )
1359  {
1360  if( !status ) status = s;
1361  else delete s;
1362  }
1363 
1364  ResponseHandler *handler;
1365  XRootDStatus *status;
1366  };
1367  auto pgwrt = std::make_shared<pgwrt_t>( handler );
1368 
1369  int fLen, lLen;
1370  XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1371  uint32_t fstpglen = fLen;
1372 
1373  time_t start = ::time( nullptr );
1374  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1375  {
1376  std::unique_ptr<AnyObject> scoped( r );
1377  // if the request failed simply pass the status to the
1378  // user handler
1379  if( !s->IsOK() )
1380  {
1381  pgwrt->SetStatus( s );
1382  return; // pgwrt destructor will call the handler
1383  }
1384  // also if the request was sucessful and there were no
1385  // corrupted pages pass the status to the user handler
1386  RetryInfo *inf = nullptr;
1387  r->Get( inf );
1388  if( !inf->NeedRetry() )
1389  {
1390  pgwrt->SetStatus( s );
1391  return; // pgwrt destructor will call the handler
1392  }
1393  delete s;
1394  // first adjust the timeout value
1395  uint16_t elapsed = ::time( nullptr ) - start;
1396  if( elapsed >= timeout )
1397  {
1398  pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1399  return; // pgwrt destructor will call the handler
1400  }
1401  else timeout -= elapsed;
1402  // retransmit the corrupted pages
1403  for( size_t i = 0; i < inf->Size(); ++i )
1404  {
1405  auto tpl = inf->At( i );
1406  uint64_t pgoff = std::get<0>( tpl );
1407  uint32_t pglen = std::get<1>( tpl );
1408  const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1409  uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1410  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1411  {
1412  std::unique_ptr<AnyObject> scoped( r );
1413  // if we failed simply set the status
1414  if( !s->IsOK() )
1415  {
1416  pgwrt->SetStatus( s );
1417  return; // the destructor will call the handler
1418  }
1419  delete s;
1420  // otherwise check if the data were not corrupted again
1421  RetryInfo *inf = nullptr;
1422  r->Get( inf );
1423  if( inf->NeedRetry() ) // so we failed in the end
1424  {
1425  DefaultEnv::GetLog()->Warning( FileMsg, "[%p@%s] Failed retransmitting corrupted "
1426  "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1427  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1428  pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1429  "Failed to retransmit corrupted page" ) );
1430  }
1431  else
1432  DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Succesfuly retransmitted corrupted "
1433  "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1434  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1435  } );
1436  auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1437  if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1438  DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Retransmitting corrupted page: "
1439  "pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1440  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1441  }
1442  } );
1443 
1444  auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1445  if( !st.IsOK() )
1446  {
1447  pgwrt->handler = nullptr;
1448  delete h;
1449  }
1450  return st;
1451  }
1452 
1453  //------------------------------------------------------------------------
1454  // Write number of pages at a given offset - async
1455  //------------------------------------------------------------------------
1456  XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1457  uint64_t offset,
1458  uint32_t size,
1459  const void *buffer,
1460  uint32_t digest,
1461  ResponseHandler *handler,
1462  uint16_t timeout )
1463  {
1464  std::vector<uint32_t> cksums{ digest };
1465  return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1466  }
1467 
1468  //------------------------------------------------------------------------
1469  // Write number of pages at a given offset - async
1470  //------------------------------------------------------------------------
1471  XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1472  uint64_t offset,
1473  uint32_t size,
1474  const void *buffer,
1475  std::vector<uint32_t> &cksums,
1476  kXR_char flags,
1477  ResponseHandler *handler,
1478  uint16_t timeout )
1479  {
1480  XrdSysMutexHelper scopedLock( self->pMutex );
1481 
1482  if( self->pFileState == Error ) return self->pStatus;
1483 
1484  if( self->pFileState != Opened && self->pFileState != Recovering )
1485  return XRootDStatus( stError, errInvalidOp );
1486 
1487  Log *log = DefaultEnv::GetLog();
1488  log->Debug( FileMsg, "[%p@%s] Sending a pgwrite command for handle %#x to %s",
1489  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1490  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1491 
1492  //--------------------------------------------------------------------------
1493  // Create the message
1494  //--------------------------------------------------------------------------
1495  Message *msg;
1496  ClientPgWriteRequest *req;
1497  MessageUtils::CreateRequest( msg, req );
1498 
1499  req->requestid = kXR_pgwrite;
1500  req->offset = offset;
1501  req->dlen = size + cksums.size() * sizeof( uint32_t );
1502  req->reqflags = flags;
1503  memcpy( req->fhandle, self->pFileHandle, 4 );
1504 
1505  ChunkList *list = new ChunkList();
1506  list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1507 
1508  MessageSendParams params;
1509  params.timeout = timeout;
1510  params.followRedirects = false;
1511  params.stateful = true;
1512  params.chunkList = list;
1513  params.crc32cDigests.swap( cksums );
1514 
1516 
1518  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1519 
1520  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1521  }
1522 
1523  //----------------------------------------------------------------------------
1524  // Commit all pending disk writes - async
1525  //----------------------------------------------------------------------------
1526  XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1527  ResponseHandler *handler,
1528  uint16_t timeout )
1529  {
1530  XrdSysMutexHelper scopedLock( self->pMutex );
1531 
1532  if( self->pFileState == Error ) return self->pStatus;
1533 
1534  if( self->pFileState != Opened && self->pFileState != Recovering )
1535  return XRootDStatus( stError, errInvalidOp );
1536 
1537  Log *log = DefaultEnv::GetLog();
1538  log->Debug( FileMsg, "[%p@%s] Sending a sync command for handle %#x to %s",
1539  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1540  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1541 
1542  Message *msg;
1543  ClientSyncRequest *req;
1544  MessageUtils::CreateRequest( msg, req );
1545 
1546  req->requestid = kXR_sync;
1547  memcpy( req->fhandle, self->pFileHandle, 4 );
1548 
1549  MessageSendParams params;
1550  params.timeout = timeout;
1551  params.followRedirects = false;
1552  params.stateful = true;
1554 
1556  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1557 
1558  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1559  }
1560 
1561  //----------------------------------------------------------------------------
1562  // Truncate the file to a particular size - async
1563  //----------------------------------------------------------------------------
1564  XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1565  uint64_t size,
1566  ResponseHandler *handler,
1567  uint16_t timeout )
1568  {
1569  XrdSysMutexHelper scopedLock( self->pMutex );
1570 
1571  if( self->pFileState == Error ) return self->pStatus;
1572 
1573  if( self->pFileState != Opened && self->pFileState != Recovering )
1574  return XRootDStatus( stError, errInvalidOp );
1575 
1576  Log *log = DefaultEnv::GetLog();
1577  log->Debug( FileMsg, "[%p@%s] Sending a truncate command for handle %#x to %s",
1578  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1579  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1580 
1581  Message *msg;
1582  ClientTruncateRequest *req;
1583  MessageUtils::CreateRequest( msg, req );
1584 
1585  req->requestid = kXR_truncate;
1586  memcpy( req->fhandle, self->pFileHandle, 4 );
1587  req->offset = size;
1588 
1589  MessageSendParams params;
1590  params.timeout = timeout;
1591  params.followRedirects = false;
1592  params.stateful = true;
1594 
1596  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1597 
1598  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1599  }
1600 
1601  //----------------------------------------------------------------------------
1602  // Read scattered data chunks in one operation - async
1603  //----------------------------------------------------------------------------
1604  XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1605  const ChunkList &chunks,
1606  void *buffer,
1607  ResponseHandler *handler,
1608  uint16_t timeout )
1609  {
1610  //--------------------------------------------------------------------------
1611  // Sanity check
1612  //--------------------------------------------------------------------------
1613  XrdSysMutexHelper scopedLock( self->pMutex );
1614 
1615  if( self->pFileState == Error ) return self->pStatus;
1616 
1617  if( self->pFileState != Opened && self->pFileState != Recovering )
1618  return XRootDStatus( stError, errInvalidOp );
1619 
1620  Log *log = DefaultEnv::GetLog();
1621  log->Debug( FileMsg, "[%p@%s] Sending a vector read command for handle %#x to %s",
1622  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1623  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1624 
1625  //--------------------------------------------------------------------------
1626  // Build the message
1627  //--------------------------------------------------------------------------
1628  Message *msg;
1629  ClientReadVRequest *req;
1630  MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1631 
1632  req->requestid = kXR_readv;
1633  req->dlen = sizeof(readahead_list)*chunks.size();
1634 
1635  ChunkList *list = new ChunkList();
1636  char *cursor = (char*)buffer;
1637 
1638  //--------------------------------------------------------------------------
1639  // Copy the chunk info
1640  //--------------------------------------------------------------------------
1641  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1642  for( size_t i = 0; i < chunks.size(); ++i )
1643  {
1644  dataChunk[i].rlen = chunks[i].length;
1645  dataChunk[i].offset = chunks[i].offset;
1646  memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1647 
1648  void *chunkBuffer;
1649  if( cursor )
1650  {
1651  chunkBuffer = cursor;
1652  cursor += chunks[i].length;
1653  }
1654  else
1655  chunkBuffer = chunks[i].buffer;
1656 
1657  list->push_back( ChunkInfo( chunks[i].offset,
1658  chunks[i].length,
1659  chunkBuffer ) );
1660  }
1661 
1662  //--------------------------------------------------------------------------
1663  // Send the message
1664  //--------------------------------------------------------------------------
1665  MessageSendParams params;
1666  params.timeout = timeout;
1667  params.followRedirects = false;
1668  params.stateful = true;
1669  params.chunkList = list;
1671 
1673  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1674 
1675  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1676  }
1677 
1678  //------------------------------------------------------------------------
1679  // Write scattered data chunks in one operation - async
1680  //------------------------------------------------------------------------
1681  XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1682  const ChunkList &chunks,
1683  ResponseHandler *handler,
1684  uint16_t timeout )
1685  {
1686  //--------------------------------------------------------------------------
1687  // Sanity check
1688  //--------------------------------------------------------------------------
1689  XrdSysMutexHelper scopedLock( self->pMutex );
1690 
1691  if( self->pFileState == Error ) return self->pStatus;
1692 
1693  if( self->pFileState != Opened && self->pFileState != Recovering )
1694  return XRootDStatus( stError, errInvalidOp );
1695 
1696  Log *log = DefaultEnv::GetLog();
1697  log->Debug( FileMsg, "[%p@%s] Sending a vector write command for handle %#x to %s",
1698  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1699  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1700 
1701  //--------------------------------------------------------------------------
1702  // Determine the size of the payload
1703  //--------------------------------------------------------------------------
1704 
1705  // the size of write vector
1706  uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1707 
1708  //--------------------------------------------------------------------------
1709  // Build the message
1710  //--------------------------------------------------------------------------
1711  Message *msg;
1712  ClientWriteVRequest *req;
1713  MessageUtils::CreateRequest( msg, req, payloadSize );
1714 
1715  req->requestid = kXR_writev;
1716  req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1717 
1718  ChunkList *list = new ChunkList();
1719 
1720  //--------------------------------------------------------------------------
1721  // Copy the chunk info
1722  //--------------------------------------------------------------------------
1723  XrdProto::write_list *writeList =
1724  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1725 
1726 
1727 
1728  for( size_t i = 0; i < chunks.size(); ++i )
1729  {
1730  writeList[i].wlen = chunks[i].length;
1731  writeList[i].offset = chunks[i].offset;
1732  memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1733 
1734  list->push_back( ChunkInfo( chunks[i].offset,
1735  chunks[i].length,
1736  chunks[i].buffer ) );
1737  }
1738 
1739  //--------------------------------------------------------------------------
1740  // Send the message
1741  //--------------------------------------------------------------------------
1742  MessageSendParams params;
1743  params.timeout = timeout;
1744  params.followRedirects = false;
1745  params.stateful = true;
1746  params.chunkList = list;
1748 
1750  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1751 
1752  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1753  }
1754 
1755  //------------------------------------------------------------------------
1756  // Write scattered buffers in one operation - async
1757  //------------------------------------------------------------------------
1758  XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1759  uint64_t offset,
1760  const struct iovec *iov,
1761  int iovcnt,
1762  ResponseHandler *handler,
1763  uint16_t timeout )
1764  {
1765  XrdSysMutexHelper scopedLock( self->pMutex );
1766 
1767  if( self->pFileState == Error ) return self->pStatus;
1768 
1769  if( self->pFileState != Opened && self->pFileState != Recovering )
1770  return XRootDStatus( stError, errInvalidOp );
1771 
1772  Log *log = DefaultEnv::GetLog();
1773  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1774  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1775  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1776 
1777  Message *msg;
1778  ClientWriteRequest *req;
1779  MessageUtils::CreateRequest( msg, req );
1780 
1781  ChunkList *list = new ChunkList();
1782 
1783  uint32_t size = 0;
1784  for( int i = 0; i < iovcnt; ++i )
1785  {
1786  if( iov[i].iov_len == 0 ) continue;
1787  size += iov[i].iov_len;
1788  list->push_back( ChunkInfo( 0, iov[i].iov_len,
1789  (char*)iov[i].iov_base ) );
1790  }
1791 
1792  req->requestid = kXR_write;
1793  req->offset = offset;
1794  req->dlen = size;
1795  memcpy( req->fhandle, self->pFileHandle, 4 );
1796 
1797  MessageSendParams params;
1798  params.timeout = timeout;
1799  params.followRedirects = false;
1800  params.stateful = true;
1801  params.chunkList = list;
1802 
1804 
1806  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1807 
1808  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1809  }
1810 
1811  //------------------------------------------------------------------------
1812  // Read data into scattered buffers in one operation - async
1813  //------------------------------------------------------------------------
1814  XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1815  uint64_t offset,
1816  struct iovec *iov,
1817  int iovcnt,
1818  ResponseHandler *handler,
1819  uint16_t timeout )
1820  {
1821  XrdSysMutexHelper scopedLock( self->pMutex );
1822 
1823  if( self->pFileState == Error ) return self->pStatus;
1824 
1825  if( self->pFileState != Opened && self->pFileState != Recovering )
1826  return XRootDStatus( stError, errInvalidOp );
1827 
1828  Log *log = DefaultEnv::GetLog();
1829  log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1830  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1831  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1832 
1833  Message *msg;
1834  ClientReadRequest *req;
1835  MessageUtils::CreateRequest( msg, req );
1836 
1837  // calculate the total read size
1838  size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1839  {
1840  return acc + rhs.iov_len;
1841  } );
1842  req->requestid = kXR_read;
1843  req->offset = offset;
1844  req->rlen = size;
1845  msg->SetVirtReqID( kXR_virtReadv );
1846  memcpy( req->fhandle, self->pFileHandle, 4 );
1847 
1848  ChunkList *list = new ChunkList();
1849  list->reserve( iovcnt );
1850  uint64_t choff = offset;
1851  for( int i = 0; i < iovcnt; ++i )
1852  {
1853  list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1854  choff += iov[i].iov_len;
1855  }
1856 
1858  MessageSendParams params;
1859  params.timeout = timeout;
1860  params.followRedirects = false;
1861  params.stateful = true;
1862  params.chunkList = list;
1864  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1865 
1866  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1867  }
1868 
1869  //----------------------------------------------------------------------------
1870  // Performs a custom operation on an open file, server implementation
1871  // dependent - async
1872  //----------------------------------------------------------------------------
1873  XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1874  const Buffer &arg,
1875  ResponseHandler *handler,
1876  uint16_t timeout )
1877  {
1878  XrdSysMutexHelper scopedLock( self->pMutex );
1879 
1880  if( self->pFileState == Error ) return self->pStatus;
1881 
1882  if( self->pFileState != Opened && self->pFileState != Recovering )
1883  return XRootDStatus( stError, errInvalidOp );
1884 
1885  Log *log = DefaultEnv::GetLog();
1886  log->Debug( FileMsg, "[%p@%s] Sending a fcntl command for handle %#x to %s",
1887  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1888  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1889 
1890  Message *msg;
1891  ClientQueryRequest *req;
1892  MessageUtils::CreateRequest( msg, req, arg.GetSize() );
1893 
1894  req->requestid = kXR_query;
1895  req->infotype = kXR_Qopaqug;
1896  req->dlen = arg.GetSize();
1897  memcpy( req->fhandle, self->pFileHandle, 4 );
1898  msg->Append( arg.GetBuffer(), arg.GetSize(), 24 );
1899 
1900  MessageSendParams params;
1901  params.timeout = timeout;
1902  params.followRedirects = false;
1903  params.stateful = true;
1905 
1907  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1908 
1909  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1910  }
1911 
1912  //----------------------------------------------------------------------------
1913  // Get access token to a file - async
1914  //----------------------------------------------------------------------------
1915  XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
1916  ResponseHandler *handler,
1917  uint16_t timeout )
1918  {
1919  XrdSysMutexHelper scopedLock( self->pMutex );
1920 
1921  if( self->pFileState == Error ) return self->pStatus;
1922 
1923  if( self->pFileState != Opened && self->pFileState != Recovering )
1924  return XRootDStatus( stError, errInvalidOp );
1925 
1926  Log *log = DefaultEnv::GetLog();
1927  log->Debug( FileMsg, "[%p@%s] Sending a visa command for handle %#x to %s",
1928  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1929  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1930 
1931  Message *msg;
1932  ClientQueryRequest *req;
1933  MessageUtils::CreateRequest( msg, req );
1934 
1935  req->requestid = kXR_query;
1936  req->infotype = kXR_Qvisa;
1937  memcpy( req->fhandle, self->pFileHandle, 4 );
1938 
1939  MessageSendParams params;
1940  params.timeout = timeout;
1941  params.followRedirects = false;
1942  params.stateful = true;
1944 
1946  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1947 
1948  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1949  }
1950 
1951  //------------------------------------------------------------------------
1952  // Set extended attributes - async
1953  //------------------------------------------------------------------------
1954  XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
1955  const std::vector<xattr_t> &attrs,
1956  ResponseHandler *handler,
1957  uint16_t timeout )
1958  {
1959  XrdSysMutexHelper scopedLock( self->pMutex );
1960 
1961  if( self->pFileState == Error ) return self->pStatus;
1962 
1963  if( self->pFileState != Opened && self->pFileState != Recovering )
1964  return XRootDStatus( stError, errInvalidOp );
1965 
1966  Log *log = DefaultEnv::GetLog();
1967  log->Debug( FileMsg, "[%p@%s] Sending a fattr set command for handle %#x to %s",
1968  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1969  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1970 
1971  //--------------------------------------------------------------------------
1972  // Issue a new fattr get request
1973  //--------------------------------------------------------------------------
1974  return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
1975  }
1976 
1977  //------------------------------------------------------------------------
1978  // Get extended attributes - async
1979  //------------------------------------------------------------------------
1980  XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
1981  const std::vector<std::string> &attrs,
1982  ResponseHandler *handler,
1983  uint16_t timeout )
1984  {
1985  XrdSysMutexHelper scopedLock( self->pMutex );
1986 
1987  if( self->pFileState == Error ) return self->pStatus;
1988 
1989  if( self->pFileState != Opened && self->pFileState != Recovering )
1990  return XRootDStatus( stError, errInvalidOp );
1991 
1992  Log *log = DefaultEnv::GetLog();
1993  log->Debug( FileMsg, "[%p@%s] Sending a fattr get command for handle %#x to %s",
1994  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1995  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1996 
1997  //--------------------------------------------------------------------------
1998  // Issue a new fattr get request
1999  //--------------------------------------------------------------------------
2000  return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
2001  }
2002 
2003  //------------------------------------------------------------------------
2004  // Delete extended attributes - async
2005  //------------------------------------------------------------------------
2006  XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
2007  const std::vector<std::string> &attrs,
2008  ResponseHandler *handler,
2009  uint16_t timeout )
2010  {
2011  XrdSysMutexHelper scopedLock( self->pMutex );
2012 
2013  if( self->pFileState == Error ) return self->pStatus;
2014 
2015  if( self->pFileState != Opened && self->pFileState != Recovering )
2016  return XRootDStatus( stError, errInvalidOp );
2017 
2018  Log *log = DefaultEnv::GetLog();
2019  log->Debug( FileMsg, "[%p@%s] Sending a fattr del command for handle %#x to %s",
2020  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2021  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2022 
2023  //--------------------------------------------------------------------------
2024  // Issue a new fattr del request
2025  //--------------------------------------------------------------------------
2026  return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2027  }
2028 
2029  //------------------------------------------------------------------------
2030  // List extended attributes - async
2031  //------------------------------------------------------------------------
2032  XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2033  ResponseHandler *handler,
2034  uint16_t timeout )
2035  {
2036  XrdSysMutexHelper scopedLock( self->pMutex );
2037 
2038  if( self->pFileState == Error ) return self->pStatus;
2039 
2040  if( self->pFileState != Opened && self->pFileState != Recovering )
2041  return XRootDStatus( stError, errInvalidOp );
2042 
2043  Log *log = DefaultEnv::GetLog();
2044  log->Debug( FileMsg, "[%p@%s] Sending a fattr list command for handle %#x to %s",
2045  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2046  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2047 
2048  //--------------------------------------------------------------------------
2049  // Issue a new fattr get request
2050  //--------------------------------------------------------------------------
2051  static const std::vector<std::string> nothing;
2052  return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2053  nothing, handler, timeout );
2054  }
2055 
2056  //------------------------------------------------------------------------
2066  //------------------------------------------------------------------------
2067  XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2068  kXR_char code,
2069  ResponseHandler *handler,
2070  uint16_t timeout )
2071  {
2072  XrdSysMutexHelper scopedLock( self->pMutex );
2073 
2074  if( self->pFileState == Error ) return self->pStatus;
2075 
2076  if( self->pFileState != Opened && self->pFileState != Recovering )
2077  return XRootDStatus( stError, errInvalidOp );
2078 
2079  Log *log = DefaultEnv::GetLog();
2080  log->Debug( FileMsg, "[%p@%s] Sending a checkpoint command for handle %#x to %s",
2081  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2082  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2083 
2084  Message *msg;
2085  ClientChkPointRequest *req;
2086  MessageUtils::CreateRequest( msg, req );
2087 
2088  req->requestid = kXR_chkpoint;
2089  req->opcode = code;
2090  memcpy( req->fhandle, self->pFileHandle, 4 );
2091 
2092  MessageSendParams params;
2093  params.timeout = timeout;
2094  params.followRedirects = false;
2095  params.stateful = true;
2096 
2098 
2100  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2101 
2102  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2103  }
2104 
2105  //------------------------------------------------------------------------
2115  //------------------------------------------------------------------------
2116  XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2117  uint64_t offset,
2118  uint32_t size,
2119  const void *buffer,
2120  ResponseHandler *handler,
2121  uint16_t timeout )
2122  {
2123  XrdSysMutexHelper scopedLock( self->pMutex );
2124 
2125  if( self->pFileState == Error ) return self->pStatus;
2126 
2127  if( self->pFileState != Opened && self->pFileState != Recovering )
2128  return XRootDStatus( stError, errInvalidOp );
2129 
2130  Log *log = DefaultEnv::GetLog();
2131  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2132  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2133  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2134 
2135  Message *msg;
2136  ClientChkPointRequest *req;
2137  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2138 
2139  req->requestid = kXR_chkpoint;
2140  req->opcode = kXR_ckpXeq;
2141  req->dlen = 24; // as specified in the protocol specification
2142  memcpy( req->fhandle, self->pFileHandle, 4 );
2143 
2145  wrtreq->requestid = kXR_write;
2146  wrtreq->offset = offset;
2147  wrtreq->dlen = size;
2148  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2149 
2150  ChunkList *list = new ChunkList();
2151  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2152 
2153  MessageSendParams params;
2154  params.timeout = timeout;
2155  params.followRedirects = false;
2156  params.stateful = true;
2157  params.chunkList = list;
2158 
2160 
2162  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2163 
2164  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2165  }
2166 
2167  //------------------------------------------------------------------------
2177  //------------------------------------------------------------------------
2178  XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2179  uint64_t offset,
2180  const struct iovec *iov,
2181  int iovcnt,
2182  ResponseHandler *handler,
2183  uint16_t timeout )
2184  {
2185  XrdSysMutexHelper scopedLock( self->pMutex );
2186 
2187  if( self->pFileState == Error ) return self->pStatus;
2188 
2189  if( self->pFileState != Opened && self->pFileState != Recovering )
2190  return XRootDStatus( stError, errInvalidOp );
2191 
2192  Log *log = DefaultEnv::GetLog();
2193  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2194  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2195  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2196 
2197  Message *msg;
2198  ClientChkPointRequest *req;
2199  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2200 
2201  req->requestid = kXR_chkpoint;
2202  req->opcode = kXR_ckpXeq;
2203  req->dlen = 24; // as specified in the protocol specification
2204  memcpy( req->fhandle, self->pFileHandle, 4 );
2205 
2206  ChunkList *list = new ChunkList();
2207  uint32_t size = 0;
2208  for( int i = 0; i < iovcnt; ++i )
2209  {
2210  if( iov[i].iov_len == 0 ) continue;
2211  size += iov[i].iov_len;
2212  list->push_back( ChunkInfo( 0, iov[i].iov_len,
2213  (char*)iov[i].iov_base ) );
2214  }
2215 
2217  wrtreq->requestid = kXR_write;
2218  wrtreq->offset = offset;
2219  wrtreq->dlen = size;
2220  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2221 
2222  MessageSendParams params;
2223  params.timeout = timeout;
2224  params.followRedirects = false;
2225  params.stateful = true;
2226  params.chunkList = list;
2227 
2229 
2231  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2232 
2233  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2234  }
2235 
2236  //----------------------------------------------------------------------------
2237  // Check if the file is open
2238  //----------------------------------------------------------------------------
2240  {
2241  XrdSysMutexHelper scopedLock( pMutex );
2242 
2243  if( pFileState == Opened || pFileState == Recovering )
2244  return true;
2245  return false;
2246  }
2247 
2248  //----------------------------------------------------------------------------
2249  // Set file property
2250  //----------------------------------------------------------------------------
2251  bool FileStateHandler::SetProperty( const std::string &name,
2252  const std::string &value )
2253  {
2254  XrdSysMutexHelper scopedLock( pMutex );
2255  if( name == "ReadRecovery" )
2256  {
2257  if( value == "true" ) pDoRecoverRead = true;
2258  else pDoRecoverRead = false;
2259  return true;
2260  }
2261  else if( name == "WriteRecovery" )
2262  {
2263  if( value == "true" ) pDoRecoverWrite = true;
2264  else pDoRecoverWrite = false;
2265  return true;
2266  }
2267  else if( name == "FollowRedirects" )
2268  {
2269  if( value == "true" ) pFollowRedirects = true;
2270  else pFollowRedirects = false;
2271  return true;
2272  }
2273  else if( name == "BundledClose" )
2274  {
2275  if( value == "true" ) pAllowBundledClose = true;
2276  else pAllowBundledClose = false;
2277  return true;
2278  }
2279  return false;
2280  }
2281 
2282  //----------------------------------------------------------------------------
2283  // Get file property
2284  //----------------------------------------------------------------------------
2285  bool FileStateHandler::GetProperty( const std::string &name,
2286  std::string &value ) const
2287  {
2288  XrdSysMutexHelper scopedLock( pMutex );
2289  if( name == "ReadRecovery" )
2290  {
2291  if( pDoRecoverRead ) value = "true";
2292  else value = "false";
2293  return true;
2294  }
2295  else if( name == "WriteRecovery" )
2296  {
2297  if( pDoRecoverWrite ) value = "true";
2298  else value = "false";
2299  return true;
2300  }
2301  else if( name == "FollowRedirects" )
2302  {
2303  if( pFollowRedirects ) value = "true";
2304  else value = "false";
2305  return true;
2306  }
2307  else if( name == "DataServer" && pDataServer )
2308  { value = pDataServer->GetHostId(); return true; }
2309  else if( name == "LastURL" && pDataServer )
2310  { value = pDataServer->GetURL(); return true; }
2311  else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2312  { value = pWrtRecoveryRedir->GetHostId(); return true; }
2313  value = "";
2314  return false;
2315  }
2316 
2317  //----------------------------------------------------------------------------
2318  // Process the results of the opening operation
2319  //----------------------------------------------------------------------------
2321  const OpenInfo *openInfo,
2322  const HostList *hostList )
2323  {
2324  Log *log = DefaultEnv::GetLog();
2325  XrdSysMutexHelper scopedLock( pMutex );
2326 
2327  //--------------------------------------------------------------------------
2328  // Assign the data server and the load balancer
2329  //--------------------------------------------------------------------------
2330  std::string lastServer = pFileUrl->GetHostId();
2331  if( hostList )
2332  {
2333  delete pDataServer;
2334  delete pLoadBalancer;
2335  pLoadBalancer = 0;
2336  delete pWrtRecoveryRedir;
2337  pWrtRecoveryRedir = 0;
2338 
2339  pDataServer = new URL( hostList->back().url );
2340  pDataServer->SetParams( pFileUrl->GetParams() );
2341  if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2342  lastServer = pDataServer->GetHostId();
2343  HostList::const_iterator itC;
2344  URL::ParamsMap params = pDataServer->GetParams();
2345  for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2346  {
2347  MessageUtils::MergeCGI( params,
2348  itC->url.GetParams(),
2349  true );
2350  }
2351  pDataServer->SetParams( params );
2352 
2353  HostList::const_reverse_iterator it;
2354  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2355  if( it->loadBalancer )
2356  {
2357  pLoadBalancer = new URL( it->url );
2358  break;
2359  }
2360 
2361  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2362  if( it->flags & kXR_recoverWrts )
2363  {
2364  pWrtRecoveryRedir = new URL( it->url );
2365  break;
2366  }
2367  }
2368 
2369  log->Debug(FileMsg, "[%p@%s] Open has returned with status %s",
2370  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), status->ToStr().c_str() );
2371 
2372  if( pDataServer && !pDataServer->IsLocalFile() )
2373  {
2374  //------------------------------------------------------------------------
2375  // Check if we are using a secure connection
2376  //------------------------------------------------------------------------
2377  XrdCl::AnyObject isencobj;
2379  QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2380  if( st.IsOK() )
2381  {
2382  bool *isenc;
2383  isencobj.Get( isenc );
2384  pIsChannelEncrypted = isenc ? *isenc : false;
2385  delete isenc;
2386  }
2387  }
2388 
2389  //--------------------------------------------------------------------------
2390  // We have failed
2391  //--------------------------------------------------------------------------
2392  pStatus = *status;
2393  if( !pStatus.IsOK() || !openInfo )
2394  {
2395  log->Debug(FileMsg, "[%p@%s] Error while opening at %s: %s",
2396  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2397  pStatus.ToStr().c_str() );
2398  FailQueuedMessages( pStatus );
2399  pFileState = Error;
2400 
2401  //------------------------------------------------------------------------
2402  // Report to monitoring
2403  //------------------------------------------------------------------------
2405  if( mon )
2406  {
2408  i.file = pFileUrl;
2409  i.status = status;
2411  mon->Event( Monitor::EvErrIO, &i );
2412  }
2413  }
2414  //--------------------------------------------------------------------------
2415  // We have succeeded
2416  //--------------------------------------------------------------------------
2417  else
2418  {
2419  //------------------------------------------------------------------------
2420  // Store the response info
2421  //------------------------------------------------------------------------
2422  openInfo->GetFileHandle( pFileHandle );
2423  pSessionId = openInfo->GetSessionId();
2424  if( openInfo->GetStatInfo() )
2425  {
2426  delete pStatInfo;
2427  pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2428  }
2429 
2430  log->Debug( FileMsg, "[%p@%s] successfully opened at %s, handle: %#x, "
2431  "session id: %llu", (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
2432  pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2433  (unsigned long long) pSessionId );
2434 
2435  //------------------------------------------------------------------------
2436  // Inform the monitoring about opening success
2437  //------------------------------------------------------------------------
2438  gettimeofday( &pOpenTime, 0 );
2440  if( mon )
2441  {
2443  i.file = pFileUrl;
2444  i.dataServer = pDataServer->GetHostId();
2445  i.oFlags = pOpenFlags;
2446  i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2447  mon->Event( Monitor::EvOpen, &i );
2448  }
2449 
2450  //------------------------------------------------------------------------
2451  // Resend the queued messages if any
2452  //------------------------------------------------------------------------
2453  ReSendQueuedMessages();
2454  pFileState = Opened;
2455  }
2456  }
2457 
2458  //----------------------------------------------------------------------------
2459  // Process the results of the closing operation
2460  //----------------------------------------------------------------------------
2462  {
2463  Log *log = DefaultEnv::GetLog();
2464  XrdSysMutexHelper scopedLock( pMutex );
2465 
2466  log->Debug(FileMsg, "[%p@%s] Close returned from %s with: %s", (void*)this,
2467  pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2468  status->ToStr().c_str() );
2469 
2470  log->Dump(FileMsg, "[%p@%s] Items in the fly %zu, queued for recovery %zu",
2471  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2472 
2473  MonitorClose( status );
2474  ResetMonitoringVars();
2475 
2476  pStatus = *status;
2477  pFileState = Closed;
2478  }
2479 
2480  //----------------------------------------------------------------------------
2481  // Handle an error while sending a stateful message
2482  //----------------------------------------------------------------------------
2483  void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2484  XRootDStatus *status,
2485  Message *message,
2486  ResponseHandler *userHandler,
2487  MessageSendParams &sendParams )
2488  {
2489  //--------------------------------------------------------------------------
2490  // It may be a redirection
2491  //--------------------------------------------------------------------------
2492  if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2493  {
2494  static const std::string root = "root", xroot = "xroot", file = "file",
2495  roots = "roots", xroots = "xroots";
2496  std::string msg = status->GetErrorMessage();
2497  if( !msg.compare( 0, root.size(), root ) ||
2498  !msg.compare( 0, xroot.size(), xroot ) ||
2499  !msg.compare( 0, file.size(), file ) ||
2500  !msg.compare( 0, roots.size(), roots ) ||
2501  !msg.compare( 0, xroots.size(), xroots ) )
2502  {
2503  FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2504  return;
2505  }
2506  }
2507 
2508  //--------------------------------------------------------------------------
2509  // Handle error
2510  //--------------------------------------------------------------------------
2511  Log *log = DefaultEnv::GetLog();
2512  XrdSysMutexHelper scopedLock( self->pMutex );
2513  self->pInTheFly.erase( message );
2514 
2515  log->Dump( FileMsg, "[%p@%s] File state error encountered. Message %s "
2516  "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2517  message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2518 
2519  //--------------------------------------------------------------------------
2520  // Report to monitoring
2521  //--------------------------------------------------------------------------
2523  if( mon )
2524  {
2526  i.file = self->pFileUrl;
2527  i.status = status;
2528 
2529  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2530  switch( req->header.requestid )
2531  {
2532  case kXR_read: i.opCode = Monitor::ErrorInfo::ErrRead; break;
2538  default: i.opCode = Monitor::ErrorInfo::ErrUnc;
2539  }
2540 
2541  mon->Event( Monitor::EvErrIO, &i );
2542  }
2543 
2544  //--------------------------------------------------------------------------
2545  // The message is not recoverable
2546  // (message using a kernel buffer is not recoverable by definition)
2547  //--------------------------------------------------------------------------
2548  if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2549  {
2550  log->Error( FileMsg, "[%p@%s] Fatal file state error. Message %s "
2551  "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2552  message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2553 
2554  self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2555  delete status;
2556  return;
2557  }
2558 
2559  //--------------------------------------------------------------------------
2560  // Insert the message to the recovery queue and start the recovery
2561  // procedure if we don't have any more message in the fly
2562  //--------------------------------------------------------------------------
2563  self->pCloseReason = *status;
2564  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2565  delete status;
2566  }
2567 
2568  //----------------------------------------------------------------------------
2569  // Handle stateful redirect
2570  //----------------------------------------------------------------------------
2571  void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2572  const std::string &redirectUrl,
2573  Message *message,
2574  ResponseHandler *userHandler,
2575  MessageSendParams &sendParams )
2576  {
2577  XrdSysMutexHelper scopedLock( self->pMutex );
2578  self->pInTheFly.erase( message );
2579 
2580  //--------------------------------------------------------------------------
2581  // Register the state redirect url and append the new cgi information to
2582  // the file URL
2583  //--------------------------------------------------------------------------
2584  if( !self->pStateRedirect )
2585  {
2586  std::ostringstream o;
2587  self->pStateRedirect = new URL( redirectUrl );
2588  URL::ParamsMap params = self->pFileUrl->GetParams();
2589  MessageUtils::MergeCGI( params,
2590  self->pStateRedirect->GetParams(),
2591  false );
2592  self->pFileUrl->SetParams( params );
2593  }
2594 
2595  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2596  }
2597 
2598  //----------------------------------------------------------------------------
2599  // Handle stateful response
2600  //----------------------------------------------------------------------------
2601  void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2602  XRootDStatus *status,
2603  Message *message,
2604  AnyObject *response,
2605  HostList */*urlList*/ )
2606  {
2607  Log *log = DefaultEnv::GetLog();
2608  XrdSysMutexHelper scopedLock( self->pMutex );
2609 
2610  log->Dump( FileMsg, "[%p@%s] Got state response for message %s",
2611  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2612  message->GetObfuscatedDescription().c_str() );
2613 
2614  //--------------------------------------------------------------------------
2615  // Since this message may be the last "in-the-fly" and no recovery
2616  // is done if messages are in the fly, we may need to trigger recovery
2617  //--------------------------------------------------------------------------
2618  self->pInTheFly.erase( message );
2619  RunRecovery( self );
2620 
2621  //--------------------------------------------------------------------------
2622  // Play with the actual response before returning it. This is a good
2623  // place to do caching in the future.
2624  //--------------------------------------------------------------------------
2625  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2626  switch( req->header.requestid )
2627  {
2628  //------------------------------------------------------------------------
2629  // Cache the stat response
2630  //------------------------------------------------------------------------
2631  case kXR_stat:
2632  {
2633  StatInfo *info = 0;
2634  response->Get( info );
2635  delete self->pStatInfo;
2636  self->pStatInfo = new StatInfo( *info );
2637  break;
2638  }
2639 
2640  //------------------------------------------------------------------------
2641  // Handle read response
2642  //------------------------------------------------------------------------
2643  case kXR_read:
2644  {
2645  ++self->pRCount;
2646  self->pRBytes += req->read.rlen;
2647  break;
2648  }
2649 
2650  //------------------------------------------------------------------------
2651  // Handle read response
2652  //------------------------------------------------------------------------
2653  case kXR_pgread:
2654  {
2655  ++self->pRCount;
2656  self->pRBytes += req->pgread.rlen;
2657  break;
2658  }
2659 
2660  //------------------------------------------------------------------------
2661  // Handle readv response
2662  //------------------------------------------------------------------------
2663  case kXR_readv:
2664  {
2665  ++self->pVRCount;
2666  size_t segs = req->header.dlen/sizeof(readahead_list);
2667  readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2668  for( size_t i = 0; i < segs; ++i )
2669  self->pVRBytes += dataChunk[i].rlen;
2670  self->pVSegs += segs;
2671  break;
2672  }
2673 
2674  //------------------------------------------------------------------------
2675  // Handle write response
2676  //------------------------------------------------------------------------
2677  case kXR_write:
2678  {
2679  ++self->pWCount;
2680  self->pWBytes += req->write.dlen;
2681  break;
2682  }
2683 
2684  //------------------------------------------------------------------------
2685  // Handle write response
2686  //------------------------------------------------------------------------
2687  case kXR_pgwrite:
2688  {
2689  ++self->pWCount;
2690  self->pWBytes += req->pgwrite.dlen;
2691  break;
2692  }
2693 
2694  //------------------------------------------------------------------------
2695  // Handle writev response
2696  //------------------------------------------------------------------------
2697  case kXR_writev:
2698  {
2699  ++self->pVWCount;
2700  size_t size = req->header.dlen/sizeof(readahead_list);
2701  XrdProto::write_list *wrtList =
2702  reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2703  for( size_t i = 0; i < size; ++i )
2704  self->pVWBytes += wrtList[i].wlen;
2705  break;
2706  }
2707  };
2708  }
2709 
2710  //------------------------------------------------------------------------
2712  //------------------------------------------------------------------------
2713  void FileStateHandler::Tick( time_t now )
2714  {
2715  if (pMutex.CondLock())
2716  {TimeOutRequests( now );
2717  pMutex.UnLock();
2718  }
2719  }
2720 
2721  //----------------------------------------------------------------------------
2722  // Declare timeout on requests being recovered
2723  //----------------------------------------------------------------------------
2725  {
2726  if( !pToBeRecovered.empty() )
2727  {
2728  Log *log = DefaultEnv::GetLog();
2729  log->Dump( FileMsg, "[%p@%s] Got a timer event", (void*)this,
2730  pFileUrl->GetObfuscatedURL().c_str() );
2731  RequestList::iterator it;
2733  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2734  {
2735  if( it->params.expires <= now )
2736  {
2737  jobMan->QueueJob( new ResponseJob(
2738  it->handler,
2740  0, it->params.hostList ) );
2741  it = pToBeRecovered.erase( it );
2742  }
2743  else
2744  ++it;
2745  }
2746  }
2747  }
2748 
2749  //----------------------------------------------------------------------------
2750  // Called in the child process after the fork
2751  //----------------------------------------------------------------------------
2753  {
2754  Log *log = DefaultEnv::GetLog();
2755 
2756  if( pFileState == Closed || pFileState == Error )
2757  return;
2758 
2759  if( (IsReadOnly() && pDoRecoverRead) ||
2760  (!IsReadOnly() && pDoRecoverWrite) )
2761  {
2762  log->Debug( FileMsg, "[%p@%s] Putting the file in recovery state in "
2763  "process %d", (void*)this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2764  pFileState = Recovering;
2765  pInTheFly.clear();
2766  pToBeRecovered.clear();
2767  }
2768  else
2769  pFileState = Error;
2770  }
2771 
2772  //------------------------------------------------------------------------
2773  // Try other data server
2774  //------------------------------------------------------------------------
2775  XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, uint16_t timeout )
2776  {
2777  XrdSysMutexHelper scopedLock( self->pMutex );
2778 
2779  if( self->pFileState != Opened || !self->pLoadBalancer )
2780  return XRootDStatus( stError, errInvalidOp );
2781 
2782  self->pFileState = Recovering;
2783 
2784  Log *log = DefaultEnv::GetLog();
2785  log->Debug( FileMsg, "[%p@%s] Reopen file at next data server.",
2786  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2787 
2788  // merge CGI
2789  auto lbcgi = self->pLoadBalancer->GetParams();
2790  auto dtcgi = self->pDataServer->GetParams();
2791  MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2792  // update tried CGI
2793  auto itr = lbcgi.find( "tried" );
2794  if( itr == lbcgi.end() )
2795  lbcgi["tried"] = self->pDataServer->GetHostName();
2796  else
2797  {
2798  std::string tried = itr->second;
2799  tried += "," + self->pDataServer->GetHostName();
2800  lbcgi["tried"] = tried;
2801  }
2802  self->pLoadBalancer->SetParams( lbcgi );
2803 
2804  return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2805  }
2806 
2807  //------------------------------------------------------------------------
2808  // Generic implementation of xattr operation
2809  //------------------------------------------------------------------------
2810  template<typename T>
2811  Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2812  kXR_char subcode,
2813  kXR_char options,
2814  const std::vector<T> &attrs,
2815  ResponseHandler *handler,
2816  uint16_t timeout )
2817  {
2818  //--------------------------------------------------------------------------
2819  // Issue a new fattr request
2820  //--------------------------------------------------------------------------
2821  Message *msg;
2822  ClientFattrRequest *req;
2823  MessageUtils::CreateRequest( msg, req );
2824 
2825  req->requestid = kXR_fattr;
2826  req->subcode = subcode;
2827  req->numattr = attrs.size();
2828  req->options = options;
2829  memcpy( req->fhandle, self->pFileHandle, 4 );
2830  XRootDStatus st = MessageUtils::CreateXAttrBody( msg, attrs );
2831  if( !st.IsOK() ) return st;
2832 
2833  MessageSendParams params;
2834  params.timeout = timeout;
2835  params.followRedirects = false;
2836  params.stateful = true;
2838 
2840  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2841 
2842  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2843  }
2844 
2845  //----------------------------------------------------------------------------
2846  // Send a message to a host or put it in the recovery queue
2847  //----------------------------------------------------------------------------
2848  Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2849  const URL &url,
2850  Message *msg,
2851  ResponseHandler *handler,
2852  MessageSendParams &sendParams )
2853  {
2854  //--------------------------------------------------------------------------
2855  // Recovering
2856  //--------------------------------------------------------------------------
2857  if( self->pFileState == Recovering )
2858  {
2859  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2860  }
2861 
2862  //--------------------------------------------------------------------------
2863  // Trying to send
2864  //--------------------------------------------------------------------------
2865  if( self->pFileState == Opened )
2866  {
2867  msg->SetSessionId( self->pSessionId );
2868  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2869 
2870  //------------------------------------------------------------------------
2871  // Invalid session id means that the connection has been broken while we
2872  // were idle so we haven't been informed about this fact earlier.
2873  //------------------------------------------------------------------------
2874  if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
2875  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2876 
2877  if( st.IsOK() )
2878  self->pInTheFly.insert(msg);
2879  else
2880  delete handler;
2881  return st;
2882  }
2883  return Status( stError, errInvalidOp );
2884  }
2885 
2886  //----------------------------------------------------------------------------
2887  // Check if the stateful error is recoverable
2888  //----------------------------------------------------------------------------
2889  bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
2890  {
2891  const auto recoverable_errors = {
2895  errInternal,
2896  errTlsError,
2898  };
2899 
2900  if (pDoRecoverRead || pDoRecoverWrite)
2901  for (const auto error : recoverable_errors)
2902  if (status.code == error)
2903  return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
2904 
2905  return false;
2906  }
2907 
2908  //----------------------------------------------------------------------------
2909  // Check if the file is open for read only
2910  //----------------------------------------------------------------------------
2911  bool FileStateHandler::IsReadOnly() const
2912  {
2913  if( (pOpenFlags & kXR_open_read) && !(pOpenFlags & kXR_open_updt) &&
2914  !(pOpenFlags & kXR_open_apnd ) )
2915  return true;
2916  return false;
2917  }
2918 
2919  //----------------------------------------------------------------------------
2920  // Recover a message
2921  //----------------------------------------------------------------------------
2922  Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
2923  RequestData rd,
2924  bool callbackOnFailure )
2925  {
2926  self->pFileState = Recovering;
2927 
2928  Log *log = DefaultEnv::GetLog();
2929  log->Dump( FileMsg, "[%p@%s] Putting message %s in the recovery list",
2930  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2931  rd.request->GetObfuscatedDescription().c_str() );
2932 
2933  Status st = RunRecovery( self );
2934  if( st.IsOK() )
2935  {
2936  self->pToBeRecovered.push_back( rd );
2937  return st;
2938  }
2939 
2940  if( callbackOnFailure )
2941  self->FailMessage( rd, st );
2942 
2943  return st;
2944  }
2945 
2946  //----------------------------------------------------------------------------
2947  // Run the recovery procedure if appropriate
2948  //----------------------------------------------------------------------------
2949  Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
2950  {
2951  if( self->pFileState != Recovering )
2952  return Status();
2953 
2954  if( !self->pInTheFly.empty() )
2955  return Status();
2956 
2957  Log *log = DefaultEnv::GetLog();
2958  log->Debug( FileMsg, "[%p@%s] Running the recovery procedure", (void*)self.get(),
2959  self->pFileUrl->GetObfuscatedURL().c_str() );
2960 
2961  Status st;
2962  if( self->pStateRedirect )
2963  {
2964  SendClose( self, 0 );
2965  st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
2966  delete self->pStateRedirect; self->pStateRedirect = 0;
2967  }
2968  else if( self->IsReadOnly() && self->pLoadBalancer )
2969  st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
2970  else
2971  st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
2972 
2973  if( !st.IsOK() )
2974  {
2975  self->pFileState = Error;
2976  self->pStatus = st;
2977  self->FailQueuedMessages( st );
2978  }
2979 
2980  return st;
2981  }
2982 
2983  //----------------------------------------------------------------------------
2984  // Send a close and ignore the response
2985  //----------------------------------------------------------------------------
2986  XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
2987  uint16_t timeout )
2988  {
2989  Message *msg;
2990  ClientCloseRequest *req;
2991  MessageUtils::CreateRequest( msg, req );
2992 
2993  req->requestid = kXR_close;
2994  memcpy( req->fhandle, self->pFileHandle, 4 );
2995 
2997  msg->SetSessionId( self->pSessionId );
2999  [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
3000  MessageSendParams params;
3001  params.timeout = timeout;
3002  params.followRedirects = false;
3003  params.stateful = true;
3004 
3006 
3007  return self->IssueRequest( *self->pDataServer, msg, handler, params );
3008  }
3009 
3010  //----------------------------------------------------------------------------
3011  // Re-open the current file at a given server
3012  //----------------------------------------------------------------------------
3013  XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3014  const URL &url,
3015  uint16_t timeout )
3016  {
3017  Log *log = DefaultEnv::GetLog();
3018  log->Dump( FileMsg, "[%p@%s] Sending a recovery open command to %s",
3019  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.GetObfuscatedURL().c_str() );
3020 
3021  //--------------------------------------------------------------------------
3022  // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3023  // procedure to delete a file that has been partially updated or fail it
3024  // because a partially uploaded file already exists.
3025  //--------------------------------------------------------------------------
3026  if( self->pOpenFlags & kXR_delete)
3027  {
3028  self->pOpenFlags &= ~kXR_delete;
3029  self->pOpenFlags |= kXR_open_updt;
3030  }
3031 
3032  self->pOpenFlags &= ~kXR_new;
3033 
3034  Message *msg;
3035  ClientOpenRequest *req;
3036  URL u = url;
3037 
3038  if( url.GetPath().empty() )
3039  u.SetPath( self->pFileUrl->GetPath() );
3040 
3041  std::string path = u.GetPathWithFilteredParams();
3042  MessageUtils::CreateRequest( msg, req, path.length() );
3043 
3044  req->requestid = kXR_open;
3045  req->mode = self->pOpenMode;
3046  req->options = self->pOpenFlags;
3047  req->dlen = path.length();
3048  msg->Append( path.c_str(), path.length(), 24 );
3049 
3050  // create a new reopen handler
3051  // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3052  // until we know that 'SendMessage' was successful)
3053  OpenHandler *openHandler = new OpenHandler( self, 0 );
3054  MessageSendParams params; params.timeout = timeout;
3057 
3058  //--------------------------------------------------------------------------
3059  // Issue the open request
3060  //--------------------------------------------------------------------------
3061  XRootDStatus st = self->IssueRequest( url, msg, openHandler, params );
3062 
3063  // if there was a problem destroy the open handler
3064  if( !st.IsOK() )
3065  {
3066  delete openHandler;
3067  self->pStatus = st;
3068  self->pFileState = Closed;
3069  }
3070  return st;
3071  }
3072 
3073  //------------------------------------------------------------------------
3074  // Fail a message
3075  //------------------------------------------------------------------------
3076  void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3077  {
3078  Log *log = DefaultEnv::GetLog();
3079  log->Dump( FileMsg, "[%p@%s] Failing message %s with %s",
3080  (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3081  rd.request->GetObfuscatedDescription().c_str(),
3082  status.ToStr().c_str() );
3083 
3084  StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3085  if( !sh )
3086  {
3087  Log *log = DefaultEnv::GetLog();
3088  log->Error( FileMsg, "[%p@%s] Internal error while recovering %s",
3089  (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3090  rd.request->GetObfuscatedDescription().c_str() );
3091  return;
3092  }
3093 
3095  ResponseHandler *userHandler = sh->GetUserHandler();
3096  jobMan->QueueJob( new ResponseJob(
3097  userHandler,
3098  new XRootDStatus( status ),
3099  0, rd.params.hostList ) );
3100 
3101  delete sh;
3102  }
3103 
3104  //----------------------------------------------------------------------------
3105  // Fail queued messages
3106  //----------------------------------------------------------------------------
3107  void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3108  {
3109  RequestList::iterator it;
3110  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3111  FailMessage( *it, status );
3112  pToBeRecovered.clear();
3113  }
3114 
3115  //------------------------------------------------------------------------
3116  // Re-send queued messages
3117  //------------------------------------------------------------------------
3118  void FileStateHandler::ReSendQueuedMessages()
3119  {
3120  RequestList::iterator it;
3121  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3122  {
3123  it->request->SetSessionId( pSessionId );
3124  ReWriteFileHandle( it->request );
3125  XRootDStatus st = IssueRequest( *pDataServer, it->request,
3126  it->handler, it->params );
3127  if( !st.IsOK() )
3128  FailMessage( *it, st );
3129  }
3130  pToBeRecovered.clear();
3131  }
3132 
3133  //------------------------------------------------------------------------
3134  // Re-write file handle
3135  //------------------------------------------------------------------------
3136  void FileStateHandler::ReWriteFileHandle( Message *msg )
3137  {
3139  switch( hdr->requestid )
3140  {
3141  case kXR_read:
3142  {
3144  memcpy( req->fhandle, pFileHandle, 4 );
3145  break;
3146  }
3147  case kXR_write:
3148  {
3150  memcpy( req->fhandle, pFileHandle, 4 );
3151  break;
3152  }
3153  case kXR_sync:
3154  {
3156  memcpy( req->fhandle, pFileHandle, 4 );
3157  break;
3158  }
3159  case kXR_truncate:
3160  {
3162  memcpy( req->fhandle, pFileHandle, 4 );
3163  break;
3164  }
3165  case kXR_readv:
3166  {
3168  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3169  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3170  memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3171  break;
3172  }
3173  case kXR_writev:
3174  {
3175  ClientWriteVRequest *req =
3176  reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3177  XrdProto::write_list *wrtList =
3178  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3179  size_t size = req->dlen / sizeof(XrdProto::write_list);
3180  for( size_t i = 0; i < size; ++i )
3181  memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3182  break;
3183  }
3184  case kXR_pgread:
3185  {
3187  memcpy( req->fhandle, pFileHandle, 4 );
3188  break;
3189  }
3190  case kXR_pgwrite:
3191  {
3193  memcpy( req->fhandle, pFileHandle, 4 );
3194  break;
3195  }
3196  }
3197 
3198  Log *log = DefaultEnv::GetLog();
3199  log->Dump( FileMsg, "[%p@%s] Rewritten file handle for %s to %#x",
3200  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), msg->GetObfuscatedDescription().c_str(),
3201  *((uint32_t*)pFileHandle) );
3203  }
3204 
3205  //----------------------------------------------------------------------------
3206  // Dispatch monitoring information on close
3207  //----------------------------------------------------------------------------
3208  void FileStateHandler::MonitorClose( const XRootDStatus *status )
3209  {
3211  if( mon )
3212  {
3214  i.file = pFileUrl;
3215  i.oTOD = pOpenTime;
3216  gettimeofday( &i.cTOD, 0 );
3217  i.rBytes = pRBytes;
3218  i.vrBytes = pVRBytes;
3219  i.wBytes = pWBytes;
3220  i.vwBytes = pVWBytes;
3221  i.vSegs = pVSegs;
3222  i.rCount = pRCount;
3223  i.vCount = pVRCount;
3224  i.wCount = pWCount;
3225  i.status = status;
3226  mon->Event( Monitor::EvClose, &i );
3227  }
3228  }
3229 
3230  XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3231  Message *msg,
3232  ResponseHandler *handler,
3233  MessageSendParams &sendParams )
3234  {
3235  // first handle Metalinks
3236  if( pUseVirtRedirector && url.IsMetalink() )
3237  return MessageUtils::RedirectMessage( url, msg, handler,
3238  sendParams, pLFileHandler );
3239 
3240  // than local file access
3241  if( url.IsLocalFile() )
3242  return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3243 
3244  // and finally ordinary XRootD requests
3245  return MessageUtils::SendMessage( url, msg, handler,
3246  sendParams, pLFileHandler );
3247  }
3248 
3249  //------------------------------------------------------------------------
3250  // Send a write request with payload being stored in a kernel buffer
3251  //------------------------------------------------------------------------
3252  XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3253  uint64_t offset,
3254  uint32_t length,
3255  std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3256  ResponseHandler *handler,
3257  uint16_t timeout )
3258  {
3259  //--------------------------------------------------------------------------
3260  // Create the write request
3261  //--------------------------------------------------------------------------
3262  XrdSysMutexHelper scopedLock( self->pMutex );
3263 
3264  if( self->pFileState != Opened && self->pFileState != Recovering )
3265  return XRootDStatus( stError, errInvalidOp );
3266 
3267  Log *log = DefaultEnv::GetLog();
3268  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
3269  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3270  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3271 
3272  Message *msg;
3273  ClientWriteRequest *req;
3274  MessageUtils::CreateRequest( msg, req );
3275 
3276  req->requestid = kXR_write;
3277  req->offset = offset;
3278  req->dlen = length;
3279  memcpy( req->fhandle, self->pFileHandle, 4 );
3280 
3281  MessageSendParams params;
3282  params.timeout = timeout;
3283  params.followRedirects = false;
3284  params.stateful = true;
3285  params.kbuff = kbuff.release();
3286  params.chunkList = new ChunkList();
3287 
3289 
3291  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3292 
3293  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3294  }
3295 }
kXR_unt16 requestid
Definition: XProtocol.hh:479
kXR_unt16 requestid
Definition: XProtocol.hh:630
kXR_unt16 requestid
Definition: XProtocol.hh:806
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
#define kXR_suppgrw
Definition: XProtocol.hh:1174
kXR_char fhandle[4]
Definition: XProtocol.hh:531
kXR_char fhandle[4]
Definition: XProtocol.hh:782
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:861
kXR_char fhandle[4]
Definition: XProtocol.hh:807
kXR_char fhandle[4]
Definition: XProtocol.hh:771
kXR_int64 offset
Definition: XProtocol.hh:646
kXR_unt16 requestid
Definition: XProtocol.hh:644
@ kXR_virtReadv
Definition: XProtocol.hh:150
kXR_unt16 options
Definition: XProtocol.hh:481
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:862
kXR_unt16 requestid
Definition: XProtocol.hh:228
@ kXR_async
Definition: XProtocol.hh:458
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_open_read
Definition: XProtocol.hh:456
@ kXR_open_updt
Definition: XProtocol.hh:457
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_open_apnd
Definition: XProtocol.hh:462
@ kXR_retstat
Definition: XProtocol.hh:463
struct ClientRequestHdr header
Definition: XProtocol.hh:846
kXR_char fhandle[4]
Definition: XProtocol.hh:509
#define kXR_recoverWrts
Definition: XProtocol.hh:1166
kXR_unt16 infotype
Definition: XProtocol.hh:631
kXR_char fhandle[4]
Definition: XProtocol.hh:645
kXR_char fhandle[4]
Definition: XProtocol.hh:229
kXR_unt16 requestid
Definition: XProtocol.hh:157
kXR_char fhandle[4]
Definition: XProtocol.hh:633
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
struct ClientReadRequest read
Definition: XProtocol.hh:867
kXR_int32 rlen
Definition: XProtocol.hh:660
kXR_unt16 requestid
Definition: XProtocol.hh:768
kXR_int32 dlen
Definition: XProtocol.hh:483
kXR_char fhandle[4]
Definition: XProtocol.hh:794
kXR_unt16 mode
Definition: XProtocol.hh:480
kXR_unt16 requestid
Definition: XProtocol.hh:508
kXR_unt16 requestid
Definition: XProtocol.hh:781
kXR_char fhandle[4]
Definition: XProtocol.hh:204
kXR_int64 offset
Definition: XProtocol.hh:661
#define kXR_PROTPGRWVERSION
Definition: XProtocol.hh:73
kXR_int64 offset
Definition: XProtocol.hh:808
struct ClientWriteRequest write
Definition: XProtocol.hh:876
kXR_int32 rlen
Definition: XProtocol.hh:647
kXR_unt16 requestid
Definition: XProtocol.hh:670
@ kXR_Qopaqug
Definition: XProtocol.hh:625
@ kXR_Qvisa
Definition: XProtocol.hh:622
kXR_int32 dlen
Definition: XProtocol.hh:159
unsigned char kXR_char
Definition: XPtypes.hh:65
static int mapError(int rc)
Definition: XProtocol.hh:1362
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
Definition: XrdClBuffer.hh:34
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
Definition: XrdClBuffer.hh:164
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
An interface for file plug-ins.
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, uint16_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, uint16_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Closed
The file is closed.
@ Opened
Opening has succeeded.
@ Error
Opening has failed.
@ Recovering
Recovering from an error.
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool IsOpen() const
Check if the file is open.
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, uint16_t timeout)
Try other data server.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:465
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:402
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition: XrdClURL.cc:331
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition: XrdClURL.cc:498
void SetPath(const std::string &path)
Set the path.
Definition: XrdClURL.hh:225
bool IsLocalFile() const
Definition: XrdClURL.cc:474
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
Definition: XrdClUtils.hh:235
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
Definition: XrdClStatus.hh:76
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t errPollerError
Definition: XrdClStatus.hh:75
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInProgress
Definition: XrdClStatus.hh:59
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint64_t FileMsg
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
Definition: XrdClStatus.hh:78
const uint16_t errSocketError
Definition: XrdClStatus.hh:72
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errSocketDisconnected
Definition: XrdClStatus.hh:74
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition: XProtocol.hh:298
kXR_char fhandle[4]
Definition: XProtocol.hh:288
kXR_unt16 requestid
Definition: XProtocol.hh:287
kXR_unt16 requestid
Definition: XProtocol.hh:820
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Describe a file close event.
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted