46 #include <sys/types.h>
68 virtual ~TPCStatusHandler()
102 TPCStatusHandler(
const TPCStatusHandler &other);
103 TPCStatusHandler &operator = (
const TPCStatusHandler &other);
109 class InitTimeoutCalc
113 InitTimeoutCalc( uint16_t timeLeft ) :
114 hasInitTimeout( timeLeft ), start( time( 0 ) ), timeLeft( timeLeft )
123 time_t now = time( 0 );
124 if( now - start > timeLeft )
127 timeLeft -= ( now - start );
145 msg +=
" (" + str +
")";
159 CopyJob( jobId, jobProperties, jobResults ),
160 dstFile(
File::DisableVirtRedirect ),
170 log->
Debug(
UtilityMsg,
"Creating a third party copy job, from %s to %s",
182 if( !st.
IsOK() )
return st;
190 if( !st.
IsOK() )
return st;
198 if( !st.
IsOK() )
return st;
204 if( checkSumMode !=
"none" )
207 std::string sourceCheckSum;
208 std::string targetCheckSum;
213 timeval oStart, oEnd;
215 if( checkSumMode ==
"end2end" || checkSumMode ==
"source" ||
216 !checkSumPreset.empty() )
218 gettimeofday( &oStart, 0 );
219 if( !checkSumPreset.empty() )
221 sourceCheckSum = checkSumType +
":";
228 std::string vrCheckSum;
231 !( vrCheckSum = redirector->
GetCheckSum( checkSumType ) ).empty() )
232 sourceCheckSum = vrCheckSum;
236 gettimeofday( &oEnd, 0 );
238 return UpdateErrMsg( st,
"source" );
246 timeval tStart, tEnd;
248 if( checkSumMode ==
"end2end" || checkSumMode ==
"target" )
250 gettimeofday( &tStart, 0 );
253 gettimeofday( &tEnd, 0 );
255 return UpdateErrMsg( st,
"destination" );
262 auto sanitize_cksum = [](
char c )
265 if( std::isalpha( c ) )
return std::tolower( c, loc );
269 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
270 sourceCheckSum.begin(), sanitize_cksum );
272 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
273 targetCheckSum.begin(), sanitize_cksum );
278 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
281 if( sourceCheckSum == targetCheckSum )
290 i.
cksum = sourceCheckSum;
321 "Cannot do a third-party-copy for local file." );
336 "is only supported for root/xroot protocol." );
339 InitTimeoutCalc timeLeft( initTimeout );
349 env->
GetInt(
"SubStreamsPerChannel", nbStrm );
352 if (nbStrm > 0) --nbStrm;
354 bool tpcLiteOnly =
false;
362 if( checkSumType ==
"auto" )
365 if( checkSumType.empty() )
368 log->
Info(
UtilityMsg,
"Using inferred checksum type: %s.", checkSumType.c_str() );
377 URL sourceURL = source;
383 sourceFile.SetProperty(
"ReadRecovery", value );
392 if( sourceURL.GetProtocol() ==
"root" || sourceURL.GetProtocol() ==
"xroot" ||
393 sourceURL.GetProtocol() ==
"roots" || sourceURL.GetProtocol() ==
"xroots" )
395 params = sourceURL.GetParams();
396 params[
"tpc.stage"] =
"placement";
397 sourceURL.SetParams( params );
399 sourceURL.GetObfuscatedURL().c_str() );
408 std::string sourceUrl;
409 sourceFile.GetProperty(
"LastURL", sourceUrl );
410 tpcSource = sourceUrl;
412 VirtualRedirector *redirector = 0;
416 ( size = redirector->GetSize() ) >= 0 )
421 st = sourceFile.Stat(
false, statInfo );
422 if (st.IsOK()) sourceSize = statInfo->GetSize();
440 tpcSource = sourceURL;
445 URL tpcSourceUrl = tpcSource;
449 URL::ParamsMap::const_iterator itr = srcparams.begin();
450 for( ; itr != srcparams.end(); ++itr )
451 tpcsrcparams[itr->first] = itr->second;
452 tpcSourceUrl.SetParams( tpcsrcparams );
456 itr = scgiparams.begin();
457 for( ; itr != scgiparams.end(); ++itr )
458 if( itr->first.compare( 0, 6,
"xrdcl." ) != 0 )
460 if( !scgi.empty() ) scgi +=
'\t';
461 scgi += itr->first +
'=' + itr->second;
464 if( !timeLeft().IsOK() )
467 st = sourceFile.Close( 1 );
471 st = sourceFile.Close( timeLeft );
473 if( !timeLeft().IsOK() )
489 tpcKey = GenerateKey();
491 char *cgiBuff =
new char[2048];
495 0, cgiBuff, 2048, nbStrm,
508 URL cgiURL; cgiURL.SetParams( cgiBuff );
517 std::ostringstream o; o << sourceSize;
518 params[
"oss.asize"] = o.str();
520 params[
"tpc.stage"] =
"copy";
523 if( !scgi.empty() && delegate )
524 params[
"tpc.scgi"] = scgi;
547 st = dstFile.
Open( realTarget.
GetURL(), targetFlags, mode, timeLeft );
554 st.GetErrorMessage().find(
"tpc not supported" ) != std::string::npos )
556 "Destination does not support third-party-copy." );
557 return UpdateErrMsg( st,
"destination" );
560 std::string lastUrl; dstFile.
GetProperty(
"LastURL", lastUrl );
561 realTarget = lastUrl;
563 if( !timeLeft().IsOK() )
566 st = dstFile.
Close( 1 );
577 st = dstFile.
Close( 1 );
579 "Destination does not support third-party-copy.");
586 tpcLite = ( st.code !=
suPartial ) && delegate;
588 if( !tpcLite && tpcLiteOnly )
590 st = dstFile.
Close( 1 );
592 "support delegation." );
598 if( !timeLeft().IsOK() )
601 st = dstFile.
Close( 1 );
614 tpcSource.
GetURL().c_str() );
616 "support third-party-copy" );
619 if( !timeLeft().IsOK() )
622 st = sourceFile.Close( 1 );
627 initTimeout = uint16_t( timeLeft );
629 return XRootDStatus();
635 XRootDStatus ThirdPartyCopyJob::RunTPC( CopyProgressHandler *progress )
642 char *cgiBuff =
new char[2048];
648 log->Error(
UtilityMsg,
"Unable to setup source url: %s", cgiP+1 );
653 URL cgiURL; cgiURL.SetParams( cgiBuff );
657 params[
"tpc.stage"] =
"copy";
663 int closeTimeout = 0;
669 InitTimeoutCalc timeLeft( initTimeout );
670 XRootDStatus st = dstFile.
Sync( timeLeft );
673 log->Error(
UtilityMsg,
"Unable set up rendez-vous: %s",
674 st.ToStr().c_str() );
675 XRootDStatus status = dstFile.
Close( closeTimeout );
676 return UpdateErrMsg( st,
"destination" );
682 if( !timeLeft().IsOK() )
684 XRootDStatus status = dstFile.
Close( closeTimeout );
692 sourceFile.SetProperty(
"ReadRecovery", value );
699 log->Error(
UtilityMsg,
"Unable to open source %s: %s",
701 XRootDStatus status = dstFile.
Close( closeTimeout );
702 return UpdateErrMsg( st,
"source" );
708 TPCStatusHandler statusHandler;
712 uint16_t tpcTimeout = 0;
715 st = dstFile.
Sync( &statusHandler, tpcTimeout );
718 log->Error(
UtilityMsg,
"Unable start the copy: %s",
719 st.ToStr().c_str() );
720 XRootDStatus statusS = sourceFile.Close( closeTimeout );
721 XRootDStatus statusT = dstFile.
Close( closeTimeout );
722 return UpdateErrMsg( st,
"destination" );
728 bool canceled =
false;
735 st = dstFile.
Stat(
true, info );
738 progress->JobProgress(
pJobId, info->GetSize(), sourceSize );
742 bool shouldCancel = progress->ShouldCancel(
pJobId );
745 log->Debug(
UtilityMsg,
"Cancellation requested by progress handler" );
746 Buffer arg, *response = 0; arg.FromString(
"ofs.tpc cancel" );
747 XRootDStatus st = dstFile.
Fcntl( arg, response );
749 log->Debug(
UtilityMsg,
"Error while trying to cancel tpc: %s",
750 st.ToStr().c_str() );
768 st = *statusHandler.GetStatus();
772 log->Error(
UtilityMsg,
"Third party copy from %s to %s failed: %s",
774 st.ToStr().c_str() );
777 XRootDStatus statusS = sourceFile.Close( closeTimeout );
778 XRootDStatus statusT = dstFile.
Close( closeTimeout );
782 XRootDStatus statusS = sourceFile.Close( closeTimeout );
783 XRootDStatus statusT = dstFile.
Close( closeTimeout );
785 if ( !statusS.IsOK() || !statusT.IsOK() )
787 st = (statusS.IsOK() ? statusT : statusS);
788 log->Error(
UtilityMsg,
"Third party copy from %s to %s failed during "
791 (statusS.IsOK() ?
"destination" :
"source"), st.ToStr().c_str() );
792 return UpdateErrMsg( st, statusS.IsOK() ?
"source" :
"destination" );
795 log->Debug(
UtilityMsg,
"Third party copy from %s to %s successful",
800 return XRootDStatus();
803 XRootDStatus ThirdPartyCopyJob::RunLite( CopyProgressHandler *progress )
808 int closeTimeout = 0;
814 InitTimeoutCalc timeLeft( initTimeout );
815 XRootDStatus st = dstFile.
Sync( timeLeft );
818 log->Error(
UtilityMsg,
"Unable set up rendez-vous: %s",
819 st.ToStr().c_str() );
820 XRootDStatus status = dstFile.
Close( closeTimeout );
821 return UpdateErrMsg( st,
"destination" );
827 TPCStatusHandler statusHandler;
831 uint16_t tpcTimeout = 0;
834 st = dstFile.
Sync( &statusHandler, tpcTimeout );
837 log->Error(
UtilityMsg,
"Unable start the copy: %s",
838 st.ToStr().c_str() );
839 XRootDStatus statusT = dstFile.
Close( closeTimeout );
840 return UpdateErrMsg( st,
"destination" );
846 bool canceled =
false;
853 st = dstFile.
Stat(
true, info );
856 progress->JobProgress(
pJobId, info->GetSize(), sourceSize );
860 bool shouldCancel = progress->ShouldCancel(
pJobId );
863 log->Debug(
UtilityMsg,
"Cancellation requested by progress handler" );
864 Buffer arg, *response = 0; arg.FromString(
"ofs.tpc cancel" );
865 XRootDStatus st = dstFile.
Fcntl( arg, response );
867 log->Debug(
UtilityMsg,
"Error while trying to cancel tpc: %s",
868 st.ToStr().c_str() );
886 st = *statusHandler.GetStatus();
890 log->Error(
UtilityMsg,
"Third party copy from %s to %s failed: %s",
892 st.ToStr().c_str() );
895 XRootDStatus statusT = dstFile.
Close( closeTimeout );
899 st = dstFile.
Close( closeTimeout );
903 log->Error(
UtilityMsg,
"Third party copy from %s to %s failed during "
904 "close of %s: %s",
GetSource().GetObfuscatedURL().c_str(),
906 "destination", st.ToStr().c_str() );
907 return UpdateErrMsg( st,
"destination" );
910 log->Debug(
UtilityMsg,
"Third party copy from %s to %s successful",
915 return XRootDStatus();
922 std::string ThirdPartyCopyJob::GenerateKey()
924 static const int _10to9 = 1000000000;
928 auto tp = std::chrono::high_resolution_clock::now();
929 auto d = tp.time_since_epoch();
930 auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>( d );
931 auto s = std::chrono::duration_cast<std::chrono::seconds>( d );
932 uint32_t k1 = ns.count() - s.count() * _10to9;
933 uint32_t k2 = getpid() | (getppid() << 16);
934 uint32_t k3 = s.count();
935 snprintf( tpcKey, 25,
"%08x%08x%08x", k1, k2, k3 );
936 return std::string(tpcKey);
const URL & GetSource() const
Get source.
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
void Enable()
Enable delegation in the environment.
static DlgEnv & Instance()
void Disable()
Disable delegation in the environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus Fcntl(const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool GetProperty(const std::string &name, std::string &value) const
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool SetProperty(const std::string &name, const std::string &value)
XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
void Error(uint64_t topic, const char *format,...)
Report an error.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
An abstract class to describe the client-side monitoring plugin interface.
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
ThirdPartyCopyJob(uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
Constructor.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
bool IsMetalink() const
Is it a URL to a metalink.
const std::string & GetHostName() const
Get the name of the target host.
std::map< std::string, std::string > ParamsMap
const std::string & GetProtocol() const
Get the protocol.
void SetParams(const std::string ¶ms)
Set params.
std::string GetURL() const
Get the URL.
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
const ParamsMap & GetParams() const
Get the URL params.
const std::string & GetPath() const
Get the path.
static XRootDStatus CheckTPCLite(const std::string &server, uint16_t timeout=0)
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
static XRootDStatus CheckTPC(const std::string &server, uint16_t timeout=0)
Check if peer supports tpc.
An interface for metadata redirectors.
virtual std::string GetCheckSum(const std::string &type) const =0
void SetErrorMessage(const std::string &message)
Set the error message.
const std::string & GetErrorMessage() const
Get error message.
static const char * cgiC2Dst(const char *cKey, const char *xSrc, const char *xLfn, const char *xCks, char *Buff, int Blen, int strms=0, const char *iHst=0, const char *sprt=0, const char *tprt=0, bool dlgon=false, bool push=false)
static const char * cgiC2Src(const char *cKey, const char *xDst, int xTTL, char *Buff, int Blen)
static void Wait(int milliseconds)
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
const uint16_t errNotSupported
const uint16_t errCheckSumError
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Update
Open for reading and writing.
bool IsOK() const
We're fine.