XRootD
XrdClCopy.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdApps/XrdCpConfig.hh"
26 #include "XrdApps/XrdCpFile.hh"
27 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClDefaultEnv.hh"
30 #include "XrdCl/XrdClLog.hh"
31 #include "XrdCl/XrdClFileSystem.hh"
32 #include "XrdCl/XrdClUtils.hh"
33 #include "XrdCl/XrdClDlgEnv.hh"
34 #include "XrdCl/XrdClOptimizers.hh"
35 #include "XrdSys/XrdSysE2T.hh"
36 #include "XrdSys/XrdSysPthread.hh"
38 
39 #include <cstdio>
40 #include <iostream>
41 #include <iomanip>
42 #include <limits>
43 
44 //------------------------------------------------------------------------------
45 // Progress notifier
46 //------------------------------------------------------------------------------
48 {
49  public:
50  //--------------------------------------------------------------------------
52  //--------------------------------------------------------------------------
53  ProgressDisplay(): pPrevious(0), pPrintProgressBar(true),
54  pPrintSourceCheckSum(false), pPrintTargetCheckSum(false),
55  pPrintAdditionalCheckSum(false)
56  {}
57 
58  //--------------------------------------------------------------------------
60  //--------------------------------------------------------------------------
61  virtual void BeginJob( uint16_t jobNum,
62  uint16_t jobTotal,
63  const XrdCl::URL *source,
64  const XrdCl::URL *destination )
65  {
66  XrdSysMutexHelper scopedLock( pMutex );
67  if( pPrintProgressBar )
68  {
69  if( jobTotal > 1 )
70  {
71  std::cerr << "Job: " << jobNum << "/" << jobTotal << std::endl;
72  std::cerr << "Source: " << source->GetURL() << std::endl;
73  std::cerr << "Target: " << destination->GetURL() << std::endl;
74  }
75  }
76  pPrevious = 0;
77 
78  JobData d;
79  d.started = time(0);
80  d.source = source;
81  d.target = destination;
82  pOngoingJobs[jobNum] = d;
83  }
84 
85  //--------------------------------------------------------------------------
87  //--------------------------------------------------------------------------
88  virtual void EndJob( uint16_t jobNum, const XrdCl::PropertyList *results )
89  {
90  XrdSysMutexHelper scopedLock( pMutex );
91 
92  std::map<uint16_t, JobData>::iterator it = pOngoingJobs.find( jobNum );
93  if( it == pOngoingJobs.end() )
94  return;
95 
96  JobData &d = it->second;
97 
98  // make sure the last available status was printed, which may not be
99  // the case when processing stdio since we throttle printing and don't
100  // know the total size
101  JobProgress( jobNum, d.bytesProcessed, d.bytesTotal );
102 
103  if( pPrintProgressBar )
104  {
105  if( pOngoingJobs.size() > 1 )
106  std::cerr << "\r" << std::string(70, ' ') << "\r";
107  else
108  std::cerr << std::endl;
109  }
110 
112  results->Get( "status", st );
113  if( !st.IsOK() )
114  {
115  pOngoingJobs.erase(it);
116  return;
117  }
118 
119  std::string checkSum;
120  uint64_t size;
121  results->Get( "size", size );
122  if( pPrintSourceCheckSum )
123  {
124  results->Get( "sourceCheckSum", checkSum );
125  PrintCheckSum( d.source, checkSum, size );
126  }
127 
128  if( pPrintTargetCheckSum )
129  {
130  results->Get( "targetCheckSum", checkSum );
131  PrintCheckSum( d.target, checkSum, size );
132  }
133 
134  if( pPrintAdditionalCheckSum )
135  {
136  std::vector<std::string> addcksums;
137  results->Get( "additionalCkeckSum", addcksums );
138  for( auto &cks : addcksums )
139  PrintCheckSum( d.source, cks, size );
140  }
141 
142  pOngoingJobs.erase(it);
143  }
144 
145  //--------------------------------------------------------------------------
147  //--------------------------------------------------------------------------
148  std::string GetProgressBar( time_t now )
149  {
150  JobData &d = pOngoingJobs.begin()->second;
151 
152  uint64_t speed = 0;
153  if( now-d.started )
154  speed = d.bytesProcessed/(now-d.started);
155  else
156  speed = d.bytesProcessed;
157 
158  std::string bar;
159  int prog = 0;
160  int proc = 0;
161 
162  if( d.bytesTotal )
163  {
164  prog = (int)((double)d.bytesProcessed/d.bytesTotal*50);
165  proc = (int)((double)d.bytesProcessed/d.bytesTotal*100);
166  }
167  else
168  {
169  prog = 50;
170  proc = 100;
171  }
172  bar.append( prog, '=' );
173  if( prog < 50 )
174  bar += ">";
175 
176  std::ostringstream o;
177  o << "[" << XrdCl::Utils::BytesToString(d.bytesProcessed) << "B/";
178  o << XrdCl::Utils::BytesToString(d.bytesTotal) << "B]";
179  o << "[" << std::setw(3) << std::right << proc << "%]";
180  o << "[" << std::setw(50) << std::left;
181  o << bar;
182  o << "]";
183  o << "[" << XrdCl::Utils::BytesToString(speed) << "B/s] ";
184  return o.str();
185  }
186 
187  //--------------------------------------------------------------------------
189  //--------------------------------------------------------------------------
190  std::string GetSummaryBar( time_t now )
191  {
192  std::map<uint16_t, JobData>::iterator it;
193  std::ostringstream o;
194 
195  for( it = pOngoingJobs.begin(); it != pOngoingJobs.end(); ++it )
196  {
197  JobData &d = it->second;
198  uint16_t jobNum = it->first;
199 
200  uint64_t speed = 0;
201  if( now-d.started )
202  speed = d.bytesProcessed/(now-d.started);
203 
204  int proc = 0;
205  if( d.bytesTotal )
206  proc = (int)((double)d.bytesProcessed/d.bytesTotal*100);
207  else
208  proc = 100;
209 
210  o << "[#" << jobNum << ": ";
211  o << proc << "% ";
212  o << XrdCl::Utils::BytesToString(speed) << "B/s] ";
213  }
214  o << " ";
215  return o.str();
216  }
217 
218  //--------------------------------------------------------------------------
220  //--------------------------------------------------------------------------
221  virtual void JobProgress( uint16_t jobNum,
222  uint64_t bytesProcessed,
223  uint64_t bytesTotal )
224  {
225  XrdSysMutexHelper scopedLock( pMutex );
226 
227  if( pPrintProgressBar )
228  {
229  time_t now = time(0);
230  if( (now - pPrevious < 1) && (bytesProcessed != bytesTotal) )
231  return;
232  pPrevious = now;
233 
234  std::map<uint16_t, JobData>::iterator it = pOngoingJobs.find( jobNum );
235  if( it == pOngoingJobs.end() )
236  return;
237 
238  JobData &d = it->second;
239 
240  d.bytesProcessed = bytesProcessed;
241  d.bytesTotal = bytesTotal;
242 
243  std::string progress;
244  if( pOngoingJobs.size() == 1 )
245  progress = GetProgressBar( now );
246  else
247  progress = GetSummaryBar( now );
248 
249  std::cerr << "\r" << progress << std::flush;
250  }
251  }
252 
253  //--------------------------------------------------------------------------
255  //--------------------------------------------------------------------------
256  void PrintCheckSum( const XrdCl::URL *url,
257  const std::string &checkSum,
258  uint64_t size )
259  {
260  if( checkSum.empty() )
261  return;
262  std::string::size_type i = checkSum.find( ':' );
263  std::cerr << checkSum.substr( 0, i+1 ) << " ";
264  std::cerr << checkSum.substr( i+1, checkSum.length()-i ) << " ";
265 
266  if( url->IsLocalFile() )
267  std::cerr << url->GetPath() << " ";
268  else
269  {
270  std::cerr << url->GetProtocol() << "://" << url->GetHostId();
271  std::cerr << url->GetPath() << " ";
272  }
273 
274  std::cerr << size;
275  std::cerr << std::endl;
276  }
277 
278  //--------------------------------------------------------------------------
279  // Printing flags
280  //--------------------------------------------------------------------------
281  void PrintProgressBar( bool print ) { pPrintProgressBar = print; }
282  void PrintSourceCheckSum( bool print ) { pPrintSourceCheckSum = print; }
283  void PrintTargetCheckSum( bool print ) { pPrintTargetCheckSum = print; }
284  void PrintAdditionalCheckSum( bool print ) { pPrintAdditionalCheckSum = print; }
285 
286  private:
287  struct JobData
288  {
289  JobData(): bytesProcessed(0), bytesTotal(0),
290  started(0), source(0), target(0) {}
291  uint64_t bytesProcessed;
292  uint64_t bytesTotal;
293  time_t started;
294  const XrdCl::URL *source;
295  const XrdCl::URL *target;
296  };
297 
298  time_t pPrevious;
299  bool pPrintProgressBar;
300  bool pPrintSourceCheckSum;
301  bool pPrintTargetCheckSum;
302  bool pPrintAdditionalCheckSum;
303  std::map<uint16_t, JobData> pOngoingJobs;
304  XrdSysRecMutex pMutex;
305 };
306 
307 //------------------------------------------------------------------------------
308 // Check if we support all the specified user options
309 //------------------------------------------------------------------------------
311 {
312  if( config->pHost )
313  {
314  std::cerr << "SOCKS Proxies are not yet supported" << std::endl;
315  return false;
316  }
317 
318  return true;
319 }
320 
321 //------------------------------------------------------------------------------
322 // Append extra cgi info to existing URL
323 //------------------------------------------------------------------------------
324 void AppendCGI( std::string &url, const char *newCGI )
325 {
326  if( !newCGI || !(*newCGI) )
327  return;
328 
329  if( *newCGI == '&' )
330  ++newCGI;
331 
332  if( url.find( '?' ) == std::string::npos )
333  url += "?";
334 
335  if( url.find( '&' ) == std::string::npos )
336  url += "&";
337 
338  url += newCGI;
339 }
340 
341 //------------------------------------------------------------------------------
342 // Process commandline environment settings
343 //------------------------------------------------------------------------------
345 {
347 
348  XrdCpConfig::defVar *cursor = config->intDefs;
349  while( cursor )
350  {
351  env->PutInt( cursor->vName, cursor->intVal );
352  cursor = cursor->Next;
353  }
354 
355  cursor = config->strDefs;
356  while( cursor )
357  {
358  env->PutString( cursor->vName, cursor->strVal );
359  cursor = cursor->Next;
360  }
361 }
362 
363 //------------------------------------------------------------------------------
364 // Translate file type to a string for diagnostics purposes
365 //------------------------------------------------------------------------------
367 {
368  switch( type )
369  {
370  case XrdCpFile::isDir: return "directory";
371  case XrdCpFile::isFile: return "local file";
372  case XrdCpFile::isXroot: return "xroot";
373  case XrdCpFile::isHttp: return "http";
374  case XrdCpFile::isHttps: return "https";
375  case XrdCpFile::isStdIO: return "stdio";
376  default: return "other";
377  };
378 }
379 
380 //------------------------------------------------------------------------------
381 // Count the sources
382 //------------------------------------------------------------------------------
383 uint32_t CountSources( XrdCpFile *file )
384 {
385  uint32_t count;
386  for( count = 0; file; file = file->Next, ++count ) {};
387  return count;
388 }
389 
390 //------------------------------------------------------------------------------
391 // Adjust file information for the cases when XrdCpConfig cannot do this
392 //------------------------------------------------------------------------------
394 {
395  //----------------------------------------------------------------------------
396  // If the file is url and the directory offset is not set we set it
397  // to the last slash
398  //----------------------------------------------------------------------------
399  if( file->Doff == 0 )
400  {
401  char *slash = file->Path;
402  for( ; *slash; ++slash ) {};
403  for( ; *slash != '/' && slash > file->Path; --slash ) {};
404  file->Doff = slash - file->Path;
405  }
406 };
407 
408 //------------------------------------------------------------------------------
409 // Recursively index all files and directories inside a remote directory
410 //------------------------------------------------------------------------------
412  std::string basePath,
413  long dirOffset )
414 {
415  using namespace XrdCl;
416 
417  Log *log = DefaultEnv::GetLog();
418  log->Debug( AppMsg, "Indexing %s", basePath.c_str() );
419 
420  DirectoryList *dirList = 0;
421  XRootDStatus st = fs->DirList( URL( basePath ).GetPath(), DirListFlags::Recursive
423  if( !st.IsOK() )
424  {
425  log->Info( AppMsg, "Failed to get directory listing for %s: %s",
426  basePath.c_str(),
427  st.GetErrorMessage().c_str() );
428  return 0;
429  }
430 
431  XrdCpFile start, *current = 0;
432  XrdCpFile *end = &start;
433  int badUrl = 0;
434  for( auto itr = dirList->Begin(); itr != dirList->End(); ++itr )
435  {
436  DirectoryList::ListEntry *e = *itr;
437  if( e->GetStatInfo()->TestFlags( StatInfo::IsDir ) )
438  continue;
439  std::string path = basePath + '/' + e->GetName();
440  current = new XrdCpFile( path.c_str(), badUrl );
441  if( badUrl )
442  {
443  log->Error( AppMsg, "Bad URL: %s", current->Path );
444  delete current;
445  return 0;
446  }
447 
448  current->Doff = dirOffset;
449  end->Next = current;
450  end = current;
451  }
452 
453  delete dirList;
454 
455  return start.Next;
456 }
457 
458 //------------------------------------------------------------------------------
459 // Clean up the copy job descriptors
460 //------------------------------------------------------------------------------
461 void CleanUpResults( std::vector<XrdCl::PropertyList *> &results )
462 {
463  std::vector<XrdCl::PropertyList *>::iterator it;
464  for( it = results.begin(); it != results.end(); ++it )
465  delete *it;
466 }
467 
468 //--------------------------------------------------------------------------
469 // Let the show begin
470 //------------------------------------------------------------------------------
471 int main( int argc, char **argv )
472 {
473  using namespace XrdCl;
474 
475  //----------------------------------------------------------------------------
476  // Configure the copy command, if it returns then everything went well, ugly
477  //----------------------------------------------------------------------------
478  XrdCpConfig config( argv[0] );
479  config.Config( argc, argv, XrdCpConfig::optRmtRec );
480  if( !AllOptionsSupported( &config ) )
481  return 50; // generic error
482  ProcessCommandLineEnv( &config );
483 
484  //----------------------------------------------------------------------------
485  // Set options
486  //----------------------------------------------------------------------------
487  CopyProcess process;
488  Log *log = DefaultEnv::GetLog();
489  if( config.Dlvl )
490  {
491  if( config.Dlvl == 1 ) log->SetLevel( Log::InfoMsg );
492  else if( config.Dlvl == 2 ) log->SetLevel( Log::DebugMsg );
493  else if( config.Dlvl == 3 ) log->SetLevel( Log::DumpMsg );
494  }
495 
496  ProgressDisplay progress;
497  if( config.Want(XrdCpConfig::DoNoPbar) || !isatty( fileno( stdout ) ) )
498  progress.PrintProgressBar( false );
499 
500  bool posc = false;
501  bool force = false;
502  bool coerce = false;
503  bool makedir = false;
504  bool dynSrc = false;
505  bool delegate = false;
506  bool preserveXAttr = false;
507  bool rmOnBadCksum = false;
508  bool continue_ = false;
509  bool recurse = false;
510  bool zipappend = false;
511  bool doserver = false;
512  std::string thirdParty = "none";
513 
514  if( config.Want( XrdCpConfig::DoPosc ) ) posc = true;
515  if( config.Want( XrdCpConfig::DoForce ) ) force = true;
516  if( config.Want( XrdCpConfig::DoCoerce ) ) coerce = true;
517  if( config.Want( XrdCpConfig::DoTpc ) ) thirdParty = "first";
518  if( config.Want( XrdCpConfig::DoTpcOnly ) ) thirdParty = "only";
519  if( config.Want( XrdCpConfig::DoZipAppend ) ) zipappend = true;
520  if( config.Want( XrdCpConfig::DoServer ) ) doserver = true;
521  if( config.Want( XrdCpConfig::DoTpcDlgt ) )
522  {
523  // the env var is being set already here (we are issuing a stat
524  // inhere and we need the env var when we are establishing the
525  // connection and authenticating), but we are also setting a delegate
526  // parameter for CopyJob so it can be used on its own.
528  delegate = true;
529  }
530  else
532 
533  if( config.Want( XrdCpConfig::DoRecurse ) )
534  {
535  makedir = true;
536  recurse = true;
537  }
538  if( config.Want( XrdCpConfig::DoPath ) ) makedir = true;
539  if( config.Want( XrdCpConfig::DoDynaSrc ) ) dynSrc = true;
540  if( config.Want( XrdCpConfig::DoXAttr ) ) preserveXAttr = true;
541  if( config.Want( XrdCpConfig::DoRmOnBadCksum ) ) rmOnBadCksum = true;
542  if( config.Want( XrdCpConfig::DoContinue ) ) continue_ = true;
543 
544  if( force && continue_ )
545  {
546  std::cerr << "Invalid argument combination: continue + force." << std::endl;
547  return 50;
548  }
549 
550  //----------------------------------------------------------------------------
551  // Checksums
552  //----------------------------------------------------------------------------
553  std::string checkSumType;
554  std::string checkSumPreset;
555  std::string checkSumMode = "none";
556  if( config.Want( XrdCpConfig::DoCksum ) )
557  {
558  checkSumMode = "end2end";
559  std::vector<std::string> ckSumParams;
560  Utils::splitString( ckSumParams, config.CksVal, ":" );
561  if( ckSumParams.size() > 1 )
562  {
563  if( ckSumParams[1] == "print" )
564  {
565  checkSumMode = "target";
566  progress.PrintTargetCheckSum( true );
567  }
568  else
569  checkSumPreset = ckSumParams[1];
570  }
571  checkSumType = ckSumParams[0];
572  }
573 
574  if( config.Want( XrdCpConfig::DoCksrc ) )
575  {
576  checkSumMode = "source";
577  std::vector<std::string> ckSumParams;
578  Utils::splitString( ckSumParams, config.CksVal, ":" );
579  if( ckSumParams.size() == 2 )
580  {
581  checkSumMode = "source";
582  checkSumType = ckSumParams[0];
583  progress.PrintSourceCheckSum( true );
584  }
585  else
586  {
587  std::cerr << "Invalid parameter: " << config.CksVal << std::endl;
588  return 50; // generic error
589  }
590  }
591 
592  if( !config.AddCksVal.empty() )
593  progress.PrintAdditionalCheckSum( true );
594 
595  //----------------------------------------------------------------------------
596  // ZIP archive
597  //----------------------------------------------------------------------------
598  std::string zipFile;
599  bool zip = false;
600  if( config.Want( XrdCpConfig::DoZip ) )
601  {
602  zipFile = config.zipFile;
603  zip = true;
604  }
605 
606  //----------------------------------------------------------------------------
607  // Extreme Copy
608  //----------------------------------------------------------------------------
609  int nbSources = 0;
610  bool xcp = false;
611  if( config.Want( XrdCpConfig::DoSources ) )
612  {
613  nbSources = config.nSrcs;
614  xcp = true;
615  }
616 
617  //----------------------------------------------------------------------------
618  // Environment settings
619  //----------------------------------------------------------------------------
621 
622  /* Stop PostMaster when exiting main() to ensure proper shutdown */
623  struct scope_exit {
624  ~scope_exit() { XrdCl::DefaultEnv::GetPostMaster()->Stop(); }
625  } stopPostMaster;
626 
627  if( config.nStrm != 0 )
628  env->PutInt( "SubStreamsPerChannel", config.nStrm + 1 /*stands for the control stream*/ );
629 
630  if( config.Retry != -1 )
631  {
632  env->PutInt( "CpRetry", config.Retry );
633  env->PutString( "CpRetryPolicy", config.RetryPolicy );
634  }
635 
636  if( config.Want( XrdCpConfig::DoNoTlsOK ) )
637  env->PutInt( "NoTlsOK", 1 );
638 
639  if( config.Want( XrdCpConfig::DoTlsNoData ) )
640  env->PutInt( "TlsNoData", 1 );
641 
642  if( config.Want( XrdCpConfig::DoTlsMLF ) )
643  env->PutInt( "TlsMetalink", 1 );
644 
645  if( config.Want( XrdCpConfig::DoZipMtlnCksum ) )
646  env->PutInt( "ZipMtlnCksum", 1 );
647 
648  int chunkSize = DefaultCPChunkSize;
649  env->GetInt( "CPChunkSize", chunkSize );
650 
651  int blockSize = DefaultXCpBlockSize;
652  env->GetInt( "XCpBlockSize", blockSize );
653 
654  int parallelChunks = DefaultCPParallelChunks;
655  env->GetInt( "CPParallelChunks", parallelChunks );
656  if( parallelChunks < 1 ||
657  parallelChunks > std::numeric_limits<uint8_t>::max() )
658  {
659  std::cerr << "Can only handle between 1 and ";
660  std::cerr << (int)std::numeric_limits<uint8_t>::max();
661  std::cerr << " chunks in parallel. You asked for " << parallelChunks;
662  std::cerr << "." << std::endl;
663  return 50; // generic error
664  }
665 
666  if( !preserveXAttr )
667  {
668  int val = DefaultPreserveXAttrs;
669  env->GetInt( "PreserveXAttrs", val );
670  if( val ) preserveXAttr = true;
671  }
672 
673  log->Dump( AppMsg, "Chunk size: %d, parallel chunks %d, streams: %d",
674  chunkSize, parallelChunks, config.nStrm + 1 );
675 
676  //----------------------------------------------------------------------------
677  // Build the URLs
678  //----------------------------------------------------------------------------
679  std::vector<XrdCl::PropertyList*> resultVect;
680 
681  std::string dest;
682  if( config.dstFile->Protocol == XrdCpFile::isDir ||
683  config.dstFile->Protocol == XrdCpFile::isFile )
684  {
685  dest = "file://";
686 
687  // if it is not an absolute path append cwd
688  if( config.dstFile->Path[0] != '/' )
689  {
690  char buf[FILENAME_MAX];
691  char *cwd = getcwd( buf, FILENAME_MAX );
692  if( !cwd )
693  {
694  XRootDStatus st( stError, XProtocol::mapError( errno ), errno );
695  std::cerr << st.GetErrorMessage() << std::endl;
696  return st.GetShellCode();
697  }
698  dest += cwd;
699  dest += '/';
700  }
701  }
702  dest += config.dstFile->Path;
703 
704  //----------------------------------------------------------------------------
705  // We need to check whether our target is a file or a directory:
706  // 1) it's a file, so we can accept only one source
707  // 2) it's a directory, so:
708  // * we can accept multiple sources
709  // * we need to append the source name
710  //----------------------------------------------------------------------------
711  bool targetIsDir = false;
712  bool targetExists = false;
713  if( config.dstFile->Protocol == XrdCpFile::isDir )
714  targetIsDir = true;
715  else if( config.dstFile->Protocol == XrdCpFile::isXroot ||
717  {
718  URL target( dest );
719  FileSystem fs( target );
720  StatInfo *statInfo = 0;
721  XRootDStatus st = fs.Stat( target.GetPathWithParams(), statInfo );
722  if( st.IsOK() )
723  {
724  if( statInfo->TestFlags( StatInfo::IsDir ) )
725  targetIsDir = true;
726  targetExists = true;
727  }
728  else if( st.errNo == kXR_NotFound && config.Want( XrdCpConfig::DoPath ) )
729  {
730  int n = strlen(config.dstFile->Path);
731  if( config.dstFile->Path[n-1] == '/' )
732  targetIsDir = true;
733  }
734  else if( st.errNo == kXR_NotAuthorized )
735  {
736  log->Error( AppMsg, "%s (destination)", st.ToString().c_str() );
737  std::cerr << st.ToStr() << std::endl;
738  return st.GetShellCode();
739  }
740 
741  delete statInfo;
742  }
743 
744  if( !targetIsDir && targetExists && !force && !recurse && !zipappend )
745  {
746  XRootDStatus st( stError, errInvalidOp, EEXIST );
747  // Unable to create /tmp/test.txt; file exists
748  log->Error( AppMsg, "%s (destination)", st.ToString().c_str() );
749  std::cerr << "Run: " << st.ToStr() << std::endl;
750  return st.GetShellCode();
751  }
752 
753  //----------------------------------------------------------------------------
754  // If we have multiple sources and target is neither a directory nor stdout
755  // then we cannot proceed
756  //----------------------------------------------------------------------------
757  if( CountSources(config.srcFile) > 1 && !targetIsDir &&
758  config.dstFile->Protocol != XrdCpFile::isStdIO )
759  {
760  std::cerr << "Multiple sources were given but target is not a directory.";
761  std::cerr << std::endl;
762  return 50; // generic error
763  }
764 
765  //----------------------------------------------------------------------------
766  // If we're doing remote recursive copy, chain all the files (if it's a
767  // directory)
768  //----------------------------------------------------------------------------
769  bool remoteSrcIsDir = false;
770  if( config.Want( XrdCpConfig::DoRecurse ) &&
771  config.srcFile->Protocol == XrdCpFile::isXroot )
772  {
773  URL source( config.srcFile->Path );
774  FileSystem *fs = new FileSystem( source );
775  StatInfo *statInfo = 0;
776 
777  XRootDStatus st = fs->Stat( source.GetPath(), statInfo );
778  if( st.IsOK() && statInfo->TestFlags( StatInfo::IsDir ) )
779  {
780  remoteSrcIsDir = true;
781  //------------------------------------------------------------------------
782  // Recursively index the remote directory
783  //------------------------------------------------------------------------
784  delete config.srcFile;
785  std::string url = source.GetURL();
786  config.srcFile = IndexRemote( fs, url, url.size() );
787  if ( !config.srcFile )
788  {
789  std::cerr << "Error indexing remote directory.";
790  return 50; // generic error
791  }
792  }
793 
794  delete fs;
795  delete statInfo;
796  }
797 
798  XrdCpFile *sourceFile = config.srcFile;
799  //----------------------------------------------------------------------------
800  // Process the sources
801  //----------------------------------------------------------------------------
802  while( sourceFile )
803  {
804  AdjustFileInfo( sourceFile );
805 
806  //--------------------------------------------------------------------------
807  // Create a job for every source
808  //--------------------------------------------------------------------------
809  PropertyList properties;
810  PropertyList *results = new PropertyList;
811  std::string source = sourceFile->Path;
812  if( sourceFile->Protocol == XrdCpFile::isFile )
813  {
814  // make sure it is an absolute path
815  if( source[0] == '/' )
816  source = "file://" + source;
817  else
818  {
819  char buf[FILENAME_MAX];
820  char *cwd = getcwd( buf, FILENAME_MAX );
821  if( !cwd )
822  {
823  XRootDStatus st( stError, XProtocol::mapError( errno ), errno );
824  std::cerr << st.GetErrorMessage() << std::endl;
825  return st.GetShellCode();
826  }
827  source = "file://" + std::string( cwd ) + '/' + source;
828  }
829  }
830 
831  AppendCGI( source, config.srcOpq );
832 
833  std::string sourcePathObf = sourceFile->Path;
834  std::string destPathObf = dest;
835  if( unlikely(log->GetLevel() >= Log::DumpMsg) ) {
836  sourcePathObf = obfuscateAuth(sourcePathObf);
837  destPathObf = obfuscateAuth(destPathObf);
838  }
839  log->Dump( AppMsg, "Processing source entry: %s, type %s, target file: %s, logLevel = %d",
840  sourcePathObf.c_str(), FileType2String( sourceFile->Protocol ),
841  destPathObf.c_str(), log->GetLevel() );
842 
843  //--------------------------------------------------------------------------
844  // Set up the job
845  //--------------------------------------------------------------------------
846  std::string target = dest;
847 
848 
849  bool srcIsDir = false;
850  // if this is local file, for a directory Dlen + Doff will overlap with path size
851  if( strncmp( sourceFile->ProtName, "file", 4 ) == 0 )
852  srcIsDir = std::string( sourceFile->Path ).size() == size_t( sourceFile->Doff + sourceFile->Dlen );
853  // otherwise we are handling a remote file
854  else
855  srcIsDir = remoteSrcIsDir;
856  // if this is a recursive copy make sure we preserve the directory structure
857  if( config.Want( XrdCpConfig::DoRecurse ) && srcIsDir )
858  {
859  // get the source directory
860  std::string srcDir( sourceFile->Path, sourceFile->Doff );
861  // remove the trailing slash
862  if( srcDir[srcDir.size() - 1] == '/' )
863  srcDir = srcDir.substr( 0, srcDir.size() - 1 );
864  size_t diroff = srcDir.rfind( '/' );
865  // if there is no '/' it means a directory name has been given as relative path
866  if( diroff == std::string::npos ) diroff = 0;
867  target += '/';
868  target += sourceFile->Path + diroff;
869  // remove the filename from destination path as it will be appended later anyway
870  target = target.substr( 0 , target.rfind('/') );
871  }
872  AppendCGI( target, config.dstOpq );
873 
874  properties.Set( "source", source );
875  properties.Set( "target", target );
876  properties.Set( "force", force );
877  properties.Set( "posc", posc );
878  properties.Set( "coerce", coerce );
879  properties.Set( "makeDir", makedir );
880  properties.Set( "dynamicSource", dynSrc );
881  properties.Set( "thirdParty", thirdParty );
882  properties.Set( "checkSumMode", checkSumMode );
883  properties.Set( "checkSumType", checkSumType );
884  properties.Set( "checkSumPreset", checkSumPreset );
885  properties.Set( "chunkSize", chunkSize );
886  properties.Set( "parallelChunks", parallelChunks );
887  properties.Set( "zipArchive", zip );
888  properties.Set( "xcp", xcp );
889  properties.Set( "xcpBlockSize", blockSize );
890  properties.Set( "delegate", delegate );
891  properties.Set( "targetIsDir", targetIsDir );
892  properties.Set( "preserveXAttr", preserveXAttr );
893  properties.Set( "xrate", config.xRate );
894  properties.Set( "xrateThreshold", config.xRateThreshold );
895  properties.Set( "rmOnBadCksum", rmOnBadCksum );
896  properties.Set( "continue", continue_ );
897  properties.Set( "zipAppend", zipappend );
898  properties.Set( "addcksums", config.AddCksVal );
899  properties.Set( "doServer", doserver );
900 
901  if( zip )
902  properties.Set( "zipSource", zipFile );
903 
904  if( xcp )
905  properties.Set( "nbXcpSources", nbSources );
906 
907 
908  XRootDStatus st = process.AddJob( properties, results );
909  if( !st.IsOK() )
910  {
911  std::cerr << "AddJob " << source << " -> " << target << ": ";
912  std::cerr << st.ToStr() << std::endl;
913  }
914  resultVect.push_back( results );
915  sourceFile = sourceFile->Next;
916  }
917 
918  //----------------------------------------------------------------------------
919  // Configure the copy process
920  //----------------------------------------------------------------------------
921  PropertyList processConfig;
922  processConfig.Set( "jobType", "configuration" );
923  processConfig.Set( "parallel", config.Parallel );
924  process.AddJob( processConfig, 0 );
925 
926  //----------------------------------------------------------------------------
927  // Prepare and run the copy process
928  //----------------------------------------------------------------------------
929  XRootDStatus st = process.Prepare();
930  if( !st.IsOK() )
931  {
932  CleanUpResults( resultVect );
933  std::cerr << "Prepare: " << st.ToStr() << std::endl;
934  return st.GetShellCode();
935  }
936 
937  st = process.Run( &progress );
938  if( !st.IsOK() )
939  {
940  if( resultVect.size() == 1 )
941  std::cerr << "Run: " << st.ToStr() << std::endl;
942  else
943  {
944  std::vector<XrdCl::PropertyList*>::iterator it;
945  uint16_t i = 1;
946  uint16_t jobsRun = 0;
947  uint16_t errors = 0;
948  for( it = resultVect.begin(); it != resultVect.end(); ++it, ++i )
949  {
950  if( !(*it)->HasProperty( "status" ) )
951  continue;
952 
953  XRootDStatus st = (*it)->Get<XRootDStatus>("status");
954  if( !st.IsOK() )
955  {
956  std::cerr << "Job #" << i << ": " << st.ToStr();
957  ++errors;
958  }
959  ++jobsRun;
960  }
961  std::cerr << "Jobs total: " << resultVect.size();
962  std::cerr << ", run: " << jobsRun;
963  std::cerr << ", errors: " << errors << std::endl;
964  }
965  CleanUpResults( resultVect );
966  return st.GetShellCode();
967  }
968  CleanUpResults( resultVect );
969  return 0;
970 }
971 
@ kXR_NotAuthorized
Definition: XProtocol.hh:1000
@ kXR_NotFound
Definition: XProtocol.hh:1001
bool AllOptionsSupported(XrdCpConfig *config)
Definition: XrdClCopy.cc:310
const char * FileType2String(XrdCpFile::PType type)
Definition: XrdClCopy.cc:366
int main(int argc, char **argv)
Definition: XrdClCopy.cc:471
XrdCpFile * IndexRemote(XrdCl::FileSystem *fs, std::string basePath, long dirOffset)
Definition: XrdClCopy.cc:411
void ProcessCommandLineEnv(XrdCpConfig *config)
Definition: XrdClCopy.cc:344
void CleanUpResults(std::vector< XrdCl::PropertyList * > &results)
Definition: XrdClCopy.cc:461
void AdjustFileInfo(XrdCpFile *file)
Definition: XrdClCopy.cc:393
uint32_t CountSources(XrdCpFile *file)
Definition: XrdClCopy.cc:383
void AppendCGI(std::string &url, const char *newCGI)
Definition: XrdClCopy.cc:324
#define unlikely(x)
std::string obfuscateAuth(const std::string &input)
void PrintAdditionalCheckSum(bool print)
Definition: XrdClCopy.cc:284
void PrintSourceCheckSum(bool print)
Definition: XrdClCopy.cc:282
virtual void EndJob(uint16_t jobNum, const XrdCl::PropertyList *results)
End job.
Definition: XrdClCopy.cc:88
void PrintProgressBar(bool print)
Definition: XrdClCopy.cc:281
void PrintCheckSum(const XrdCl::URL *url, const std::string &checkSum, uint64_t size)
Print the checksum.
Definition: XrdClCopy.cc:256
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
Job progress.
Definition: XrdClCopy.cc:221
std::string GetProgressBar(time_t now)
Get progress bar.
Definition: XrdClCopy.cc:148
virtual void BeginJob(uint16_t jobNum, uint16_t jobTotal, const XrdCl::URL *source, const XrdCl::URL *destination)
Begin job.
Definition: XrdClCopy.cc:61
std::string GetSummaryBar(time_t now)
Get sumary bar.
Definition: XrdClCopy.cc:190
ProgressDisplay()
Constructor.
Definition: XrdClCopy.cc:53
void PrintTargetCheckSum(bool print)
Definition: XrdClCopy.cc:283
static int mapError(int rc)
Definition: XProtocol.hh:1361
Copy the data from one point to another.
XRootDStatus Run(CopyProgressHandler *handler)
Run the copy jobs.
XRootDStatus Prepare()
XRootDStatus AddJob(const PropertyList &properties, PropertyList *results)
Interface for copy progress notification.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
const std::string & GetName() const
Get file name.
StatInfo * GetStatInfo()
Get the stat info object.
Iterator End()
Get the end iterator.
Iterator Begin()
Get the begin iterator.
void Enable()
Enable delegation in the environment.
Definition: XrdClDlgEnv.hh:47
static DlgEnv & Instance()
Definition: XrdClDlgEnv.hh:28
void Disable()
Disable delegation in the environment.
Definition: XrdClDlgEnv.hh:55
bool PutInt(const std::string &key, int value)
Definition: XrdClEnv.cc:110
bool PutString(const std::string &key, const std::string &value)
Definition: XrdClEnv.cc:52
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Send file/filesystem queries to an XRootD cluster.
XRootDStatus DirList(const std::string &path, DirListFlags::Flags flags, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus Stat(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Handle diagnostics.
Definition: XrdClLog.hh:101
@ InfoMsg
print info
Definition: XrdClLog.hh:111
@ DebugMsg
print debug info
Definition: XrdClLog.hh:112
@ DumpMsg
print details of the request and responses
Definition: XrdClLog.hh:113
void SetLevel(LogLevel level)
Set the level of the messages that should be sent to the destination.
Definition: XrdClLog.hh:193
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition: XrdClLog.hh:258
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
bool Stop()
Stop the postmaster.
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
Object stat info.
bool TestFlags(uint32_t flags) const
Test flags.
@ IsDir
This is a directory.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
std::string GetPathWithParams() const
Get the path with params.
Definition: XrdClURL.cc:311
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
bool IsLocalFile() const
Definition: XrdClURL.cc:467
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static std::string BytesToString(uint64_t bytes)
Convert bytes to a human readable string.
Definition: XrdClUtils.cc:367
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
short Doff
Definition: XrdCpFile.hh:46
PType Protocol
Definition: XrdCpFile.hh:49
char * Path
Definition: XrdCpFile.hh:45
char ProtName[8]
Definition: XrdCpFile.hh:50
XrdCpFile * Next
Definition: XrdCpFile.hh:44
short Dlen
Definition: XrdCpFile.hh:47
const int DefaultCPChunkSize
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t AppMsg
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const int DefaultCPParallelChunks
const int DefaultXCpBlockSize
const int DefaultPreserveXAttrs
@ Merge
Merge duplicates.
@ Recursive
Do a recursive listing.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:148
int GetShellCode() const
Get the status code that may be returned to the shell.
Definition: XrdClStatus.hh:129
const char * vName
Definition: XrdCpConfig.hh:53
defVar * intDefs
Definition: XrdCpConfig.hh:63
void Config(int argc, char **argv, int Opts=0)
Definition: XrdCpConfig.cc:194
std::vector< std::string > AddCksVal
Definition: XrdCpConfig.hh:97
const char * dstOpq
Definition: XrdCpConfig.hh:65
static const uint64_t DoZipMtlnCksum
Definition: XrdCpConfig.hh:189
XrdCpFile * srcFile
Definition: XrdCpConfig.hh:90
XrdCpFile * dstFile
Definition: XrdCpConfig.hh:91
char * zipFile
Definition: XrdCpConfig.hh:93
static const uint64_t DoNoPbar
Definition: XrdCpConfig.hh:122
static const uint64_t DoCoerce
Definition: XrdCpConfig.hh:105
static const uint64_t DoForce
Definition: XrdCpConfig.hh:111
static const uint64_t DoRmOnBadCksum
Definition: XrdCpConfig.hh:192
static const uint64_t DoNoTlsOK
Definition: XrdCpConfig.hh:177
static const uint64_t DoTpc
Definition: XrdCpConfig.hh:150
char * pHost
Definition: XrdCpConfig.hh:71
static const uint64_t DoCksum
Definition: XrdCpConfig.hh:101
defVar * strDefs
Definition: XrdCpConfig.hh:64
static const uint64_t DoCksrc
Definition: XrdCpConfig.hh:100
static const uint64_t DoTpcDlgt
Definition: XrdCpConfig.hh:152
static const uint64_t DoZip
Definition: XrdCpConfig.hh:171
static const uint64_t DoContinue
Definition: XrdCpConfig.hh:195
const char * CksVal
Definition: XrdCpConfig.hh:88
static const uint64_t DoRecurse
Definition: XrdCpConfig.hh:132
const char * srcOpq
Definition: XrdCpConfig.hh:66
static const uint64_t DoZipAppend
Definition: XrdCpConfig.hh:204
static const uint64_t DoDynaSrc
Definition: XrdCpConfig.hh:166
int Want(uint64_t What)
Definition: XrdCpConfig.hh:226
long long xRate
Definition: XrdCpConfig.hh:68
static const uint64_t DoSources
Definition: XrdCpConfig.hh:144
static const uint64_t DoXAttr
Definition: XrdCpConfig.hh:186
static const uint64_t DoTlsMLF
Definition: XrdCpConfig.hh:180
static const int optRmtRec
Definition: XrdCpConfig.hh:218
std::string RetryPolicy
Definition: XrdCpConfig.hh:78
static const uint64_t DoPath
Definition: XrdCpConfig.hh:183
static const uint64_t DoPosc
Definition: XrdCpConfig.hh:125
long long xRateThreshold
Definition: XrdCpConfig.hh:69
static const uint64_t DoTpcOnly
Definition: XrdCpConfig.hh:151
static const uint64_t DoTlsNoData
Definition: XrdCpConfig.hh:174
static const uint64_t DoServer
Definition: XrdCpConfig.hh:138