XRootD
XrdCl::CopyProcess Class Reference

Copy the data from one point to another. More...

#include <XrdClCopyProcess.hh>

+ Collaboration diagram for XrdCl::CopyProcess:

Public Member Functions

 CopyProcess ()
 Constructor. More...
 
virtual ~CopyProcess ()
 Destructor. More...
 
XRootDStatus AddJob (const PropertyList &properties, PropertyList *results)
 
XRootDStatus Prepare ()
 
XRootDStatus Run (CopyProgressHandler *handler)
 Run the copy jobs. More...
 

Detailed Description

Copy the data from one point to another.

Definition at line 107 of file XrdClCopyProcess.hh.

Constructor & Destructor Documentation

◆ CopyProcess()

XrdCl::CopyProcess::CopyProcess ( )

Constructor.

Definition at line 212 of file XrdClCopyProcess.cc.

212  : pImpl( new CopyProcessImpl() )
213  {
214  }

◆ ~CopyProcess()

XrdCl::CopyProcess::~CopyProcess ( )
virtual

Destructor.

Definition at line 219 of file XrdClCopyProcess.cc.

220  {
221  CleanUpJobs();
222  delete pImpl;
223  }

Member Function Documentation

◆ AddJob()

XRootDStatus XrdCl::CopyProcess::AddJob ( const PropertyList properties,
PropertyList results 
)

Add job

Parameters
propertiesjob configuration parameters
resultsplaceholder for the results

Configuration properties: source [string] - original source URL target [string] - target directory or file sourceLimit [uint16_t] - maximum number sources force [bool] - overwrite target if exists posc [bool] - persistify only on successful close coerce [bool] - ignore locking semantics on destination makeDir [bool] - create path to the file if it doesn't exist thirdParty [string] - "first" try third party copy, if it fails try normal copy; "only" only try third party copy checkSumMode [string] - "none" - no checksumming "end2end" - end to end checksumming "source" - calculate checksum at source "target" - calculate checksum at target checkSumType [string] - type of the checksum to be used checkSumPreset [string] - checksum preset chunkSize [uint32_t] - size of a copy chunks in bytes parallelChunks [uint8_t] - number of chunks that should be requested in parallel initTimeout [uint16_t] - time limit for successfull initialization of the copy job tpcTimeout [uint16_t] - time limit for the actual copy to finish dynamicSource [bool] - support for the case where the size source file may change during reading process

Configuration job - this is a job that that is supposed to configure the copy process as a whole instead of adding a copy job:

jobType [string] - "configuration" - for configuraion parallel [uint8_t] - nomber of copy jobs to be run in parallel

Results: sourceCheckSum [string] - checksum at source, if requested targetCheckSum [string] - checksum at target, if requested size [uint64_t] - file size status [XRootDStatus] - status of the copy operation sources [vector<string>] - all sources used realTarget [string] - the actual disk server target

Definition at line 228 of file XrdClCopyProcess.cc.

230  {
231  Env *env = DefaultEnv::GetEnv();
232 
233  //--------------------------------------------------------------------------
234  // Process a configuraion job
235  //--------------------------------------------------------------------------
236  if( properties.HasProperty( "jobType" ) &&
237  properties.Get<std::string>( "jobType" ) == "configuration" )
238  {
239  if( pImpl->pJobProperties.size() > 0 &&
240  pImpl->pJobProperties.rbegin()->HasProperty( "jobType" ) &&
241  pImpl->pJobProperties.rbegin()->Get<std::string>( "jobType" ) == "configuration" )
242  {
243  PropertyList &config = *pImpl->pJobProperties.rbegin();
244  PropertyList::PropertyMap::const_iterator it;
245  for( it = properties.begin(); it != properties.end(); ++it )
246  config.Set( it->first, it->second );
247  }
248  else
249  pImpl->pJobProperties.push_back( properties );
250  return XRootDStatus();
251  }
252 
253  //--------------------------------------------------------------------------
254  // Validate properties
255  //--------------------------------------------------------------------------
256  if( !properties.HasProperty( "source" ) )
257  return XRootDStatus( stError, errInvalidArgs, 0, "source not specified" );
258 
259  if( !properties.HasProperty( "target" ) )
260  return XRootDStatus( stError, errInvalidArgs, 0, "target not specified" );
261 
262  pImpl->pJobProperties.push_back( properties );
263  PropertyList &p = pImpl->pJobProperties.back();
264 
265  const char *bools[] = {"target", "force", "posc", "coerce", "makeDir",
266  "zipArchive", "xcp", "preserveXAttr", "rmOnBadCksum",
267  "continue", "zipAppend", "doServer", 0};
268  for( int i = 0; bools[i]; ++i )
269  if( !p.HasProperty( bools[i] ) )
270  p.Set( bools[i], false );
271 
272  if( !p.HasProperty( "thirdParty" ) )
273  p.Set( "thirdParty", "none" );
274 
275  if( !p.HasProperty( "checkSumMode" ) )
276  p.Set( "checkSumMode", "none" );
277  else
278  {
279  if( !p.HasProperty( "checkSumType" ) )
280  {
281  pImpl->pJobProperties.pop_back();
282  return XRootDStatus( stError, errInvalidArgs, 0,
283  "checkSumType not specified" );
284  }
285  else
286  {
287  //----------------------------------------------------------------------
288  // Checksum type has to be case insensitive
289  //----------------------------------------------------------------------
290  std::string checkSumType;
291  p.Get( "checkSumType", checkSumType );
292  std::transform(checkSumType.begin(), checkSumType.end(),
293  checkSumType.begin(), ::tolower);
294  p.Set( "checkSumType", checkSumType );
295  }
296  }
297 
298  if( !p.HasProperty( "parallelChunks" ) )
299  {
300  int val = DefaultCPParallelChunks;
301  env->GetInt( "CPParallelChunks", val );
302  p.Set( "parallelChunks", val );
303  }
304 
305  if( !p.HasProperty( "chunkSize" ) )
306  {
307  int val = DefaultCPChunkSize;
308  env->GetInt( "CPChunkSize", val );
309  p.Set( "chunkSize", val );
310  }
311 
312  if( !p.HasProperty( "xcpBlockSize" ) )
313  {
314  int val = DefaultXCpBlockSize;
315  env->GetInt( "XCpBlockSize", val );
316  p.Set( "xcpBlockSize", val );
317  }
318 
319  if( !p.HasProperty( "initTimeout" ) )
320  {
321  int val = DefaultCPInitTimeout;
322  env->GetInt( "CPInitTimeout", val );
323  p.Set( "initTimeout", val );
324  }
325 
326  if( !p.HasProperty( "tpcTimeout" ) )
327  {
328  int val = DefaultCPTPCTimeout;
329  env->GetInt( "CPTPCTimeout", val );
330  p.Set( "tpcTimeout", val );
331  }
332 
333  if( !p.HasProperty( "cpTimeout" ) )
334  {
335  int val = DefaultCPTimeout;
336  env->GetInt( "CPTimeout", val );
337  p.Set( "cpTimeout", val );
338  }
339 
340  if( !p.HasProperty( "dynamicSource" ) )
341  p.Set( "dynamicSource", false );
342 
343  if( !p.HasProperty( "xrate" ) )
344  p.Set( "xrate", 0 );
345 
346  if( !p.HasProperty( "xrateThreshold" ) || p.Get<long long>( "xrateThreshold" ) == 0 )
347  {
348  int val = DefaultXRateThreshold;
349  env->GetInt( "XRateThreshold", val );
350  p.Set( "xrateThreshold", val );
351  }
352 
353  //--------------------------------------------------------------------------
354  // Insert the properties
355  //--------------------------------------------------------------------------
356  Log *log = DefaultEnv::GetLog();
357  Utils::LogPropertyList( log, UtilityMsg, "Adding job with properties: %s",
358  p );
359  pImpl->pJobResults.push_back( results );
360  return XRootDStatus();
361  }
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static void LogPropertyList(Log *log, uint64_t topic, const char *format, const PropertyList &list)
Log property list.
Definition: XrdClUtils.cc:617
const int DefaultCPInitTimeout
const int DefaultXRateThreshold
const int DefaultCPChunkSize
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
std::vector< PropertyList * > pJobResults
const int DefaultCPParallelChunks
const int DefaultXCpBlockSize
const uint64_t UtilityMsg
const int DefaultCPTimeout
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
std::vector< PropertyList > pJobProperties
const int DefaultCPTPCTimeout
XrdSysError Log
Definition: XrdConfig.cc:112

References XrdCl::PropertyList::begin(), XrdCl::DefaultCPChunkSize, XrdCl::DefaultCPInitTimeout, XrdCl::DefaultCPParallelChunks, XrdCl::DefaultCPTimeout, XrdCl::DefaultCPTPCTimeout, XrdCl::DefaultXCpBlockSize, XrdCl::DefaultXRateThreshold, XrdCl::PropertyList::end(), XrdCl::errInvalidArgs, XrdCl::PropertyList::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::PropertyList::HasProperty(), XrdCl::Utils::LogPropertyList(), XrdCl::CopyProcessImpl::pJobProperties, XrdCl::CopyProcessImpl::pJobResults, XrdCl::PropertyList::Set(), XrdCl::stError, and XrdCl::UtilityMsg.

