XRootD
XrdOfsTPCProg Class Reference

#include <XrdOfsTPCProg.hh>

+ Collaboration diagram for XrdOfsTPCProg:

Public Member Functions

 XrdOfsTPCProg (XrdOfsTPCProg *Prev, int num, int errMon)
 
 ~XrdOfsTPCProg ()
 
void Cancel ()
 
void Run ()
 
int Xeq (bool &isIPv4)
 

Static Public Member Functions

static int Init ()
 
static XrdOfsTPCProgStart (XrdOfsTPCJob *jP, int &rc)
 

Detailed Description

Definition at line 40 of file XrdOfsTPCProg.hh.

Constructor & Destructor Documentation

◆ XrdOfsTPCProg()

XrdOfsTPCProg::XrdOfsTPCProg ( XrdOfsTPCProg Prev,
int  num,
int  errMon 
)

Definition at line 122 of file XrdOfsTPCProg.cc.

123  : Prog(&OfsEroute, errMon),
124  JobStream(&OfsEroute),
125  Next(Prev), Job(0)
126  {snprintf(Pname, sizeof(Pname), "TPC job %d: ", num);
127  Pname[sizeof(Pname)-1] = 0;
128  }
XrdSysError OfsEroute

Referenced by Init().

+ Here is the caller graph for this function:

◆ ~XrdOfsTPCProg()

XrdOfsTPCProg::~XrdOfsTPCProg ( )
inline

Definition at line 57 of file XrdOfsTPCProg.hh.

57 {}

Member Function Documentation

◆ Cancel()

void XrdOfsTPCProg::Cancel ( )
inline

Definition at line 44 of file XrdOfsTPCProg.hh.

44 {JobStream.Drain();}

References XrdOucStream::Drain().

Referenced by XrdOfsTPCJob::Del().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Init()

int XrdOfsTPCProg::Init ( )
static

Definition at line 167 of file XrdOfsTPCProg.cc.

168 {
169  int n;
170 
171 // Allocate copy program objects
172 //
173  for (n = 0; n < Cfg.xfrMax; n++)
174  {pgmIdle = new XrdOfsTPCProg(pgmIdle, n, Cfg.errMon);
175  if (pgmIdle->Prog.Setup(Cfg.XfrProg, &OfsEroute)) return 0;
176  }
177 
178 // All done
179 //
180  Cfg.doEcho = Cfg.doEcho || GTRACE(debug);
181  return 1;
182 }
#define GTRACE(act)
Definition: XrdBwmTrace.hh:40
XrdOfsTPCProg(XrdOfsTPCProg *Prev, int num, int errMon)
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
Definition: XrdOucProg.cc:296

References XrdOfsTPCProg(), XrdNetPMarkConfig::Cfg, GTRACE, OfsEroute, and XrdOucProg::Setup().

Referenced by XrdOfsTPC::Start().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Run()

void XrdOfsTPCProg::Run ( )

Definition at line 188 of file XrdOfsTPCProg.cc.

189 {
190  XrdXrootdTpcMon::TpcInfo monInfo;
191  struct stat Stat;
192  const char *clID, *at;
193  char *questSrc, *questLfn, *questDst;
194  int rc;
195  bool isIPv4, doMon = Cfg.tpcMon != 0;
196  char clBuff[592];
197 
198 // Run the current job and indicate it's ending status and possibly getting a
199 // another job to run. Note "Job" will always be valid.
200 //
201 do{if (doMon)
202  {monInfo.Init();
203  gettimeofday(&monInfo.begT, 0);
204  }
205 
206  rc = Xeq(isIPv4);
207 
208  if (doMon)
209  {gettimeofday(&monInfo.endT, 0);
210  if ((questSrc = index(Job->Info.Key, '?'))) *questSrc = 0;
211  monInfo.srcURL = Job->Info.Key;
212  if ((questLfn = index(Job->Info.Lfn, '?'))) *questLfn = 0;
213  monInfo.dstURL = Job->Info.Lfn;
214  monInfo.endRC = rc;
215  if (Job->Info.Str) monInfo.strm = Job->Info.Str;
216  if (isIPv4) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
217 
218  clID = Job->Info.Org;
219  if (clID && (at = index(clID, '@')) && !index(at+1, '.'))
220  {const char *dName = XrdNetIdentity::Domain();
221  if (dName)
222  {snprintf(clBuff, sizeof(clBuff), "%s%s", clID, dName);
223  clID = clBuff;
224  }
225  }
226  monInfo.clID = clID;
227 
228  if ((questDst = index(Job->Info.Dst, '?'))) *questDst = 0;
229  if (!XrdOfsOss->Stat(Job->Info.Dst, &Stat)) monInfo.fSize = Stat.st_size;
230  if (questDst) *questDst = '?';
231  Cfg.tpcMon->Report(monInfo);
232  if (questLfn) *questLfn = '?';
233  if (questSrc) *questSrc = '?';
234  }
235 
236  Job = Job->Done(this, eRec, rc);
237 
238  } while(Job);
239 
240 // No more jobs to run. Place us on the idle queue. Upon return this thread
241 // will end.
242 //
243  pgmMutex.Lock();
244  Next = pgmIdle;
245  pgmIdle = this;
246  pgmMutex.UnLock();
247 }
struct stat Stat
Definition: XrdCks.cc:49
XrdOss * XrdOfsOss
Definition: XrdOfs.cc:163
int stat(const char *path, struct stat *buf)
static const char * Domain(const char **eText=0)
XrdOfsTPCJob * Done(XrdOfsTPCProg *pgmP, const char *eTxt, int rc)
int Xeq(bool &isIPv4)
XrdOfsTPCInfo Info
Definition: XrdOfsTPC.hh:109
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0

