XRootD
XrdClXCpSrc.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdCl/XrdClXCpSrc.hh"
26 #include "XrdCl/XrdClXCpCtx.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClDefaultEnv.hh"
29 #include "XrdCl/XrdClConstants.hh"
30 #include "XrdCl/XrdClUtils.hh"
31 
32 #include <cmath>
33 #include <cstdlib>
34 
35 namespace XrdCl
36 {
37 
39 {
40  public:
41 
42  ChunkHandler( XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle, bool usepgrd ) :
43  pSrc( src->Self() ), pOffset( offset ), pSize( size ), pBuffer( buffer ), pHandle( handle ), pUsePgRead( usepgrd )
44  {
45 
46  }
47 
48  virtual ~ChunkHandler()
49  {
50  pSrc->Delete();
51  }
52 
53  virtual void HandleResponse( XRootDStatus *status, AnyObject *response )
54  {
55  PageInfo *chunk = 0;
56  if( response ) // get the response
57  {
58  ToPgInfo( response, chunk );
59  delete response;
60  }
61 
62  if( !chunk && status->IsOK() ) // if the response is not there make sure the status is error
63  {
64  *status = XRootDStatus( stError, errInternal );
65  }
66 
67  if( status->IsOK() && chunk->GetLength() != pSize ) // the file size on the server is different
68  { // than the one specified in metalink file
69  *status = XRootDStatus( stError, errDataError );
70  }
71 
72  if( !status->IsOK() )
73  {
74  delete[] pBuffer;
75  delete chunk;
76  chunk = 0;
77  }
78 
79  pSrc->ReportResponse( status, chunk, pHandle );
80 
81  delete this;
82  }
83 
84  private:
85 
86  void ToPgInfo( AnyObject *response, PageInfo *&chunk )
87  {
88  if( pUsePgRead )
89  {
90  response->Get( chunk );
91  response->Set( ( int* )0 );
92  }
93  else
94  {
95  ChunkInfo *rsp = nullptr;
96  response->Get( rsp );
97  chunk = new PageInfo( rsp->offset, rsp->length, rsp->buffer );
98  }
99  }
100 
101  XCpSrc *pSrc;
102  uint64_t pOffset;
103  uint64_t pSize;
104  char *pBuffer;
105  File *pHandle;
106  bool pUsePgRead;
107 };
108 
109 
110 XCpSrc::XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx ) :
111  pChunkSize( chunkSize ), pParallel( parallel ), pFileSize( fileSize ), pThread(),
112  pCtx( ctx->Self() ), pFile( 0 ), pCurrentOffset( 0 ), pBlkEnd( 0 ), pDataTransfered( 0 ), pRefCount( 1 ),
113  pRunning( false ), pStartTime( 0 ), pTransferTime( 0 ), pUsePgRead( false )
114 {
115 }
116 
117 XCpSrc::~XCpSrc()
118 {
119  pCtx->RemoveSrc( this );
120  pCtx->Delete();
121 }
122 
124 {
125  pRunning = true;
126  int rc = pthread_create( &pThread, 0, Run, this );
127  if( rc )
128  {
129  pRunning = false;
130  pCtx->RemoveSrc( this );
131  pCtx->Delete();
132  }
133 }
134 
135 void* XCpSrc::Run( void* arg )
136 {
137  XCpSrc *me = static_cast<XCpSrc*>( arg );
138  me->StartDownloading();
139  me->Delete();
140  return 0;
141 }
142 
143 void XCpSrc::StartDownloading()
144 {
145  XRootDStatus st = Initialize();
146  if( !st.IsOK() )
147  {
148  pRunning = false;
149  // notify those who wait for the file
150  // size, they won't get it from this
151  // source
152  pCtx->NotifyInitExpectant();
153  // put a null chunk so we are sure
154  // the main thread doesn't get stuck
155  // at the sync queue
156  pCtx->PutChunk( 0 );
157  return;
158  }
159 
160  // start counting transfer time
161  pStartTime = time( 0 );
162 
163  while( pRunning )
164  {
165  st = ReadChunks();
166  if( st.IsOK() && st.code == suPartial )
167  {
168  // we have only ongoing transfers
169  // so we can already ask for new block
170  if( GetWork().IsOK() ) continue;
171  }
172  else if( st.IsOK() && st.code == suDone )
173  {
174  // if we are done, try to get more work,
175  // if successful continue
176  if( GetWork().IsOK() ) continue;
177  // keep track of the time before we go idle
178  pTransferTime += time( 0 ) - pStartTime;
179  // check if the overall download process is
180  // done, this makes the thread wait until
181  // either the download is done, or a source
182  // went to error, or a 60s timeout has been
183  // reached (the timeout is there so we can
184  // check if a source degraded in the meanwhile
185  // and now we can steal from it)
186  if( !pCtx->AllDone() )
187  {
188  // reset start time after pause
189  pStartTime = time( 0 );
190  continue;
191  }
192  // stop counting
193  // otherwise we are done here
194  pRunning = false;
195  return;
196  }
197 
198  XRootDStatus *status = pReports.Get();
199  if( !status->IsOK() )
200  {
201  Log *log = DefaultEnv::GetLog();
202  std::string myHost = URL( pUrl ).GetHostName();
203  log->Error( UtilityMsg, "Failed to read chunk from %s: %s", myHost.c_str(), status->GetErrorMessage().c_str() );
204 
205  if( !Recover().IsOK() )
206  {
207  delete status;
208  pRunning = false;
209  // notify idle sources, they might be
210  // interested in taking over my workload
211  pCtx->NotifyIdleSrc();
212  // put a null chunk so we are sure
213  // the main thread doesn't get stuck
214  // at the sync queue
215  pCtx->PutChunk( 0 );
216  // if we have data we need to wait for someone to take over
217  // unless the extreme copy is over, in this case we don't care
218  while( HasData() && !pCtx->AllDone() );
219 
220  return;
221  }
222  }
223  delete status;
224  }
225 }
226 
227 XRootDStatus XCpSrc::Initialize()
228 {
229  Log *log = DefaultEnv::GetLog();
230  XRootDStatus st;
231 
232  do
233  {
234  if( !pCtx->GetNextUrl( pUrl ) )
235  {
236  log->Error( UtilityMsg, "Failed to initialize XCp source, no more replicas to try" );
237  return XRootDStatus( stError );
238  }
239 
240  log->Debug( UtilityMsg, "Opening %s for reading", pUrl.c_str() );
241 
242  std::string value;
243  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
244 
245  pFile = new File();
246  pFile->SetProperty( "ReadRecovery", value );
247 
248  st = pFile->Open( pUrl, OpenFlags::Read );
249  if( !st.IsOK() )
250  {
251  log->Warning( UtilityMsg, "Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
252  DeletePtr( pFile );
253  continue;
254  }
255 
256  URL url( pUrl );
257  if( ( !url.IsLocalFile() && !pFile->IsSecure() ) ||
258  ( url.IsLocalFile() && url.IsMetalink() ) )
259  {
260  std::string datasrv;
261  pFile->GetProperty( "DataServer", datasrv );
262  //--------------------------------------------------------------------
263  // Decide whether we can use PgRead
264  //--------------------------------------------------------------------
265  int val = XrdCl::DefaultCpUsePgWrtRd;
266  XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
267  pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
268  }
269 
270  if( pFileSize < 0 )
271  {
272  StatInfo *statInfo = 0;
273  st = pFile->Stat( false, statInfo );
274  if( !st.IsOK() )
275  {
276  log->Warning( UtilityMsg, "Failed to stat %s: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
277  DeletePtr( pFile );
278  continue;
279  }
280  pFileSize = statInfo->GetSize();
281  pCtx->SetFileSize( pFileSize );
282  delete statInfo;
283  }
284  }
285  while( !st.IsOK() );
286 
287  std::pair<uint64_t, uint64_t> p = pCtx->GetBlock();
288  pCurrentOffset = p.first;
289  pBlkEnd = p.second + p.first;
290 
291  return st;
292 }
293 
294 XRootDStatus XCpSrc::Recover()
295 {
296  Log *log = DefaultEnv::GetLog();
297  XRootDStatus st;
298 
299  do
300  {
301  if( !pCtx->GetNextUrl( pUrl ) )
302  {
303  log->Error( UtilityMsg, "Failed to initialize XCp source, no more replicas to try" );
304  return XRootDStatus( stError );
305  }
306 
307  log->Debug( UtilityMsg, "Opening %s for reading", pUrl.c_str() );
308 
309  std::string value;
310  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
311 
312  pFile = new File();
313  pFile->SetProperty( "ReadRecovery", value );
314 
315  st = pFile->Open( pUrl, OpenFlags::Read );
316  if( !st.IsOK() )
317  {
318  DeletePtr( pFile );
319  log->Warning( UtilityMsg, "Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
320  }
321 
322  URL url( pUrl );
323  if( ( !url.IsLocalFile() && pFile->IsSecure() ) ||
324  ( url.IsLocalFile() && url.IsMetalink() ) )
325  {
326  std::string datasrv;
327  pFile->GetProperty( "DataServer", datasrv );
328  //--------------------------------------------------------------------
329  // Decide whether we can use PgRead
330  //--------------------------------------------------------------------
331  int val = XrdCl::DefaultCpUsePgWrtRd;
332  XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
333  pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
334  }
335  }
336  while( !st.IsOK() );
337 
338  pRecovered.insert( pOngoing.begin(), pOngoing.end() );
339  pOngoing.clear();
340 
341  // since we have a brand new source, we need
342  // to restart transfer rate statistics
343  pTransferTime = 0;
344  pStartTime = time( 0 );
345  pDataTransfered = 0;
346 
347  return st;
348 }
349 
350 XRootDStatus XCpSrc::ReadChunks()
351 {
352  XrdSysMutexHelper lck( pMtx );
353 
354  while( pOngoing.size() < pParallel && !pRecovered.empty() )
355  {
356  std::pair<uint64_t, uint64_t> p;
357  std::map<uint64_t, uint64_t>::iterator itr = pRecovered.begin();
358  p = *itr;
359  pOngoing.insert( p );
360  pRecovered.erase( itr );
361 
362  char *buffer = new char[p.second];
363  ChunkHandler *handler = new ChunkHandler( this, p.first, p.second, buffer, pFile, pUsePgRead );
364  XRootDStatus st = pUsePgRead
365  ? pFile->PgRead( p.first, p.second, buffer, handler )
366  : pFile->Read( p.first, p.second, buffer, handler );
367  if( !st.IsOK() )
368  {
369  delete[] buffer;
370  delete handler;
371  ReportResponse( new XRootDStatus( st ), 0, pFile );
372  return st;
373  }
374  }
375 
376  while( pOngoing.size() < pParallel && pCurrentOffset < pBlkEnd )
377  {
378  uint64_t chunkSize = pChunkSize;
379  if( pCurrentOffset + chunkSize > pBlkEnd )
380  chunkSize = pBlkEnd - pCurrentOffset;
381  pOngoing[pCurrentOffset] = chunkSize;
382  char *buffer = new char[chunkSize];
383  ChunkHandler *handler = new ChunkHandler( this, pCurrentOffset, chunkSize, buffer, pFile, pUsePgRead );
384  XRootDStatus st = pUsePgRead
385  ? pFile->PgRead( pCurrentOffset, chunkSize, buffer, handler )
386  : pFile->Read( pCurrentOffset, chunkSize, buffer, handler );
387  pCurrentOffset += chunkSize;
388  if( !st.IsOK() )
389  {
390  delete[] buffer;
391  delete handler;
392  ReportResponse( new XRootDStatus( st ), 0, pFile );
393  return st;
394  }
395  }
396 
397  if( pOngoing.empty() ) return XRootDStatus( stOK, suDone );
398 
399  if( pRecovered.empty() && pCurrentOffset >= pBlkEnd ) return XRootDStatus( stOK, suPartial );
400 
401  return XRootDStatus( stOK, suContinue );
402 }
403 
404 void XCpSrc::ReportResponse( XRootDStatus *status, PageInfo *chunk, File *handle )
405 {
406  XrdSysMutexHelper lck( pMtx );
407  bool ignore = false;
408 
409  if( status->IsOK() )
410  {
411  // if the status is OK remove it from
412  // the list of ongoing transfers, if it
413  // was not on the list we ignore the
414  // response (this could happen due to
415  // source change or stealing)
416  ignore = !pOngoing.erase( chunk->GetOffset() );
417  }
418  else if( FilesEqual( pFile, handle ) )
419  {
420  // if the status is NOT OK, and pFile
421  // match the handle it means that we see
422  // an error for the first time, map the
423  // broken file to the number of outstanding
424  // asynchronous operations and reset the pointer
425  pFailed[pFile] = pOngoing.size();
426  pFile = 0;
427  }
428  else
429  DeletePtr( status );
430 
431  if( !FilesEqual( pFile, handle ) )
432  {
433  // if the pFile does not match the handle,
434  // it means that this response came from
435  // a broken source, decrement the count of
436  // outstanding async operations for this src,
437  --pFailed[handle];
438  if( pFailed[handle] == 0 )
439  {
440  // if this was the last outstanding operation
441  // close the file and delete it
442  pFailed.erase( handle );
443  XRootDStatus st = handle->Close();
444  delete handle;
445  }
446  }
447 
448  lck.UnLock();
449 
450  if( status ) pReports.Put( status );
451 
452  if( ignore )
453  {
454  DeleteChunk( chunk );
455  return;
456  }
457 
458  if( chunk )
459  {
460  pDataTransfered += chunk->GetLength();
461  pCtx->PutChunk( chunk );
462  }
463 }
464 
465 void XCpSrc::Steal( XCpSrc *src )
466 {
467  if( !src ) return;
468 
469  XrdSysMutexHelper lck1( pMtx ), lck2( src->pMtx );
470 
471  Log *log = DefaultEnv::GetLog();
472  std::string myHost = URL( pUrl ).GetHostName(), srcHost = URL( src->pUrl ).GetHostName();
473 
474  if( !src->pRunning )
475  {
476  // the source we are stealing from is in error state, we can have everything
477 
478  pRecovered.insert( src->pOngoing.begin(), src->pOngoing.end() );
479  pRecovered.insert( src->pRecovered.begin(), src->pRecovered.end() );
480  pCurrentOffset = src->pCurrentOffset;
481  pBlkEnd = src->pBlkEnd;
482 
483  src->pOngoing.clear();
484  src->pRecovered.clear();
485  src->pCurrentOffset = 0;
486  src->pBlkEnd = 0;
487 
488  // a broken source might be waiting for
489  // someone to take over his data, so we
490  // need to notify
491  pCtx->NotifyIdleSrc();
492 
493  log->Debug( UtilityMsg, "%s: Stealing everything from %s", myHost.c_str(), srcHost.c_str() );
494 
495  return;
496  }
497 
498  // the source we are stealing from is just slower, only take part of its work
499  // so we want a fraction of its work we want for ourself
500  uint64_t myTransferRate = TransferRate(), srcTransferRate = src->TransferRate();
501  if( myTransferRate == 0 ) return;
502  double fraction = double( myTransferRate ) / double( myTransferRate + srcTransferRate );
503 
504  if( src->pCurrentOffset < src->pBlkEnd )
505  {
506  // the source still has a block of data
507  uint64_t blkSize = src->pBlkEnd - src->pCurrentOffset;
508  uint64_t steal = static_cast<uint64_t>( round( fraction * blkSize ) );
509  // if after stealing there will be less than one chunk
510  // take everything
511  if( blkSize - steal <= pChunkSize )
512  steal = blkSize;
513 
514  pCurrentOffset = src->pBlkEnd - steal;
515  pBlkEnd = src->pBlkEnd;
516  src->pBlkEnd -= steal;
517 
518  log->Debug( UtilityMsg, "%s: Stealing fraction (%f) of block from %s", myHost.c_str(), fraction, srcHost.c_str() );
519 
520  return;
521  }
522 
523  if( !src->pRecovered.empty() )
524  {
525  size_t count = static_cast<size_t>( round( fraction * src->pRecovered.size() ) );
526  while( count-- )
527  {
528  std::map<uint64_t, uint64_t>::iterator itr = src->pRecovered.begin();
529  pRecovered.insert( *itr );
530  src->pRecovered.erase( itr );
531  }
532 
533  log->Debug( UtilityMsg, "%s: Stealing fraction (%f) of recovered chunks from %s", myHost.c_str(), fraction, srcHost.c_str() );
534 
535  return;
536  }
537 
538  // * a fraction < 0.5 means that we are actually slower (so it does
539  // not make sense to steal ongoing's from someone who's faster)
540  // * a fraction ~ 0.5 means that we have more or less the same transfer
541  // rate (similarly, it doesn't make sense to steal)
542  // * the source needs to be really faster (though, this is an arbitrary
543  // choice) to actually steal something
544  if( !src->pOngoing.empty() && fraction > 0.7 )
545  {
546  size_t count = static_cast<size_t>( round( fraction * src->pOngoing.size() ) );
547  while( count-- )
548  {
549  std::map<uint64_t, uint64_t>::iterator itr = src->pOngoing.begin();
550  pRecovered.insert( *itr );
551  src->pOngoing.erase( itr );
552  }
553 
554  log->Debug( UtilityMsg, "%s: Stealing fraction (%f) of ongoing chunks from %s", myHost.c_str(), fraction, srcHost.c_str() );
555  }
556 }
557 
558 XRootDStatus XCpSrc::GetWork()
559 {
560  std::pair<uint64_t, uint64_t> p = pCtx->GetBlock();
561 
562  if( p.second > 0 )
563  {
564  XrdSysMutexHelper lck( pMtx );
565  pCurrentOffset = p.first;
566  pBlkEnd = p.first + p.second;
567 
568  Log *log = DefaultEnv::GetLog();
569  std::string myHost = URL( pUrl ).GetHostName();
570  log->Debug( UtilityMsg, "%s got next block", myHost.c_str() );
571 
572  return XRootDStatus();
573  }
574 
575  XCpSrc *wLink = pCtx->WeakestLink( this );
576  Steal( wLink );
577 
578  // if we managed to steal something declare success
579  if( pCurrentOffset < pBlkEnd || !pRecovered.empty() ) return XRootDStatus();
580  // otherwise return an error
581  return XRootDStatus( stError, errInvalidOp );
582 }
583 
585 {
586  time_t duration = pTransferTime + time( 0 ) - pStartTime;
587  return pDataTransfered / ( duration + 1 ); // add one to avoid floating point exception
588 }
589 
590 } /* namespace XrdCl */
XrdOucString File
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClXCpSrc.cc:53
ChunkHandler(XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle, bool usepgrd)
Definition: XrdClXCpSrc.cc:42
virtual ~ChunkHandler()
Definition: XrdClXCpSrc.cc:48
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A file.
Definition: XrdClFile.hh:46
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:206
bool IsSecure() const
Check if the file is using an encrypted connection.
Definition: XrdClFile.cc:857
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:99
bool GetProperty(const std::string &name, std::string &value) const
Definition: XrdClFile.cc:878
XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:245
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:177
bool SetProperty(const std::string &name, const std::string &value)
Definition: XrdClFile.cc:867
Handle an async response.
static bool HasPgRW(const XrdCl::URL &url)
Definition: XrdClUtils.hh:267
void NotifyInitExpectant()
Definition: XrdClXCpCtx.hh:197
void NotifyIdleSrc()
Definition: XrdClXCpCtx.cc:172
bool GetNextUrl(std::string &url)
Definition: XrdClXCpCtx.cc:57
void RemoveSrc(XCpSrc *src)
Definition: XrdClXCpCtx.hh:167
XCpSrc * WeakestLink(XCpSrc *exclude)
Definition: XrdClXCpCtx.cc:66
void PutChunk(PageInfo *chunk)
Definition: XrdClXCpCtx.cc:87
void SetFileSize(int64_t size)
Definition: XrdClXCpCtx.cc:104
std::pair< uint64_t, uint64_t > GetBlock()
Definition: XrdClXCpCtx.cc:92
friend class ChunkHandler
Definition: XrdClXCpSrc.hh:39
static void DeleteChunk(PageInfo *&chunk)
Definition: XrdClXCpSrc.hh:126
XCpSrc(uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx)
Definition: XrdClXCpSrc.cc:110
uint64_t TransferRate()
Definition: XrdClXCpSrc.cc:584
const uint16_t suPartial
Definition: XrdClStatus.hh:41
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const int DefaultCpUsePgWrtRd
const uint64_t UtilityMsg
const uint16_t suDone
Definition: XrdClStatus.hh:38
const uint16_t suContinue
Definition: XrdClStatus.hh:39
XrdSysError Log
Definition: XrdConfig.cc:112
@ Read
Open only for reading.
uint32_t GetLength() const
Get the data length.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124