XRootD
XrdOfsTPCProg.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O f s T P C P r o g . c c */
4 /* */
5 /* (c) 2011 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <strings.h>
33 #include <unistd.h>
34 #include <sys/stat.h>
35 
36 #include "XrdNet/XrdNetIdentity.hh"
37 #include "XrdOfs/XrdOfsTPC.hh"
39 #include "XrdOfs/XrdOfsTPCJob.hh"
40 #include "XrdOfs/XrdOfsTPCProg.hh"
41 #include "XrdOfs/XrdOfsTrace.hh"
42 #include "XrdOss/XrdOss.hh"
43 #include "XrdOuc/XrdOucCallBack.hh"
44 #include "XrdOuc/XrdOucProg.hh"
45 #include "XrdSys/XrdSysError.hh"
46 #include "XrdSys/XrdSysFD.hh"
47 #include "XrdSys/XrdSysHeaders.hh"
48 
50 
51 /******************************************************************************/
52 /* G l o b a l O b j e c t s */
53 /******************************************************************************/
54 
55 extern XrdSysError OfsEroute;
56 extern XrdSysTrace OfsTrace;
57 extern XrdOss *XrdOfsOss;
58 
59 namespace XrdOfsTPCParms
60 {
61 extern XrdOfsTPCConfig Cfg;
62 }
63 
64 using namespace XrdOfsTPCParms;
65 
66 /******************************************************************************/
67 /* S t a t i c V a r i a b l e s */
68 /******************************************************************************/
69 
70 XrdSysMutex XrdOfsTPCProg::pgmMutex;
71 XrdOfsTPCProg *XrdOfsTPCProg::pgmIdle = 0;
72 
73 /******************************************************************************/
74 /* E x t e r n a l L i n k a g e s */
75 /******************************************************************************/
76 
77 void *XrdOfsTPCProgRun(void *pp)
78 {
79  XrdOfsTPCProg *theProg = (XrdOfsTPCProg *)pp;
80  theProg->Run();
81  return (void *)0;
82 }
83 
84 /******************************************************************************/
85 /* L o c a l C l a s s e s */
86 /******************************************************************************/
87 
88 namespace
89 {
90 class credFile
91 {
92 public:
93 
94 char *Path;
95 char pEnv[MAXPATHLEN+65];
96 
97  credFile(XrdOfsTPCJob *jP)
98  {if (jP->Info.Csz > 0 && jP->Info.Crd && jP->Info.Env)
99  {int n;
100  csMutex.Lock(); n = cSeq++; csMutex.UnLock();
101  snprintf(pEnv, sizeof(pEnv), "%s=%s%s#%d.creds",
102  jP->Info.Env, jP->credPath(), jP->Info.Org, n);
103  Path = index(pEnv,'=')+1;
104  } else Path = 0;
105  }
106 
107  ~credFile() {if (Path) unlink(Path);}
108 
109 private:
110 static XrdSysMutex csMutex;
111 static int cSeq;
112 };
113 
114 XrdSysMutex credFile::csMutex;
115 int credFile::cSeq = 0;
116 }
117 
118 /******************************************************************************/
119 /* C o n s t r u c t o r */
120 /******************************************************************************/
121 
122 XrdOfsTPCProg::XrdOfsTPCProg(XrdOfsTPCProg *Prev, int num, int errMon)
123  : Prog(&OfsEroute, errMon),
124  JobStream(&OfsEroute),
125  Next(Prev), Job(0)
126  {snprintf(Pname, sizeof(Pname), "TPC job %d: ", num);
127  Pname[sizeof(Pname)-1] = 0;
128  }
129 
130 /******************************************************************************/
131 /* E x p o r t C r e d s */
132 /******************************************************************************/
133 
134 int XrdOfsTPCProg::ExportCreds(const char *path)
135 {
136 static const int oOpts = (O_CREAT | O_TRUNC | O_WRONLY);
137 static const mode_t oMode = (S_IRUSR | S_IWUSR);
138 
139 int fd, rc;
140 
141 // Open the file as if it were new
142 //
143  fd = XrdSysFD_Open(path, oOpts, oMode);
144  if (fd < 0)
145  {rc = errno;
146  OfsEroute.Emsg("TPC", rc, "create credentials file", path);
147  return -rc;
148  }
149 
150 // Write out the credentials
151 //
152  if (write(fd, Job->Info.Crd, Job->Info.Csz) < 0)
153  {rc = errno;
154  OfsEroute.Emsg("TPC", rc, "write credentials file", path);
155  } else rc = 0;
156 
157 // Close the file and return (we ignore close errors)
158 //
159  close(fd);
160  return rc;
161 }
162 
163 /******************************************************************************/
164 /* I n i t */
165 /******************************************************************************/
166 
168 {
169  int n;
170 
171 // Allocate copy program objects
172 //
173  for (n = 0; n < Cfg.xfrMax; n++)
174  {pgmIdle = new XrdOfsTPCProg(pgmIdle, n, Cfg.errMon);
175  if (pgmIdle->Prog.Setup(Cfg.XfrProg, &OfsEroute)) return 0;
176  }
177 
178 // All done
179 //
180  Cfg.doEcho = Cfg.doEcho || GTRACE(debug);
181  return 1;
182 }
183 
184 /******************************************************************************/
185 /* R u n */
186 /******************************************************************************/
187 
189 {
190  XrdXrootdTpcMon::TpcInfo monInfo;
191  struct stat Stat;
192  const char *clID, *at;
193  char *questSrc, *questLfn, *questDst;
194  int rc;
195  bool isIPv4, doMon = Cfg.tpcMon != 0;
196  char clBuff[592];
197 
198 // Run the current job and indicate it's ending status and possibly getting a
199 // another job to run. Note "Job" will always be valid.
200 //
201 do{if (doMon)
202  {monInfo.Init();
203  gettimeofday(&monInfo.begT, 0);
204  }
205 
206  rc = Xeq(isIPv4);
207 
208  if (doMon)
209  {gettimeofday(&monInfo.endT, 0);
210  if ((questSrc = index(Job->Info.Key, '?'))) *questSrc = 0;
211  monInfo.srcURL = Job->Info.Key;
212  if ((questLfn = index(Job->Info.Lfn, '?'))) *questLfn = 0;
213  monInfo.dstURL = Job->Info.Lfn;
214  monInfo.endRC = rc;
215  if (Job->Info.Str) monInfo.strm = Job->Info.Str;
216  if (isIPv4) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
217 
218  clID = Job->Info.Org;
219  if (clID && (at = index(clID, '@')) && !index(at+1, '.'))
220  {const char *dName = XrdNetIdentity::Domain();
221  if (dName)
222  {snprintf(clBuff, sizeof(clBuff), "%s%s", clID, dName);
223  clID = clBuff;
224  }
225  }
226  monInfo.clID = clID;
227 
228  if ((questDst = index(Job->Info.Dst, '?'))) *questDst = 0;
229  if (!XrdOfsOss->Stat(Job->Info.Dst, &Stat)) monInfo.fSize = Stat.st_size;
230  if (questDst) *questDst = '?';
231  Cfg.tpcMon->Report(monInfo);
232  if (questLfn) *questLfn = '?';
233  if (questSrc) *questSrc = '?';
234  }
235 
236  Job = Job->Done(this, eRec, rc);
237 
238  } while(Job);
239 
240 // No more jobs to run. Place us on the idle queue. Upon return this thread
241 // will end.
242 //
243  pgmMutex.Lock();
244  Next = pgmIdle;
245  pgmIdle = this;
246  pgmMutex.UnLock();
247 }
248 
249 /******************************************************************************/
250 /* S t a r t */
251 /******************************************************************************/
252 
254 {
255  XrdSysMutexHelper pgmMon(&pgmMutex);
256  XrdOfsTPCProg *pgmP;
257  pthread_t tid;
258 
259 // Get a new program object, if none left, tell the caller to try later
260 //
261  if (!(pgmP = pgmIdle)) {rc = 0; return 0;}
262  pgmP->Job = jP;
263 
264 // Start a thread to run the job
265 //
266  if ((rc = XrdSysThread::Run(&tid, XrdOfsTPCProgRun, (void *)pgmP, 0,
267  "TPC job")))
268  return 0;
269 
270 // We are all set, return the program being used
271 //
272  pgmIdle = pgmP->Next;
273  return pgmP;
274 }
275 
276 /******************************************************************************/
277 /* X e q */
278 /******************************************************************************/
279 
280 int XrdOfsTPCProg::Xeq(bool &isIPv4)
281 {
282  EPNAME("Xeq");
283  credFile cFile(Job);
284  const char *Args[6], *eVec[6], **envArg;
285  char *lP, *Colon, *cksVal, sBuff[8], *tident = Job->Info.Org;
286  char *Quest = index(Job->Info.Key, '?');
287  int i, rc, aNum = 0;
288 
289 // If we have credentials, write them out to a file
290 //
291  if (cFile.Path && (rc = ExportCreds(cFile.Path)))
292  {strcpy(eRec, "Copy failed; unable to pass credentials.");
293  return rc;
294  }
295 
296 // Echo out what we are doing if so desired
297 //
298  if (Cfg.doEcho)
299  {if (Quest) *Quest = 0;
300  OfsEroute.Say(Pname,tident," copying ",Job->Info.Key," to ",Job->Info.Dst);
301  if (Quest) *Quest = '?';
302  }
303 
304 // Determine checksum option
305 //
306  cksVal = (Job->Info.Cks ? Job->Info.Cks : Cfg.cksType);
307  if (cksVal)
308  {Args[aNum++] = "-C";
309  Args[aNum++] = cksVal;
310  }
311 
312 // Set streams option if need be
313 //
314  if (Job->Info.Str)
315  {sprintf(sBuff, "%d", static_cast<int>(Job->Info.Str));
316  Args[aNum++] = "-S";
317  Args[aNum++] = sBuff;
318  }
319 
320 // Set remaining arguments
321 //
322  Args[aNum++] = Job->Info.Key;
323  Args[aNum++] = Job->Info.Dst;
324 
325 // Always export the trace identifier of the original issuer
326 //
327  char tidBuff[512];
328  snprintf(tidBuff, sizeof(tidBuff), "XRD_TIDENT=%s", tident);
329  eVec[0] = tidBuff;
330  envArg = eVec;
331  i = 1;
332 
333 // Export source protocol if present
334 //
335  char sprBuff[128];
336  if (Job->Info.Spr)
337  {snprintf(sprBuff, sizeof(sprBuff), "XRDTPC_SPROT=%s", Job->Info.Spr);
338  eVec[i++] = sprBuff;
339  }
340 
341 // Export target protocol if present
342 //
343  char tprBuff[128];
344  if (Job->Info.Tpr)
345  {snprintf(tprBuff, sizeof(tprBuff), "XRDTPC_TPROT=%s", Job->Info.Tpr);
346  eVec[i++] = tprBuff;
347  }
348 
349 // If we need to reproxy, export the path
350 //
351  char rpxBuff[1024];
352  if (Job->Info.Rpx)
353  {snprintf(rpxBuff, sizeof(rpxBuff), "XRD_CPTARGET=%s", Job->Info.Rpx);
354  eVec[i++] = rpxBuff;
355  }
356 
357 // Determine if credentials are being passed, If so, pass where it is.
358 //
359  if (cFile.Path) eVec[i++] = cFile.pEnv;
360  eVec[i] = 0;
361 
362 // Start the job.
363 //
364  if ((rc = Prog.Run(&JobStream, Args, aNum, envArg)))
365  {strcpy(eRec, "Copy failed; unable to start job.");
366  OfsEroute.Emsg("TPC", Job->Info.Org, Job->Info.Lfn, eRec);
367  return rc;
368  }
369 
370 // Now we drain the output looking for an end of run line. This line should
371 // be printed as an error message should the copy fail.
372 //
373  *eRec = 0;
374  isIPv4 = false;
375  while((lP = JobStream.GetLine()))
376  {if (!strcmp(lP, "!-!IPv4")) isIPv4 = true;
377  if ((Colon = index(lP, ':')) && *(Colon+1) == ' ')
378  {strncpy(eRec, Colon+2, sizeof(eRec)-1);
379  eRec[sizeof(eRec)-1] = 0;
380  }
381  if (Cfg.doEcho && *lP) OfsEroute.Say(Pname, lP);
382  }
383 
384 // The job has completed. So, we must get the ending status.
385 //
386  if ((rc = Prog.RunDone(JobStream)) < 0) rc = -rc;
387  DEBUG(Pname <<"ended with rc=" <<rc);
388 
389 // Check if we should generate a message
390 //
391  if (rc && !(*eRec)) sprintf(eRec, "Copy failed with return code %d", rc);
392 
393 // Log failures and optionally remove the file (Info would do that as well
394 // but much later on, so we do it now).
395 //
396  if (rc)
397  {OfsEroute.Emsg("TPC", Job->Info.Org, Job->Info.Lfn, eRec);
398  if (Cfg.autoRM) XrdOfsOss->Unlink(Job->Info.Lfn);
399  } else Job->Info.Success();
400 
401 // All done
402 //
403  return rc;
404 }
#define tident
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define GTRACE(act)
Definition: XrdBwmTrace.hh:40
struct stat Stat
Definition: XrdCks.cc:49
XrdOss * XrdOfsOss
Definition: XrdOfs.cc:163
XrdSysTrace OfsTrace
XrdSysError OfsEroute
void * XrdOfsTPCProgRun(void *pp)
int stat(const char *path, struct stat *buf)
int unlink(const char *path)
ssize_t write(int fildes, const void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:43
XrdOucString Path
static const char * Domain(const char **eText=0)
const char * Env
XrdOfsTPCJob * Done(XrdOfsTPCProg *pgmP, const char *eTxt, int rc)
static XrdOfsTPCProg * Start(XrdOfsTPCJob *jP, int &rc)
int Xeq(bool &isIPv4)
XrdOfsTPCProg(XrdOfsTPCProg *Prev, int num, int errMon)
static int Init()
XrdOfsTPCInfo Info
Definition: XrdOfsTPC.hh:109
static const char * credPath()
Definition: XrdOfsTPC.hh:77
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
int RunDone(XrdOucStream &cmd) const
Definition: XrdOucProg.cc:257
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
Definition: XrdOucProg.cc:108
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
Definition: XrdOucProg.cc:296
char * GetLine()
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
XrdOfsTPCConfig Cfg
Definition: XrdOfsTPC.cc:85