XRootD
XrdCl::XCpCtx Class Reference

#include <XrdClXCpCtx.hh>

+ Collaboration diagram for XrdCl::XCpCtx:

Public Member Functions

 XCpCtx (const std::vector< std::string > &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize)
 
bool AllDone ()
 
void Delete ()
 
std::pair< uint64_t, uint64_t > GetBlock ()
 
XRootDStatus GetChunk (XrdCl::PageInfo &ci)
 
bool GetNextUrl (std::string &url)
 
int64_t GetSize ()
 
XRootDStatus Initialize ()
 
void NotifyIdleSrc ()
 
void NotifyInitExpectant ()
 
void PutChunk (PageInfo *chunk)
 
void RemoveSrc (XCpSrc *src)
 
XCpCtxSelf ()
 
void SetFileSize (int64_t size)
 
XCpSrcWeakestLink (XCpSrc *exclude)
 

Detailed Description

Definition at line 40 of file XrdClXCpCtx.hh.

Constructor & Destructor Documentation

◆ XCpCtx()

XrdCl::XCpCtx::XCpCtx ( const std::vector< std::string > &  urls,
uint64_t  blockSize,
uint8_t  parallelSrc,
uint64_t  chunkSize,
uint64_t  parallelChunks,
int64_t  fileSize 
)

Constructor

Parameters
urls: list of replica urls
blockSize: the default block size
parallelSrc: maximum number of parallel sources
chunkSize: the default chunk size
parallelChunks: the default number of parallel chunks per source
fileSize: the file size if specified in the metalink file (-1 indicates that the file size is not known and a stat should be done)

Definition at line 36 of file XrdClXCpCtx.cc.

36  :
37  pUrls( std::deque<std::string>( urls.begin(), urls.end() ) ), pBlockSize( blockSize ),
38  pParallelSrc( parallelSrc ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ),
39  pOffset( 0 ), pFileSize( -1 ), pFileSizeCV( 0 ), pDataReceived( 0 ), pDone( false ),
40  pDoneCV( 0 ), pRefCount( 1 )
41 {
42  SetFileSize( fileSize );
43 }
void SetFileSize(int64_t size)
Definition: XrdClXCpCtx.cc:104

References SetFileSize().

+ Here is the call graph for this function:

Member Function Documentation

◆ AllDone()

bool XrdCl::XCpCtx::AllDone ( )

Returns true if all chunks have been transferred, otherwise blocks until NotifyIdleSrc is called, or a 1 minute timeout occurs.

Returns
: true is all chunks have been transferred, false otherwise.

Definition at line 177 of file XrdClXCpCtx.cc.

178 {
179  XrdSysCondVarHelper lck( pDoneCV );
180 
181  if( !pDone )
182  pDoneCV.Wait( 60 );
183 
184  return pDone;
185 }

References XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ Delete()

void XrdCl::XCpCtx::Delete ( )
inline

Deletes the instance if the reference counter reached 0.

Definition at line 61 of file XrdClXCpCtx.hh.

62  {
63  XrdSysMutexHelper lck( pMtx );
64  --pRefCount;
65  if( !pRefCount )
66  {
67  lck.UnLock();
68  delete this;
69  }
70  }

References XrdSysMutexHelper::UnLock().

Referenced by XrdCl::XCpSrc::Start().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetBlock()

std::pair< uint64_t, uint64_t > XrdCl::XCpCtx::GetBlock ( )

Get next block that has to be transferred

Returns
: pair of offset and block size

Definition at line 92 of file XrdClXCpCtx.cc.

93 {
94  XrdSysMutexHelper lck( pMtx );
95 
96  uint64_t blkSize = pBlockSize, offset = pOffset;
97  if( pOffset + blkSize > uint64_t( pFileSize ) )
98  blkSize = pFileSize - pOffset;
99  pOffset += blkSize;
100 
101  return std::make_pair( offset, blkSize );
102 }

◆ GetChunk()

