43 pSrc( src->Self() ), pOffset( offset ), pSize( size ), pBuffer( buffer ), pHandle( handle ), pUsePgRead( usepgrd )
58 ToPgInfo( response, chunk );
62 if( !chunk && status->
IsOK() )
79 pSrc->ReportResponse( status, chunk, pHandle );
90 response->
Get( chunk );
91 response->
Set( (
int* )0 );
95 ChunkInfo *rsp =
nullptr;
97 chunk =
new PageInfo( rsp->offset, rsp->length, rsp->buffer );
111 pChunkSize( chunkSize ), pParallel( parallel ), pFileSize( fileSize ), pThread(),
112 pCtx( ctx->Self() ), pFile( 0 ), pCurrentOffset( 0 ), pBlkEnd( 0 ), pDataTransfered( 0 ), pRefCount( 1 ),
113 pRunning( false ), pStartTime( 0 ), pTransferTime( 0 ), pUsePgRead( false )
126 int rc = pthread_create( &pThread, 0, Run,
this );
135 void* XCpSrc::Run(
void* arg )
138 me->StartDownloading();
143 void XCpSrc::StartDownloading()
145 XRootDStatus st = Initialize();
161 pStartTime = time( 0 );
170 if( GetWork().IsOK() )
continue;
172 else if( st.IsOK() && st.code ==
suDone )
176 if( GetWork().IsOK() )
continue;
178 pTransferTime += time( 0 ) - pStartTime;
189 pStartTime = time( 0 );
198 XRootDStatus *status = pReports.Get();
199 if( !status->IsOK() )
202 std::string myHost = URL( pUrl ).GetHostName();
203 log->Error(
UtilityMsg,
"Failed to read chunk from %s: %s", myHost.c_str(), status->GetErrorMessage().c_str() );
205 if( !Recover().IsOK() )
227 XRootDStatus XCpSrc::Initialize()
236 log->Error(
UtilityMsg,
"Failed to initialize XCp source, no more replicas to try" );
237 return XRootDStatus(
stError );
240 log->Debug(
UtilityMsg,
"Opening %s for reading", pUrl.c_str() );
251 log->Warning(
UtilityMsg,
"Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
257 if( ( !url.IsLocalFile() && !pFile->
IsSecure() ) ||
258 ( url.IsLocalFile() && url.IsMetalink() ) )
272 StatInfo *statInfo = 0;
273 st = pFile->
Stat(
false, statInfo );
276 log->Warning(
UtilityMsg,
"Failed to stat %s: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
280 pFileSize = statInfo->GetSize();
287 std::pair<uint64_t, uint64_t> p = pCtx->
GetBlock();
288 pCurrentOffset = p.first;
289 pBlkEnd = p.second + p.first;
294 XRootDStatus XCpSrc::Recover()
303 log->Error(
UtilityMsg,
"Failed to initialize XCp source, no more replicas to try" );
304 return XRootDStatus(
stError );
307 log->Debug(
UtilityMsg,
"Opening %s for reading", pUrl.c_str() );
319 log->Warning(
UtilityMsg,
"Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
323 if( ( !url.IsLocalFile() && pFile->
IsSecure() ) ||
324 ( url.IsLocalFile() && url.IsMetalink() ) )
338 pRecovered.insert( pOngoing.begin(), pOngoing.end() );
344 pStartTime = time( 0 );
350 XRootDStatus XCpSrc::ReadChunks()
354 while( pOngoing.size() < pParallel && !pRecovered.empty() )
356 std::pair<uint64_t, uint64_t> p;
357 std::map<uint64_t, uint64_t>::iterator itr = pRecovered.begin();
359 pOngoing.insert( p );
360 pRecovered.erase( itr );
362 char *buffer =
new char[p.second];
364 XRootDStatus st = pUsePgRead
365 ? pFile->
PgRead( p.first, p.second, buffer, handler )
366 : pFile->
Read( p.first, p.second, buffer, handler );
371 ReportResponse(
new XRootDStatus( st ), 0, pFile );
376 while( pOngoing.size() < pParallel && pCurrentOffset < pBlkEnd )
378 uint64_t chunkSize = pChunkSize;
379 if( pCurrentOffset + chunkSize > pBlkEnd )
380 chunkSize = pBlkEnd - pCurrentOffset;
381 pOngoing[pCurrentOffset] = chunkSize;
382 char *buffer =
new char[chunkSize];
384 XRootDStatus st = pUsePgRead
385 ? pFile->
PgRead( pCurrentOffset, chunkSize, buffer, handler )
386 : pFile->
Read( pCurrentOffset, chunkSize, buffer, handler );
387 pCurrentOffset += chunkSize;
392 ReportResponse(
new XRootDStatus( st ), 0, pFile );
397 if( pOngoing.empty() )
return XRootDStatus(
stOK,
suDone );
399 if( pRecovered.empty() && pCurrentOffset >= pBlkEnd )
return XRootDStatus(
stOK,
suPartial );
404 void XCpSrc::ReportResponse( XRootDStatus *status, PageInfo *chunk,
File *handle )
416 ignore = !pOngoing.erase( chunk->GetOffset() );
418 else if( FilesEqual( pFile, handle ) )
425 pFailed[pFile] = pOngoing.size();
431 if( !FilesEqual( pFile, handle ) )
438 if( pFailed[handle] == 0 )
442 pFailed.erase( handle );
443 XRootDStatus st = handle->Close();
450 if( status ) pReports.Put( status );
460 pDataTransfered += chunk->GetLength();
465 void XCpSrc::Steal( XCpSrc *src )
472 std::string myHost = URL( pUrl ).GetHostName(), srcHost = URL( src->pUrl ).GetHostName();
478 pRecovered.insert( src->pOngoing.begin(), src->pOngoing.end() );
479 pRecovered.insert( src->pRecovered.begin(), src->pRecovered.end() );
480 pCurrentOffset = src->pCurrentOffset;
481 pBlkEnd = src->pBlkEnd;
483 src->pOngoing.clear();
484 src->pRecovered.clear();
485 src->pCurrentOffset = 0;
493 log->Debug(
UtilityMsg,
"%s: Stealing everything from %s", myHost.c_str(), srcHost.c_str() );
500 uint64_t myTransferRate =
TransferRate(), srcTransferRate = src->TransferRate();
501 if( myTransferRate == 0 )
return;
502 double fraction = double( myTransferRate ) / double( myTransferRate + srcTransferRate );
504 if( src->pCurrentOffset < src->pBlkEnd )
507 uint64_t blkSize = src->pBlkEnd - src->pCurrentOffset;
508 uint64_t steal =
static_cast<uint64_t
>( round( fraction * blkSize ) );
511 if( blkSize - steal <= pChunkSize )
514 pCurrentOffset = src->pBlkEnd - steal;
515 pBlkEnd = src->pBlkEnd;
516 src->pBlkEnd -= steal;
518 log->Debug(
UtilityMsg,
"%s: Stealing fraction (%f) of block from %s", myHost.c_str(), fraction, srcHost.c_str() );
523 if( !src->pRecovered.empty() )
525 size_t count =
static_cast<size_t>( round( fraction * src->pRecovered.size() ) );
528 std::map<uint64_t, uint64_t>::iterator itr = src->pRecovered.begin();
529 pRecovered.insert( *itr );
530 src->pRecovered.erase( itr );
533 log->Debug(
UtilityMsg,
"%s: Stealing fraction (%f) of recovered chunks from %s", myHost.c_str(), fraction, srcHost.c_str() );
544 if( !src->pOngoing.empty() && fraction > 0.7 )
546 size_t count =
static_cast<size_t>( round( fraction * src->pOngoing.size() ) );
549 std::map<uint64_t, uint64_t>::iterator itr = src->pOngoing.begin();
550 pRecovered.insert( *itr );
551 src->pOngoing.erase( itr );
554 log->Debug(
UtilityMsg,
"%s: Stealing fraction (%f) of ongoing chunks from %s", myHost.c_str(), fraction, srcHost.c_str() );
558 XRootDStatus XCpSrc::GetWork()
560 std::pair<uint64_t, uint64_t> p = pCtx->
GetBlock();
565 pCurrentOffset = p.first;
566 pBlkEnd = p.first + p.second;
569 std::string myHost = URL( pUrl ).GetHostName();
570 log->Debug(
UtilityMsg,
"%s got next block", myHost.c_str() );
572 return XRootDStatus();
579 if( pCurrentOffset < pBlkEnd || !pRecovered.empty() )
return XRootDStatus();
586 time_t duration = pTransferTime + time( 0 ) - pStartTime;
587 return pDataTransfered / ( duration + 1 );
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
ChunkHandler(XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle, bool usepgrd)
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool IsSecure() const
Check if the file is using an encrypted connection.
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool GetProperty(const std::string &name, std::string &value) const
XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool SetProperty(const std::string &name, const std::string &value)
Handle an async response.
static bool HasPgRW(const XrdCl::URL &url)
void NotifyInitExpectant()
bool GetNextUrl(std::string &url)
void RemoveSrc(XCpSrc *src)
XCpSrc * WeakestLink(XCpSrc *exclude)
void PutChunk(PageInfo *chunk)
void SetFileSize(int64_t size)
std::pair< uint64_t, uint64_t > GetBlock()
friend class ChunkHandler
static void DeleteChunk(PageInfo *&chunk)
XCpSrc(uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx)
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const int DefaultCpUsePgWrtRd
const uint64_t UtilityMsg
const uint16_t suContinue
@ Read
Open only for reading.
uint32_t GetLength() const
Get the data length.
bool IsOK() const
We're fine.