References XrdXrootdTpcMon::TpcInfo::begT, XrdNetPMarkConfig::Cfg, XrdXrootdTpcMon::TpcInfo::clID, XrdNetIdentity::Domain(), XrdOfsTPCJob::Done(), XrdOfsTPCInfo::Dst, XrdXrootdTpcMon::TpcInfo::dstURL, XrdXrootdTpcMon::TpcInfo::endRC, XrdXrootdTpcMon::TpcInfo::endT, XrdXrootdTpcMon::TpcInfo::fSize, XrdOfsTPC::Info, XrdXrootdTpcMon::TpcInfo::Init(), XrdXrootdTpcMon::TpcInfo::isIPv4, XrdOfsTPCInfo::Key, XrdOfsTPCInfo::Lfn, XrdSysMutex::Lock(), XrdXrootdTpcMon::TpcInfo::opts, XrdOfsTPCInfo::Org, XrdXrootdTpcMon::TpcInfo::srcURL, Stat, stat(), XrdOss::Stat(), XrdOfsTPCInfo::Str, XrdXrootdTpcMon::TpcInfo::strm, XrdSysMutex::UnLock(), Xeq(), and XrdOfsOss.

Referenced by XrdOfsTPCProgRun().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Start()

XrdOfsTPCProg * XrdOfsTPCProg::Start ( XrdOfsTPCJob jP,
int &  rc 
)
static

Definition at line 253 of file XrdOfsTPCProg.cc.

254 {
255  XrdSysMutexHelper pgmMon(&pgmMutex);
256  XrdOfsTPCProg *pgmP;
257  pthread_t tid;
258 
259 // Get a new program object, if none left, tell the caller to try later
260 //
261  if (!(pgmP = pgmIdle)) {rc = 0; return 0;}
262  pgmP->Job = jP;
263 
264 // Start a thread to run the job
265 //
266  if ((rc = XrdSysThread::Run(&tid, XrdOfsTPCProgRun, (void *)pgmP, 0,
267  "TPC job")))
268  return 0;
269 
270 // We are all set, return the program being used
271 //
272  pgmIdle = pgmP->Next;
273  return pgmP;
274 }
void * XrdOfsTPCProgRun(void *pp)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)

References XrdSysThread::Run(), and XrdOfsTPCProgRun().

Referenced by XrdOfsTPCJob::Sync().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Xeq()

int XrdOfsTPCProg::Xeq ( bool &  isIPv4)

Definition at line 280 of file XrdOfsTPCProg.cc.