XRootDStatus XrdCl::XCpCtx::GetChunk ( XrdCl::PageInfo ci)

Gets the next chunk from the sink, if the sink is empty blocks.

Parameters
ci: the chunk retrieved from sink (output parameter)
Returns
: stError if we failed to transfer the file, stOK otherwise, with one of the following codes:
  • suDone : the whole file has been transferred, we are done
  • suContinue : a chunk has been written into ci, continue calling GetChunk in order to retrieve remaining chunks
  • suRetry : a chunk has not been written into ci, try again.

Definition at line 140 of file XrdClXCpCtx.cc.

141 {
142  // if we received all the data we are done here
143  if( pDataReceived == uint64_t( pFileSize ) )
144  {
145  XrdSysCondVarHelper lck( pDoneCV );
146  pDone = true;
147  pDoneCV.Broadcast();
148  return XRootDStatus( stOK, suDone );
149  }
150 
151  // if we don't have active sources it means we failed
152  if( GetRunning() == 0 )
153  {
154  XrdSysCondVarHelper lck( pDoneCV );
155  pDone = true;
156  pDoneCV.Broadcast();
157  return XRootDStatus( stError, errNoMoreReplicas );
158  }
159 
160  PageInfo *chunk = pSink.Get();
161  if( chunk )
162  {
163  pDataReceived += chunk->GetLength();
164  ci = std::move( *chunk );
165  delete chunk;
166  return XRootDStatus( stOK, suContinue );
167  }
168 
169  return XRootDStatus( stOK, suRetry );
170 }
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t suDone
Definition: XrdClStatus.hh:38
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const uint16_t errNoMoreReplicas
No more replicas to try.
Definition: XrdClStatus.hh:65

References XrdSysCondVar::Broadcast(), XrdCl::errNoMoreReplicas, XrdCl::PageInfo::GetLength(), XrdCl::stError, XrdCl::stOK, XrdCl::suContinue, XrdCl::suDone, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ GetNextUrl()

bool XrdCl::XCpCtx::GetNextUrl ( std::string &  url)

Gets the next URL from the list of file replicas

Parameters
url: the output parameter
Returns
: true if a url has been written to the url parameter, false otherwise

Definition at line 57 of file XrdClXCpCtx.cc.

58 {
59  XrdSysMutexHelper lck( pMtx );
60  if( pUrls.empty() ) return false;
61  url = pUrls.front();
62  pUrls.pop();
63  return true;
64 }

◆ GetSize()

int64_t XrdCl::XCpCtx::GetSize ( )
inline

Get file size. The call blocks until the file size is being set using SetFileSize.

Definition at line 129 of file XrdClXCpCtx.hh.

130  {
131  XrdSysCondVarHelper lck( pFileSizeCV );
132  while( pFileSize < 0 && GetRunning() > 0 ) pFileSizeCV.Wait();
133  return pFileSize;
134  }

References XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ Initialize()

XRootDStatus XrdCl::XCpCtx::Initialize ( )

Starts one thread per source, each thread tries to open a file, stat the file if necessary, and then starts reading the file, all chunks read go to the sink.

Returns
Error if we were not able to create any threads

Definition at line 121 of file XrdClXCpCtx.cc.

122 {
123  for( uint8_t i = 0; i < pParallelSrc; ++i )
124  {
125  XCpSrc *src = new XCpSrc( pChunkSize, pParallelChunks, pFileSize, this );
126  pSources.push_back( src );
127  src->Start();
128  }
129 
130  if( pSources.empty() )
131  {
132  Log *log = DefaultEnv::GetLog();
133  log->Error( UtilityMsg, "Failed to initialize (failed to create new threads)" );
134  return XRootDStatus( stError, errInternal, EAGAIN, "XCpCtx: failed to create new threads." );
135  }
136 
137  return XRootDStatus();
138 }
static Log * GetLog()
Get default log.
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint64_t UtilityMsg
XrdSysError Log
Definition: XrdConfig.cc:112

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::XCpSrc::Start(), XrdCl::stError, and XrdCl::UtilityMsg.

