XRootD
XrdClClassicCopyJob.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClDefaultEnv.hh"
29 #include "XrdCl/XrdClFile.hh"
30 #include "XrdCl/XrdClMonitor.hh"
31 #include "XrdCl/XrdClUtils.hh"
33 #include "XrdCks/XrdCksCalc.hh"
35 #include "XrdCl/XrdClZipArchive.hh"
37 #include "XrdCl/XrdClPostMaster.hh"
38 #include "XrdCl/XrdClJobManager.hh"
40 #include "XrdClXCpCtx.hh"
42 #include "XrdSys/XrdSysE2T.hh"
43 #include "XrdSys/XrdSysPthread.hh"
44 
45 #include <memory>
46 #include <mutex>
47 #include <queue>
48 #include <algorithm>
49 #include <chrono>
50 #include <thread>
51 #include <vector>
52 
53 #include <sys/types.h>
54 #include <sys/stat.h>
55 #include <fcntl.h>
56 #include <cerrno>
57 #include <unistd.h>
58 
59 #if __cplusplus < 201103L
60 #include <ctime>
61 #endif
62 
63 namespace
64 {
65  //----------------------------------------------------------------------------
67  //----------------------------------------------------------------------------
68  template<typename U = std::ratio<1, 1>>
69  class mytimer_t
70  {
71  public:
72  mytimer_t() : start( clock_t::now() ){ }
73  void reset(){ start = clock_t::now(); }
74  uint64_t elapsed() const
75  {
76  return std::chrono::duration_cast<unit_t>( clock_t::now() - start ).count();
77  }
78  private:
79  typedef std::chrono::high_resolution_clock clock_t;
80  typedef std::chrono::duration<uint64_t, U> unit_t;
81  std::chrono::time_point<clock_t> start;
82  };
83 
84  using timer_sec_t = mytimer_t<>;
85  using timer_nsec_t = mytimer_t<std::nano>;
86 
87 
88  inline XrdCl::XRootDStatus Translate( std::vector<XrdCl::XAttr> &in,
89  std::vector<XrdCl::xattr_t> &out )
90  {
91  std::vector<XrdCl::xattr_t> ret;
92  ret.reserve( in.size() );
93  std::vector<XrdCl::XAttr>::iterator itr = in.begin();
94  for( ; itr != in.end() ; ++itr )
95  {
96  if( !itr->status.IsOK() ) return itr->status;
97  XrdCl::xattr_t xa( itr->name, itr->value );
98  ret.push_back( std::move( xa ) );
99  }
100  out.swap( ret );
101  return XrdCl::XRootDStatus();
102  }
103 
104  //----------------------------------------------------------------------------
106  //----------------------------------------------------------------------------
108  std::vector<XrdCl::xattr_t> &xattrs )
109  {
110  std::vector<XrdCl::XAttr> rsp;
111  XrdCl::XRootDStatus st = file.ListXAttr( rsp );
112  if( !st.IsOK() ) return st;
113  return Translate( rsp, xattrs );
114  }
115 
116  //----------------------------------------------------------------------------
118  //----------------------------------------------------------------------------
119  inline XrdCl::XRootDStatus GetXAttr( const std::string &url,
120  std::vector<XrdCl::xattr_t> &xattrs )
121  {
122  XrdCl::URL u( url );
123  XrdCl::FileSystem fs( u );
124  std::vector<XrdCl::XAttr> rsp;
125  XrdCl::XRootDStatus st = fs.ListXAttr( u.GetPath(), rsp );
126  if( !st.IsOK() ) return st;
127  return Translate( rsp, xattrs );
128  }
129 
131  const std::vector<XrdCl::xattr_t> &xattrs )
132  {
133  std::vector<XrdCl::XAttrStatus> rsp;
134  file.SetXAttr( xattrs, rsp );
135  std::vector<XrdCl::XAttrStatus>::iterator itr = rsp.begin();
136  for( ; itr != rsp.end() ; ++itr )
137  if( !itr->status.IsOK() ) return itr->status;
138  return XrdCl::XRootDStatus();
139  }
140 
141  //----------------------------------------------------------------------------
143  //----------------------------------------------------------------------------
144  class Source
145  {
146  public:
147  //------------------------------------------------------------------------
148  // Destructor
149  //------------------------------------------------------------------------
150  Source( const std::string &checkSumType = "",
151  const std::vector<std::string> &addcks = std::vector<std::string>() ) :
152  pCkSumHelper( 0 ),
153  pContinue( false )
154  {
155  if( !checkSumType.empty() )
156  pCkSumHelper = new XrdCl::CheckSumHelper( "source", checkSumType );
157 
158  for( auto &type : addcks )
159  pAddCksHelpers.push_back( new XrdCl::CheckSumHelper( "source", type ) );
160  };
161 
162  virtual ~Source()
163  {
164  delete pCkSumHelper;
165  for( auto ptr : pAddCksHelpers )
166  delete ptr;
167  }
168 
169  //------------------------------------------------------------------------
171  //------------------------------------------------------------------------
172  virtual XrdCl::XRootDStatus Initialize() = 0;
173 
174  //------------------------------------------------------------------------
176  //------------------------------------------------------------------------
177  virtual int64_t GetSize() = 0;
178 
179  //------------------------------------------------------------------------
181  //------------------------------------------------------------------------
182  virtual XrdCl::XRootDStatus StartAt( uint64_t offset ) = 0;
183 
184  //------------------------------------------------------------------------
191  //------------------------------------------------------------------------
192  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci ) = 0;
193 
194  //------------------------------------------------------------------------
196  //------------------------------------------------------------------------
197  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
198  std::string &checkSumType ) = 0;
199 
200  //------------------------------------------------------------------------
202  //------------------------------------------------------------------------
203  virtual std::vector<std::string> GetAddCks() = 0;
204 
205  //------------------------------------------------------------------------
207  //------------------------------------------------------------------------
208  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs ) = 0;
209 
210  //------------------------------------------------------------------------
212  //------------------------------------------------------------------------
213  virtual XrdCl::XRootDStatus TryOtherServer()
214  {
216  }
217 
218  protected:
219 
220  XrdCl::CheckSumHelper *pCkSumHelper;
221  std::vector<XrdCl::CheckSumHelper*> pAddCksHelpers;
222  bool pContinue;
223  };
224 
225  //----------------------------------------------------------------------------
227  //----------------------------------------------------------------------------
228  class Destination
229  {
230  public:
231  //------------------------------------------------------------------------
233  //------------------------------------------------------------------------
234  Destination( const std::string &checkSumType = "" ):
235  pPosc( false ), pForce( false ), pCoerce( false ), pMakeDir( false ),
236  pContinue( false ), pCkSumHelper( 0 )
237  {
238  if( !checkSumType.empty() )
239  pCkSumHelper = new XrdCl::CheckSumHelper( "destination", checkSumType );
240  }
241 
242  //------------------------------------------------------------------------
244  //------------------------------------------------------------------------
245  virtual ~Destination()
246  {
247  delete pCkSumHelper;
248  }
249 
250  //------------------------------------------------------------------------
252  //------------------------------------------------------------------------
253  virtual XrdCl::XRootDStatus Initialize() = 0;
254 
255  //------------------------------------------------------------------------
257  //------------------------------------------------------------------------
258  virtual XrdCl::XRootDStatus Finalize() = 0;
259 
260  //------------------------------------------------------------------------
265  //------------------------------------------------------------------------
266  virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci ) = 0;
267 
268  //------------------------------------------------------------------------
270  //------------------------------------------------------------------------
271  virtual XrdCl::XRootDStatus Flush() = 0;
272 
273  //------------------------------------------------------------------------
275  //------------------------------------------------------------------------
276  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
277  std::string &checkSumType ) = 0;
278 
279  //------------------------------------------------------------------------
281  //------------------------------------------------------------------------
282  virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs ) = 0;
283 
284  //------------------------------------------------------------------------
286  //------------------------------------------------------------------------
287  virtual int64_t GetSize() = 0;
288 
289  //------------------------------------------------------------------------
291  //------------------------------------------------------------------------
292  void SetPOSC( bool posc )
293  {
294  pPosc = posc;
295  }
296 
297  //------------------------------------------------------------------------
299  //------------------------------------------------------------------------
300  void SetForce( bool force )
301  {
302  pForce = force;
303  }
304 
305  //------------------------------------------------------------------------
307  //------------------------------------------------------------------------
308  void SetContinue( bool continue_ )
309  {
310  pContinue = continue_;
311  }
312 
313  //------------------------------------------------------------------------
315  //------------------------------------------------------------------------
316  void SetCoerce( bool coerce )
317  {
318  pCoerce = coerce;
319  }
320 
321  //------------------------------------------------------------------------
323  //------------------------------------------------------------------------
324  void SetMakeDir( bool makedir )
325  {
326  pMakeDir = makedir;
327  }
328 
329  //------------------------------------------------------------------------
331  //------------------------------------------------------------------------
332  virtual const std::string& GetLastURL() const
333  {
334  static const std::string empty;
335  return empty;
336  }
337 
338  //------------------------------------------------------------------------
340  //------------------------------------------------------------------------
341  virtual const std::string& GetWrtRecoveryRedir() const
342  {
343  static const std::string empty;
344  return empty;
345  }
346 
347  protected:
348  bool pPosc;
349  bool pForce;
350  bool pCoerce;
351  bool pMakeDir;
352  bool pContinue;
353 
354  XrdCl::CheckSumHelper *pCkSumHelper;
355  };
356 
357  //----------------------------------------------------------------------------
359  //----------------------------------------------------------------------------
360  class StdInSource: public Source
361  {
362  public:
363  //------------------------------------------------------------------------
365  //------------------------------------------------------------------------
366  StdInSource( const std::string &ckSumType, uint32_t chunkSize, const std::vector<std::string> &addcks ):
367  Source( ckSumType, addcks ),
368  pCurrentOffset(0),
369  pChunkSize( chunkSize )
370  {
371 
372  }
373 
374  //------------------------------------------------------------------------
376  //------------------------------------------------------------------------
377  virtual ~StdInSource()
378  {
379 
380  }
381 
382  //------------------------------------------------------------------------
384  //------------------------------------------------------------------------
385  virtual XrdCl::XRootDStatus Initialize()
386  {
387  if( pCkSumHelper )
388  {
389  auto st = pCkSumHelper->Initialize();
390  if( !st.IsOK() ) return st;
391  for( auto cksHelper : pAddCksHelpers )
392  {
393  st = cksHelper->Initialize();
394  if( !st.IsOK() ) return st;
395  }
396  }
397  return XrdCl::XRootDStatus();
398  }
399 
400  //------------------------------------------------------------------------
402  //------------------------------------------------------------------------
403  virtual int64_t GetSize()
404  {
405  return -1;
406  }
407 
408  //------------------------------------------------------------------------
410  //------------------------------------------------------------------------
411  virtual XrdCl::XRootDStatus StartAt( uint64_t )
412  {
414  "Cannot continue from stdin!" );
415  }
416 
417  //------------------------------------------------------------------------
419  //------------------------------------------------------------------------
420  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
421  {
422  using namespace XrdCl;
423  Log *log = DefaultEnv::GetLog();
424 
425  uint32_t toRead = pChunkSize;
426  char *buffer = new char[toRead];
427 
428  int64_t bytesRead = 0;
429  uint32_t offset = 0;
430  while( toRead )
431  {
432  int64_t bRead = read( 0, buffer+offset, toRead );
433  if( bRead == -1 )
434  {
435  log->Debug( UtilityMsg, "Unable to read from stdin: %s",
436  XrdSysE2T( errno ) );
437  delete [] buffer;
438  return XRootDStatus( stError, errOSError, errno );
439  }
440 
441  if( bRead == 0 )
442  break;
443 
444  bytesRead += bRead;
445  offset += bRead;
446  toRead -= bRead;
447  }
448 
449  if( bytesRead == 0 )
450  {
451  delete [] buffer;
452  return XRootDStatus( stOK, suDone );
453  }
454 
455  if( pCkSumHelper )
456  pCkSumHelper->Update( buffer, bytesRead );
457 
458  for( auto cksHelper : pAddCksHelpers )
459  cksHelper->Update( buffer, bytesRead );
460 
461  ci = XrdCl::PageInfo( pCurrentOffset, bytesRead, buffer );
462  pCurrentOffset += bytesRead;
463  return XRootDStatus( stOK, suContinue );
464  }
465 
466  //------------------------------------------------------------------------
468  //------------------------------------------------------------------------
469  virtual XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
470  std::string &checkSum,
471  std::string &checkSumType )
472  {
473  using namespace XrdCl;
474  if( cksHelper )
475  return cksHelper->GetCheckSum( checkSum, checkSumType );
477  }
478 
479  //------------------------------------------------------------------------
481  //------------------------------------------------------------------------
482  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
483  std::string &checkSumType )
484  {
485  return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
486  }
487 
488  //------------------------------------------------------------------------
490  //------------------------------------------------------------------------
491  std::vector<std::string> GetAddCks()
492  {
493  std::vector<std::string> ret;
494  for( auto cksHelper : pAddCksHelpers )
495  {
496  std::string type = cksHelper->GetType();
497  std::string cks;
498  GetCheckSumImpl( cksHelper, cks, type );
499  ret.push_back( type + ":" + cks );
500  }
501  return ret;
502  }
503 
504  //------------------------------------------------------------------------
506  //------------------------------------------------------------------------
507  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
508  {
509  return XrdCl::XRootDStatus();
510  }
511 
512  private:
513  StdInSource(const StdInSource &other);
514  StdInSource &operator = (const StdInSource &other);
515 
516  uint64_t pCurrentOffset;
517  uint32_t pChunkSize;
518  };
519 
520  //----------------------------------------------------------------------------
522  //----------------------------------------------------------------------------
523  class XRootDSource: public Source
524  {
525  struct CancellableJob : public XrdCl::Job
526  {
527  virtual void Cancel() = 0;
528 
529  std::mutex mtx;
530  };
531 
532  //----------------------------------------------------------------------------
533  // On-connect callback job, a lambda would be more elegant, but we still have
534  // to support SLC6
535  //----------------------------------------------------------------------------
536  template<typename READER>
537  struct OnConnJob : public CancellableJob
538  {
539  OnConnJob( XRootDSource *self, READER *reader ) : self( self ), reader( reader )
540  {
541  }
542 
543  void Run( void* )
544  {
545  std::unique_lock<std::mutex> lck( mtx );
546  if( !self || !reader ) return;
547  // add new chunks to the queue
548  if( self->pNbConn < self->pMaxNbConn )
549  self->FillQueue( reader );
550  }
551 
552  void Cancel()
553  {
554  std::unique_lock<std::mutex> lck( mtx );
555  self = 0;
556  reader = 0;
557  }
558 
559  private:
560  XRootDSource *self;
561  READER *reader;
562 
563  };
564 
565  public:
566 
567  //------------------------------------------------------------------------
569  //------------------------------------------------------------------------
570  XrdCl::XRootDStatus TryOtherServer()
571  {
572  return pFile->TryOtherServer();
573  }
574 
575  //------------------------------------------------------------------------
577  //------------------------------------------------------------------------
578  XRootDSource( const XrdCl::URL *url,
579  uint32_t chunkSize,
580  uint8_t parallelChunks,
581  const std::string &ckSumType,
582  const std::vector<std::string> &addcks,
583  bool doserver ):
584  Source( ckSumType, addcks ),
585  pUrl( url ), pFile( new XrdCl::File() ), pSize( -1 ),
586  pCurrentOffset( 0 ), pChunkSize( chunkSize ),
587  pParallel( parallelChunks ),
588  pNbConn( 0 ), pUsePgRead( false ),
589  pDoServer( doserver )
590  {
592  XrdCl::DefaultEnv::GetEnv()->GetInt( "SubStreamsPerChannel", val );
593  pMaxNbConn = val - 1; // account for the control stream
594  }
595 
596  //------------------------------------------------------------------------
598  //------------------------------------------------------------------------
599  virtual ~XRootDSource()
600  {
601  if( pDataConnCB )
602  pDataConnCB->Cancel();
603 
604  CleanUpChunks();
605  if( pFile->IsOpen() )
606  XrdCl::XRootDStatus status = pFile->Close();
607  delete pFile;
608  }
609 
610  //------------------------------------------------------------------------
612  //------------------------------------------------------------------------
613  virtual XrdCl::XRootDStatus Initialize()
614  {
615  using namespace XrdCl;
616  Log *log = DefaultEnv::GetLog();
617  log->Debug( UtilityMsg, "Opening %s for reading",
618  pUrl->GetObfuscatedURL().c_str() );
619 
620  std::string value;
621  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
622  pFile->SetProperty( "ReadRecovery", value );
623 
624  XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
625  if( !st.IsOK() )
626  return st;
627 
628  StatInfo *statInfo;
629  st = pFile->Stat( false, statInfo );
630  if( !st.IsOK() )
631  return st;
632 
633  pSize = statInfo->GetSize();
634  delete statInfo;
635 
636  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
637  {
638  st = pCkSumHelper->Initialize();
639  if( !st.IsOK() ) return st;
640 
641  for( auto cksHelper : pAddCksHelpers )
642  {
643  st = cksHelper->Initialize();
644  if( !st.IsOK() ) return st;
645  }
646  }
647 
648  //----------------------------------------------------------------------
649  // Figere out the actual data server we are talking to
650  //----------------------------------------------------------------------
651  if( !pUrl->IsLocalFile() ||
652  ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
653  {
654  pFile->GetProperty( "LastURL", pDataServer );
655  }
656 
657 
658  if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
659  ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
660  {
661  //--------------------------------------------------------------------
662  // Decide whether we can use PgRead
663  //--------------------------------------------------------------------
664  int val = XrdCl::DefaultCpUsePgWrtRd;
665  XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
666  pUsePgRead = XrdCl::Utils::HasPgRW( pDataServer ) && ( val == 1 );
667  }
668 
669  //----------------------------------------------------------------------
670  // Print the IPv4/IPv6 stack to the stderr if we are running in server
671  // mode
672  //----------------------------------------------------------------------
673  if( pDoServer && !pUrl->IsLocalFile() )
674  {
675  AnyObject obj;
676  DefaultEnv::GetPostMaster()->QueryTransport( pDataServer, StreamQuery::IpStack, obj );
677  std::string *ipstack = nullptr;
678  obj.Get( ipstack );
679  std::cerr << "!-!" << *ipstack << std::endl;
680  delete ipstack;
681  }
682 
683  SetOnDataConnectHandler( pFile );
684 
685  return XRootDStatus();
686  }
687 
688  //------------------------------------------------------------------------
690  //------------------------------------------------------------------------
691  virtual int64_t GetSize()
692  {
693  return pSize;
694  }
695 
696  //------------------------------------------------------------------------
698  //------------------------------------------------------------------------
699  virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
700  {
701  pCurrentOffset = offset;
702  pContinue = true;
703  return XrdCl::XRootDStatus();
704  }
705 
706  //------------------------------------------------------------------------
713  //------------------------------------------------------------------------
714  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
715  {
716  return GetChunkImpl( pFile, ci );
717  }
718 
719  //------------------------------------------------------------------------
721  //------------------------------------------------------------------------
722  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
723  {
724  return ::GetXAttr( *pFile, xattrs );
725  }
726 
727  //------------------------------------------------------------------------
728  // Clean up the chunks that are flying
729  //------------------------------------------------------------------------
730  void CleanUpChunks()
731  {
732  while( !pChunks.empty() )
733  {
734  ChunkHandler *ch = pChunks.front();
735  pChunks.pop();
736  ch->sem->Wait();
737  delete [] (char *)ch->chunk.GetBuffer();
738  delete ch;
739  }
740  }
741 
742  //------------------------------------------------------------------------
743  // Get check sum
744  //------------------------------------------------------------------------
745  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
746  std::string &checkSumType )
747  {
748  return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
749  }
750 
751  XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
752  std::string &checkSum,
753  std::string &checkSumType )
754  {
755  if( pUrl->IsMetalink() )
756  {
758  XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
759  checkSum = redirector->GetCheckSum( checkSumType );
760  if( !checkSum.empty() ) return XrdCl::XRootDStatus();
761  }
762 
763  if( pUrl->IsLocalFile() )
764  {
765  if( pContinue )
766  // in case of --continue option we have to calculate the checksum from scratch
767  return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl->GetPath() );
768 
769  if( cksHelper )
770  return cksHelper->GetCheckSum( checkSum, checkSumType );
771 
773  }
774 
775  std::string dataServer; pFile->GetProperty( "DataServer", dataServer );
776  std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
777  return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType, XrdCl::URL( lastUrl ) );
778  }
779 
780  //------------------------------------------------------------------------
782  //------------------------------------------------------------------------
783  std::vector<std::string> GetAddCks()
784  {
785  std::vector<std::string> ret;
786  for( auto cksHelper : pAddCksHelpers )
787  {
788  std::string type = cksHelper->GetType();
789  std::string cks;
790  GetCheckSumImpl( cksHelper, cks, type );
791  ret.push_back( cks );
792  }
793  return ret;
794  }
795 
796  private:
797  XRootDSource(const XRootDSource &other);
798  XRootDSource &operator = (const XRootDSource &other);
799 
800  protected:
801 
802  //------------------------------------------------------------------------
803  // Fill the queue with in-the-fly read requests
804  //------------------------------------------------------------------------
805  template<typename READER>
806  inline void FillQueue( READER *reader )
807  {
808  //----------------------------------------------------------------------
809  // Get the number of connected streams
810  //----------------------------------------------------------------------
811  uint16_t parallel = pParallel;
812  if( pNbConn < pMaxNbConn )
813  {
815  NbConnectedStrm( pDataServer );
816  }
817  if( pNbConn ) parallel *= pNbConn;
818 
819  while( pChunks.size() < parallel && pCurrentOffset < pSize )
820  {
821  uint64_t chunkSize = pChunkSize;
822  if( pCurrentOffset + chunkSize > (uint64_t)pSize )
823  chunkSize = pSize - pCurrentOffset;
824 
825  char *buffer = new char[chunkSize];
826  ChunkHandler *ch = new ChunkHandler();
827  ch->status = pUsePgRead
828  ? reader->PgRead( pCurrentOffset, chunkSize, buffer, ch )
829  : reader->Read( pCurrentOffset, chunkSize, buffer, ch );
830  pChunks.push( ch );
831  pCurrentOffset += chunkSize;
832  if( !ch->status.IsOK() )
833  {
834  ch->sem->Post();
835  break;
836  }
837  }
838  }
839 
840  //------------------------------------------------------------------------
841  // Set the on-connect handler for data streams
842  //------------------------------------------------------------------------
843  template<typename READER>
844  void SetOnDataConnectHandler( READER *reader )
845  {
846  // we need to create the object anyway as it contains our mutex now
847  pDataConnCB.reset( new OnConnJob<READER>( this, reader ) );
848 
849  // check if it is a local file
850  if( pDataServer.empty() ) return;
851 
852  XrdCl::DefaultEnv::GetPostMaster()->SetOnDataConnectHandler( pDataServer, pDataConnCB );
853  }
854 
855  //------------------------------------------------------------------------
863  //------------------------------------------------------------------------
864  template<typename READER>
865  XrdCl::XRootDStatus GetChunkImpl( READER *reader, XrdCl::PageInfo &ci )
866  {
867  //----------------------------------------------------------------------
868  // Sanity check
869  //----------------------------------------------------------------------
870  using namespace XrdCl;
871  Log *log = DefaultEnv::GetLog();
872 
873  //----------------------------------------------------------------------
874  // Fill the queue
875  //----------------------------------------------------------------------
876  std::unique_lock<std::mutex> lck( pDataConnCB->mtx );
877  FillQueue( reader );
878 
879  //----------------------------------------------------------------------
880  // Pick up a chunk from the front and wait for status
881  //----------------------------------------------------------------------
882  if( pChunks.empty() )
883  return XRootDStatus( stOK, suDone );
884 
885  std::unique_ptr<ChunkHandler> ch( pChunks.front() );
886  pChunks.pop();
887  lck.unlock();
888 
889  ch->sem->Wait();
890 
891  if( !ch->status.IsOK() )
892  {
893  log->Debug( UtilityMsg, "Unable read %d bytes at %llu from %s: %s",
894  ch->chunk.GetLength(), (unsigned long long) ch->chunk.GetOffset(),
895  pUrl->GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
896  delete [] (char *)ch->chunk.GetBuffer();
897  CleanUpChunks();
898  return ch->status;
899  }
900 
901  ci = std::move( ch->chunk );
902  // if it is a local file update the checksum
903  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
904  {
905  if( pCkSumHelper )
906  pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
907 
908  for( auto cksHelper : pAddCksHelpers )
909  cksHelper->Update( ci.GetBuffer(), ci.GetLength() );
910  }
911 
912  return XRootDStatus( stOK, suContinue );
913  }
914 
915  //------------------------------------------------------------------------
916  // Asynchronous chunk handler
917  //------------------------------------------------------------------------
919  {
920  public:
921  ChunkHandler(): sem( new XrdSysSemaphore(0) ) {}
922  virtual ~ChunkHandler() { delete sem; }
923  virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
924  XrdCl::AnyObject *response )
925  {
926  this->status = *statusval;
927  delete statusval;
928  if( response )
929  {
930  chunk = ToChunk( response );
931  delete response;
932  }
933  sem->Post();
934  }
935 
936  XrdCl::PageInfo ToChunk( XrdCl::AnyObject *response )
937  {
938  if( response->Has<XrdCl::PageInfo>() )
939  {
940  XrdCl::PageInfo *resp = nullptr;
941  response->Get( resp );
942  return std::move( *resp );
943  }
944  else
945  {
946  XrdCl::ChunkInfo *resp = nullptr;
947  response->Get( resp );
948  return XrdCl::PageInfo( resp->GetOffset(), resp->GetLength(),
949  resp->GetBuffer() );
950  }
951  }
952 
953  XrdSysSemaphore *sem;
954  XrdCl::PageInfo chunk;
955  XrdCl::XRootDStatus status;
956  };
957 
958  const XrdCl::URL *pUrl;
959  XrdCl::File *pFile;
960  int64_t pSize;
961  int64_t pCurrentOffset;
962  uint32_t pChunkSize;
963  uint16_t pParallel;
964  std::queue<ChunkHandler*> pChunks;
965  std::string pDataServer;
966  uint16_t pNbConn;
967  uint16_t pMaxNbConn;
968  bool pUsePgRead;
969  bool pDoServer;
970 
971  std::shared_ptr<CancellableJob> pDataConnCB;
972  };
973 
974  //----------------------------------------------------------------------------
976  //----------------------------------------------------------------------------
977  class XRootDSourceZip: public XRootDSource
978  {
979  public:
980  //------------------------------------------------------------------------
982  //------------------------------------------------------------------------
983  XRootDSourceZip( const std::string &filename,
984  const XrdCl::URL *archive,
985  uint32_t chunkSize,
986  uint8_t parallelChunks,
987  const std::string &ckSumType,
988  const std::vector<std::string> &addcks,
989  bool doserver ):
990  XRootDSource( archive, chunkSize, parallelChunks, ckSumType,
991  addcks, doserver ),
992  pFilename( filename ),
993  pZipArchive( new XrdCl::ZipArchive() )
994  {
995  }
996 
997  //------------------------------------------------------------------------
999  //------------------------------------------------------------------------
1000  virtual ~XRootDSourceZip()
1001  {
1002  CleanUpChunks();
1003 
1004  XrdCl::WaitFor( XrdCl::CloseArchive( pZipArchive ) );
1005  delete pZipArchive;
1006  }
1007 
1008  //------------------------------------------------------------------------
1010  //------------------------------------------------------------------------
1011  virtual XrdCl::XRootDStatus Initialize()
1012  {
1013  using namespace XrdCl;
1014  Log *log = DefaultEnv::GetLog();
1015  log->Debug( UtilityMsg, "Opening %s for reading",
1016  pUrl->GetObfuscatedURL().c_str() );
1017 
1018  std::string value;
1019  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
1020  pZipArchive->SetProperty( "ReadRecovery", value );
1021 
1022  XRootDStatus st = XrdCl::WaitFor( XrdCl::OpenArchive( pZipArchive, pUrl->GetURL(), XrdCl::OpenFlags::Read ) );
1023  if( !st.IsOK() )
1024  return st;
1025 
1026  st = pZipArchive->OpenFile( pFilename );
1027  if( !st.IsOK() )
1028  return st;
1029 
1030  XrdCl::StatInfo *info = 0;
1031  st = pZipArchive->Stat( info );
1032  if( st.IsOK() )
1033  {
1034  pSize = info->GetSize();
1035  delete info;
1036  }
1037  else
1038  return st;
1039 
1040  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper )
1041  {
1042  auto st = pCkSumHelper->Initialize();
1043  if( !st.IsOK() ) return st;
1044  for( auto cksHelper : pAddCksHelpers )
1045  {
1046  st = cksHelper->Initialize();
1047  if( !st.IsOK() ) return st;
1048  }
1049  }
1050 
1051  if( ( !pUrl->IsLocalFile() && !pZipArchive->IsSecure() ) ||
1052  ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1053  {
1054  pZipArchive->GetProperty( "DataServer", pDataServer );
1055  //--------------------------------------------------------------------
1056  // Decide whether we can use PgRead
1057  //--------------------------------------------------------------------
1058  int val = XrdCl::DefaultCpUsePgWrtRd;
1059  XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1060  pUsePgRead = XrdCl::Utils::HasPgRW( pDataServer ) && ( val == 1 );
1061  }
1062 
1063  SetOnDataConnectHandler( pZipArchive );
1064 
1065  return XrdCl::XRootDStatus();
1066  }
1067 
1068  //------------------------------------------------------------------------
1076  //------------------------------------------------------------------------
1077  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1078  {
1079  return GetChunkImpl( pZipArchive, ci );
1080  }
1081 
1082  //------------------------------------------------------------------------
1083  // Get check sum
1084  //------------------------------------------------------------------------
1085  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1086  std::string &checkSumType )
1087  {
1088  return GetCheckSumImpl( checkSum, checkSumType, pCkSumHelper );
1089  }
1090 
1091  //------------------------------------------------------------------------
1092  // Get check sum implementation
1093  //------------------------------------------------------------------------
1094  virtual XrdCl::XRootDStatus GetCheckSumImpl( std::string &checkSum,
1095  std::string &checkSumType,
1096  XrdCl::CheckSumHelper *cksHelper )
1097  {
1098  // The ZIP archive by default contains a ZCRC32 checksum
1099  if( checkSumType == "zcrc32" )
1100  {
1101  uint32_t cksum = 0;
1102  auto st = pZipArchive->GetCRC32( pFilename, cksum );
1103  if( !st.IsOK() ) return st;
1104 
1105  XrdCksData ckSum;
1106  ckSum.Set( "zcrc32" );
1107  ckSum.Set( reinterpret_cast<void*>( &cksum ), sizeof( uint32_t ) );
1108  char cksBuffer[265];
1109  ckSum.Get( cksBuffer, 256 );
1110  checkSum = "zcrc32:";
1111  checkSum += XrdCl::Utils::NormalizeChecksum( "zcrc32", cksBuffer );
1112  return st;
1113  }
1114 
1115  int useMtlnCksum = XrdCl::DefaultZipMtlnCksum;
1117  env->GetInt( "ZipMtlnCksum", useMtlnCksum );
1118  if( useMtlnCksum && pUrl->IsMetalink() )
1119  {
1121  XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1122  checkSum = redirector->GetCheckSum( checkSumType );
1123  if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1124  }
1125 
1126  // if it is a local file we can calculate the checksum ourself
1127  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && cksHelper && !pContinue )
1128  return cksHelper->GetCheckSum( checkSum, checkSumType );
1129 
1130  // if it is a remote file other types of checksum are not supported
1132  }
1133 
1134  //------------------------------------------------------------------------
1136  //------------------------------------------------------------------------
1137  std::vector<std::string> GetAddCks()
1138  {
1139  std::vector<std::string> ret;
1140  for( auto cksHelper : pAddCksHelpers )
1141  {
1142  std::string type = cksHelper->GetType();
1143  std::string cks;
1144  GetCheckSumImpl( cks, type, cksHelper );
1145  ret.push_back( cks );
1146  }
1147  return ret;
1148  }
1149 
1150  //------------------------------------------------------------------------
1152  //------------------------------------------------------------------------
1153  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1154  {
1155  return XrdCl::XRootDStatus();
1156  }
1157 
1158  private:
1159 
1160  XRootDSourceZip(const XRootDSourceZip &other);
1161  XRootDSourceZip &operator = (const XRootDSourceZip &other);
1162 
1163  const std::string pFilename;
1164  XrdCl::ZipArchive *pZipArchive;
1165  };
1166 
1167  //----------------------------------------------------------------------------
1169  //----------------------------------------------------------------------------
1170  class XRootDSourceDynamic: public Source
1171  {
1172  public:
1173 
1174  //------------------------------------------------------------------------
1176  //------------------------------------------------------------------------
1177  XrdCl::XRootDStatus TryOtherServer()
1178  {
1179  return pFile->TryOtherServer();
1180  }
1181 
1182  //------------------------------------------------------------------------
1184  //------------------------------------------------------------------------
1185  XRootDSourceDynamic( const XrdCl::URL *url,
1186  uint32_t chunkSize,
1187  const std::string &ckSumType,
1188  const std::vector<std::string> &addcks ):
1189  Source( ckSumType, addcks ),
1190  pUrl( url ), pFile( new XrdCl::File() ), pCurrentOffset( 0 ),
1191  pChunkSize( chunkSize ), pDone( false ), pUsePgRead( false )
1192  {
1193  }
1194 
1195  //------------------------------------------------------------------------
1197  //------------------------------------------------------------------------
1198  virtual ~XRootDSourceDynamic()
1199  {
1200  XrdCl::XRootDStatus status = pFile->Close();
1201  delete pFile;
1202  }
1203 
1204  //------------------------------------------------------------------------
1206  //------------------------------------------------------------------------
1207  virtual XrdCl::XRootDStatus Initialize()
1208  {
1209  using namespace XrdCl;
1210  Log *log = DefaultEnv::GetLog();
1211  log->Debug( UtilityMsg, "Opening %s for reading",
1212  pUrl->GetObfuscatedURL().c_str() );
1213 
1214  std::string value;
1215  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
1216  pFile->SetProperty( "ReadRecovery", value );
1217 
1218  XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
1219  if( !st.IsOK() )
1220  return st;
1221 
1222  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
1223  {
1224  auto st = pCkSumHelper->Initialize();
1225  if( !st.IsOK() ) return st;
1226  for( auto cksHelper : pAddCksHelpers )
1227  {
1228  st = cksHelper->Initialize();
1229  if( !st.IsOK() ) return st;
1230  }
1231  }
1232 
1233  if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
1234  ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1235  {
1236  std::string datasrv;
1237  pFile->GetProperty( "DataServer", datasrv );
1238  //--------------------------------------------------------------------
1239  // Decide whether we can use PgRead
1240  //--------------------------------------------------------------------
1241  int val = XrdCl::DefaultCpUsePgWrtRd;
1242  XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1243  pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
1244  }
1245 
1246  return XRootDStatus();
1247  }
1248 
1249  //------------------------------------------------------------------------
1251  //------------------------------------------------------------------------
1252  virtual int64_t GetSize()
1253  {
1254  return -1;
1255  }
1256 
1257  //------------------------------------------------------------------------
1259  //------------------------------------------------------------------------
1260  virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
1261  {
1262  pCurrentOffset = offset;
1263  pContinue = true;
1264  return XrdCl::XRootDStatus();
1265  }
1266 
1267  //------------------------------------------------------------------------
1275  //------------------------------------------------------------------------
1276  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1277  {
1278  //----------------------------------------------------------------------
1279  // Sanity check
1280  //----------------------------------------------------------------------
1281  using namespace XrdCl;
1282 
1283  if( pDone )
1284  return XRootDStatus( stOK, suDone );
1285 
1286  //----------------------------------------------------------------------
1287  // Fill the queue
1288  //----------------------------------------------------------------------
1289  char *buffer = new char[pChunkSize];
1290  uint32_t bytesRead = 0;
1291 
1292  std::vector<uint32_t> cksums;
1293  XRootDStatus st = pUsePgRead
1294  ? pFile->PgRead( pCurrentOffset, pChunkSize, buffer, cksums, bytesRead )
1295  : pFile->Read( pCurrentOffset, pChunkSize, buffer, bytesRead );
1296 
1297  if( !st.IsOK() )
1298  {
1299  delete [] buffer;
1300  return st;
1301  }
1302 
1303  if( !bytesRead )
1304  {
1305  delete [] buffer;
1306  return XRootDStatus( stOK, suDone );
1307  }
1308 
1309  if( bytesRead < pChunkSize )
1310  pDone = true;
1311 
1312  // if it is a local file update the checksum
1313  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
1314  {
1315  if( pCkSumHelper )
1316  pCkSumHelper->Update( buffer, bytesRead );
1317 
1318  for( auto cksHelper : pAddCksHelpers )
1319  cksHelper->Update( buffer, bytesRead );
1320  }
1321 
1322  ci = XrdCl::PageInfo( pCurrentOffset, bytesRead, buffer );
1323  pCurrentOffset += bytesRead;
1324 
1325  return XRootDStatus( stOK, suContinue );
1326  }
1327 
1328  //------------------------------------------------------------------------
1329  // Get check sum
1330  //------------------------------------------------------------------------
1331  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1332  std::string &checkSumType )
1333  {
1334  return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
1335  }
1336 
1337  XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
1338  std::string &checkSum,
1339  std::string &checkSumType )
1340  {
1341  if( pUrl->IsMetalink() )
1342  {
1344  XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1345  checkSum = redirector->GetCheckSum( checkSumType );
1346  if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1347  }
1348 
1349  if( pUrl->IsLocalFile() )
1350  {
1351  if( pContinue)
1352  // in case of --continue option we have to calculate the checksum from scratch
1353  return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl->GetPath() );
1354 
1355  if( cksHelper )
1356  return cksHelper->GetCheckSum( checkSum, checkSumType );
1357 
1359  }
1360 
1361  std::string dataServer; pFile->GetProperty( "DataServer", dataServer );
1362  std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
1363  return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType, XrdCl::URL( lastUrl ) );
1364  }
1365 
1366  //------------------------------------------------------------------------
1368  //------------------------------------------------------------------------
1369  std::vector<std::string> GetAddCks()
1370  {
1371  std::vector<std::string> ret;
1372  for( auto cksHelper : pAddCksHelpers )
1373  {
1374  std::string type = cksHelper->GetType();
1375  std::string cks;
1376  GetCheckSumImpl( cksHelper, cks, type );
1377  ret.push_back( cks );
1378  }
1379  return ret;
1380  }
1381 
1382  //------------------------------------------------------------------------
1384  //------------------------------------------------------------------------
1385  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1386  {
1387  return ::GetXAttr( *pFile, xattrs );
1388  }
1389 
1390  private:
1391  XRootDSourceDynamic(const XRootDSourceDynamic &other);
1392  XRootDSourceDynamic &operator = (const XRootDSourceDynamic &other);
1393  const XrdCl::URL *pUrl;
1394  XrdCl::File *pFile;
1395  int64_t pCurrentOffset;
1396  uint32_t pChunkSize;
1397  bool pDone;
1398  bool pUsePgRead;
1399  };
1400 
1401  //----------------------------------------------------------------------------
1403  //----------------------------------------------------------------------------
1404  class XRootDSourceXCp: public Source
1405  {
1406  public:
1407  //------------------------------------------------------------------------
1409  //------------------------------------------------------------------------
1410  XRootDSourceXCp( const XrdCl::URL* url, uint32_t chunkSize, uint16_t parallelChunks, int32_t nbSrc, uint64_t blockSize ):
1411  pXCpCtx( 0 ), pUrl( url ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ), pNbSrc( nbSrc ), pBlockSize( blockSize )
1412  {
1413  }
1414 
1415  ~XRootDSourceXCp()
1416  {
1417  if( pXCpCtx )
1418  pXCpCtx->Delete();
1419  }
1420 
1421  //------------------------------------------------------------------------
1423  //------------------------------------------------------------------------
1424  virtual XrdCl::XRootDStatus Initialize()
1425  {
1427  int64_t fileSize = -1;
1428 
1429  if( pUrl->IsMetalink() )
1430  {
1432  XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1433  fileSize = redirector->GetSize();
1434  pReplicas = redirector->GetReplicas();
1435  }
1436  else
1437  {
1438  XrdCl::LocationInfo *li = 0;
1439  XrdCl::FileSystem fs( *pUrl );
1440  XrdCl::XRootDStatus st = fs.DeepLocate( pUrl->GetPath(), XrdCl::OpenFlags::Compress | XrdCl::OpenFlags::PrefName, li );
1441  if( !st.IsOK() ) return st;
1442 
1444  for( itr = li->Begin(); itr != li->End(); ++itr)
1445  {
1446  std::string url = "root://" + itr->GetAddress() + "/" + pUrl->GetPath();
1447  pReplicas.push_back( url );
1448  }
1449 
1450  delete li;
1451  }
1452 
1453  std::stringstream ss;
1454  ss << "XCp sources: ";
1455 
1456  std::vector<std::string>::iterator itr;
1457  for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1458  {
1459  ss << *itr << ", ";
1460  }
1461  log->Debug( XrdCl::UtilityMsg, "%s", ss.str().c_str() );
1462 
1463  pXCpCtx = new XrdCl::XCpCtx( pReplicas, pBlockSize, pNbSrc, pChunkSize, pParallelChunks, fileSize );
1464 
1465  return pXCpCtx->Initialize();
1466  }
1467 
1468  //------------------------------------------------------------------------
1470  //------------------------------------------------------------------------
1471  virtual int64_t GetSize()
1472  {
1473  return pXCpCtx->GetSize();
1474  }
1475 
1476  //------------------------------------------------------------------------
1478  //------------------------------------------------------------------------
1479  virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
1480  {
1482  }
1483 
1484  //------------------------------------------------------------------------
1492  //------------------------------------------------------------------------
1493  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1494  {
1496  do
1497  {
1498  st = pXCpCtx->GetChunk( ci );
1499  }
1500  while( st.IsOK() && st.code == XrdCl::suRetry );
1501  return st;
1502  }
1503 
1504  //------------------------------------------------------------------------
1505  // Get check sum
1506  //------------------------------------------------------------------------
1507  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1508  std::string &checkSumType )
1509  {
1510  if( pUrl->IsMetalink() )
1511  {
1513  XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1514  checkSum = redirector->GetCheckSum( checkSumType );
1515  if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1516  }
1517 
1518  std::vector<std::string>::iterator itr;
1519  for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1520  {
1521  XrdCl::URL url( *itr );
1523  checkSumType, url );
1524  if( st.IsOK() ) return st;
1525  }
1526 
1528  }
1529 
1530  //------------------------------------------------------------------------
1532  //------------------------------------------------------------------------
1533  std::vector<std::string> GetAddCks()
1534  {
1535  return std::vector<std::string>();
1536  }
1537 
1538  //------------------------------------------------------------------------
1540  //------------------------------------------------------------------------
1541  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1542  {
1544  std::vector<std::string>::iterator itr = pReplicas.begin();
1545  for( ; itr < pReplicas.end() ; ++itr )
1546  {
1547  st = ::GetXAttr( *itr, xattrs );
1548  if( st.IsOK() ) return st;
1549  }
1550  return st;
1551  }
1552 
1553  private:
1554 
1555 
1556  XrdCl::XCpCtx *pXCpCtx;
1557  const XrdCl::URL *pUrl;
1558  std::vector<std::string> pReplicas;
1559  uint32_t pChunkSize;
1560  uint16_t pParallelChunks;
1561  int32_t pNbSrc;
1562  uint64_t pBlockSize;
1563  };
1564 
1565  //----------------------------------------------------------------------------
1567  //----------------------------------------------------------------------------
1568  class StdOutDestination: public Destination
1569  {
1570  public:
1571  //------------------------------------------------------------------------
1573  //------------------------------------------------------------------------
1574  StdOutDestination( const std::string &ckSumType ):
1575  Destination( ckSumType ), pCurrentOffset(0)
1576  {
1577  }
1578 
1579  //------------------------------------------------------------------------
1581  //------------------------------------------------------------------------
1582  virtual ~StdOutDestination()
1583  {
1584  }
1585 
1586  //------------------------------------------------------------------------
1588  //------------------------------------------------------------------------
1589  virtual XrdCl::XRootDStatus Initialize()
1590  {
1591  if( pContinue )
1593  ENOTSUP, "Cannot continue to stdout." );
1594 
1595  if( pCkSumHelper )
1596  return pCkSumHelper->Initialize();
1597  return XrdCl::XRootDStatus();
1598  }
1599 
1600  //------------------------------------------------------------------------
1602  //------------------------------------------------------------------------
1603  virtual XrdCl::XRootDStatus Finalize()
1604  {
1605  return XrdCl::XRootDStatus();
1606  }
1607 
1608  //------------------------------------------------------------------------
1613  //------------------------------------------------------------------------
1614  virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
1615  {
1616  using namespace XrdCl;
1617  Log *log = DefaultEnv::GetLog();
1618 
1619  if( pCurrentOffset != ci.GetOffset() )
1620  {
1621  log->Error( UtilityMsg, "Got out-of-bounds chunk, expected offset:"
1622  " %llu, got %llu", (unsigned long long) pCurrentOffset, (unsigned long long) ci.GetOffset() );
1623  return XRootDStatus( stError, errInternal );
1624  }
1625 
1626  int64_t wr = 0;
1627  uint32_t length = ci.GetLength();
1628  char *cursor = (char*)ci.GetBuffer();
1629  do
1630  {
1631  wr = write( 1, cursor, length );
1632  if( wr == -1 )
1633  {
1634  log->Debug( UtilityMsg, "Unable to write to stdout: %s",
1635  XrdSysE2T( errno ) );
1636  delete [] (char*)ci.GetBuffer();
1637  return XRootDStatus( stError, errOSError, errno );
1638  }
1639  pCurrentOffset += wr;
1640  cursor += wr;
1641  length -= wr;
1642  }
1643  while( length );
1644 
1645  if( pCkSumHelper )
1646  pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
1647  delete [] (char*)ci.GetBuffer();
1648  return XRootDStatus();
1649  }
1650 
1651  //------------------------------------------------------------------------
1653  //------------------------------------------------------------------------
1654  virtual XrdCl::XRootDStatus Flush()
1655  {
1656  return XrdCl::XRootDStatus();
1657  }
1658 
1659  //------------------------------------------------------------------------
1661  //------------------------------------------------------------------------
1662  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1663  std::string &checkSumType )
1664  {
1665  if( pCkSumHelper )
1666  return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1668  }
1669 
1670  //------------------------------------------------------------------------
1672  //------------------------------------------------------------------------
1673  virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
1674  {
1675  return XrdCl::XRootDStatus();
1676  }
1677 
1678  //------------------------------------------------------------------------
1680  //------------------------------------------------------------------------
1681  virtual int64_t GetSize()
1682  {
1683  return -1;
1684  }
1685 
1686  private:
1687  StdOutDestination(const StdOutDestination &other);
1688  StdOutDestination &operator = (const StdOutDestination &other);
1689  uint64_t pCurrentOffset;
1690  };
1691 
1692  //----------------------------------------------------------------------------
1694  //----------------------------------------------------------------------------
1695  class XRootDDestination: public Destination
1696  {
1697  public:
1698  //------------------------------------------------------------------------
1700  //------------------------------------------------------------------------
1701  XRootDDestination( const XrdCl::URL &url, uint8_t parallelChunks,
1702  const std::string &ckSumType, const XrdCl::ClassicCopyJob &cpjob ):
1703  Destination( ckSumType ),
1704  pUrl( url ), pFile( new XrdCl::File( XrdCl::File::DisableVirtRedirect ) ),
1705  pParallel( parallelChunks ), pSize( -1 ), pUsePgWrt( false ), cpjob( cpjob )
1706  {
1707  }
1708 
1709  //------------------------------------------------------------------------
1711  //------------------------------------------------------------------------
1712  virtual ~XRootDDestination()
1713  {
1714  CleanUpChunks();
1715  delete pFile;
1716 
1718 
1719  //----------------------------------------------------------------------
1720  // Make sure we clean up the cp-target symlink
1721  //----------------------------------------------------------------------
1722  std::string cptarget = XrdCl::DefaultCpTarget;
1723  XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
1724  if( !cptarget.empty() )
1725  {
1726  XrdCl::FileSystem fs( "file://localhost" );
1727  XrdCl::XRootDStatus st = fs.Rm( cptarget );
1728  if( !st.IsOK() )
1729  log->Warning( XrdCl::UtilityMsg, "Could not delete cp-target symlink: %s",
1730  st.ToString().c_str() );
1731  }
1732 
1733  //----------------------------------------------------------------------
1734  // If the copy failed and user requested posc and we are dealing with
1735  // a local destination, remove the file
1736  //----------------------------------------------------------------------
1737  if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
1738  {
1739  XrdCl::FileSystem fs( pUrl );
1740  XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
1741  if( !st.IsOK() )
1742  log->Error( XrdCl::UtilityMsg, "Failed to remove local destination"
1743  " on failure: %s", st.ToString().c_str() );
1744  }
1745  }
1746 
1747  //------------------------------------------------------------------------
1749  //------------------------------------------------------------------------
1750  virtual XrdCl::XRootDStatus Initialize()
1751  {
1752  using namespace XrdCl;
1753  Log *log = DefaultEnv::GetLog();
1754  log->Debug( UtilityMsg, "Opening %s for writing",
1755  pUrl.GetObfuscatedURL().c_str() );
1756 
1757  std::string value;
1758  DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
1759  pFile->SetProperty( "WriteRecovery", value );
1760 
1761  OpenFlags::Flags flags = OpenFlags::Update;
1762  if( pForce )
1763  flags |= OpenFlags::Delete;
1764  else if( !pContinue )
1765  flags |= OpenFlags::New;
1766 
1767  if( pPosc )
1768  flags |= OpenFlags::POSC;
1769 
1770  if( pCoerce )
1771  flags |= OpenFlags::Force;
1772 
1773  if( pMakeDir)
1774  flags |= OpenFlags::MakePath;
1775 
1776  Access::Mode mode = Access::UR|Access::UW|Access::GR|Access::OR;
1777 
1778  XrdCl::XRootDStatus st = pFile->Open( pUrl.GetURL(), flags, mode );
1779  if( !st.IsOK() )
1780  return st;
1781 
1782  if( ( !pUrl.IsLocalFile() && !pFile->IsSecure() ) ||
1783  ( pUrl.IsLocalFile() && pUrl.IsMetalink() ) )
1784  {
1785  std::string datasrv;
1786  pFile->GetProperty( "DataServer", datasrv );
1787  //--------------------------------------------------------------------
1788  // Decide whether we can use PgRead
1789  //--------------------------------------------------------------------
1790  int val = XrdCl::DefaultCpUsePgWrtRd;
1791  XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1792  pUsePgWrt = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
1793  }
1794 
1795  std::string cptarget = XrdCl::DefaultCpTarget;
1796  XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
1797  if( !cptarget.empty() )
1798  {
1799  std::string targeturl;
1800  pFile->GetProperty( "LastURL", targeturl );
1801  targeturl = URL( targeturl ).GetLocation();
1802  if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
1803  log->Warning( UtilityMsg, "Could not create cp-target symlink: %s",
1804  XrdSysE2T( errno ) );
1805  else
1806  log->Info( UtilityMsg, "Created cp-target symlink: %s -> %s",
1807  cptarget.c_str(), targeturl.c_str() );
1808  }
1809 
1810  StatInfo *info = 0;
1811  st = pFile->Stat( false, info );
1812  if( !st.IsOK() )
1813  return st;
1814  pSize = info->GetSize();
1815  delete info;
1816 
1817  if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1818  return pCkSumHelper->Initialize();
1819 
1820  return XRootDStatus();
1821  }
1822 
1823  //------------------------------------------------------------------------
1825  //------------------------------------------------------------------------
1826  virtual XrdCl::XRootDStatus Finalize()
1827  {
1828  return pFile->Close();
1829  }
1830 
1831  //------------------------------------------------------------------------
1836  //------------------------------------------------------------------------
1837  virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
1838  {
1839  using namespace XrdCl;
1840  if( !pFile->IsOpen() )
1841  {
1842  delete[] (char*)ci.GetBuffer(); // we took the ownership of the buffer
1844  }
1845 
1846  //----------------------------------------------------------------------
1847  // If there is still place for this chunk to be sent send it
1848  //----------------------------------------------------------------------
1849  if( pChunks.size() < pParallel )
1850  return QueueChunk( std::move( ci ) );
1851 
1852  //----------------------------------------------------------------------
1853  // We wait for a chunk to be sent so that we have space for the current
1854  // one
1855  //----------------------------------------------------------------------
1856  std::unique_ptr<ChunkHandler> ch( pChunks.front() );
1857  pChunks.pop();
1858  ch->sem->Wait();
1859  delete [] (char*)ch->chunk.GetBuffer();
1860  if( !ch->status.IsOK() )
1861  {
1862  Log *log = DefaultEnv::GetLog();
1863  log->Debug( UtilityMsg, "Unable write %d bytes at %llu from %s: %s",
1864  ch->chunk.GetLength(), (unsigned long long) ch->chunk.GetOffset(),
1865  pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
1866  delete[] (char*)ci.GetBuffer(); // we took the ownership of the buffer
1867  CleanUpChunks();
1868 
1869  //--------------------------------------------------------------------
1870  // Check if we should re-try the transfer from scratch at a different
1871  // data server
1872  //--------------------------------------------------------------------
1873  return CheckIfRetriable( ch->status );
1874  }
1875 
1876  return QueueChunk( std::move( ci ) );
1877  }
1878 
1879  //------------------------------------------------------------------------
1881  //------------------------------------------------------------------------
1882  virtual int64_t GetSize()
1883  {
1884  return pSize;
1885  }
1886 
1887  //------------------------------------------------------------------------
1889  //------------------------------------------------------------------------
1890  void CleanUpChunks()
1891  {
1892  while( !pChunks.empty() )
1893  {
1894  ChunkHandler *ch = pChunks.front();
1895  pChunks.pop();
1896  ch->sem->Wait();
1897  delete [] (char *)ch->chunk.GetBuffer();
1898  delete ch;
1899  }
1900  }
1901 
1902  //------------------------------------------------------------------------
1904  //------------------------------------------------------------------------
1905  XrdCl::XRootDStatus QueueChunk( XrdCl::PageInfo &&ci )
1906  {
1907  // we are writing chunks in order so we can calc the checksum
1908  // in case of local files
1909  if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1910  pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
1911 
1912  ChunkHandler *ch = new ChunkHandler( std::move( ci ) );
1914  st = pUsePgWrt
1915  ? pFile->PgWrite(ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch->chunk.GetCksums(), ch)
1916  : pFile->Write( ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
1917  if( !st.IsOK() )
1918  {
1919  CleanUpChunks();
1920  delete [] (char*)ch->chunk.GetBuffer();
1921  delete ch;
1922  return st;
1923  }
1924  pChunks.push( ch );
1925  return XrdCl::XRootDStatus();
1926  }
1927 
1928  //------------------------------------------------------------------------
1930  //------------------------------------------------------------------------
1931  virtual XrdCl::XRootDStatus Flush()
1932  {
1934  while( !pChunks.empty() )
1935  {
1936  ChunkHandler *ch = pChunks.front();
1937  pChunks.pop();
1938  ch->sem->Wait();
1939  if( !ch->status.IsOK() )
1940  {
1941  //--------------------------------------------------------------------
1942  // Check if we should re-try the transfer from scratch at a different
1943  // data server
1944  //--------------------------------------------------------------------
1945  st = CheckIfRetriable( ch->status );
1946  }
1947  delete [] (char *)ch->chunk.GetBuffer();
1948  delete ch;
1949  }
1950  return st;
1951  }
1952 
1953  //------------------------------------------------------------------------
1955  //------------------------------------------------------------------------
1956  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1957  std::string &checkSumType )
1958  {
1959  if( pUrl.IsLocalFile() )
1960  {
1961  if( pContinue )
1962  // in case of --continue option we have to calculate the checksum from scratch
1963  return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl.GetPath() );
1964 
1965  if( pCkSumHelper )
1966  return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1967 
1969  }
1970 
1971  std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
1972  return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType,
1973  XrdCl::URL( lastUrl ) );
1974  }
1975 
1976  //------------------------------------------------------------------------
1978  //------------------------------------------------------------------------
1979  virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
1980  {
1981  return ::SetXAttr( *pFile, xattrs );
1982  }
1983 
1984  //------------------------------------------------------------------------
1986  //------------------------------------------------------------------------
1987  const std::string& GetLastURL() const
1988  {
1989  return pLastURL;
1990  }
1991 
1992  //------------------------------------------------------------------------
1994  //------------------------------------------------------------------------
1995  const std::string& GetWrtRecoveryRedir() const
1996  {
1997  return pWrtRecoveryRedir;
1998  }
1999 
2000  private:
2001  XRootDDestination(const XRootDDestination &other);
2002  XRootDDestination &operator = (const XRootDDestination &other);
2003 
2004  //------------------------------------------------------------------------
2005  // Asynchronous chunk handler
2006  //------------------------------------------------------------------------
2007  class ChunkHandler: public XrdCl::ResponseHandler
2008  {
2009  public:
2010  ChunkHandler( XrdCl::PageInfo &&ci ):
2011  sem( new XrdSysSemaphore(0) ),
2012  chunk(std::move( ci ) ) {}
2013  virtual ~ChunkHandler() { delete sem; }
2014  virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
2015  XrdCl::AnyObject */*response*/ )
2016  {
2017  this->status = *statusval;
2018  delete statusval;
2019  sem->Post();
2020  }
2021 
2022  XrdSysSemaphore *sem;
2023  XrdCl::PageInfo chunk;
2024  XrdCl::XRootDStatus status;
2025  };
2026 
2027  inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
2028  {
2029  if( status.IsOK() ) return status;
2030 
2031  //--------------------------------------------------------------------
2032  // Check if we should re-try the transfer from scratch at a different
2033  // data server
2034  //--------------------------------------------------------------------
2035  std::string value;
2036  if( pFile->GetProperty( "WrtRecoveryRedir", value ) )
2037  {
2038  pWrtRecoveryRedir = value;
2039  if( pFile->GetProperty( "LastURL", value ) ) pLastURL = value;
2041  }
2042 
2043  return status;
2044  }
2045 
2046  const XrdCl::URL pUrl;
2047  XrdCl::File *pFile;
2048  uint8_t pParallel;
2049  std::queue<ChunkHandler *> pChunks;
2050  int64_t pSize;
2051 
2052  std::string pWrtRecoveryRedir;
2053  std::string pLastURL;
2054  bool pUsePgWrt;
2055  const XrdCl::ClassicCopyJob &cpjob;
2056  };
2057 
2058  //----------------------------------------------------------------------------
2060  //----------------------------------------------------------------------------
2061  class XRootDZipDestination: public Destination
2062  {
2063  public:
2064  //------------------------------------------------------------------------
2066  //------------------------------------------------------------------------
2067  XRootDZipDestination( const XrdCl::URL &url, const std::string &fn,
2068  int64_t size, uint8_t parallelChunks, XrdCl::ClassicCopyJob &cpjob ):
2069  Destination( "zcrc32" ),
2070  pUrl( url ), pFilename( fn ), pZip( new XrdCl::ZipArchive() ),
2071  pParallel( parallelChunks ), pSize( size ), cpjob( cpjob )
2072  {
2073  }
2074 
2075  //------------------------------------------------------------------------
2077  //------------------------------------------------------------------------
2078  virtual ~XRootDZipDestination()
2079  {
2080  CleanUpChunks();
2081  delete pZip;
2082 
2083  //----------------------------------------------------------------------
2084  // If the copy failed and user requested posc and we are dealing with
2085  // a local destination, remove the file
2086  //----------------------------------------------------------------------
2087  if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
2088  {
2089  XrdCl::FileSystem fs( pUrl );
2090  XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
2091  if( !st.IsOK() )
2092  {
2094  log->Error( XrdCl::UtilityMsg, "Failed to remove local destination"
2095  " on failure: %s", st.ToString().c_str() );
2096  }
2097  }
2098  }
2099 
2100  //------------------------------------------------------------------------
2102  //------------------------------------------------------------------------
2103  virtual XrdCl::XRootDStatus Initialize()
2104  {
2105  using namespace XrdCl;
2106  Log *log = DefaultEnv::GetLog();
2107  log->Debug( UtilityMsg, "Opening %s for writing",
2108  pUrl.GetObfuscatedURL().c_str() );
2109 
2110  std::string value;
2111  DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
2112  pZip->SetProperty( "WriteRecovery", value );
2113 
2114  OpenFlags::Flags flags = OpenFlags::Update;
2115 
2116  FileSystem fs( pUrl );
2117  StatInfo *info = nullptr;
2118  auto st = fs.Stat( pUrl.GetPath(), info );
2119  if( !st.IsOK() && st.code == errErrorResponse && st.errNo == kXR_NotFound )
2120  flags |= OpenFlags::New;
2121 
2122  if( pPosc )
2123  flags |= OpenFlags::POSC;
2124 
2125  if( pCoerce )
2126  flags |= OpenFlags::Force;
2127 
2128  if( pMakeDir)
2129  flags |= OpenFlags::MakePath;
2130 
2131  st = XrdCl::WaitFor( XrdCl::OpenArchive( pZip, pUrl.GetURL(), flags ) );
2132  if( !st.IsOK() )
2133  return st;
2134 
2135  std::string cptarget = XrdCl::DefaultCpTarget;
2136  XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
2137  if( !cptarget.empty() )
2138  {
2139  std::string targeturl;
2140  pZip->GetProperty( "LastURL", targeturl );
2141  if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
2142  log->Warning( UtilityMsg, "Could not create cp-target symlink: %s",
2143  XrdSysE2T( errno ) );
2144  }
2145 
2146  st = pZip->OpenFile( pFilename, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write, pSize );
2147  if( !st.IsOK() )
2148  return st;
2149 
2150  return pCkSumHelper->Initialize();
2151  }
2152 
2153  //------------------------------------------------------------------------
2155  //------------------------------------------------------------------------
2156  virtual XrdCl::XRootDStatus Finalize()
2157  {
2158  uint32_t crc32 = 0;
2159  auto st = pCkSumHelper->GetRawCheckSum( "zcrc32", crc32 );
2160  if( !st.IsOK() ) return st;
2161  pZip->UpdateMetadata( crc32 );
2162  pZip->CloseFile();
2163  return XrdCl::WaitFor( XrdCl::CloseArchive( pZip ) );
2164  }
2165 
2166  //------------------------------------------------------------------------
2171  //------------------------------------------------------------------------
2172  virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
2173  {
2174  using namespace XrdCl;
2175 
2176  //----------------------------------------------------------------------
2177  // If there is still place for this chunk to be sent send it
2178  //----------------------------------------------------------------------
2179  if( pChunks.size() < pParallel )
2180  return QueueChunk( std::move( ci ) );
2181 
2182  //----------------------------------------------------------------------
2183  // We wait for a chunk to be sent so that we have space for the current
2184  // one
2185  //----------------------------------------------------------------------
2186  std::unique_ptr<ChunkHandler> ch( pChunks.front() );
2187  pChunks.pop();
2188  ch->sem->Wait();
2189  delete [] (char*)ch->chunk.GetBuffer();
2190  if( !ch->status.IsOK() )
2191  {
2192  Log *log = DefaultEnv::GetLog();
2193  log->Debug( UtilityMsg, "Unable write %d bytes at %llu from %s: %s",
2194  ch->chunk.GetLength(), (unsigned long long) ch->chunk.GetOffset(),
2195  pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
2196  CleanUpChunks();
2197 
2198  //--------------------------------------------------------------------
2199  // Check if we should re-try the transfer from scratch at a different
2200  // data server
2201  //--------------------------------------------------------------------
2202  return CheckIfRetriable( ch->status );
2203  }
2204 
2205  return QueueChunk( std::move( ci ) );
2206  }
2207 
2208  //------------------------------------------------------------------------
2210  //------------------------------------------------------------------------
2211  virtual int64_t GetSize()
2212  {
2213  return -1;
2214  }
2215 
2216  //------------------------------------------------------------------------
2218  //------------------------------------------------------------------------
2219  void CleanUpChunks()
2220  {
2221  while( !pChunks.empty() )
2222  {
2223  ChunkHandler *ch = pChunks.front();
2224  pChunks.pop();
2225  ch->sem->Wait();
2226  delete [] (char *)ch->chunk.GetBuffer();
2227  delete ch;
2228  }
2229  }
2230 
2231  //------------------------------------------------------------------------
2232  // Queue a chunk
2233  //------------------------------------------------------------------------
2234  XrdCl::XRootDStatus QueueChunk( XrdCl::PageInfo &&ci )
2235  {
2236  // we are writing chunks in order so we can calc the checksum
2237  // in case of local files
2238  if( pCkSumHelper ) pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
2239 
2240  ChunkHandler *ch = new ChunkHandler( std::move( ci ) );
2242 
2243  //----------------------------------------------------------------------
2244  // TODO
2245  // In order to use PgWrite with ZIP append we need first to implement
2246  // PgWriteV!!!
2247  //----------------------------------------------------------------------
2248  st = pZip->Write( ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
2249  if( !st.IsOK() )
2250  {
2251  CleanUpChunks();
2252  delete [] (char*)ch->chunk.GetBuffer();
2253  delete ch;
2254  return st;
2255  }
2256  pChunks.push( ch );
2257  return XrdCl::XRootDStatus();
2258  }
2259 
2260  //------------------------------------------------------------------------
2262  //------------------------------------------------------------------------
2263  virtual XrdCl::XRootDStatus Flush()
2264  {
2266  while( !pChunks.empty() )
2267  {
2268  ChunkHandler *ch = pChunks.front();
2269  pChunks.pop();
2270  ch->sem->Wait();
2271  if( !ch->status.IsOK() )
2272  {
2273  //--------------------------------------------------------------------
2274  // Check if we should re-try the transfer from scratch at a different
2275  // data server
2276  //--------------------------------------------------------------------
2277  st = CheckIfRetriable( ch->status );
2278  }
2279  delete [] (char *)ch->chunk.GetBuffer();
2280  delete ch;
2281  }
2282  return st;
2283  }
2284 
2285  //------------------------------------------------------------------------
2287  //------------------------------------------------------------------------
2288  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
2289  std::string &checkSumType )
2290  {
2292  }
2293 
2294  //------------------------------------------------------------------------
2296  //------------------------------------------------------------------------
2297  virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
2298  {
2300  }
2301 
2302  //------------------------------------------------------------------------
2304  //------------------------------------------------------------------------
2305  const std::string& GetLastURL() const
2306  {
2307  return pLastURL;
2308  }
2309 
2310  //------------------------------------------------------------------------
2312  //------------------------------------------------------------------------
2313  const std::string& GetWrtRecoveryRedir() const
2314  {
2315  return pWrtRecoveryRedir;
2316  }
2317 
2318  private:
2319  XRootDZipDestination(const XRootDDestination &other);
2320  XRootDZipDestination &operator = (const XRootDDestination &other);
2321 
2322  //------------------------------------------------------------------------
2323  // Asynchronous chunk handler
2324  //------------------------------------------------------------------------
2325  class ChunkHandler: public XrdCl::ResponseHandler
2326  {
2327  public:
2328  ChunkHandler( XrdCl::PageInfo &&ci ):
2329  sem( new XrdSysSemaphore(0) ),
2330  chunk( std::move( ci ) ) {}
2331  virtual ~ChunkHandler() { delete sem; }
2332  virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
2333  XrdCl::AnyObject */*response*/ )
2334  {
2335  this->status = *statusval;
2336  delete statusval;
2337  sem->Post();
2338  }
2339 
2340  XrdSysSemaphore *sem;
2341  XrdCl::PageInfo chunk;
2342  XrdCl::XRootDStatus status;
2343  };
2344 
2345  inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
2346  {
2347  if( status.IsOK() ) return status;
2348 
2349  //--------------------------------------------------------------------
2350  // Check if we should re-try the transfer from scratch at a different
2351  // data server
2352  //--------------------------------------------------------------------
2353  std::string value;
2354  if( pZip->GetProperty( "WrtRecoveryRedir", value ) )
2355  {
2356  pWrtRecoveryRedir = value;
2357  if( pZip->GetProperty( "LastURL", value ) ) pLastURL = value;
2359  }
2360 
2361  return status;
2362  }
2363 
2364  const XrdCl::URL pUrl;
2365  std::string pFilename;
2366  XrdCl::ZipArchive *pZip;
2367  uint8_t pParallel;
2368  std::queue<ChunkHandler *> pChunks;
2369  int64_t pSize;
2370 
2371  std::string pWrtRecoveryRedir;
2372  std::string pLastURL;
2373  XrdCl::ClassicCopyJob &cpjob;
2374  };
2375 }
2376 
2377 //------------------------------------------------------------------------------
2378 // Get current time in nanoseconds
2379 //------------------------------------------------------------------------------
2380 inline std::chrono::nanoseconds time_nsec()
2381 {
2382  using namespace std::chrono;
2383  auto since_epoch = high_resolution_clock::now().time_since_epoch();
2384  return duration_cast<nanoseconds>( since_epoch );
2385 }
2386 
2387 //------------------------------------------------------------------------------
2388 // Convert seconds to nanoseconds
2389 //------------------------------------------------------------------------------
2390 inline long long to_nsec( long long sec )
2391 {
2392  return sec * 1000000000;
2393 }
2394 
2395 //------------------------------------------------------------------------------
2396 // Sleep for # nanoseconds
2397 //------------------------------------------------------------------------------
2398 inline void sleep_nsec( long long nsec )
2399 {
2400 #if __cplusplus >= 201103L
2401  using namespace std::chrono;
2402  std::this_thread::sleep_for( nanoseconds( nsec ) );
2403 #else
2404  timespec req;
2405  req.tv_sec = nsec / to_nsec( 1 );
2406  req.tv_nsec = nsec % to_nsec( 1 );
2407  nanosleep( &req, 0 );
2408 #endif
2409 }
2410 
2411 namespace XrdCl
2412 {
2413  //----------------------------------------------------------------------------
2414  // Constructor
2415  //----------------------------------------------------------------------------
2416  ClassicCopyJob::ClassicCopyJob( uint16_t jobId,
2417  PropertyList *jobProperties,
2418  PropertyList *jobResults ):
2419  CopyJob( jobId, jobProperties, jobResults )
2420  {
2421  Log *log = DefaultEnv::GetLog();
2422  log->Debug( UtilityMsg, "Creating a classic copy job, from %s to %s",
2423  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
2424  }
2425 
2426  //----------------------------------------------------------------------------
2427  // Run the copy job
2428  //----------------------------------------------------------------------------
2430  {
2431  Log *log = DefaultEnv::GetLog();
2432 
2433  std::string checkSumMode;
2434  std::string checkSumType;
2435  std::string checkSumPreset;
2436  std::string zipSource;
2437  uint16_t parallelChunks;
2438  uint32_t chunkSize;
2439  uint64_t blockSize;
2440  bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2441  rmOnBadCksum, continue_, zipappend, doserver;
2442  int32_t nbXcpSources;
2443  long long xRate;
2444  long long xRateThreshold;
2445  uint16_t cpTimeout;
2446  std::vector<std::string> addcksums;
2447 
2448  pProperties->Get( "checkSumMode", checkSumMode );
2449  pProperties->Get( "checkSumType", checkSumType );
2450  pProperties->Get( "checkSumPreset", checkSumPreset );
2451  pProperties->Get( "parallelChunks", parallelChunks );
2452  pProperties->Get( "chunkSize", chunkSize );
2453  pProperties->Get( "posc", posc );
2454  pProperties->Get( "force", force );
2455  pProperties->Get( "coerce", coerce );
2456  pProperties->Get( "makeDir", makeDir );
2457  pProperties->Get( "dynamicSource", dynamicSource );
2458  pProperties->Get( "zipArchive", zip );
2459  pProperties->Get( "xcp", xcp );
2460  pProperties->Get( "xcpBlockSize", blockSize );
2461  pProperties->Get( "preserveXAttr", preserveXAttr );
2462  pProperties->Get( "xrate", xRate );
2463  pProperties->Get( "xrateThreshold", xRateThreshold );
2464  pProperties->Get( "rmOnBadCksum", rmOnBadCksum );
2465  pProperties->Get( "continue", continue_ );
2466  pProperties->Get( "cpTimeout", cpTimeout );
2467  pProperties->Get( "zipAppend", zipappend );
2468  pProperties->Get( "addcksums", addcksums );
2469  pProperties->Get( "doServer", doserver );
2470 
2471  if( zip )
2472  pProperties->Get( "zipSource", zipSource );
2473 
2474  if( xcp )
2475  pProperties->Get( "nbXcpSources", nbXcpSources );
2476 
2477  if( force && continue_ )
2478  return SetResult( stError, errInvalidArgs, EINVAL,
2479  "Invalid argument combination: continue + force." );
2480 
2481  if( zipappend && ( continue_ || force ) )
2482  return SetResult( stError, errInvalidArgs, EINVAL,
2483  "Invalid argument combination: ( continue | force ) + zip-append." );
2484 
2485  //--------------------------------------------------------------------------
2486  // Start the cp t/o timer if necessary
2487  //--------------------------------------------------------------------------
2488  std::unique_ptr<timer_sec_t> cptimer;
2489  if( cpTimeout ) cptimer.reset( new timer_sec_t() );
2490 
2491  //--------------------------------------------------------------------------
2492  // Remove on bad checksum implies that POSC semantics has to be enabled
2493  //--------------------------------------------------------------------------
2494  if( rmOnBadCksum ) posc = true;
2495 
2496  //--------------------------------------------------------------------------
2497  // Resolve the 'auto' checksum type.
2498  //--------------------------------------------------------------------------
2499  if( checkSumType == "auto" )
2500  {
2501  checkSumType = Utils::InferChecksumType( GetSource(), GetTarget(), zip );
2502  if( checkSumType.empty() )
2503  return SetResult( stError, errCheckSumError, ENOTSUP, "Could not infer checksum type." );
2504  else
2505  log->Info( UtilityMsg, "Using inferred checksum type: %s.", checkSumType.c_str() );
2506  }
2507 
2508  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2509  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2510 
2511  //--------------------------------------------------------------------------
2512  // Initialize the source and the destination
2513  //--------------------------------------------------------------------------
2514  std::unique_ptr<Source> src;
2515  if( xcp )
2516  src.reset( new XRootDSourceXCp( &GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2517  else if( zip ) // TODO make zip work for xcp
2518  src.reset( new XRootDSourceZip( zipSource, &GetSource(), chunkSize, parallelChunks,
2519  checkSumType, addcksums , doserver) );
2520  else if( GetSource().GetProtocol() == "stdio" )
2521  src.reset( new StdInSource( checkSumType, chunkSize, addcksums ) );
2522  else
2523  {
2524  if( dynamicSource )
2525  src.reset( new XRootDSourceDynamic( &GetSource(), chunkSize, checkSumType, addcksums ) );
2526  else
2527  src.reset( new XRootDSource( &GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2528  }
2529 
2530  XRootDStatus st = src->Initialize();
2531  if( !st.IsOK() ) return SourceError( st );
2532  uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2533 
2534  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2535  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2536 
2537  std::unique_ptr<Destination> dest;
2538  URL newDestUrl( GetTarget() );
2539 
2540  if( GetTarget().GetProtocol() == "stdio" )
2541  dest.reset( new StdOutDestination( checkSumType ) );
2542  else if( zipappend )
2543  {
2544  std::string fn = GetSource().GetPath();
2545  size_t pos = fn.rfind( '/' );
2546  if( pos != std::string::npos )
2547  fn = fn.substr( pos + 1 );
2548  int64_t size = src->GetSize();
2549  dest.reset( new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *this ) );
2550  }
2551  //--------------------------------------------------------------------------
2552  // For xrootd destination build the oss.asize hint
2553  //--------------------------------------------------------------------------
2554  else
2555  {
2556  if( src->GetSize() >= 0 )
2557  {
2558  URL::ParamsMap params = newDestUrl.GetParams();
2559  std::ostringstream o; o << src->GetSize();
2560  params["oss.asize"] = o.str();
2561  newDestUrl.SetParams( params );
2562  // makeDir = true; // Backward compatibility for xroot destinations!!!
2563  }
2564  dest.reset( new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *this ) );
2565  }
2566 
2567  dest->SetForce( force );
2568  dest->SetPOSC( posc );
2569  dest->SetCoerce( coerce );
2570  dest->SetMakeDir( makeDir );
2571  dest->SetContinue( continue_ );
2572  st = dest->Initialize();
2573  if( !st.IsOK() ) return DestinationError( st );
2574 
2575  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2576  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2577 
2578  //--------------------------------------------------------------------------
2579  // Copy the chunks
2580  //--------------------------------------------------------------------------
2581  if( continue_ )
2582  {
2583  size -= dest->GetSize();
2584  XrdCl::XRootDStatus st = src->StartAt( dest->GetSize() );
2585  if( !st.IsOK() ) return SetResult( st );
2586  }
2587 
2588  PageInfo pageInfo;
2589  uint64_t total_processed = 0;
2590  uint64_t processed = 0;
2591  auto start = time_nsec();
2592  uint16_t threshold_interval = parallelChunks;
2593  bool threshold_draining = false;
2594  timer_nsec_t threshold_timer;
2595  while( 1 )
2596  {
2597  st = src->GetChunk( pageInfo );
2598  if( !st.IsOK() )
2599  return SourceError( st);
2600 
2601  if( st.IsOK() && st.code == suDone )
2602  break;
2603 
2604  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2605  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2606 
2607  if( xRate )
2608  {
2609  auto elapsed = ( time_nsec() - start ).count();
2610  double transferred = total_processed + pageInfo.GetLength();
2611  double expected = double( xRate ) / to_nsec( 1 ) * elapsed;
2612  //----------------------------------------------------------------------
2613  // check if our transfer rate didn't exceeded the limit
2614  // (we are too fast)
2615  //----------------------------------------------------------------------
2616  if( elapsed && // make sure elapsed time is greater than 0
2617  transferred > expected )
2618  {
2619  auto nsec = ( transferred / xRate * to_nsec( 1 ) ) - elapsed;
2620  sleep_nsec( nsec );
2621  }
2622  }
2623 
2624  if( xRateThreshold )
2625  {
2626  auto elapsed = threshold_timer.elapsed();
2627  double transferred = processed + pageInfo.GetLength();
2628  double expected = double( xRateThreshold ) / to_nsec( 1 ) * elapsed;
2629  //----------------------------------------------------------------------
2630  // check if our transfer rate dropped below the threshold
2631  // (we are too slow)
2632  //----------------------------------------------------------------------
2633  if( elapsed && // make sure elapsed time is greater than 0
2634  transferred < expected &&
2635  threshold_interval == 0 ) // we check every # parallelChunks
2636  {
2637  if( !threshold_draining )
2638  {
2639  log->Warning( UtilityMsg, "Transfer rate dropped below requested ehreshold,"
2640  " trying different source!" );
2641  XRootDStatus st = src->TryOtherServer();
2642  if( !st.IsOK() ) return SetResult( stError, errThresholdExceeded, 0,
2643  "The transfer rate dropped below "
2644  "requested threshold!" );
2645  threshold_draining = true; // before the next measurement we need to drain
2646  // all the chunks that will come from the old server
2647  }
2648  else // now that all the chunks from the old server have
2649  { // been received we can start another measurement
2650  processed = 0;
2651  threshold_timer.reset();
2652  threshold_interval = parallelChunks;
2653  threshold_draining = false;
2654  }
2655  }
2656 
2657  threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2658  }
2659 
2660  total_processed += pageInfo.GetLength();
2661  processed += pageInfo.GetLength();
2662 
2663  st = dest->PutChunk( std::move( pageInfo ) );
2664  if( !st.IsOK() )
2665  {
2666  if( st.code == errRetry )
2667  {
2668  pResults->Set( "LastURL", dest->GetLastURL() );
2669  pResults->Set( "WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2670  return SetResult( st );
2671  }
2672  return DestinationError( st );
2673  }
2674 
2675  if( progress )
2676  {
2677  progress->JobProgress( pJobId, total_processed, size );
2678  if( progress->ShouldCancel( pJobId ) )
2679  return SetResult( stError, errOperationInterrupted, kXR_Cancelled, "The copy-job has been cancelled!" );
2680  }
2681  }
2682 
2683  st = dest->Flush();
2684  if( !st.IsOK() )
2685  return DestinationError( st );
2686 
2687  //--------------------------------------------------------------------------
2688  // Copy extended attributes
2689  //--------------------------------------------------------------------------
2690  if( preserveXAttr && Utils::HasXAttr( GetSource() ) && Utils::HasXAttr( GetTarget() ) )
2691  {
2692  std::vector<xattr_t> xattrs;
2693  st = src->GetXAttr( xattrs );
2694  if( !st.IsOK() ) return SourceError( st );
2695  st = dest->SetXAttr( xattrs );
2696  if( !st.IsOK() ) return DestinationError( st );
2697  }
2698 
2699  //--------------------------------------------------------------------------
2700  // The size of the source is known and not enough data has been transferred
2701  // to the destination
2702  //--------------------------------------------------------------------------
2703  if( src->GetSize() >= 0 && size != total_processed )
2704  {
2705  log->Error( UtilityMsg, "The declared source size is %llu bytes, but "
2706  "received %llu bytes.", (unsigned long long) size, (unsigned long long) total_processed );
2707  return SetResult( stError, errDataError );
2708  }
2709  pResults->Set( "size", total_processed );
2710 
2711  //--------------------------------------------------------------------------
2712  // Finalize the destination
2713  //--------------------------------------------------------------------------
2714  st = dest->Finalize();
2715  if( !st.IsOK() )
2716  return DestinationError( st );
2717 
2718  //--------------------------------------------------------------------------
2719  // Verify the checksums if needed
2720  //--------------------------------------------------------------------------
2721  if( checkSumMode != "none" )
2722  {
2723  log->Debug( UtilityMsg, "Attempting checksum calculation, mode: %s.",
2724  checkSumMode.c_str() );
2725  std::string sourceCheckSum;
2726  std::string targetCheckSum;
2727 
2728  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2729  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2730 
2731  //------------------------------------------------------------------------
2732  // Get the check sum at source
2733  //------------------------------------------------------------------------
2734  timeval oStart, oEnd;
2735  XRootDStatus st;
2736 
2737  if( checkSumMode == "end2end" || checkSumMode == "source" ||
2738  !checkSumPreset.empty() )
2739  {
2740  gettimeofday( &oStart, 0 );
2741  if( !checkSumPreset.empty() )
2742  {
2743  sourceCheckSum = checkSumType + ":";
2744  sourceCheckSum += Utils::NormalizeChecksum( checkSumType,
2745  checkSumPreset );
2746  }
2747  else
2748  {
2749  st = src->GetCheckSum( sourceCheckSum, checkSumType );
2750  }
2751  gettimeofday( &oEnd, 0 );
2752 
2753  if( !st.IsOK() )
2754  return SourceError( st );
2755 
2756  pResults->Set( "sourceCheckSum", sourceCheckSum );
2757  }
2758 
2759  if( !addcksums.empty() )
2760  pResults->Set( "additionalCkeckSum", src->GetAddCks() );
2761 
2762  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2763  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2764 
2765  //------------------------------------------------------------------------
2766  // Get the check sum at destination
2767  //------------------------------------------------------------------------
2768  timeval tStart, tEnd;
2769 
2770  if( checkSumMode == "end2end" || checkSumMode == "target" )
2771  {
2772  gettimeofday( &tStart, 0 );
2773  st = dest->GetCheckSum( targetCheckSum, checkSumType );
2774  if( !st.IsOK() )
2775  return DestinationError( st );
2776  gettimeofday( &tEnd, 0 );
2777  pResults->Set( "targetCheckSum", targetCheckSum );
2778  }
2779 
2780  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2781  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2782 
2783  //------------------------------------------------------------------------
2784  // Make sure the checksums are both lower case
2785  //------------------------------------------------------------------------
2786  auto sanitize_cksum = []( char c )
2787  {
2788  std::locale loc;
2789  if( std::isalpha( c ) ) return std::tolower( c, loc );
2790  return c;
2791  };
2792 
2793  std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2794  sourceCheckSum.begin(), sanitize_cksum );
2795 
2796  std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2797  targetCheckSum.begin(), sanitize_cksum );
2798 
2799  //------------------------------------------------------------------------
2800  // Compare and inform monitoring
2801  //------------------------------------------------------------------------
2802  if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2803  {
2804  bool match = false;
2805  if( sourceCheckSum == targetCheckSum )
2806  match = true;
2807 
2809  if( mon )
2810  {
2812  i.transfer.origin = &GetSource();
2813  i.transfer.target = &GetTarget();
2814  i.cksum = sourceCheckSum;
2815  i.oTime = Utils::GetElapsedMicroSecs( oStart, oEnd );
2816  i.tTime = Utils::GetElapsedMicroSecs( tStart, tEnd );
2817  i.isOK = match;
2818  mon->Event( Monitor::EvCheckSum, &i );
2819  }
2820 
2821  if( !match )
2822  {
2823  if( rmOnBadCksum )
2824  {
2825  FileSystem fs( newDestUrl );
2826  st = fs.Rm( newDestUrl.GetPath() );
2827  if( !st.IsOK() )
2828  log->Error( UtilityMsg, "Invalid checksum: failed to remove the target file: %s", st.ToString().c_str() );
2829  else
2830  log->Info( UtilityMsg, "Target file removed due to bad checksum!" );
2831  }
2832 
2833  st = dest->Finalize();
2834  if( !st.IsOK() )
2835  log->Error( UtilityMsg, "Failed to finalize the destination: %s", st.ToString().c_str() );
2836 
2837  return SetResult( stError, errCheckSumError, 0 );
2838  }
2839 
2840  log->Info( UtilityMsg, "Checksum verification: succeeded." );
2841  }
2842  }
2843 
2844  return SetResult();
2845  }
2846 }
@ kXR_NotFound
Definition: XProtocol.hh:1001
@ kXR_Cancelled
Definition: XProtocol.hh:1007
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
ssize_t write(int fildes, const void *buf, size_t nbyte)
ssize_t read(int fildes, void *buf, size_t nbyte)
bool Force
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
int Set(const char *csName)
Definition: XrdCksData.hh:81
int Get(char *Buff, int Blen)
Definition: XrdCksData.hh:69
void Get(Type &object)
Retrieve the object being held.
Check sum helper for stdio.
XRootDStatus Initialize()
Initialize.
XRootDStatus GetCheckSum(std::string &checkSum, std::string &checkSumType)
const std::string & GetType()
void Update(const void *buffer, uint32_t size)
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
PropertyList * pResults
const URL & GetSource() const
Get source.
Definition: XrdClCopyJob.hh:94
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
virtual bool ShouldCancel(uint16_t jobNum)
Determine whether the job should be canceled.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Send file/filesystem queries to an XRootD cluster.
XRootDStatus Rm(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
A file.
Definition: XrdClFile.hh:46
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
Definition: XrdClFile.cc:764
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
Definition: XrdClFile.cc:665
Interface for a job to be run by the job manager.
Path location info.
Iterator Begin()
Get the location begin iterator.
LocationList::iterator Iterator
Iterator over locations.
Iterator End()
Get the location end iterator.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Handle an async response.
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
URL representation.
Definition: XrdClURL.hh:31
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:395
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClURL.cc:337
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
Definition: XrdClUtils.cc:648
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
Definition: XrdClUtils.cc:771
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
Definition: XrdClUtils.cc:269
static XRootDStatus GetLocalCheckSum(std::string &checkSum, const std::string &checkSumType, const std::string &path)
Get a checksum from local file.
Definition: XrdClUtils.cc:330
static bool HasXAttr(const XrdCl::URL &url)
Definition: XrdClUtils.hh:253
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
Definition: XrdClUtils.cc:279
static bool HasPgRW(const XrdCl::URL &url)
Definition: XrdClUtils.hh:267
An interface for metadata redirectors.
virtual long long GetSize() const =0
virtual const std::vector< std::string > & GetReplicas()=0
Returns a vector with replicas as given in the meatlink file.
virtual std::string GetCheckSum(const std::string &type) const =0
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const char *const DefaultCpTarget
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t errNotImplemented
Operation is not implemented.
Definition: XrdClStatus.hh:64
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
SetXAttrImpl< false > SetXAttr(Ctx< File > file, Arg< std::string > name, Arg< std::string > value)
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
GetXAttrImpl< false > GetXAttr(Ctx< File > file, Arg< std::string > name)
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const int DefaultSubStreamsPerChannel
const int DefaultCpUsePgWrtRd
const uint16_t errOSError
Definition: XrdClStatus.hh:61
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
std::tuple< std::string, std::string > xattr_t
Extended attribute key - value pair.
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const uint16_t errRetry
Try again for whatever reason.
Definition: XrdClStatus.hh:49
const uint16_t errCheckSumError
Definition: XrdClStatus.hh:101
const uint16_t suDone
Definition: XrdClStatus.hh:38
const uint16_t errThresholdExceeded
Definition: XrdClStatus.hh:92
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const uint16_t errNoMoreReplicas
No more replicas to try.
Definition: XrdClStatus.hh:65
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
const int DefaultZipMtlnCksum
Mode
Access mode.
Describe a data chunk for vector read.
uint64_t GetOffset() const
Get the offset.
uint32_t GetLength() const
Get the data length.
void * GetBuffer()
Get the buffer.
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
uint16_t status
Status of the execution.
Definition: XrdClStatus.hh:146
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:148