Referenced by DoCat(), and main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Prepare()

XRootDStatus XrdCl::CopyProcess::Prepare ( )

Definition at line 366 of file XrdClCopyProcess.cc.

367  {
368  Log *log = DefaultEnv::GetLog();
369  std::vector<PropertyList>::iterator it;
370 
371  log->Debug( UtilityMsg, "CopyProcess: %zu jobs to prepare",
372  pImpl->pJobProperties.size() );
373 
374  std::map<std::string, uint32_t> targetFlags;
375  int i = 0;
376  for( it = pImpl->pJobProperties.begin(); it != pImpl->pJobProperties.end(); ++it, ++i )
377  {
378  PropertyList &props = *it;
379 
380  if( props.HasProperty( "jobType" ) &&
381  props.Get<std::string>( "jobType" ) == "configuration" )
382  continue;
383 
384  PropertyList *res = pImpl->pJobResults[i];
385  std::string tmp;
386 
387  props.Get( "source", tmp );
388  URL source = tmp;
389  if( !source.IsValid() )
390  return XRootDStatus( stError, errInvalidArgs, 0, "invalid source" );
391 
392  //--------------------------------------------------------------------------
393  // Create a virtual redirector if it is a Metalink file
394  //--------------------------------------------------------------------------
395  if( source.IsMetalink() )
396  {
397  RedirectorRegistry &registry = RedirectorRegistry::Instance();
398  XRootDStatus st = registry.RegisterAndWait( source );
399  if( !st.IsOK() ) return st;
400  }
401 
402  // handle UNZIP CGI
403  const URL::ParamsMap &cgi = source.GetParams();
404  URL::ParamsMap::const_iterator itr = cgi.find( "xrdcl.unzip" );
405  if( itr != cgi.end() )
406  {
407  props.Set( "zipArchive", true );
408  props.Set( "zipSource", itr->second );
409  }
410 
411  props.Get( "target", tmp );
412  URL target = tmp;
413  if( !target.IsValid() )
414  return XRootDStatus( stError, errInvalidArgs, 0, "invalid target" );
415 
416  if( target.GetProtocol() != "stdio" )
417  {
418  // handle directories
419  bool targetIsDir = false;
420  props.Get( "targetIsDir", targetIsDir );
421 
422  if( targetIsDir )
423  {
424  std::string path = target.GetPath() + '/';
425  std::string fn;
426 
427  bool isZip = false;
428  props.Get( "zipArchive", isZip );
429  if( isZip )
430  {
431  props.Get( "zipSource", fn );
432  }
433  else if( source.IsMetalink() )
434  {
435  RedirectorRegistry &registry = XrdCl::RedirectorRegistry::Instance();
436  VirtualRedirector *redirector = registry.Get( source );
437  fn = redirector->GetTargetName();
438  }
439  else
440  {
441  fn = source.GetPath();
442  }
443 
444  size_t pos = fn.rfind( '/' );
445  if( pos != std::string::npos )
446  fn = fn.substr( pos + 1 );
447  path += fn;
448  target.SetPath( path );
449  props.Set( "target", target.GetURL() );
450  }
451  }
452 
453  bool tpc = false;
454  props.Get( "thirdParty", tmp );
455  if( tmp != "none" )
456  tpc = true;
457 
458  //------------------------------------------------------------------------
459  // Check if we have all we need
460  //------------------------------------------------------------------------
461  if( source.GetProtocol() != "stdio" && source.GetPath().empty() )
462  {
463  log->Debug( UtilityMsg, "CopyProcess (job #%d): no source specified.",
464  i );
465  CleanUpJobs();
466  XRootDStatus st = XRootDStatus( stError, errInvalidArgs );
467  res->Set( "status", st );
468  return st;
469  }
470 
471  if( target.GetProtocol() != "stdio" && target.GetPath().empty() )
472  {
473  log->Debug( UtilityMsg, "CopyProcess (job #%d): no target specified.",
474  i );
475  CleanUpJobs();
476  XRootDStatus st = XRootDStatus( stError, errInvalidArgs );
477  res->Set( "status", st );
478  return st;
479  }
480 
481  //------------------------------------------------------------------------
482  // Check what kind of job we should do
483  //------------------------------------------------------------------------
484  CopyJob *job = 0;
485 
486  if( tpc == true )
487  {
488  MarkTPC( props );
489  job = new TPFallBackCopyJob( i+1, &props, res );
490  }
491  else
492  job = new ClassicCopyJob( i+1, &props, res );
493 
494  pImpl->pJobs.push_back( job );
495  }
496  return XRootDStatus();
497  }
static RedirectorRegistry & Instance()
Returns reference to the single instance.
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
std::vector< CopyJob * > pJobs