+ Here is the call graph for this function:

◆ NotifyIdleSrc()

void XrdCl::XCpCtx::NotifyIdleSrc ( )

Notify idle sources, used in two case:

  • if one of the sources failed and an idle source needs to take over
  • or if we are done and all idle source should be stopped

Definition at line 172 of file XrdClXCpCtx.cc.

173 {
174  pDoneCV.Broadcast();
175 }

References XrdSysCondVar::Broadcast().

+ Here is the call graph for this function:

◆ NotifyInitExpectant()

void XrdCl::XCpCtx::NotifyInitExpectant ( )
inline

Notify those who are waiting for initialization. In particular the GetSize() caller will be waiting on the result of initialization.

Definition at line 197 of file XrdClXCpCtx.hh.

198  {
199  pFileSizeCV.Broadcast();
200  }

References XrdSysCondVar::Broadcast().

+ Here is the call graph for this function:

◆ PutChunk()

void XrdCl::XCpCtx::PutChunk ( PageInfo chunk)

Put a chunk into the sink

Parameters
chunk: the chunk

Definition at line 87 of file XrdClXCpCtx.cc.

88 {
89  pSink.Put( chunk );
90 }

◆ RemoveSrc()

void XrdCl::XCpCtx::RemoveSrc ( XCpSrc src)
inline

Remove given source

Parameters
src: the source to be removed

Definition at line 167 of file XrdClXCpCtx.hh.

168  {
169  XrdSysMutexHelper lck( pMtx );
170  pSources.remove( src );
171  }

Referenced by XrdCl::XCpSrc::Start().

+ Here is the caller graph for this function:

◆ Self()

XCpCtx* XrdCl::XCpCtx::Self ( )
inline

Increments the reference counter.

Returns
: myself.

Definition at line 77 of file XrdClXCpCtx.hh.

78  {
79  XrdSysMutexHelper lck( pMtx );
80  ++pRefCount;
81  return this;
82  }

◆ SetFileSize()

void XrdCl::XCpCtx::SetFileSize ( int64_t  size)

Set the file size (GetSize will block until SetFileSize will be called). Also calculates the block size.

Parameters
size: file size

Definition at line 104 of file XrdClXCpCtx.cc.

105 {
106  XrdSysMutexHelper lck( pMtx );
107  if( pFileSize < 0 && size >= 0 )
108  {
109  XrdSysCondVarHelper lck( pFileSizeCV );
110  pFileSize = size;
111  pFileSizeCV.Broadcast();
112 
113  if( pBlockSize > uint64_t( pFileSize ) / pParallelSrc )
114  pBlockSize = pFileSize / pParallelSrc;
115 
116  if( pBlockSize < pChunkSize )
117  pBlockSize = pChunkSize;
118  }
119 }

References XrdSysCondVar::Broadcast().

Referenced by XCpCtx().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WeakestLink()

XCpSrc * XrdCl::XCpCtx::WeakestLink ( XCpSrc exclude)

Get the 'weakest' sources

Parameters
exclude: the source that is excluded from the search
Returns
: the weakest source

Definition at line 66 of file XrdClXCpCtx.cc.

67 {
68  uint64_t transferRate = -1; // set transferRate to max uint64 value
69  XCpSrc *ret = 0;
70 
71  std::list<XCpSrc*>::iterator itr;
72  for( itr = pSources.begin() ; itr != pSources.end() ; ++itr )
73  {
74  XCpSrc *src = *itr;
75  if( src == exclude ) continue;
76  uint64_t tmp = src->TransferRate();
77  if( src->HasData() && tmp < transferRate )
78  {
79  ret = src;
80  transferRate = tmp;
81  }
82  }
83 
84  return ret;
85 }

References XrdCl::XCpSrc::HasData(), and XrdCl::XCpSrc::TransferRate().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: