54 pJob(job), pProgress(progress), pCurrentJob(currentJob),
55 pTotalJobs(totalJobs), pSem(sem),
68 virtual void Run(
void * )
77 pProgress->BeginJob( pCurrentJob, pTotalJobs,
89 gettimeofday( &bTOD, 0 );
97 st = pJob->Run( pProgress );
104 pJob->GetResults()->Get(
"LastURL", url );
107 auto itr = cgi.find(
"tried" );
108 if( itr != cgi.end() )
110 std::string tried = itr->second;
111 if( tried[tried.size() - 1] !=
',' ) tried +=
',';
112 tried += lastURL.GetHostName();
113 cgi[
"tried"] = tried;
116 cgi[
"tried"] = lastURL.GetHostName();
118 std::string recoveryRedir;
119 pJob->GetResults()->Get(
"WrtRecoveryRedir", recoveryRedir );
123 pJob->GetProperties()->Get(
"target", target );
125 trgURL.SetHostName( recRedirURL.GetHostName() );
126 trgURL.SetPort( recRedirURL.GetPort() );
127 trgURL.SetProtocol( recRedirURL.GetProtocol() );
128 trgURL.SetParams( cgi );
129 pJob->GetProperties()->Set(
"target", trgURL.GetURL() );
139 if( !st.
IsOK() && pRetryCnt > 0 &&
144 if( pRetryPolicy ==
"continue" )
146 pJob->GetProperties()->Set(
"force",
false );
147 pJob->GetProperties()->Set(
"continue",
true );
151 pJob->GetProperties()->Set(
"force",
true );
152 pJob->GetProperties()->Set(
"continue",
false );
162 pJob->GetResults()->Set(
"status", st );
169 std::vector<std::string> sources;
170 pJob->GetResults()->Get(
"sources", sources );
176 gettimeofday( &i.
eTOD, 0 );
182 pProgress->EndJob( pCurrentJob, pJob->GetResults() );
191 uint16_t pCurrentJob;
196 std::string pRetryPolicy;
237 properties.
Get<std::string>(
"jobType" ) ==
"configuration" )
241 pImpl->
pJobProperties.rbegin()->Get<std::string>(
"jobType" ) ==
"configuration" )
244 PropertyList::PropertyMap::const_iterator it;
245 for( it = properties.
begin(); it != properties.
end(); ++it )
246 config.
Set( it->first, it->second );
265 const char *bools[] = {
"target",
"force",
"posc",
"coerce",
"makeDir",
266 "zipArchive",
"xcp",
"preserveXAttr",
"rmOnBadCksum",
267 "continue",
"zipAppend",
"doServer", 0};
268 for(
int i = 0; bools[i]; ++i )
270 p.
Set( bools[i],
false );
273 p.
Set(
"thirdParty",
"none" );
276 p.
Set(
"checkSumMode",
"none" );
283 "checkSumType not specified" );
290 std::string checkSumType;
291 p.
Get(
"checkSumType", checkSumType );
292 std::transform(checkSumType.begin(), checkSumType.end(),
293 checkSumType.begin(), ::tolower);
294 p.
Set(
"checkSumType", checkSumType );
301 env->
GetInt(
"CPParallelChunks", val );
302 p.
Set(
"parallelChunks", val );
308 env->
GetInt(
"CPChunkSize", val );
309 p.
Set(
"chunkSize", val );
315 env->
GetInt(
"XCpBlockSize", val );
316 p.
Set(
"xcpBlockSize", val );
322 env->
GetInt(
"CPInitTimeout", val );
323 p.
Set(
"initTimeout", val );
329 env->
GetInt(
"CPTPCTimeout", val );
330 p.
Set(
"tpcTimeout", val );
336 env->
GetInt(
"CPTimeout", val );
337 p.
Set(
"cpTimeout", val );
341 p.
Set(
"dynamicSource",
false );
346 if( !p.
HasProperty(
"xrateThreshold" ) || p.
Get<
long long>(
"xrateThreshold" ) == 0 )
349 env->
GetInt(
"XRateThreshold", val );
350 p.
Set(
"xrateThreshold", val );
369 std::vector<PropertyList>::iterator it;
374 std::map<std::string, uint32_t> targetFlags;
381 props.
Get<std::string>(
"jobType" ) ==
"configuration" )
387 props.
Get(
"source", tmp );
399 if( !st.
IsOK() )
return st;
404 URL::ParamsMap::const_iterator itr = cgi.find(
"xrdcl.unzip" );
405 if( itr != cgi.end() )
407 props.
Set(
"zipArchive",
true );
408 props.
Set(
"zipSource", itr->second );
411 props.
Get(
"target", tmp );
419 bool targetIsDir =
false;
420 props.
Get(
"targetIsDir", targetIsDir );
424 std::string path = target.
GetPath() +
'/';
428 props.
Get(
"zipArchive", isZip );
431 props.
Get(
"zipSource", fn );
444 size_t pos = fn.rfind(
'/' );
445 if( pos != std::string::npos )
446 fn = fn.substr( pos + 1 );
454 props.
Get(
"thirdParty", tmp );
467 res->
Set(
"status", st );
477 res->
Set(
"status", st );
494 pImpl->
pJobs.push_back( job );
507 uint8_t parallelThreads = 1;
510 pImpl->
pJobProperties.rbegin()->Get<std::string>(
"jobType" ) ==
"configuration" )
514 parallelThreads = (uint8_t)config.
Get<
int>(
"parallel" );
520 std::vector<CopyJob *>::iterator it;
521 uint16_t currentJob = 1;
522 uint16_t totalJobs = pImpl->
pJobs.size();
527 if( parallelThreads == 1 )
531 for( it = pImpl->
pJobs.begin(); it != pImpl->
pJobs.end(); ++it )
533 QueuedCopyJob j( *it, progress, currentJob, totalJobs );
544 if( !err.
IsOK() )
return err;
551 uint16_t workers = std::min( (uint16_t)parallelThreads,
552 (uint16_t)pImpl->
pJobs.size() );
557 "Unable to start job manager" );
560 std::vector<QueuedCopyJob*> queued;
561 for( it = pImpl->
pJobs.begin(); it != pImpl->
pJobs.end(); ++it )
563 QueuedCopyJob *j =
new QueuedCopyJob( *it, progress, currentJob,
566 queued.push_back( j );
571 std::vector<QueuedCopyJob*>::iterator itQ;
572 for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
578 "Unable to stop job manager" );
580 for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
583 for( it = pImpl->
pJobs.begin(); it != pImpl->
pJobs.end(); ++it )
586 if( !st.
IsOK() )
return st;
592 void CopyProcess::CleanUpJobs()
594 std::vector<CopyJob*>::iterator itJ;
595 for( itJ = pImpl->
pJobs.begin(); itJ != pImpl->
pJobs.end(); ++itJ )
606 pImpl->
pJobs.clear();
const URL & GetSource() const
Get source.
virtual ~CopyProcess()
Destructor.
CopyProcess()
Constructor.
XRootDStatus Run(CopyProgressHandler *handler)
Run the copy jobs.
XRootDStatus AddJob(const PropertyList &properties, PropertyList *results)
Interface for copy progress notification.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
bool Finalize()
Finalize the job manager, clear the queues.
bool Start()
Start the workers.
bool Initialize()
Initialize the job manager.
bool Stop()
Stop the workers.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
virtual void Run(void *arg)=0
The job logic.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
An abstract class to describe the client-side monitoring plugin interface.
TransferInfo transfer
The transfer in question.
@ EvCopyBeg
CopyBInfo: Copy operation started.
@ EvCopyEnd
CopyEInfo: Copy operation ended.
virtual void Event(EventCode evCode, void *evData)=0
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
PropertyMap::const_iterator end() const
Get the end iterator.
bool Get(const std::string &name, Item &item) const
bool HasProperty(const std::string &name) const
Check if we now about the given name.
PropertyMap::const_iterator begin() const
Get the begin iterator.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
XRootDStatus RegisterAndWait(const URL &url)
Creates a new virtual redirector and registers it (sync).
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
bool IsMetalink() const
Is it a URL to a metalink.
std::map< std::string, std::string > ParamsMap
const std::string & GetProtocol() const
Get the protocol.
std::string GetURL() const
Get the URL.
void SetPath(const std::string &path)
Set the path.
const ParamsMap & GetParams() const
Get the URL params.
const std::string & GetPath() const
Get the path.
bool IsValid() const
Is the url valid.
static void LogPropertyList(Log *log, uint64_t topic, const char *format, const PropertyList &list)
Log property list.
An interface for metadata redirectors.
virtual std::string GetTargetName() const =0
Gets the file name as specified in the metalink.
const int DefaultCPInitTimeout
const int DefaultXRateThreshold
const uint16_t errOperationExpired
const int DefaultCPChunkSize
const uint16_t stError
An error occurred that could potentially be retried.
const int DefaultRetryWrtAtLBLimit
std::vector< PropertyList * > pJobResults
const int DefaultCPParallelChunks
const uint16_t errOSError
const int DefaultXCpBlockSize
const uint64_t UtilityMsg
const int DefaultCPTimeout
const uint16_t errInvalidArgs
std::vector< PropertyList > pJobProperties
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errThresholdExceeded
const char *const DefaultCpRetryPolicy
const int DefaultCPTPCTimeout
std::vector< CopyJob * > pJobs
Describe an end of copy event.
TransferInfo transfer
The transfer in question.
int sources
Number of sources used for the copy.
timeval bTOD
Copy start time.
const XRootDStatus * status
Status of the copy.
timeval eTOD
Copy end time.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static bool IsSocketError(uint16_t code)