References XrdCl::Log::Debug(), XrdCl::errInvalidArgs, XrdCl::PropertyList::Get(), XrdCl::RedirectorRegistry::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::URL::GetParams(), XrdCl::URL::GetPath(), XrdCl::URL::GetProtocol(), XrdCl::VirtualRedirector::GetTargetName(), XrdCl::URL::GetURL(), XrdCl::PropertyList::HasProperty(), XrdCl::RedirectorRegistry::Instance(), XrdCl::URL::IsMetalink(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::CopyProcessImpl::pJobProperties, XrdCl::CopyProcessImpl::pJobResults, XrdCl::CopyProcessImpl::pJobs, XrdCl::RedirectorRegistry::RegisterAndWait(), XrdCl::PropertyList::Set(), XrdCl::URL::SetPath(), XrdCl::stError, and XrdCl::UtilityMsg.

Referenced by DoCat(), and main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Run()

XRootDStatus XrdCl::CopyProcess::Run ( CopyProgressHandler handler)

Run the copy jobs.

Definition at line 502 of file XrdClCopyProcess.cc.

503  {
504  //--------------------------------------------------------------------------
505  // Get the configuration
506  //--------------------------------------------------------------------------
507  uint8_t parallelThreads = 1;
508  if( pImpl->pJobProperties.size() > 0 &&
509  pImpl->pJobProperties.rbegin()->HasProperty( "jobType" ) &&
510  pImpl->pJobProperties.rbegin()->Get<std::string>( "jobType" ) == "configuration" )
511  {
512  PropertyList &config = *pImpl->pJobProperties.rbegin();
513  if( config.HasProperty( "parallel" ) )
514  parallelThreads = (uint8_t)config.Get<int>( "parallel" );
515  }
516 
517  //--------------------------------------------------------------------------
518  // Run the show
519  //--------------------------------------------------------------------------
520  std::vector<CopyJob *>::iterator it;
521  uint16_t currentJob = 1;
522  uint16_t totalJobs = pImpl->pJobs.size();
523 
524  //--------------------------------------------------------------------------
525  // Single thread
526  //--------------------------------------------------------------------------
527  if( parallelThreads == 1 )
528  {
529  XRootDStatus err;
530 
531  for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
532  {
533  QueuedCopyJob j( *it, progress, currentJob, totalJobs );
534  j.Run(0);
535 
536  XRootDStatus st = (*it)->GetResults()->Get<XRootDStatus>( "status" );
537  if( err.IsOK() && !st.IsOK() )
538  {
539  err = st;
540  }
541  ++currentJob;
542  }
543 
544  if( !err.IsOK() ) return err;
545  }
546  //--------------------------------------------------------------------------
547  // Multiple threads
548  //--------------------------------------------------------------------------
549  else
550  {
551  uint16_t workers = std::min( (uint16_t)parallelThreads,
552  (uint16_t)pImpl->pJobs.size() );
553  JobManager jm( workers );
554  jm.Initialize();
555  if( !jm.Start() )
556  return XRootDStatus( stError, errOSError, 0,
557  "Unable to start job manager" );
558 
559  XrdSysSemaphore *sem = new XrdSysSemaphore(0);
560  std::vector<QueuedCopyJob*> queued;
561  for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
562  {
563  QueuedCopyJob *j = new QueuedCopyJob( *it, progress, currentJob,
564  totalJobs, sem );
565 
566  queued.push_back( j );
567  jm.QueueJob(j, 0);
568  ++currentJob;
569  }
570 
571  std::vector<QueuedCopyJob*>::iterator itQ;
572  for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
573  sem->Wait();
574  delete sem;
575 
576  if( !jm.Stop() )
577  return XRootDStatus( stError, errOSError, 0,
578  "Unable to stop job manager" );
579  jm.Finalize();
580  for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
581  delete *itQ;
582 
583  for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
584  {
585  XRootDStatus st = (*it)->GetResults()->Get<XRootDStatus>( "status" );
586  if( !st.IsOK() ) return st;
587  }
588  };
589  return XRootDStatus();
590  }
const uint16_t errOSError
Definition: XrdClStatus.hh:61

References XrdCl::errOSError, XrdCl::JobManager::Finalize(), XrdCl::PropertyList::Get(), XrdCl::PropertyList::HasProperty(), XrdCl::JobManager::Initialize(), XrdCl::Status::IsOK(), XrdCl::CopyProcessImpl::pJobProperties, XrdCl::CopyProcessImpl::pJobs, XrdCl::JobManager::QueueJob(), XrdCl::JobManager::Start(), XrdCl::stError, XrdCl::JobManager::Stop(), and XrdSysSemaphore::Wait().

Referenced by DoCat(), and main().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: