2433 std::string checkSumMode;
2434 std::string checkSumType;
2435 std::string checkSumPreset;
2436 std::string zipSource;
2437 uint16_t parallelChunks;
2440 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2441 rmOnBadCksum, continue_, zipappend, doserver;
2442 int32_t nbXcpSources;
2444 long long xRateThreshold;
2446 std::vector<std::string> addcksums;
2477 if( force && continue_ )
2479 "Invalid argument combination: continue + force." );
2481 if( zipappend && ( continue_ || force ) )
2483 "Invalid argument combination: ( continue | force ) + zip-append." );
2488 std::unique_ptr<timer_sec_t> cptimer;
2489 if( cpTimeout ) cptimer.reset(
new timer_sec_t() );
2494 if( rmOnBadCksum ) posc =
true;
2499 if( checkSumType ==
"auto" )
2502 if( checkSumType.empty() )
2505 log->
Info(
UtilityMsg,
"Using inferred checksum type: %s.", checkSumType.c_str() );
2508 if( cptimer && cptimer->elapsed() > cpTimeout )
2514 std::unique_ptr<Source> src;
2516 src.reset(
new XRootDSourceXCp( &
GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2518 src.reset(
new XRootDSourceZip( zipSource, &
GetSource(), chunkSize, parallelChunks,
2519 checkSumType, addcksums , doserver) );
2520 else if(
GetSource().GetProtocol() ==
"stdio" )
2521 src.reset(
new StdInSource( checkSumType, chunkSize, addcksums ) );
2525 src.reset(
new XRootDSourceDynamic( &
GetSource(), chunkSize, checkSumType, addcksums ) );
2527 src.reset(
new XRootDSource( &
GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2531 if( !st.
IsOK() )
return SourceError( st );
2532 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2534 if( cptimer && cptimer->elapsed() > cpTimeout )
2537 std::unique_ptr<Destination> dest;
2540 if(
GetTarget().GetProtocol() ==
"stdio" )
2541 dest.reset(
new StdOutDestination( checkSumType ) );
2542 else if( zipappend )
2545 size_t pos = fn.rfind(
'/' );
2546 if( pos != std::string::npos )
2547 fn = fn.substr( pos + 1 );
2548 int64_t size = src->GetSize();
2549 dest.reset(
new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *
this ) );
2556 if( src->GetSize() >= 0 )
2559 std::ostringstream o; o << src->GetSize();
2560 params[
"oss.asize"] = o.str();
2561 newDestUrl.SetParams( params );
2564 dest.reset(
new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *
this ) );
2567 dest->SetForce( force );
2568 dest->SetPOSC( posc );
2569 dest->SetCoerce( coerce );
2570 dest->SetMakeDir( makeDir );
2571 dest->SetContinue( continue_ );
2572 st = dest->Initialize();
2573 if( !st.
IsOK() )
return DestinationError( st );
2575 if( cptimer && cptimer->elapsed() > cpTimeout )
2583 size -= dest->GetSize();
2585 if( !st.
IsOK() )
return SetResult( st );
2589 uint64_t total_processed = 0;
2590 uint64_t processed = 0;
2592 uint16_t threshold_interval = parallelChunks;
2593 bool threshold_draining =
false;
2594 timer_nsec_t threshold_timer;
2597 st = src->GetChunk( pageInfo );
2599 return SourceError( st);
2604 if( cptimer && cptimer->elapsed() > cpTimeout )
2609 auto elapsed = (
time_nsec() - start ).count();
2610 double transferred = total_processed + pageInfo.
GetLength();
2611 double expected = double( xRate ) /
to_nsec( 1 ) * elapsed;
2617 transferred > expected )
2619 auto nsec = ( transferred / xRate *
to_nsec( 1 ) ) - elapsed;
2624 if( xRateThreshold )
2626 auto elapsed = threshold_timer.elapsed();
2627 double transferred = processed + pageInfo.
GetLength();
2628 double expected = double( xRateThreshold ) /
to_nsec( 1 ) * elapsed;
2634 transferred < expected &&
2635 threshold_interval == 0 )
2637 if( !threshold_draining )
2640 " trying different source!" );
2643 "The transfer rate dropped below "
2644 "requested threshold!" );
2645 threshold_draining =
true;
2651 threshold_timer.reset();
2652 threshold_interval = parallelChunks;
2653 threshold_draining =
false;
2657 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2660 total_processed += pageInfo.
GetLength();
2663 st = dest->PutChunk( std::move( pageInfo ) );
2669 pResults->
Set(
"WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2670 return SetResult( st );
2672 return DestinationError( st );
2685 return DestinationError( st );
2692 std::vector<xattr_t> xattrs;
2693 st = src->GetXAttr( xattrs );
2694 if( !st.
IsOK() )
return SourceError( st );
2695 st = dest->SetXAttr( xattrs );
2696 if( !st.
IsOK() )
return DestinationError( st );
2703 if( src->GetSize() >= 0 && size != total_processed )
2705 log->
Error(
UtilityMsg,
"The declared source size is %llu bytes, but "
2706 "received %llu bytes.", (
unsigned long long) size, (
unsigned long long) total_processed );
2714 st = dest->Finalize();
2716 return DestinationError( st );
2721 if( checkSumMode !=
"none" )
2724 checkSumMode.c_str() );
2725 std::string sourceCheckSum;
2726 std::string targetCheckSum;
2728 if( cptimer && cptimer->elapsed() > cpTimeout )
2734 timeval oStart, oEnd;
2737 if( checkSumMode ==
"end2end" || checkSumMode ==
"source" ||
2738 !checkSumPreset.empty() )
2740 gettimeofday( &oStart, 0 );
2741 if( !checkSumPreset.empty() )
2743 sourceCheckSum = checkSumType +
":";
2749 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2751 gettimeofday( &oEnd, 0 );
2754 return SourceError( st );
2756 pResults->
Set(
"sourceCheckSum", sourceCheckSum );
2759 if( !addcksums.empty() )
2760 pResults->
Set(
"additionalCkeckSum", src->GetAddCks() );
2762 if( cptimer && cptimer->elapsed() > cpTimeout )
2768 timeval tStart, tEnd;
2770 if( checkSumMode ==
"end2end" || checkSumMode ==
"target" )
2772 gettimeofday( &tStart, 0 );
2773 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2775 return DestinationError( st );
2776 gettimeofday( &tEnd, 0 );
2777 pResults->
Set(
"targetCheckSum", targetCheckSum );
2780 if( cptimer && cptimer->elapsed() > cpTimeout )
2786 auto sanitize_cksum = [](
char c )
2789 if( std::isalpha( c ) )
return std::tolower( c, loc );
2793 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2794 sourceCheckSum.begin(), sanitize_cksum );
2796 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2797 targetCheckSum.begin(), sanitize_cksum );
2802 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2805 if( sourceCheckSum == targetCheckSum )
2814 i.
cksum = sourceCheckSum;
2826 st = fs.Rm( newDestUrl.GetPath() );
2830 log->
Info(
UtilityMsg,
"Target file removed due to bad checksum!" );
2833 st = dest->Finalize();
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
PropertyList * pProperties
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
virtual bool ShouldCancel(uint16_t jobNum)
Determine whether the job should be canceled.
static Monitor * GetMonitor()
Get the monitor object.
Send file/filesystem queries to an XRootD cluster.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Info(uint64_t topic, const char *format,...)
Print an info.
An abstract class to describe the client-side monitoring plugin interface.
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
std::map< std::string, std::string > ParamsMap
const std::string & GetPath() const
Get the path.
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static bool HasXAttr(const XrdCl::URL &url)
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidArgs
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
uint32_t GetLength() const
Get the data length.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.