53 #include <sys/types.h>
59 #if __cplusplus < 201103L
68 template<
typename U = std::ratio<1, 1>>
72 mytimer_t() : start( clock_t::now() ){ }
73 void reset(){ start = clock_t::now(); }
74 uint64_t elapsed()
const
76 return std::chrono::duration_cast<unit_t>( clock_t::now() - start ).count();
79 typedef std::chrono::high_resolution_clock clock_t;
80 typedef std::chrono::duration<uint64_t, U> unit_t;
81 std::chrono::time_point<clock_t> start;
84 using timer_sec_t = mytimer_t<>;
85 using timer_nsec_t = mytimer_t<std::nano>;
89 std::vector<XrdCl::xattr_t> &out )
91 std::vector<XrdCl::xattr_t> ret;
92 ret.reserve( in.size() );
93 std::vector<XrdCl::XAttr>::iterator itr = in.begin();
94 for( ; itr != in.end() ; ++itr )
96 if( !itr->status.IsOK() )
return itr->
status;
98 ret.push_back( std::move( xa ) );
108 std::vector<XrdCl::xattr_t> &xattrs )
110 std::vector<XrdCl::XAttr> rsp;
112 if( !st.
IsOK() )
return st;
113 return Translate( rsp, xattrs );
120 std::vector<XrdCl::xattr_t> &xattrs )
124 std::vector<XrdCl::XAttr> rsp;
126 if( !st.
IsOK() )
return st;
127 return Translate( rsp, xattrs );
131 const std::vector<XrdCl::xattr_t> &xattrs )
133 std::vector<XrdCl::XAttrStatus> rsp;
135 std::vector<XrdCl::XAttrStatus>::iterator itr = rsp.begin();
136 for( ; itr != rsp.end() ; ++itr )
137 if( !itr->status.IsOK() )
return itr->
status;
150 Source(
const std::string &checkSumType =
"",
151 const std::vector<std::string> &addcks = std::vector<std::string>() ) :
155 if( !checkSumType.empty() )
158 for(
auto &type : addcks )
165 for(
auto ptr : pAddCksHelpers )
177 virtual int64_t GetSize() = 0;
198 std::string &checkSumType ) = 0;
203 virtual std::vector<std::string> GetAddCks() = 0;
221 std::vector<XrdCl::CheckSumHelper*> pAddCksHelpers;
234 Destination(
const std::string &checkSumType =
"" ):
235 pPosc( false ), pForce( false ), pCoerce( false ), pMakeDir( false ),
236 pContinue( false ), pCkSumHelper( 0 )
238 if( !checkSumType.empty() )
245 virtual ~Destination()
277 std::string &checkSumType ) = 0;
287 virtual int64_t GetSize() = 0;
292 void SetPOSC(
bool posc )
300 void SetForce(
bool force )
308 void SetContinue(
bool continue_ )
310 pContinue = continue_;
316 void SetCoerce(
bool coerce )
324 void SetMakeDir(
bool makedir )
332 virtual const std::string& GetLastURL()
const
334 static const std::string empty;
341 virtual const std::string& GetWrtRecoveryRedir()
const
343 static const std::string empty;
360 class StdInSource:
public Source
366 StdInSource(
const std::string &ckSumType, uint32_t chunkSize,
const std::vector<std::string> &addcks ):
367 Source( ckSumType, addcks ),
369 pChunkSize( chunkSize )
377 virtual ~StdInSource()
389 auto st = pCkSumHelper->Initialize();
390 if( !st.
IsOK() )
return st;
391 for(
auto cksHelper : pAddCksHelpers )
393 st = cksHelper->Initialize();
394 if( !st.
IsOK() )
return st;
403 virtual int64_t GetSize()
414 "Cannot continue from stdin!" );
422 using namespace XrdCl;
423 Log *log = DefaultEnv::GetLog();
425 uint32_t toRead = pChunkSize;
426 char *buffer =
new char[toRead];
428 int64_t bytesRead = 0;
432 int64_t bRead =
read( 0, buffer+offset, toRead );
456 pCkSumHelper->Update( buffer, bytesRead );
458 for(
auto cksHelper : pAddCksHelpers )
459 cksHelper->Update( buffer, bytesRead );
462 pCurrentOffset += bytesRead;
470 std::string &checkSum,
471 std::string &checkSumType )
473 using namespace XrdCl;
475 return cksHelper->
GetCheckSum( checkSum, checkSumType );
483 std::string &checkSumType )
485 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
491 std::vector<std::string> GetAddCks()
493 std::vector<std::string> ret;
494 for(
auto cksHelper : pAddCksHelpers )
496 std::string type = cksHelper->
GetType();
498 GetCheckSumImpl( cksHelper, cks, type );
499 ret.push_back( type +
":" + cks );
513 StdInSource(
const StdInSource &other);
514 StdInSource &operator = (
const StdInSource &other);
516 uint64_t pCurrentOffset;
523 class XRootDSource:
public Source
527 virtual void Cancel() = 0;
536 template<
typename READER>
537 struct OnConnJob :
public CancellableJob
539 OnConnJob( XRootDSource *
self, READER *reader ) : self( self ), reader( reader )
545 std::unique_lock<std::mutex> lck( mtx );
546 if( !
self || !reader )
return;
548 if( self->pNbConn < self->pMaxNbConn )
549 self->FillQueue( reader );
554 std::unique_lock<std::mutex> lck( mtx );
572 return pFile->TryOtherServer();
580 uint8_t parallelChunks,
581 const std::string &ckSumType,
582 const std::vector<std::string> &addcks,
584 Source( ckSumType, addcks ),
585 pUrl( url ), pFile( new
XrdCl::
File() ), pSize( -1 ),
586 pCurrentOffset( 0 ), pChunkSize( chunkSize ),
587 pParallel( parallelChunks ),
588 pNbConn( 0 ), pUsePgRead( false ),
589 pDoServer( doserver )
593 pMaxNbConn = val - 1;
599 virtual ~XRootDSource()
602 pDataConnCB->Cancel();
605 if( pFile->IsOpen() )
615 using namespace XrdCl;
616 Log *log = DefaultEnv::GetLog();
618 pUrl->GetObfuscatedURL().c_str() );
621 DefaultEnv::GetEnv()->GetString(
"ReadRecovery", value );
622 pFile->SetProperty(
"ReadRecovery", value );
624 XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
629 st = pFile->Stat(
false, statInfo );
636 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
638 st = pCkSumHelper->Initialize();
639 if( !st.
IsOK() )
return st;
641 for(
auto cksHelper : pAddCksHelpers )
644 if( !st.
IsOK() )
return st;
651 if( !pUrl->IsLocalFile() ||
652 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
654 pFile->GetProperty(
"LastURL", pDataServer );
658 if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
659 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
673 if( pDoServer && !pUrl->IsLocalFile() )
676 DefaultEnv::GetPostMaster()->QueryTransport( pDataServer, StreamQuery::IpStack, obj );
677 std::string *ipstack =
nullptr;
679 std::cerr <<
"!-!" << *ipstack << std::endl;
683 SetOnDataConnectHandler( pFile );
691 virtual int64_t GetSize()
701 pCurrentOffset = offset;
716 return GetChunkImpl( pFile, ci );
732 while( !pChunks.empty() )
737 delete [] (
char *)ch->chunk.GetBuffer();
746 std::string &checkSumType )
748 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
752 std::string &checkSum,
753 std::string &checkSumType )
755 if( pUrl->IsMetalink() )
759 checkSum = redirector->
GetCheckSum( checkSumType );
763 if( pUrl->IsLocalFile() )
770 return cksHelper->
GetCheckSum( checkSum, checkSumType );
775 std::string dataServer; pFile->GetProperty(
"DataServer", dataServer );
776 std::string lastUrl; pFile->GetProperty(
"LastURL", lastUrl );
783 std::vector<std::string> GetAddCks()
785 std::vector<std::string> ret;
786 for(
auto cksHelper : pAddCksHelpers )
788 std::string type = cksHelper->
GetType();
790 GetCheckSumImpl( cksHelper, cks, type );
791 ret.push_back( cks );
797 XRootDSource(
const XRootDSource &other);
798 XRootDSource &operator = (
const XRootDSource &other);
805 template<
typename READER>
806 inline void FillQueue( READER *reader )
811 uint16_t parallel = pParallel;
812 if( pNbConn < pMaxNbConn )
815 NbConnectedStrm( pDataServer );
817 if( pNbConn ) parallel *= pNbConn;
819 while( pChunks.size() < parallel && pCurrentOffset < pSize )
821 uint64_t chunkSize = pChunkSize;
822 if( pCurrentOffset + chunkSize > (uint64_t)pSize )
823 chunkSize = pSize - pCurrentOffset;
825 char *buffer =
new char[chunkSize];
827 ch->status = pUsePgRead
828 ? reader->PgRead( pCurrentOffset, chunkSize, buffer, ch )
829 : reader->Read( pCurrentOffset, chunkSize, buffer, ch );
831 pCurrentOffset += chunkSize;
832 if( !ch->status.IsOK() )
843 template<
typename READER>
844 void SetOnDataConnectHandler( READER *reader )
847 pDataConnCB.reset(
new OnConnJob<READER>(
this, reader ) );
850 if( pDataServer.empty() )
return;
864 template<
typename READER>
870 using namespace XrdCl;
871 Log *log = DefaultEnv::GetLog();
876 std::unique_lock<std::mutex> lck( pDataConnCB->mtx );
882 if( pChunks.empty() )
885 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
891 if( !ch->status.IsOK() )
894 ch->chunk.GetLength(), (
unsigned long long) ch->chunk.GetOffset(),
895 pUrl->GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
896 delete [] (
char *)ch->chunk.GetBuffer();
901 ci = std::move( ch->chunk );
903 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
908 for(
auto cksHelper : pAddCksHelpers )
926 this->status = *statusval;
930 chunk = ToChunk( response );
941 response->
Get( resp );
942 return std::move( *resp );
947 response->
Get( resp );
961 int64_t pCurrentOffset;
964 std::queue<ChunkHandler*> pChunks;
965 std::string pDataServer;
971 std::shared_ptr<CancellableJob> pDataConnCB;
977 class XRootDSourceZip:
public XRootDSource
983 XRootDSourceZip(
const std::string &filename,
986 uint8_t parallelChunks,
987 const std::string &ckSumType,
988 const std::vector<std::string> &addcks,
990 XRootDSource( archive, chunkSize, parallelChunks, ckSumType,
992 pFilename( filename ),
1000 virtual ~XRootDSourceZip()
1013 using namespace XrdCl;
1014 Log *log = DefaultEnv::GetLog();
1016 pUrl->GetObfuscatedURL().c_str() );
1019 DefaultEnv::GetEnv()->GetString(
"ReadRecovery", value );
1020 pZipArchive->SetProperty(
"ReadRecovery", value );
1026 st = pZipArchive->OpenFile( pFilename );
1031 st = pZipArchive->Stat( info );
1040 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper )
1042 auto st = pCkSumHelper->Initialize();
1043 if( !st.
IsOK() )
return st;
1044 for(
auto cksHelper : pAddCksHelpers )
1047 if( !st.
IsOK() )
return st;
1051 if( ( !pUrl->IsLocalFile() && !pZipArchive->IsSecure() ) ||
1052 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1054 pZipArchive->GetProperty(
"DataServer", pDataServer );
1063 SetOnDataConnectHandler( pZipArchive );
1079 return GetChunkImpl( pZipArchive, ci );
1086 std::string &checkSumType )
1088 return GetCheckSumImpl( checkSum, checkSumType, pCkSumHelper );
1095 std::string &checkSumType,
1099 if( checkSumType ==
"zcrc32" )
1102 auto st = pZipArchive->GetCRC32( pFilename, cksum );
1103 if( !st.
IsOK() )
return st;
1106 ckSum.
Set(
"zcrc32" );
1107 ckSum.
Set(
reinterpret_cast<void*
>( &cksum ),
sizeof( uint32_t ) );
1108 char cksBuffer[265];
1109 ckSum.
Get( cksBuffer, 256 );
1110 checkSum =
"zcrc32:";
1117 env->
GetInt(
"ZipMtlnCksum", useMtlnCksum );
1118 if( useMtlnCksum && pUrl->IsMetalink() )
1122 checkSum = redirector->
GetCheckSum( checkSumType );
1127 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && cksHelper && !pContinue )
1128 return cksHelper->
GetCheckSum( checkSum, checkSumType );
1137 std::vector<std::string> GetAddCks()
1139 std::vector<std::string> ret;
1140 for(
auto cksHelper : pAddCksHelpers )
1142 std::string type = cksHelper->
GetType();
1144 GetCheckSumImpl( cks, type, cksHelper );
1145 ret.push_back( cks );
1160 XRootDSourceZip(
const XRootDSourceZip &other);
1161 XRootDSourceZip &operator = (
const XRootDSourceZip &other);
1163 const std::string pFilename;
1170 class XRootDSourceDynamic:
public Source
1179 return pFile->TryOtherServer();
1187 const std::string &ckSumType,
1188 const std::vector<std::string> &addcks ):
1189 Source( ckSumType, addcks ),
1190 pUrl( url ), pFile( new
XrdCl::
File() ), pCurrentOffset( 0 ),
1191 pChunkSize( chunkSize ), pDone( false ), pUsePgRead( false )
1198 virtual ~XRootDSourceDynamic()
1209 using namespace XrdCl;
1210 Log *log = DefaultEnv::GetLog();
1212 pUrl->GetObfuscatedURL().c_str() );
1215 DefaultEnv::GetEnv()->GetString(
"ReadRecovery", value );
1216 pFile->SetProperty(
"ReadRecovery", value );
1218 XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
1222 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
1224 auto st = pCkSumHelper->Initialize();
1225 if( !st.
IsOK() )
return st;
1226 for(
auto cksHelper : pAddCksHelpers )
1229 if( !st.
IsOK() )
return st;
1233 if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
1234 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1236 std::string datasrv;
1237 pFile->GetProperty(
"DataServer", datasrv );
1252 virtual int64_t GetSize()
1262 pCurrentOffset = offset;
1281 using namespace XrdCl;
1289 char *buffer =
new char[pChunkSize];
1290 uint32_t bytesRead = 0;
1292 std::vector<uint32_t> cksums;
1294 ? pFile->PgRead( pCurrentOffset, pChunkSize, buffer, cksums, bytesRead )
1295 : pFile->Read( pCurrentOffset, pChunkSize, buffer, bytesRead );
1309 if( bytesRead < pChunkSize )
1313 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
1316 pCkSumHelper->Update( buffer, bytesRead );
1318 for(
auto cksHelper : pAddCksHelpers )
1319 cksHelper->
Update( buffer, bytesRead );
1323 pCurrentOffset += bytesRead;
1332 std::string &checkSumType )
1334 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
1338 std::string &checkSum,
1339 std::string &checkSumType )
1341 if( pUrl->IsMetalink() )
1345 checkSum = redirector->
GetCheckSum( checkSumType );
1349 if( pUrl->IsLocalFile() )
1356 return cksHelper->
GetCheckSum( checkSum, checkSumType );
1361 std::string dataServer; pFile->GetProperty(
"DataServer", dataServer );
1362 std::string lastUrl; pFile->GetProperty(
"LastURL", lastUrl );
1369 std::vector<std::string> GetAddCks()
1371 std::vector<std::string> ret;
1372 for(
auto cksHelper : pAddCksHelpers )
1374 std::string type = cksHelper->
GetType();
1376 GetCheckSumImpl( cksHelper, cks, type );
1377 ret.push_back( cks );
1391 XRootDSourceDynamic(
const XRootDSourceDynamic &other);
1392 XRootDSourceDynamic &operator = (
const XRootDSourceDynamic &other);
1395 int64_t pCurrentOffset;
1396 uint32_t pChunkSize;
1404 class XRootDSourceXCp:
public Source
1410 XRootDSourceXCp(
const XrdCl::URL* url, uint32_t chunkSize, uint16_t parallelChunks, int32_t nbSrc, uint64_t blockSize ):
1411 pXCpCtx( 0 ), pUrl( url ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ), pNbSrc( nbSrc ), pBlockSize( blockSize )
1427 int64_t fileSize = -1;
1429 if( pUrl->IsMetalink() )
1433 fileSize = redirector->
GetSize();
1441 if( !st.
IsOK() )
return st;
1444 for( itr = li->
Begin(); itr != li->
End(); ++itr)
1446 std::string url =
"root://" + itr->GetAddress() +
"/" + pUrl->
GetPath();
1447 pReplicas.push_back( url );
1453 std::stringstream ss;
1454 ss <<
"XCp sources: ";
1456 std::vector<std::string>::iterator itr;
1457 for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1463 pXCpCtx =
new XrdCl::XCpCtx( pReplicas, pBlockSize, pNbSrc, pChunkSize, pParallelChunks, fileSize );
1465 return pXCpCtx->Initialize();
1471 virtual int64_t GetSize()
1473 return pXCpCtx->GetSize();
1498 st = pXCpCtx->GetChunk( ci );
1508 std::string &checkSumType )
1510 if( pUrl->IsMetalink() )
1514 checkSum = redirector->
GetCheckSum( checkSumType );
1518 std::vector<std::string>::iterator itr;
1519 for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1523 checkSumType, url );
1524 if( st.
IsOK() )
return st;
1533 std::vector<std::string> GetAddCks()
1535 return std::vector<std::string>();
1544 std::vector<std::string>::iterator itr = pReplicas.begin();
1545 for( ; itr < pReplicas.end() ; ++itr )
1548 if( st.
IsOK() )
return st;
1558 std::vector<std::string> pReplicas;
1559 uint32_t pChunkSize;
1560 uint16_t pParallelChunks;
1562 uint64_t pBlockSize;
1568 class StdOutDestination:
public Destination
1574 StdOutDestination(
const std::string &ckSumType ):
1575 Destination( ckSumType ), pCurrentOffset(0)
1582 virtual ~StdOutDestination()
1593 ENOTSUP,
"Cannot continue to stdout." );
1596 return pCkSumHelper->Initialize();
1616 using namespace XrdCl;
1617 Log *log = DefaultEnv::GetLog();
1622 " %llu, got %llu", (
unsigned long long) pCurrentOffset, (
unsigned long long) ci.
GetOffset() );
1631 wr =
write( 1, cursor, length );
1639 pCurrentOffset += wr;
1663 std::string &checkSumType )
1666 return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1681 virtual int64_t GetSize()
1687 StdOutDestination(
const StdOutDestination &other);
1688 StdOutDestination &operator = (
const StdOutDestination &other);
1689 uint64_t pCurrentOffset;
1695 class XRootDDestination:
public Destination
1701 XRootDDestination(
const XrdCl::URL &url, uint8_t parallelChunks,
1703 Destination( ckSumType ),
1705 pParallel( parallelChunks ), pSize( -1 ), pUsePgWrt( false ), cpjob( cpjob )
1712 virtual ~XRootDDestination()
1724 if( !cptarget.empty() )
1737 if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
1743 " on failure: %s", st.
ToString().c_str() );
1752 using namespace XrdCl;
1753 Log *log = DefaultEnv::GetLog();
1755 pUrl.GetObfuscatedURL().c_str() );
1758 DefaultEnv::GetEnv()->GetString(
"WriteRecovery", value );
1759 pFile->SetProperty(
"WriteRecovery", value );
1763 flags |= OpenFlags::Delete;
1764 else if( !pContinue )
1765 flags |= OpenFlags::New;
1768 flags |= OpenFlags::POSC;
1774 flags |= OpenFlags::MakePath;
1776 Access::Mode mode = Access::UR|Access::UW|Access::GR|Access::OR;
1782 if( ( !pUrl.IsLocalFile() && !pFile->IsSecure() ) ||
1783 ( pUrl.IsLocalFile() && pUrl.IsMetalink() ) )
1785 std::string datasrv;
1786 pFile->GetProperty(
"DataServer", datasrv );
1797 if( !cptarget.empty() )
1799 std::string targeturl;
1800 pFile->GetProperty(
"LastURL", targeturl );
1802 if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
1807 cptarget.c_str(), targeturl.c_str() );
1811 st = pFile->Stat(
false, info );
1817 if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1818 return pCkSumHelper->Initialize();
1828 return pFile->Close();
1839 using namespace XrdCl;
1840 if( !pFile->IsOpen() )
1849 if( pChunks.size() < pParallel )
1850 return QueueChunk( std::move( ci ) );
1856 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
1859 delete [] (
char*)ch->chunk.GetBuffer();
1860 if( !ch->status.IsOK() )
1862 Log *log = DefaultEnv::GetLog();
1864 ch->chunk.GetLength(), (
unsigned long long) ch->chunk.GetOffset(),
1865 pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
1873 return CheckIfRetriable( ch->status );
1876 return QueueChunk( std::move( ci ) );
1882 virtual int64_t GetSize()
1890 void CleanUpChunks()
1892 while( !pChunks.empty() )
1897 delete [] (
char *)ch->chunk.GetBuffer();
1909 if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1915 ? pFile->PgWrite(ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch->chunk.GetCksums(), ch)
1916 : pFile->Write( ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
1920 delete [] (
char*)ch->chunk.GetBuffer();
1934 while( !pChunks.empty() )
1939 if( !ch->status.IsOK() )
1945 st = CheckIfRetriable( ch->status );
1947 delete [] (
char *)ch->chunk.GetBuffer();
1957 std::string &checkSumType )
1959 if( pUrl.IsLocalFile() )
1966 return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1971 std::string lastUrl; pFile->GetProperty(
"LastURL", lastUrl );
1987 const std::string& GetLastURL()
const
1995 const std::string& GetWrtRecoveryRedir()
const
1997 return pWrtRecoveryRedir;
2001 XRootDDestination(
const XRootDDestination &other);
2002 XRootDDestination &operator = (
const XRootDDestination &other);
2012 chunk(std::move( ci ) ) {}
2017 this->status = *statusval;
2029 if( status.
IsOK() )
return status;
2036 if( pFile->GetProperty(
"WrtRecoveryRedir", value ) )
2038 pWrtRecoveryRedir = value;
2039 if( pFile->GetProperty(
"LastURL", value ) ) pLastURL = value;
2049 std::queue<ChunkHandler *> pChunks;
2052 std::string pWrtRecoveryRedir;
2053 std::string pLastURL;
2061 class XRootDZipDestination:
public Destination
2067 XRootDZipDestination(
const XrdCl::URL &url,
const std::string &fn,
2069 Destination(
"zcrc32" ),
2071 pParallel( parallelChunks ), pSize( size ), cpjob( cpjob )
2078 virtual ~XRootDZipDestination()
2087 if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
2095 " on failure: %s", st.
ToString().c_str() );
2105 using namespace XrdCl;
2106 Log *log = DefaultEnv::GetLog();
2108 pUrl.GetObfuscatedURL().c_str() );
2111 DefaultEnv::GetEnv()->GetString(
"WriteRecovery", value );
2112 pZip->SetProperty(
"WriteRecovery", value );
2118 auto st = fs.Stat( pUrl.GetPath(), info );
2120 flags |= OpenFlags::New;
2123 flags |= OpenFlags::POSC;
2129 flags |= OpenFlags::MakePath;
2137 if( !cptarget.empty() )
2139 std::string targeturl;
2140 pZip->GetProperty(
"LastURL", targeturl );
2141 if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
2150 return pCkSumHelper->Initialize();
2159 auto st = pCkSumHelper->GetRawCheckSum(
"zcrc32", crc32 );
2160 if( !st.
IsOK() )
return st;
2161 pZip->UpdateMetadata( crc32 );
2174 using namespace XrdCl;
2179 if( pChunks.size() < pParallel )
2180 return QueueChunk( std::move( ci ) );
2186 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
2189 delete [] (
char*)ch->chunk.GetBuffer();
2190 if( !ch->status.IsOK() )
2192 Log *log = DefaultEnv::GetLog();
2194 ch->chunk.GetLength(), (
unsigned long long) ch->chunk.GetOffset(),
2195 pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
2202 return CheckIfRetriable( ch->status );
2205 return QueueChunk( std::move( ci ) );
2211 virtual int64_t GetSize()
2219 void CleanUpChunks()
2221 while( !pChunks.empty() )
2226 delete [] (
char *)ch->chunk.GetBuffer();
2248 st = pZip->Write( ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
2252 delete [] (
char*)ch->chunk.GetBuffer();
2266 while( !pChunks.empty() )
2271 if( !ch->status.IsOK() )
2277 st = CheckIfRetriable( ch->status );
2279 delete [] (
char *)ch->chunk.GetBuffer();
2289 std::string &checkSumType )
2305 const std::string& GetLastURL()
const
2313 const std::string& GetWrtRecoveryRedir()
const
2315 return pWrtRecoveryRedir;
2319 XRootDZipDestination(
const XRootDDestination &other);
2320 XRootDZipDestination &operator = (
const XRootDDestination &other);
2330 chunk( std::move( ci ) ) {}
2335 this->status = *statusval;
2347 if( status.
IsOK() )
return status;
2354 if( pZip->GetProperty(
"WrtRecoveryRedir", value ) )
2356 pWrtRecoveryRedir = value;
2357 if( pZip->GetProperty(
"LastURL", value ) ) pLastURL = value;
2365 std::string pFilename;
2368 std::queue<ChunkHandler *> pChunks;
2371 std::string pWrtRecoveryRedir;
2372 std::string pLastURL;
2382 using namespace std::chrono;
2383 auto since_epoch = high_resolution_clock::now().time_since_epoch();
2384 return duration_cast<nanoseconds>( since_epoch );
2392 return sec * 1000000000;
2400 #if __cplusplus >= 201103L
2401 using namespace std::chrono;
2402 std::this_thread::sleep_for( nanoseconds( nsec ) );
2405 req.tv_sec = nsec /
to_nsec( 1 );
2406 req.tv_nsec = nsec %
to_nsec( 1 );
2407 nanosleep( &req, 0 );
2416 ClassicCopyJob::ClassicCopyJob( uint16_t jobId,
2419 CopyJob( jobId, jobProperties, jobResults )
2433 std::string checkSumMode;
2434 std::string checkSumType;
2435 std::string checkSumPreset;
2436 std::string zipSource;
2437 uint16_t parallelChunks;
2440 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2441 rmOnBadCksum, continue_, zipappend, doserver;
2442 int32_t nbXcpSources;
2444 long long xRateThreshold;
2446 std::vector<std::string> addcksums;
2477 if( force && continue_ )
2479 "Invalid argument combination: continue + force." );
2481 if( zipappend && ( continue_ || force ) )
2483 "Invalid argument combination: ( continue | force ) + zip-append." );
2488 std::unique_ptr<timer_sec_t> cptimer;
2489 if( cpTimeout ) cptimer.reset(
new timer_sec_t() );
2494 if( rmOnBadCksum ) posc =
true;
2499 if( checkSumType ==
"auto" )
2502 if( checkSumType.empty() )
2505 log->
Info(
UtilityMsg,
"Using inferred checksum type: %s.", checkSumType.c_str() );
2508 if( cptimer && cptimer->elapsed() > cpTimeout )
2514 std::unique_ptr<Source> src;
2516 src.reset(
new XRootDSourceXCp( &
GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2518 src.reset(
new XRootDSourceZip( zipSource, &
GetSource(), chunkSize, parallelChunks,
2519 checkSumType, addcksums , doserver) );
2520 else if(
GetSource().GetProtocol() ==
"stdio" )
2521 src.reset(
new StdInSource( checkSumType, chunkSize, addcksums ) );
2525 src.reset(
new XRootDSourceDynamic( &
GetSource(), chunkSize, checkSumType, addcksums ) );
2527 src.reset(
new XRootDSource( &
GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2531 if( !st.
IsOK() )
return SourceError( st );
2532 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2534 if( cptimer && cptimer->elapsed() > cpTimeout )
2537 std::unique_ptr<Destination> dest;
2540 if(
GetTarget().GetProtocol() ==
"stdio" )
2541 dest.reset(
new StdOutDestination( checkSumType ) );
2542 else if( zipappend )
2545 size_t pos = fn.rfind(
'/' );
2546 if( pos != std::string::npos )
2547 fn = fn.substr( pos + 1 );
2548 int64_t size = src->GetSize();
2549 dest.reset(
new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *
this ) );
2556 if( src->GetSize() >= 0 )
2559 std::ostringstream o; o << src->GetSize();
2560 params[
"oss.asize"] = o.str();
2564 dest.reset(
new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *
this ) );
2567 dest->SetForce( force );
2568 dest->SetPOSC( posc );
2569 dest->SetCoerce( coerce );
2570 dest->SetMakeDir( makeDir );
2571 dest->SetContinue( continue_ );
2572 st = dest->Initialize();
2573 if( !st.
IsOK() )
return DestinationError( st );
2575 if( cptimer && cptimer->elapsed() > cpTimeout )
2583 size -= dest->GetSize();
2585 if( !st.
IsOK() )
return SetResult( st );
2589 uint64_t total_processed = 0;
2590 uint64_t processed = 0;
2592 uint16_t threshold_interval = parallelChunks;
2593 bool threshold_draining =
false;
2594 timer_nsec_t threshold_timer;
2597 st = src->GetChunk( pageInfo );
2599 return SourceError( st);
2604 if( cptimer && cptimer->elapsed() > cpTimeout )
2609 auto elapsed = (
time_nsec() - start ).count();
2610 double transferred = total_processed + pageInfo.
GetLength();
2611 double expected = double( xRate ) /
to_nsec( 1 ) * elapsed;
2617 transferred > expected )
2619 auto nsec = ( transferred / xRate *
to_nsec( 1 ) ) - elapsed;
2624 if( xRateThreshold )
2626 auto elapsed = threshold_timer.elapsed();
2627 double transferred = processed + pageInfo.
GetLength();
2628 double expected = double( xRateThreshold ) /
to_nsec( 1 ) * elapsed;
2634 transferred < expected &&
2635 threshold_interval == 0 )
2637 if( !threshold_draining )
2640 " trying different source!" );
2643 "The transfer rate dropped below "
2644 "requested threshold!" );
2645 threshold_draining =
true;
2651 threshold_timer.reset();
2652 threshold_interval = parallelChunks;
2653 threshold_draining =
false;
2657 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2660 total_processed += pageInfo.
GetLength();
2663 st = dest->PutChunk( std::move( pageInfo ) );
2669 pResults->
Set(
"WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2670 return SetResult( st );
2672 return DestinationError( st );
2685 return DestinationError( st );
2692 std::vector<xattr_t> xattrs;
2693 st = src->GetXAttr( xattrs );
2694 if( !st.
IsOK() )
return SourceError( st );
2695 st = dest->SetXAttr( xattrs );
2696 if( !st.
IsOK() )
return DestinationError( st );
2703 if( src->GetSize() >= 0 && size != total_processed )
2705 log->
Error(
UtilityMsg,
"The declared source size is %llu bytes, but "
2706 "received %llu bytes.", (
unsigned long long) size, (
unsigned long long) total_processed );
2714 st = dest->Finalize();
2716 return DestinationError( st );
2721 if( checkSumMode !=
"none" )
2724 checkSumMode.c_str() );
2725 std::string sourceCheckSum;
2726 std::string targetCheckSum;
2728 if( cptimer && cptimer->elapsed() > cpTimeout )
2734 timeval oStart, oEnd;
2737 if( checkSumMode ==
"end2end" || checkSumMode ==
"source" ||
2738 !checkSumPreset.empty() )
2740 gettimeofday( &oStart, 0 );
2741 if( !checkSumPreset.empty() )
2743 sourceCheckSum = checkSumType +
":";
2749 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2751 gettimeofday( &oEnd, 0 );
2754 return SourceError( st );
2756 pResults->
Set(
"sourceCheckSum", sourceCheckSum );
2759 if( !addcksums.empty() )
2760 pResults->
Set(
"additionalCkeckSum", src->GetAddCks() );
2762 if( cptimer && cptimer->elapsed() > cpTimeout )
2768 timeval tStart, tEnd;
2770 if( checkSumMode ==
"end2end" || checkSumMode ==
"target" )
2772 gettimeofday( &tStart, 0 );
2773 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2775 return DestinationError( st );
2776 gettimeofday( &tEnd, 0 );
2777 pResults->
Set(
"targetCheckSum", targetCheckSum );
2780 if( cptimer && cptimer->elapsed() > cpTimeout )
2786 auto sanitize_cksum = [](
char c )
2789 if( std::isalpha( c ) )
return std::tolower( c, loc );
2793 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2794 sourceCheckSum.begin(), sanitize_cksum );
2796 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2797 targetCheckSum.begin(), sanitize_cksum );
2802 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2805 if( sourceCheckSum == targetCheckSum )
2814 i.
cksum = sourceCheckSum;
2830 log->
Info(
UtilityMsg,
"Target file removed due to bad checksum!" );
2833 st = dest->Finalize();
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
ssize_t write(int fildes, const void *buf, size_t nbyte)
ssize_t read(int fildes, void *buf, size_t nbyte)
const char * XrdSysE2T(int errcode)
int Set(const char *csName)
int Get(char *Buff, int Blen)
void Get(Type &object)
Retrieve the object being held.
Check sum helper for stdio.
XRootDStatus Initialize()
Initialize.
XRootDStatus GetCheckSum(std::string &checkSum, std::string &checkSumType)
const std::string & GetType()
void Update(const void *buffer, uint32_t size)
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
const URL & GetSource() const
Get source.
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
virtual bool ShouldCancel(uint16_t jobNum)
Determine whether the job should be canceled.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
Send file/filesystem queries to an XRootD cluster.
XRootDStatus Rm(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
Interface for a job to be run by the job manager.
Iterator Begin()
Get the location begin iterator.
LocationList::iterator Iterator
Iterator over locations.
Iterator End()
Get the location end iterator.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
An abstract class to describe the client-side monitoring plugin interface.
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Handle an async response.
uint64_t GetSize() const
Get size (in bytes)
std::map< std::string, std::string > ParamsMap
void SetParams(const std::string ¶ms)
Set params.
std::string GetLocation() const
Get location (protocol://host:port/path)
const ParamsMap & GetParams() const
Get the URL params.
const std::string & GetPath() const
Get the path.
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static XRootDStatus GetLocalCheckSum(std::string &checkSum, const std::string &checkSumType, const std::string &path)
Get a checksum from local file.
static bool HasXAttr(const XrdCl::URL &url)
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
static bool HasPgRW(const XrdCl::URL &url)
An interface for metadata redirectors.
virtual long long GetSize() const =0
virtual const std::vector< std::string > & GetReplicas()=0
Returns a vector with replicas as given in the meatlink file.
virtual std::string GetCheckSum(const std::string &type) const =0
const uint16_t errUninitialized
const uint16_t errErrorResponse
const char *const DefaultCpTarget
const uint16_t errOperationExpired
const uint16_t errNotImplemented
Operation is not implemented.
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
SetXAttrImpl< false > SetXAttr(Ctx< File > file, Arg< std::string > name, Arg< std::string > value)
const uint16_t stError
An error occurred that could potentially be retried.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
GetXAttrImpl< false > GetXAttr(Ctx< File > file, Arg< std::string > name)
const uint16_t stOK
Everything went OK.
const int DefaultSubStreamsPerChannel
const int DefaultCpUsePgWrtRd
const uint16_t errOSError
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
std::tuple< std::string, std::string > xattr_t
Extended attribute key - value pair.
const uint16_t errNotSupported
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
const uint16_t suContinue
const uint16_t errNoMoreReplicas
No more replicas to try.
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
const int DefaultZipMtlnCksum
Describe a data chunk for vector read.
uint64_t GetOffset() const
Get the offset.
uint32_t GetLength() const
Get the data length.
void * GetBuffer()
Get the buffer.
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
uint32_t errNo
Errno, if any.