281 {
282  EPNAME("Xeq");
283  credFile cFile(Job);
284  const char *Args[6], *eVec[6], **envArg;
285  char *lP, *Colon, *cksVal, sBuff[8], *tident = Job->Info.Org;
286  char *Quest = index(Job->Info.Key, '?');
287  int i, rc, aNum = 0;
288 
289 // If we have credentials, write them out to a file
290 //
291  if (cFile.Path && (rc = ExportCreds(cFile.Path)))
292  {strcpy(eRec, "Copy failed; unable to pass credentials.");
293  return rc;
294  }
295 
296 // Echo out what we are doing if so desired
297 //
298  if (Cfg.doEcho)
299  {if (Quest) *Quest = 0;
300  OfsEroute.Say(Pname,tident," copying ",Job->Info.Key," to ",Job->Info.Dst);
301  if (Quest) *Quest = '?';
302  }
303 
304 // Determine checksum option
305 //
306  cksVal = (Job->Info.Cks ? Job->Info.Cks : Cfg.cksType);
307  if (cksVal)
308  {Args[aNum++] = "-C";
309  Args[aNum++] = cksVal;
310  }
311 
312 // Set streams option if need be
313 //
314  if (Job->Info.Str)
315  {sprintf(sBuff, "%d", static_cast<int>(Job->Info.Str));
316  Args[aNum++] = "-S";
317  Args[aNum++] = sBuff;
318  }
319 
320 // Set remaining arguments
321 //
322  Args[aNum++] = Job->Info.Key;
323  Args[aNum++] = Job->Info.Dst;
324 
325 // Always export the trace identifier of the original issuer
326 //
327  char tidBuff[512];
328  snprintf(tidBuff, sizeof(tidBuff), "XRD_TIDENT=%s", tident);
329  eVec[0] = tidBuff;
330  envArg = eVec;
331  i = 1;
332 
333 // Export source protocol if present
334 //
335  char sprBuff[128];
336  if (Job->Info.Spr)
337  {snprintf(sprBuff, sizeof(sprBuff), "XRDTPC_SPROT=%s", Job->Info.Spr);
338  eVec[i++] = sprBuff;
339  }
340 
341 // Export target protocol if present
342 //
343  char tprBuff[128];
344  if (Job->Info.Tpr)
345  {snprintf(tprBuff, sizeof(tprBuff), "XRDTPC_TPROT=%s", Job->Info.Tpr);
346  eVec[i++] = tprBuff;
347  }
348 
349 // If we need to reproxy, export the path
350 //
351  char rpxBuff[1024];
352  if (Job->Info.Rpx)
353  {snprintf(rpxBuff, sizeof(rpxBuff), "XRD_CPTARGET=%s", Job->Info.Rpx);
354  eVec[i++] = rpxBuff;
355  }
356 
357 // Determine if credentials are being passed, If so, pass where it is.
358 //
359  if (cFile.Path) eVec[i++] = cFile.pEnv;
360  eVec[i] = 0;
361 
362 // Start the job.
363 //
364  if ((rc = Prog.Run(&JobStream, Args, aNum, envArg)))
365  {strcpy(eRec, "Copy failed; unable to start job.");
366  OfsEroute.Emsg("TPC", Job->Info.Org, Job->Info.Lfn, eRec);
367  return rc;
368  }
369 
370 // Now we drain the output looking for an end of run line. This line should
371 // be printed as an error message should the copy fail.
372 //
373  *eRec = 0;
374  isIPv4 = false;
375  while((lP = JobStream.GetLine()))
376  {if (!strcmp(lP, "!-!IPv4")) isIPv4 = true;
377  if ((Colon = index(lP, ':')) && *(Colon+1) == ' ')
378  {strncpy(eRec, Colon+2, sizeof(eRec)-1);
379  eRec[sizeof(eRec)-1] = 0;
380  }
381  if (Cfg.doEcho && *lP) OfsEroute.Say(Pname, lP);
382  }
383 
384 // The job has completed. So, we must get the ending status.
385 //
386  if ((rc = Prog.RunDone(JobStream)) < 0) rc = -rc;
387  DEBUG(Pname <<"ended with rc=" <<rc);
388 
389 // Check if we should generate a message
390 //
391  if (rc && !(*eRec)) sprintf(eRec, "Copy failed with return code %d", rc);
392 
393 // Log failures and optionally remove the file (Info would do that as well
394 // but much later on, so we do it now).
395 //
396  if (rc)
397  {OfsEroute.Emsg("TPC", Job->Info.Org, Job->Info.Lfn, eRec);
398  if (Cfg.autoRM) XrdOfsOss->Unlink(Job->Info.Lfn);
399  } else Job->Info.Success();
400 
401 // All done
402 //
403  return rc;
404 }
#define tident
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
int RunDone(XrdOucStream &cmd) const
Definition: XrdOucProg.cc:257
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
Definition: XrdOucProg.cc:108
char * GetLine()
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141

References XrdNetPMarkConfig::Cfg, XrdOfsTPCInfo::Cks, DEBUG, XrdOfsTPCInfo::Dst, XrdSysError::Emsg(), EPNAME, XrdOucStream::GetLine(), XrdOfsTPC::Info, XrdOfsTPCInfo::Key, XrdOfsTPCInfo::Lfn, OfsEroute, XrdOfsTPCInfo::Org, XrdOfsTPCInfo::Rpx, XrdOucProg::Run(), XrdOucProg::RunDone(), XrdSysError::Say(), XrdOfsTPCInfo::Spr, XrdOfsTPCInfo::Str, XrdOfsTPCInfo::Success(), tident, XrdOfsTPCInfo::Tpr, XrdOss::Unlink(), and XrdOfsOss.

Referenced by Run().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: