XRootD
XrdClThirdPartyCopyJob.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClDefaultEnv.hh"
29 #include "XrdCl/XrdClUtils.hh"
31 #include "XrdCl/XrdClMonitor.hh"
33 #include "XrdCl/XrdClDlgEnv.hh"
34 #include "XrdOuc/XrdOucTPC.hh"
35 #include "XrdSys/XrdSysPthread.hh"
36 #include "XrdSys/XrdSysTimer.hh"
37 
38 #include <iostream>
39 #include <chrono>
40 
41 #include <cctype>
42 #include <sstream>
43 #include <cstdlib>
44 #include <cstdio>
45 #include <sys/time.h>
46 #include <sys/types.h>
47 #include <unistd.h>
48 
49 namespace
50 {
51  //----------------------------------------------------------------------------
53  //----------------------------------------------------------------------------
54  class TPCStatusHandler: public XrdCl::ResponseHandler
55  {
56  public:
57  //------------------------------------------------------------------------
58  // Constructor
59  //------------------------------------------------------------------------
60  TPCStatusHandler():
61  pSem( new XrdSysSemaphore(0) ), pStatus(0)
62  {
63  }
64 
65  //------------------------------------------------------------------------
66  // Destructor
67  //------------------------------------------------------------------------
68  virtual ~TPCStatusHandler()
69  {
70  delete pStatus;
71  delete pSem;
72  }
73 
74  //------------------------------------------------------------------------
75  // Handle Response
76  //------------------------------------------------------------------------
77  virtual void HandleResponse( XrdCl::XRootDStatus *status,
78  XrdCl::AnyObject *response )
79  {
80  delete response;
81  pStatus = status;
82  pSem->Post();
83  }
84 
85  //------------------------------------------------------------------------
86  // Get Mutex
87  //------------------------------------------------------------------------
88  XrdSysSemaphore *GetXrdSysSemaphore()
89  {
90  return pSem;
91  }
92 
93  //------------------------------------------------------------------------
94  // Get status
95  //------------------------------------------------------------------------
96  XrdCl::XRootDStatus *GetStatus()
97  {
98  return pStatus;
99  }
100 
101  private:
102  TPCStatusHandler(const TPCStatusHandler &other);
103  TPCStatusHandler &operator = (const TPCStatusHandler &other);
104 
105  XrdSysSemaphore *pSem;
106  XrdCl::XRootDStatus *pStatus;
107  };
108 
109  class InitTimeoutCalc
110  {
111  public:
112 
113  InitTimeoutCalc( uint16_t timeLeft ) :
114  hasInitTimeout( timeLeft ), start( time( 0 ) ), timeLeft( timeLeft )
115  {
116 
117  }
118 
119  XrdCl::XRootDStatus operator()()
120  {
121  if( !hasInitTimeout ) return XrdCl::XRootDStatus();
122 
123  time_t now = time( 0 );
124  if( now - start > timeLeft )
126 
127  timeLeft -= ( now - start );
128  return XrdCl::XRootDStatus();
129  }
130 
131  operator uint16_t()
132  {
133  return timeLeft;
134  }
135 
136  private:
137  bool hasInitTimeout;
138  time_t start;
139  uint16_t timeLeft;
140  };
141 
142  static XrdCl::XRootDStatus& UpdateErrMsg( XrdCl::XRootDStatus &status, const std::string &str )
143  {
144  std::string msg = status.GetErrorMessage();
145  msg += " (" + str + ")";
146  status.SetErrorMessage( msg );
147  return status;
148  }
149 }
150 
151 namespace XrdCl
152 {
153  //----------------------------------------------------------------------------
154  // Constructor
155  //----------------------------------------------------------------------------
157  PropertyList *jobProperties,
158  PropertyList *jobResults ):
159  CopyJob( jobId, jobProperties, jobResults ),
160  dstFile( File::DisableVirtRedirect ),
161  sourceSize( 0 ),
162  initTimeout( 0 ),
163  force( false ),
164  coerce( false ),
165  delegate( false ),
166  nbStrm( 0 ),
167  tpcLite( false )
168  {
169  Log *log = DefaultEnv::GetLog();
170  log->Debug( UtilityMsg, "Creating a third party copy job, from %s to %s",
171  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
172  }
173 
174  //----------------------------------------------------------------------------
175  // Run the copy job
176  //----------------------------------------------------------------------------
178  {
179  Log *log = DefaultEnv::GetLog();
180 
181  XRootDStatus st = CanDo();
182  if( !st.IsOK() ) return st;
183 
184  if( tpcLite )
185  {
186  //------------------------------------------------------------------------
187  // Run TPC-lite algorithm
188  //------------------------------------------------------------------------
189  XRootDStatus st = RunLite( progress );
190  if( !st.IsOK() ) return st;
191  }
192  else
193  {
194  //------------------------------------------------------------------------
195  // Run vanilla TPC algorithm
196  //------------------------------------------------------------------------
197  XRootDStatus st = RunTPC( progress );
198  if( !st.IsOK() ) return st;
199  }
200 
201  //--------------------------------------------------------------------------
202  // Verify the checksums if needed
203  //--------------------------------------------------------------------------
204  if( checkSumMode != "none" )
205  {
206  log->Debug( UtilityMsg, "Attempting checksum calculation." );
207  std::string sourceCheckSum;
208  std::string targetCheckSum;
209 
210  //------------------------------------------------------------------------
211  // Get the check sum at source
212  //------------------------------------------------------------------------
213  timeval oStart, oEnd;
214  XRootDStatus st;
215  if( checkSumMode == "end2end" || checkSumMode == "source" ||
216  !checkSumPreset.empty() )
217  {
218  gettimeofday( &oStart, 0 );
219  if( !checkSumPreset.empty() )
220  {
221  sourceCheckSum = checkSumType + ":";
222  sourceCheckSum += Utils::NormalizeChecksum( checkSumType,
223  checkSumPreset );
224  }
225  else
226  {
227  VirtualRedirector *redirector = 0;
228  std::string vrCheckSum;
229  if( GetSource().IsMetalink() &&
230  ( redirector = RedirectorRegistry::Instance().Get( GetSource() ) ) &&
231  !( vrCheckSum = redirector->GetCheckSum( checkSumType ) ).empty() )
232  sourceCheckSum = vrCheckSum;
233  else
234  st = Utils::GetRemoteCheckSum( sourceCheckSum, checkSumType, tpcSource );
235  }
236  gettimeofday( &oEnd, 0 );
237  if( !st.IsOK() )
238  return UpdateErrMsg( st, "source" );
239 
240  pResults->Set( "sourceCheckSum", sourceCheckSum );
241  }
242 
243  //------------------------------------------------------------------------
244  // Get the check sum at destination
245  //------------------------------------------------------------------------
246  timeval tStart, tEnd;
247 
248  if( checkSumMode == "end2end" || checkSumMode == "target" )
249  {
250  gettimeofday( &tStart, 0 );
251  st = Utils::GetRemoteCheckSum( targetCheckSum, checkSumType, realTarget );
252 
253  gettimeofday( &tEnd, 0 );
254  if( !st.IsOK() )
255  return UpdateErrMsg( st, "destination" );
256  pResults->Set( "targetCheckSum", targetCheckSum );
257  }
258 
259  //------------------------------------------------------------------------
260  // Make sure the checksums are both lower case
261  //------------------------------------------------------------------------
262  auto sanitize_cksum = []( char c )
263  {
264  std::locale loc;
265  if( std::isalpha( c ) ) return std::tolower( c, loc );
266  return c;
267  };
268 
269  std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
270  sourceCheckSum.begin(), sanitize_cksum );
271 
272  std::transform( targetCheckSum.begin(), targetCheckSum.end(),
273  targetCheckSum.begin(), sanitize_cksum );
274 
275  //------------------------------------------------------------------------
276  // Compare and inform monitoring
277  //------------------------------------------------------------------------
278  if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
279  {
280  bool match = false;
281  if( sourceCheckSum == targetCheckSum )
282  match = true;
283 
285  if( mon )
286  {
288  i.transfer.origin = &GetSource();
289  i.transfer.target = &GetTarget();
290  i.cksum = sourceCheckSum;
291  i.oTime = Utils::GetElapsedMicroSecs( oStart, oEnd );
292  i.tTime = Utils::GetElapsedMicroSecs( tStart, tEnd );
293  i.isOK = match;
294  mon->Event( Monitor::EvCheckSum, &i );
295  }
296 
297  if( !match )
298  return XRootDStatus( stError, errCheckSumError, 0 );
299 
300  log->Info(UtilityMsg, "Checksum verification: succeeded." );
301  }
302  }
303 
304  return XRootDStatus();
305  }
306 
307  //----------------------------------------------------------------------------
308  // Check whether doing a third party copy is feasible for given
309  // job descriptor
310  //----------------------------------------------------------------------------
311  XRootDStatus ThirdPartyCopyJob::CanDo()
312  {
313  const URL &source = GetSource();
314  const URL &target = GetTarget();
315 
316  //--------------------------------------------------------------------------
317  // We can only do a TPC if both source and destination are remote files
318  //--------------------------------------------------------------------------
319  if( source.IsLocalFile() || target.IsLocalFile() )
321  "Cannot do a third-party-copy for local file." );
322 
323  //--------------------------------------------------------------------------
324  // Check the initial settings
325  //--------------------------------------------------------------------------
326  Log *log = DefaultEnv::GetLog();
327  log->Debug( UtilityMsg, "Check if third party copy between %s and %s "
328  "is possible", source.GetObfuscatedURL().c_str(),
329  target.GetObfuscatedURL().c_str() );
330 
331  if( target.GetProtocol() != "root" &&
332  target.GetProtocol() != "xroot" &&
333  target.GetProtocol() != "roots" &&
334  target.GetProtocol() != "xroots" )
335  return XRootDStatus( stError, errNotSupported, 0, "Third-party-copy "
336  "is only supported for root/xroot protocol." );
337 
338  pProperties->Get( "initTimeout", initTimeout );
339  InitTimeoutCalc timeLeft( initTimeout );
340 
341  pProperties->Get( "checkSumMode", checkSumMode );
342  pProperties->Get( "checkSumType", checkSumType );
343  pProperties->Get( "checkSumPreset", checkSumPreset );
344  pProperties->Get( "force", force );
345  pProperties->Get( "coerce", coerce );
346  pProperties->Get( "delegate", delegate );
347 
349  env->GetInt( "SubStreamsPerChannel", nbStrm );
350 
351  // account for the control stream
352  if (nbStrm > 0) --nbStrm;
353 
354  bool tpcLiteOnly = false;
355 
356  if( !delegate )
357  log->Info( UtilityMsg, "We are NOT using delegation" );
358 
359  //--------------------------------------------------------------------------
360  // Resolve the 'auto' checksum type.
361  //--------------------------------------------------------------------------
362  if( checkSumType == "auto" )
363  {
364  checkSumType = Utils::InferChecksumType( GetSource(), GetTarget() );
365  if( checkSumType.empty() )
366  log->Info( UtilityMsg, "Could not infer checksum type." );
367  else
368  log->Info( UtilityMsg, "Using inferred checksum type: %s.", checkSumType.c_str() );
369  }
370 
371  //--------------------------------------------------------------------------
372  // Check if we can open the source. Note in TPC-lite scenario it is optional
373  // for this step to be successful.
374  //--------------------------------------------------------------------------
375  File sourceFile;
376  XRootDStatus st;
377  URL sourceURL = source;
378  URL::ParamsMap params;
379 
380  // set WriteRecovery property
381  std::string value;
382  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
383  sourceFile.SetProperty( "ReadRecovery", value );
384 
385  // save the original opaque parameter list as specified by the user for later
386  const URL::ParamsMap &srcparams = sourceURL.GetParams();
387 
388  //--------------------------------------------------------------------------
389  // Do the facultative step at source only if the protocol is root/xroot,
390  // otherwise don't bother
391  //--------------------------------------------------------------------------
392  if( sourceURL.GetProtocol() == "root" || sourceURL.GetProtocol() == "xroot" ||
393  sourceURL.GetProtocol() == "roots" || sourceURL.GetProtocol() == "xroots" )
394  {
395  params = sourceURL.GetParams();
396  params["tpc.stage"] = "placement";
397  sourceURL.SetParams( params );
398  log->Debug( UtilityMsg, "Trying to open %s for reading",
399  sourceURL.GetObfuscatedURL().c_str() );
400  st = sourceFile.Open( sourceURL.GetURL(), OpenFlags::Read, Access::None,
401  timeLeft );
402  }
403  else
404  st = XRootDStatus( stError, errNotSupported );
405 
406  if( st.IsOK() )
407  {
408  std::string sourceUrl;
409  sourceFile.GetProperty( "LastURL", sourceUrl );
410  tpcSource = sourceUrl;
411 
412  VirtualRedirector *redirector = 0;
413  long long size = -1;
414  if( source.IsMetalink() &&
415  ( redirector = RedirectorRegistry::Instance().Get( tpcSource ) ) &&
416  ( size = redirector->GetSize() ) >= 0 )
417  sourceSize = size;
418  else
419  {
420  StatInfo *statInfo;
421  st = sourceFile.Stat( false, statInfo );
422  if (st.IsOK()) sourceSize = statInfo->GetSize();
423  delete statInfo;
424  }
425  }
426  else
427  {
428  log->Info( UtilityMsg, "Cannot open source file %s: %s",
429  source.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
430  if( !delegate )
431  {
432  //----------------------------------------------------------------------
433  // If we cannot contact the source and there is no credential to delegate
434  // it cannot possibly work
435  //----------------------------------------------------------------------
436  st.status = stFatal;
437  return st;
438  }
439 
440  tpcSource = sourceURL;
441  tpcLiteOnly = true;
442  }
443 
444  // get the opaque parameters as returned by the redirector
445  URL tpcSourceUrl = tpcSource;
446  URL::ParamsMap tpcsrcparams = tpcSourceUrl.GetParams();
447  // merge the original cgi with the one returned by the redirector,
448  // the original values take precedence
449  URL::ParamsMap::const_iterator itr = srcparams.begin();
450  for( ; itr != srcparams.end(); ++itr )
451  tpcsrcparams[itr->first] = itr->second;
452  tpcSourceUrl.SetParams( tpcsrcparams );
453  // save the merged opaque parameter list for later
454  std::string scgi;
455  const URL::ParamsMap &scgiparams = tpcSourceUrl.GetParams();
456  itr = scgiparams.begin();
457  for( ; itr != scgiparams.end(); ++itr )
458  if( itr->first.compare( 0, 6, "xrdcl." ) != 0 )
459  {
460  if( !scgi.empty() ) scgi += '\t';
461  scgi += itr->first + '=' + itr->second;
462  }
463 
464  if( !timeLeft().IsOK() )
465  {
466  // we still want to send a close, but we time it out quickly
467  st = sourceFile.Close( 1 );
468  return XRootDStatus( stError, errOperationExpired );
469  }
470 
471  st = sourceFile.Close( timeLeft );
472 
473  if( !timeLeft().IsOK() )
474  return XRootDStatus( stError, errOperationExpired );
475 
476  //--------------------------------------------------------------------------
477  // Now we need to check the destination !!!
478  //--------------------------------------------------------------------------
479  if( delegate )
481  else
483 
484  //--------------------------------------------------------------------------
485  // Generate the destination CGI
486  //--------------------------------------------------------------------------
487  log->Debug( UtilityMsg, "Generating the destination TPC URL" );
488 
489  tpcKey = GenerateKey();
490 
491  char *cgiBuff = new char[2048];
492  const char *cgiP = XrdOucTPC::cgiC2Dst( tpcKey.c_str(),
493  tpcSource.GetHostId().c_str(),
494  tpcSource.GetPath().c_str(),
495  0, cgiBuff, 2048, nbStrm,
496  GetSource().GetHostId().c_str(),
497  GetSource().GetProtocol().c_str(),
498  GetTarget().GetProtocol().c_str(),
499  delegate );
500 
501  if( *cgiP == '!' )
502  {
503  log->Error( UtilityMsg, "Unable to setup target url: %s", cgiP+1 );
504  delete [] cgiBuff;
505  return XRootDStatus( stError, errNotSupported );
506  }
507 
508  URL cgiURL; cgiURL.SetParams( cgiBuff );
509  delete [] cgiBuff;
510 
511  realTarget = GetTarget();
512  params = realTarget.GetParams();
513  MessageUtils::MergeCGI( params, cgiURL.GetParams(), true );
514 
515  if( !tpcLiteOnly ) // we only append oss.asize if it source file size is actually known
516  {
517  std::ostringstream o; o << sourceSize;
518  params["oss.asize"] = o.str();
519  }
520  params["tpc.stage"] = "copy";
521 
522  // forward source cgi info to the destination in case we are going to do delegation
523  if( !scgi.empty() && delegate )
524  params["tpc.scgi"] = scgi;
525 
526  realTarget.SetParams( params );
527 
528  log->Debug( UtilityMsg, "Target url is: %s", realTarget.GetObfuscatedURL().c_str() );
529 
530  //--------------------------------------------------------------------------
531  // Open the target file
532  //--------------------------------------------------------------------------
533  // set WriteRecovery property
534  DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
535  dstFile.SetProperty( "WriteRecovery", value );
536 
537  OpenFlags::Flags targetFlags = OpenFlags::Update;
538  if( force )
539  targetFlags |= OpenFlags::Delete;
540  else
541  targetFlags |= OpenFlags::New;
542 
543  if( coerce )
544  targetFlags |= OpenFlags::Force;
545 
547  st = dstFile.Open( realTarget.GetURL(), targetFlags, mode, timeLeft );
548  if( !st.IsOK() )
549  {
550  log->Error( UtilityMsg, "Unable to open target %s: %s",
551  realTarget.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
552  if( st.code == errErrorResponse &&
553  st.errNo == kXR_FSError &&
554  st.GetErrorMessage().find( "tpc not supported" ) != std::string::npos )
555  return XRootDStatus( stError, errNotSupported, 0, // the open failed due to lack of TPC support on the server side
556  "Destination does not support third-party-copy." );
557  return UpdateErrMsg( st, "destination" );
558  }
559 
560  std::string lastUrl; dstFile.GetProperty( "LastURL", lastUrl );
561  realTarget = lastUrl;
562 
563  if( !timeLeft().IsOK() )
564  {
565  // we still want to send a close, but we time it out fast
566  st = dstFile.Close( 1 );
567  return XRootDStatus( stError, errOperationExpired );
568  }
569 
570  //--------------------------------------------------------------------------
571  // Verify if the destination supports TPC / TPC-lite
572  //--------------------------------------------------------------------------
573  st = Utils::CheckTPCLite( realTarget.GetURL() );
574  if( !st.IsOK() )
575  {
576  // we still want to send a close, but we time it out fast
577  st = dstFile.Close( 1 );
578  return XRootDStatus( stError, errNotSupported, 0, // doesn't support TPC
579  "Destination does not support third-party-copy.");
580  }
581 
582  //--------------------------------------------------------------------------
583  // if target supports TPC-lite and we have a credential to delegate we can
584  // go ahead and use TPC-lite
585  //--------------------------------------------------------------------------
586  tpcLite = ( st.code != suPartial ) && delegate;
587 
588  if( !tpcLite && tpcLiteOnly ) // doesn't support TPC-lite and it was our only hope
589  {
590  st = dstFile.Close( 1 );
591  return XRootDStatus( stError, errNotSupported, 0, "Destination does not "
592  "support delegation." );
593  }
594 
595  //--------------------------------------------------------------------------
596  // adjust the InitTimeout
597  //--------------------------------------------------------------------------
598  if( !timeLeft().IsOK() )
599  {
600  // we still want to send a close, but we time it out fast
601  st = dstFile.Close( 1 );
602  return XRootDStatus( stError, errOperationExpired );
603  }
604 
605  //--------------------------------------------------------------------------
606  // If we don't use delegation the source has to support TPC
607  //--------------------------------------------------------------------------
608  if( !tpcLite )
609  {
610  st = Utils::CheckTPC( URL( tpcSource ).GetURL(), timeLeft );
611  if( !st.IsOK() )
612  {
613  log->Error( UtilityMsg, "Source (%s) does not support TPC",
614  tpcSource.GetURL().c_str() );
615  return XRootDStatus( stError, errNotSupported, 0, "Source does not "
616  "support third-party-copy" );
617  }
618 
619  if( !timeLeft().IsOK() )
620  {
621  // we still want to send a close, but we time it out quickly
622  st = sourceFile.Close( 1 );
623  return XRootDStatus( stError, errOperationExpired );
624  }
625  }
626 
627  initTimeout = uint16_t( timeLeft );
628 
629  return XRootDStatus();
630  }
631 
632  //----------------------------------------------------------------------------
633  // Run vanilla copy job
634  //----------------------------------------------------------------------------
635  XRootDStatus ThirdPartyCopyJob::RunTPC( CopyProgressHandler *progress )
636  {
637  Log *log = DefaultEnv::GetLog();
638 
639  //--------------------------------------------------------------------------
640  // Generate the source CGI
641  //--------------------------------------------------------------------------
642  char *cgiBuff = new char[2048];
643  const char *cgiP = XrdOucTPC::cgiC2Src( tpcKey.c_str(),
644  realTarget.GetHostName().c_str(), -1, cgiBuff,
645  2048 );
646  if( *cgiP == '!' )
647  {
648  log->Error( UtilityMsg, "Unable to setup source url: %s", cgiP+1 );
649  delete [] cgiBuff;
650  return XRootDStatus( stError, errInvalidArgs );
651  }
652 
653  URL cgiURL; cgiURL.SetParams( cgiBuff );
654  delete [] cgiBuff;
655  URL::ParamsMap params = tpcSource.GetParams();
656  MessageUtils::MergeCGI( params, cgiURL.GetParams(), true );
657  params["tpc.stage"] = "copy";
658  tpcSource.SetParams( params );
659 
660  log->Debug( UtilityMsg, "Source url is: %s", tpcSource.GetObfuscatedURL().c_str() );
661 
662  // Set the close timeout to the default value of the stream timeout
663  int closeTimeout = 0;
664  (void) DefaultEnv::GetEnv()->GetInt( "StreamTimeout", closeTimeout);
665 
666  //--------------------------------------------------------------------------
667  // Set up the rendez-vous and open the source
668  //--------------------------------------------------------------------------
669  InitTimeoutCalc timeLeft( initTimeout );
670  XRootDStatus st = dstFile.Sync( timeLeft );
671  if( !st.IsOK() )
672  {
673  log->Error( UtilityMsg, "Unable set up rendez-vous: %s",
674  st.ToStr().c_str() );
675  XRootDStatus status = dstFile.Close( closeTimeout );
676  return UpdateErrMsg( st, "destination" );
677  }
678 
679  //--------------------------------------------------------------------------
680  // Calculate the time we have left to perform source open
681  //--------------------------------------------------------------------------
682  if( !timeLeft().IsOK() )
683  {
684  XRootDStatus status = dstFile.Close( closeTimeout );
685  return XRootDStatus( stError, errOperationExpired );
686  }
687 
688  File sourceFile( File::DisableVirtRedirect );
689  // set ReadRecovery property
690  std::string value;
691  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
692  sourceFile.SetProperty( "ReadRecovery", value );
693 
694  st = sourceFile.Open( tpcSource.GetURL(), OpenFlags::Read, Access::None,
695  timeLeft );
696 
697  if( !st.IsOK() )
698  {
699  log->Error( UtilityMsg, "Unable to open source %s: %s",
700  tpcSource.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
701  XRootDStatus status = dstFile.Close( closeTimeout );
702  return UpdateErrMsg( st, "source" );
703  }
704 
705  //--------------------------------------------------------------------------
706  // Do the copy and follow progress
707  //--------------------------------------------------------------------------
708  TPCStatusHandler statusHandler;
709  XrdSysSemaphore *sem = statusHandler.GetXrdSysSemaphore();
710  StatInfo *info = 0;
711 
712  uint16_t tpcTimeout = 0;
713  pProperties->Get( "tpcTimeout", tpcTimeout );
714 
715  st = dstFile.Sync( &statusHandler, tpcTimeout );
716  if( !st.IsOK() )
717  {
718  log->Error( UtilityMsg, "Unable start the copy: %s",
719  st.ToStr().c_str() );
720  XRootDStatus statusS = sourceFile.Close( closeTimeout );
721  XRootDStatus statusT = dstFile.Close( closeTimeout );
722  return UpdateErrMsg( st, "destination" );
723  }
724 
725  //--------------------------------------------------------------------------
726  // Stat the file every second until sync returns
727  //--------------------------------------------------------------------------
728  bool canceled = false;
729  while( 1 )
730  {
731  XrdSysTimer::Wait( 2500 );
732 
733  if( progress )
734  {
735  st = dstFile.Stat( true, info );
736  if( st.IsOK() )
737  {
738  progress->JobProgress( pJobId, info->GetSize(), sourceSize );
739  delete info;
740  info = 0;
741  }
742  bool shouldCancel = progress->ShouldCancel( pJobId );
743  if( shouldCancel )
744  {
745  log->Debug( UtilityMsg, "Cancellation requested by progress handler" );
746  Buffer arg, *response = 0; arg.FromString( "ofs.tpc cancel" );
747  XRootDStatus st = dstFile.Fcntl( arg, response );
748  if( !st.IsOK() )
749  log->Debug( UtilityMsg, "Error while trying to cancel tpc: %s",
750  st.ToStr().c_str() );
751 
752  delete response;
753  canceled = true;
754  break;
755  }
756  }
757 
758  if( sem->CondWait() )
759  break;
760  }
761 
762  //--------------------------------------------------------------------------
763  // Sync has returned so we can check if it was successful
764  //--------------------------------------------------------------------------
765  if( canceled )
766  sem->Wait();
767 
768  st = *statusHandler.GetStatus();
769 
770  if( !st.IsOK() )
771  {
772  log->Error( UtilityMsg, "Third party copy from %s to %s failed: %s",
773  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str(),
774  st.ToStr().c_str() );
775 
776  // Ignore close response
777  XRootDStatus statusS = sourceFile.Close( closeTimeout );
778  XRootDStatus statusT = dstFile.Close( closeTimeout );
779  return st;
780  }
781 
782  XRootDStatus statusS = sourceFile.Close( closeTimeout );
783  XRootDStatus statusT = dstFile.Close( closeTimeout );
784 
785  if ( !statusS.IsOK() || !statusT.IsOK() )
786  {
787  st = (statusS.IsOK() ? statusT : statusS);
788  log->Error( UtilityMsg, "Third party copy from %s to %s failed during "
789  "close of %s: %s", GetSource().GetObfuscatedURL().c_str(),
790  GetTarget().GetObfuscatedURL().c_str(),
791  (statusS.IsOK() ? "destination" : "source"), st.ToStr().c_str() );
792  return UpdateErrMsg( st, statusS.IsOK() ? "source" : "destination" );
793  }
794 
795  log->Debug( UtilityMsg, "Third party copy from %s to %s successful",
796  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
797 
798  pResults->Set( "size", sourceSize );
799 
800  return XRootDStatus();
801  }
802 
803  XRootDStatus ThirdPartyCopyJob::RunLite( CopyProgressHandler *progress )
804  {
805  Log *log = DefaultEnv::GetLog();
806 
807  // Set the close timeout to the default value of the stream timeout
808  int closeTimeout = 0;
809  (void) DefaultEnv::GetEnv()->GetInt( "StreamTimeout", closeTimeout);
810 
811  //--------------------------------------------------------------------------
812  // Set up the rendez-vous
813  //--------------------------------------------------------------------------
814  InitTimeoutCalc timeLeft( initTimeout );
815  XRootDStatus st = dstFile.Sync( timeLeft );
816  if( !st.IsOK() )
817  {
818  log->Error( UtilityMsg, "Unable set up rendez-vous: %s",
819  st.ToStr().c_str() );
820  XRootDStatus status = dstFile.Close( closeTimeout );
821  return UpdateErrMsg( st, "destination" );
822  }
823 
824  //--------------------------------------------------------------------------
825  // Do the copy and follow progress
826  //--------------------------------------------------------------------------
827  TPCStatusHandler statusHandler;
828  XrdSysSemaphore *sem = statusHandler.GetXrdSysSemaphore();
829  StatInfo *info = 0;
830 
831  uint16_t tpcTimeout = 0;
832  pProperties->Get( "tpcTimeout", tpcTimeout );
833 
834  st = dstFile.Sync( &statusHandler, tpcTimeout );
835  if( !st.IsOK() )
836  {
837  log->Error( UtilityMsg, "Unable start the copy: %s",
838  st.ToStr().c_str() );
839  XRootDStatus statusT = dstFile.Close( closeTimeout );
840  return UpdateErrMsg( st, "destination" );
841  }
842 
843  //--------------------------------------------------------------------------
844  // Stat the file every second until sync returns
845  //--------------------------------------------------------------------------
846  bool canceled = false;
847  while( 1 )
848  {
849  XrdSysTimer::Wait( 2500 );
850 
851  if( progress )
852  {
853  st = dstFile.Stat( true, info );
854  if( st.IsOK() )
855  {
856  progress->JobProgress( pJobId, info->GetSize(), sourceSize );
857  delete info;
858  info = 0;
859  }
860  bool shouldCancel = progress->ShouldCancel( pJobId );
861  if( shouldCancel )
862  {
863  log->Debug( UtilityMsg, "Cancellation requested by progress handler" );
864  Buffer arg, *response = 0; arg.FromString( "ofs.tpc cancel" );
865  XRootDStatus st = dstFile.Fcntl( arg, response );
866  if( !st.IsOK() )
867  log->Debug( UtilityMsg, "Error while trying to cancel tpc: %s",
868  st.ToStr().c_str() );
869 
870  delete response;
871  canceled = true;
872  break;
873  }
874  }
875 
876  if( sem->CondWait() )
877  break;
878  }
879 
880  //--------------------------------------------------------------------------
881  // Sync has returned so we can check if it was successful
882  //--------------------------------------------------------------------------
883  if( canceled )
884  sem->Wait();
885 
886  st = *statusHandler.GetStatus();
887 
888  if( !st.IsOK() )
889  {
890  log->Error( UtilityMsg, "Third party copy from %s to %s failed: %s",
891  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str(),
892  st.ToStr().c_str() );
893 
894  // Ignore close response
895  XRootDStatus statusT = dstFile.Close( closeTimeout );
896  return st;
897  }
898 
899  st = dstFile.Close( closeTimeout );
900 
901  if ( !st.IsOK() )
902  {
903  log->Error( UtilityMsg, "Third party copy from %s to %s failed during "
904  "close of %s: %s", GetSource().GetObfuscatedURL().c_str(),
905  GetTarget().GetObfuscatedURL().c_str(),
906  "destination", st.ToStr().c_str() );
907  return UpdateErrMsg( st, "destination" );
908  }
909 
910  log->Debug( UtilityMsg, "Third party copy from %s to %s successful",
911  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
912 
913  pResults->Set( "size", sourceSize );
914 
915  return XRootDStatus();
916  }
917 
918 
919  //----------------------------------------------------------------------------
920  // Generate a rendez-vous key
921  //----------------------------------------------------------------------------
922  std::string ThirdPartyCopyJob::GenerateKey()
923  {
924  static const int _10to9 = 1000000000;
925 
926  char tpcKey[25];
927 
928  auto tp = std::chrono::high_resolution_clock::now();
929  auto d = tp.time_since_epoch();
930  auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>( d );
931  auto s = std::chrono::duration_cast<std::chrono::seconds>( d );
932  uint32_t k1 = ns.count() - s.count() * _10to9;
933  uint32_t k2 = getpid() | (getppid() << 16);
934  uint32_t k3 = s.count();
935  snprintf( tpcKey, 25, "%08x%08x%08x", k1, k2, k3 );
936  return std::string(tpcKey);
937  }
938 }
@ kXR_FSError
Definition: XProtocol.hh:995
XrdOucString File
PropertyList * pResults
const URL & GetSource() const
Get source.
Definition: XrdClCopyJob.hh:94
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
void Enable()
Enable delegation in the environment.
Definition: XrdClDlgEnv.hh:47
static DlgEnv & Instance()
Definition: XrdClDlgEnv.hh:28
void Disable()
Disable delegation in the environment.
Definition: XrdClDlgEnv.hh:55
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A file.
Definition: XrdClFile.hh:46
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:151
@ DisableVirtRedirect
Definition: XrdClFile.hh:52
XRootDStatus Fcntl(const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:610
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:99
bool GetProperty(const std::string &name, std::string &value) const
Definition: XrdClFile.cc:878
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:177
bool SetProperty(const std::string &name, const std::string &value)
Definition: XrdClFile.cc:867
XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition: XrdClFile.cc:414
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
ThirdPartyCopyJob(uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
Constructor.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:458
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:395
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition: XrdClURL.cc:491
bool IsLocalFile() const
Definition: XrdClURL.cc:467
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
static XRootDStatus CheckTPCLite(const std::string &server, uint16_t timeout=0)
Definition: XrdClUtils.cc:426
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
Definition: XrdClUtils.cc:648
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
Definition: XrdClUtils.cc:771
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
Definition: XrdClUtils.cc:269
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
Definition: XrdClUtils.cc:279
static XRootDStatus CheckTPC(const std::string &server, uint16_t timeout=0)
Check if peer supports tpc.
Definition: XrdClUtils.cc:382
An interface for metadata redirectors.
virtual std::string GetCheckSum(const std::string &type) const =0
void SetErrorMessage(const std::string &message)
Set the error message.
const std::string & GetErrorMessage() const
Get error message.
static const char * cgiC2Dst(const char *cKey, const char *xSrc, const char *xLfn, const char *xCks, char *Buff, int Blen, int strms=0, const char *iHst=0, const char *sprt=0, const char *tprt=0, bool dlgon=false, bool push=false)
Definition: XrdOucTPC.cc:62
static const char * cgiC2Src(const char *cKey, const char *xDst, int xTTL, char *Buff, int Blen)
Definition: XrdOucTPC.cc:136
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
const uint16_t suPartial
Definition: XrdClStatus.hh:41
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const uint16_t errCheckSumError
Definition: XrdClStatus.hh:101
XrdSysError Log
Definition: XrdConfig.cc:112
Mode
Access mode.
@ UR
owner readable
@ GR
group readable
@ UW
owner writable
@ OR
world readable
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Update
Open for reading and writing.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124