XRootD
XrdFrmTransfer.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d F r m T r a n s f e r . c c */
4 /* */
5 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstring>
32 #include <strings.h>
33 #include <cstdio>
34 #include <fcntl.h>
35 #include <unistd.h>
36 #include <utime.h>
37 #include <sys/param.h>
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 
41 #include "XrdFrc/XrdFrcCID.hh"
42 #include "XrdFrc/XrdFrcRequest.hh"
43 #include "XrdFrc/XrdFrcTrace.hh"
44 #include "XrdFrc/XrdFrcXAttr.hh"
45 #include "XrdFrm/XrdFrmCns.hh"
46 #include "XrdFrm/XrdFrmConfig.hh"
47 #include "XrdFrm/XrdFrmMonitor.hh"
48 #include "XrdFrm/XrdFrmTransfer.hh"
49 #include "XrdFrm/XrdFrmXfrJob.hh"
50 #include "XrdFrm/XrdFrmXfrQueue.hh"
52 #include "XrdOss/XrdOss.hh"
53 #include "XrdOuc/XrdOucEnv.hh"
54 #include "XrdOuc/XrdOucMsubs.hh"
55 #include "XrdOuc/XrdOucProg.hh"
56 #include "XrdOuc/XrdOucSxeq.hh"
57 #include "XrdOuc/XrdOucUtils.hh"
58 #include "XrdOuc/XrdOucXAttr.hh"
59 #include "XrdSys/XrdSysError.hh"
60 #include "XrdSys/XrdSysFD.hh"
61 #include "XrdSys/XrdSysPlatform.hh"
62 
63 using namespace XrdFrc;
64 using namespace XrdFrm;
65 
66 /******************************************************************************/
67 /* L o c a l C l a s s e s */
68 /******************************************************************************/
69 
71 {
75 char *theSrc;
76 char *theDst;
77 char *theINS;
78 char theMDP[8];
79 
81  : theEnv(Env), theCmd(0), theVec(0), theSrc(0),
82  theDst(0), theINS(0)
83  {theMDP[0] = '0'; theMDP[1] = 0;}
85 };
86 
88 { struct stat *Stat;
89  int lkfd;
90  int lkfx;
91 
92  XrdFrmTranChk(struct stat *sP) : Stat(sP), lkfd(-1), lkfx(0) {}
93  ~XrdFrmTranChk() {if (lkfd >= 0) close(lkfd);}
94 };
95 
96 /******************************************************************************/
97 /* S t a t i c s */
98 /******************************************************************************/
99 
100 XrdSysMutex XrdFrmTransfer::pMutex;
101 XrdOucHash<char> XrdFrmTransfer::pTab;
102 
103 /******************************************************************************/
104 /* C o n s t r u c t o r */
105 /******************************************************************************/
106 
108 {
109  int i;
110 
111 // Construct program objects
112 //
113  for (i = 0; i < 4; i++)
114  xfrCmd[i] = (Config.xfrCmd[i].theVec ? new XrdOucProg(&Say) : 0);
115 }
116 
117 /******************************************************************************/
118 /* Public: c h e c k F F */
119 /******************************************************************************/
120 
121 const char *XrdFrmTransfer::checkFF(const char *Path)
122 {
123  EPNAME("checkFF");
124  struct stat buf;
125 
126 // Check for a fail file
127 //
128  if (!stat(Path, &buf))
129  {if (buf.st_ctime+Config.FailHold >= time(0))
130  return "request previously failed";
131  if (Config.Test) {DEBUG("would have removed '" <<Path <<"'");}
132  else {unlink(Path);
133  DEBUG("removed '" <<Path <<"'");
134  }
135  }
136 
137 // Return all is well
138 //
139  return 0;
140 }
141 
142 /******************************************************************************/
143 /* F e t c h */
144 /******************************************************************************/
145 
146 const char *XrdFrmTransfer::Fetch()
147 {
148  EPNAME("Fetch");
149  static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
150  static const int crOpts = (O_CREAT|O_TRUNC)<<8|XRDOSS_mkpath;
151 
152  XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
153  XrdFrmTranArg cmdArg(&myEnv);
154  struct stat pfnStat;
155  time_t xfrET;
156  const char *eTxt, *retMsg = 0;
157  char lfnpath[MAXPATHLEN+1024+512+8], *Lfn, Rfn[MAXPATHLEN+256], *theSrc;
158  char pdBuff[1024];
159  int iXfr, pdSZ, lfnEnd, rc, isURL = 0, doRM = 0;
160  long long fSize = 0;
161 
162 // The remote source is either the url-lfn or a translated lfn
163 //
164  if ((isURL = xfrP->reqData.LFO)) theSrc = xfrP->reqData.LFN;
165  else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
166  return "lfn2rfn failed";
167  theSrc = Rfn;
168  isURL = (*Rfn != '/');
169  }
170 
171 // Check if we can actually handle this transfer
172 //
173  if (isURL)
174  {if (xfrCmd[2]) iXfr = 2;
175  else return "url copies not configured";
176  } else {
177  if (xfrCmd[0]) iXfr = 0;
178  else return "non-url copies not configured";
179  }
180 
181 // Check for a fail file
182 //
183  if ((eTxt = ffCheck())) return eTxt;
184 
185 // Check if the file exists
186 //
187  Lfn = (xfrP->reqData.LFN)+xfrP->reqData.LFO;
188  if (!Config.Stat(Lfn, xfrP->PFN, &pfnStat))
189  {DEBUG(xfrP->PFN <<" exists; not fetched.");
190  return 0;
191  }
192 
193 // Construct the file name to which to we originally transfer the data. This is
194 // the lfn if we do not pre-allocate files and "lfn.anew" otherwise.
195 //
196  lfnEnd = strlen(Lfn);
197  strlcpy(lfnpath, Lfn, sizeof(lfnpath)-8);
198  if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
199  {strcpy(&lfnpath[lfnEnd], ".anew");
200  strcpy(&xfrP->PFN[xfrP->pfnEnd], ".anew");
201  }
202 
203 // Setup the command
204 //
205  cmdArg.theCmd = xfrCmd[iXfr];
206  cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
207  cmdArg.theSrc = theSrc;
208  cmdArg.theDst = xfrP->PFN;
209  cmdArg.theINS = xfrP->reqData.iName;
210  if (!SetupCmd(&cmdArg)) return "incoming transfer setup failed";
211 
212 // If the copycmd needs a placeholder in the filesystem for this transfer, we
213 // must create one. We first remove any existing "anew" file because we will
214 // over-write it. The create process will create a lock file if need be.
215 // However, we can ignore it as we are the only ones actually using it.
216 //
217  if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
218  {Config.ossFS->Unlink(lfnpath);
219  rc = Config.ossFS->Create(xfrP->reqData.User,lfnpath,fMode,myEnv,crOpts);
220  if (rc)
221  {Say.Emsg("Fetch", rc, "create placeholder for", lfnpath);
222  return "create failed";
223  }
224  doRM = 1;
225  } else doRM = Config.xfrCmd[iXfr].Opts & Config.cmdRME;
226 
227 // Setup program monitoring data
228 //
229  pdSZ = (Config.xfrCmd[iXfr].Opts & Config.cmdXPD ? sizeof(pdBuff) : 0);
230 
231 // Now run the command to get the file and make sure the file is there
232 // If it is, make sure that if a lock file exists its date/time is greater than
233 // the file we just fetched; then rename it to be the correct name.
234 //
235  xfrET = time(0);
236  if (!(rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
237  {if ((rc = Config.Stat(lfnpath, xfrP->PFN, &pfnStat)))
238  {Say.Emsg("Fetch", lfnpath, "fetched but not resident!"); fSize = 0;}
239  else {fSize = pfnStat.st_size;
240  if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
241  FetchDone(lfnpath, pfnStat, rc);
242  }
243  }
244 
245 // Clean up if we failed otherwise tell the cmsd that we have a new file. Upon
246 // failure we issue a a remove as we don't want the temp file to exist.
247 //
248  xfrP->PFN[xfrP->pfnEnd] = '\0';
249  if (rc)
250  {if (doRM) Config.ossFS->Unlink(lfnpath);
251  ffMake(rc == -2);
252  if (rc == -2) {xfrP->RetCode = 2; retMsg = "file not found";}
253  else retMsg = "fetch failed";
254  } else if (Config.cmsPath) Config.cmsPath->Have(Lfn);
255 
256 // We completed, see if we need to do statistics
257 //
258  if ((Config.xfrCmd[iXfr].Opts & Config.cmdStats) || XrdFrmMonitor::monSTAGE
259  || (Trace.What & TRACE_Debug))
260  {time_t eNow = time(0);
261  int inqT, xfrT;
262  inqT = static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
263  if ((xfrT = static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
264  if (((Config.xfrCmd[iXfr].Opts & Config.cmdStats)
265  || (Trace.What & TRACE_Debug)) && !retMsg)
266  {char sbuff[80];
267  sprintf(sbuff, "Got: %lld qt: %d xt: %d up: ", fSize, inqT, xfrT);
268  lfnpath[lfnEnd] = '\0';
269  Say.Say(0, sbuff, xfrP->reqData.User, " ", lfnpath);
270  }
272  {if (rc < 0) rc = -rc;
273  snprintf(lfnpath+lfnEnd, sizeof(lfnpath)-lfnEnd-1,
274  "\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
275  static_cast<long long>(eNow), fSize, inqT, xfrT,
276  xfrP->Act, rc, (pdSZ ? "&pd=" : ""), (pdSZ ? pdBuff : ""));
277  XrdFrmMonitor::Map(XROOTD_MON_MAPSTAG,xfrP->reqData.User,lfnpath);
278  }
279  }
280 
281 // All done
282 //
283  return retMsg;
284 }
285 
286 /******************************************************************************/
287 /* F e t c h D o n e */
288 /******************************************************************************/
289 
290 const char *XrdFrmTransfer::FetchDone(char *lfnpath, struct stat &Stat, int &rc)
291 {
292 
293 // If we are running in new mode, update file attributes
294 //
295  rc = 0;
296  if (Config.runNew && Config.NeedsCTA(lfnpath))
298  cpyInfo.Attr.cpyTime = static_cast<long long>(Stat.st_mtime);
299  if ((rc = cpyInfo.Set(xfrP->PFN)))
300  Say.Emsg("Fetch", rc, "set copy time xattr on", xfrP->PFN);
301  }
302 
303 // Check for a lock file and if we have one, reset it's time or delete it
304 //
305  if (Config.runOld && Config.NeedsCTA(lfnpath))
306  {struct stat lkfStat;
307  strcpy(&xfrP->PFN[xfrP->pfnEnd+5], ".lock");
308  if (!stat(xfrP->PFN, &lkfStat))
309  {if (Config.runNew && !rc) unlink(xfrP->PFN);
310  else {struct utimbuf tbuff;
311  tbuff.actime = tbuff.modtime = Stat.st_mtime;
312  if ((rc = utime(xfrP->PFN, &tbuff)))
313  Say.Emsg("Fetch", rc, "set utime on", xfrP->PFN);
314  }
315  }
316  }
317 
318 // Now rename the lfn to be what it needs to be in the end
319 //
320  if (!rc && (rc=Config.ossFS->Rename(lfnpath,xfrP->reqData.LFN)))
321  Say.Emsg("Fetch", rc, "rename", lfnpath);
322  else XrdFrmCns::Add(xfrP->reqData.User, xfrP->reqData.LFN,
323  Stat.st_size, Stat.st_mode);
324 
325 // Done
326 //
327  return (rc ? "Failed" : 0);
328 }
329 
330 /******************************************************************************/
331 /* Private: f f C h e c k */
332 /******************************************************************************/
333 
334 const char *XrdFrmTransfer::ffCheck()
335 {
336  const char *eTxt;
337 
338 // Generate proper fail file path and check if it exists
339 //
340  if (Config.xfrFdir)
341  {char ffPath[MAXPATHLEN+8];
342  if (Config.xfrFdln+xfrP->pfnEnd+5 >= int(sizeof(ffPath))) return 0;
343  strcpy(ffPath, Config.xfrFdir);
344  strcpy(ffPath+Config.xfrFdln, xfrP->PFN);
345  strcpy(ffPath+Config.xfrFdln+xfrP->pfnEnd, ".fail");
346  eTxt = checkFF(ffPath);
347  } else {
348  strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
349  eTxt = checkFF(xfrP->PFN);
350  xfrP->PFN[xfrP->pfnEnd] = '\0';
351  }
352 
353 // Determine result
354 //
355  if (eTxt) xfrP->RetCode = 1;
356  return eTxt;
357 }
358 
359 /******************************************************************************/
360 /* Private: f f M a k e */
361 /******************************************************************************/
362 
363 void XrdFrmTransfer::ffMake(int nofile){
364  static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
365  static const mode_t dMode = S_IXUSR|S_IWGRP|S_IXGRP|S_IXOTH | fMode;
366  char ffPath[MAXPATHLEN+8], *ffP;
367  int myFD;
368 
369 // Generate fail file path
370 //
371  if (Config.xfrFdir)
372  {if (Config.xfrFdln+xfrP->pfnEnd+5 >= int(sizeof(ffPath))) return;
373  strcpy(ffPath, Config.xfrFdir);
374  strcpy(ffPath+Config.xfrFdln, xfrP->PFN);
375  strcpy(ffPath+Config.xfrFdln+xfrP->pfnEnd, ".fail");
376  XrdOucUtils::makePath(ffPath, dMode);
377  ffP = ffPath;
378  } else {
379  strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
380  ffP = xfrP->PFN;
381  }
382 
383 // Create a fail file and if failure is due to "file not found" set the mtime
384 // to 2 so that the oss layer picks up the same error in the future.
385 //
386  myFD = open(ffP, O_CREAT, fMode);
387  if (myFD >= 0)
388  {close(myFD);
389  if (nofile)
390  {struct utimbuf tbuff;
391  tbuff.actime = time(0); tbuff.modtime = 2;
392  utime(ffP, &tbuff);
393  }
394  }
395  if (!Config.xfrFdir) xfrP->PFN[xfrP->pfnEnd] = '\0';
396 }
397 
398 /******************************************************************************/
399 /* I n i t */
400 /******************************************************************************/
401 
402 void *InitXfer(void *parg)
403 { XrdFrmTransfer *xP = new XrdFrmTransfer;
404  if (parg) xP->Start(*(int *)parg);
405  return (void *)0;
406 }
407 
409 {
410  static int anyQ = XrdFrmXfrQueue::useAnyQ;
411  static int inpQ = XrdFrmXfrQueue::useInpQ;
412  static int outQ = XrdFrmXfrQueue::useOutQ;
413  void *qWant;
414  pthread_t tid;
415  int retc, n;
416 
417 // Initialize the cluster identification object first
418 //
419  CID.Init(Config.QPath);
420 
421 // Initialize the transfer queue first
422 //
423  if (!XrdFrmXfrQueue::Init()) return 0;
424 
425 // Start the required number of transfer threads. Note we can split these
426 // as dedicated in threads and dedicated out threads.
427 //
428  n = Config.xfrMax;
429  while(n--)
430  { if (Config.xfrMaxIn)
431  { qWant = (void *)&inpQ; Config.xfrMaxIn--;}
432  else if (Config.xfrMaxOt)
433  { qWant = (void *)&outQ; Config.xfrMaxOt--;}
434  else qWant = (void *)&anyQ;
435 
436  if ((retc = XrdSysThread::Run(&tid, InitXfer, qWant,
437  XRDSYSTHREAD_BIND, "transfer")))
438  {Say.Emsg("main", retc, "create xfr thread"); return 0;}
439  }
440 
441 // All done
442 //
443  return 1;
444 }
445 
446 /******************************************************************************/
447 /* Private: S e t u p C m d */
448 /******************************************************************************/
449 
450 int XrdFrmTransfer::SetupCmd(XrdFrmTranArg *argP)
451 {
452  char *pdata[XrdOucMsubs::maxElem + 2], *cP;
453  int pdlen[XrdOucMsubs::maxElem + 2], i, k, n;
454 
456  Info(xfrP->reqData.User, argP->theEnv, Config.the_N2N,
457  xfrP->reqData.LFN+xfrP->reqData.LFO,
458  argP->theSrc, xfrP->reqData.Prty,
459  xfrP->reqData.Options & XrdFrcRequest::makeRW?O_RDWR:O_RDONLY,
460  argP->theMDP, xfrP->reqData.ID, xfrP->PFN, argP->theDst);
461 
462 // We must establish the host, cluster and instance name if we have one
463 //
464  if (argP->theEnv)
465  {argP->theEnv->Put(SEC_HOST, Config.myName);
466  if (argP->theINS)
467  {CID.Get(argP->theINS, CMS_CID, argP->theEnv);
468  argP->theEnv->Put(XRD_INS, argP->theINS);
469  }
470  }
471 
472 // Substitute in the parameters
473 //
474  k = argP->theVec->Subs(Info, pdata, pdlen);
475 
476 // Catenate all of the arguments
477 //
478  *cmdBuff = '\0'; n = sizeof(cmdBuff) - 4; cP = cmdBuff;
479  for (i = 0; i < k; i++)
480  {n -= pdlen[i];
481  if (n < 0)
482  {Say.Emsg("Setup",E2BIG,"build command line for", xfrP->reqData.LFN);
483  return 0;
484  }
485  strcpy(cP, pdata[i]); cP += pdlen[i];
486  }
487 
488 // Now setup the command
489 //
490  return (argP->theCmd->Setup(cmdBuff, &Say) == 0);
491 }
492 
493 /******************************************************************************/
494 /* Public: S t a r t */
495 /******************************************************************************/
496 
497 void XrdFrmTransfer::Start(int ioqType)
498 {
499  EPNAME("Transfer"); // Wrong but looks better
500  const char *Msg;
501 
502 // Prime I/O queue selection
503 
504 // Endless loop looking for transfer jobs
505 //
506  while(1)
507  {xfrP = XrdFrmXfrQueue::Get(ioqType);
508 
509  DEBUG(xfrP->Type <<" starting " <<xfrP->reqData.LFN
510  <<" for " <<xfrP->reqData.User);
511 
512  Msg = (xfrP->qNum & XrdFrcRequest::outQ ? Throw() : Fetch());
513  if (Msg && !(xfrP->RetCode)) xfrP->RetCode = 1;
514  xfrP->PFN[xfrP->pfnEnd] = 0;
515 
516  if (xfrP->RetCode || Config.Verbose)
517  {char buff1[280], buff2[80];
518  sprintf(buff1, "%s for %s", xfrP->RetCode ? "failed" : "complete",
519  xfrP->reqData.User);
520  if (xfrP->RetCode == 0) *buff2 = 0;
521  else sprintf(buff2, "; %s", (Msg ? Msg : "reason unknown"));
522  Say.Say(0, xfrP->Type, buff1, xfrP->reqData.LFN,buff2);
523  } else {
524  DEBUG(xfrP->Type
525  <<(xfrP->RetCode ? " failed " : " complete ")
526  << xfrP->reqData.LFN <<" rc=" <<xfrP->RetCode
527  <<' ' <<(Msg ? Msg : ""));
528  }
529 
530  XrdFrmXfrQueue::Done(xfrP, Msg);
531  }
532 }
533 
534 /******************************************************************************/
535 /* Private: T r a c k D C */
536 /******************************************************************************/
537 
538 int XrdFrmTransfer::TrackDC(char *Lfn, char *Mdp, char *Rfn)
539 {
540  (void)Lfn;
541  char *FName, *Slash, *Slush = 0, *begRfn = Rfn;
542  int n = -1;
543 
544 // If this is a url, then don't back space into the url part
545 //
546  if (*Rfn != '/'
547  && (Slash = index(Rfn, '/')) && *(Slash+1) == '/'
548  && (Slash = index(Slash+2, '/')) && *(Slash+1) == '/') begRfn = Slash+1;
549 
550 // Discard the filename component
551 //
552  if (!(FName = rindex(begRfn, '/')) || FName == begRfn) return 0;
553  *FName = 0; Slash = Slush = FName;
554 
555 // Try to find the created directory path
556 //
557  pMutex.Lock();
558  while(Slash != begRfn && !pTab.Find(Rfn))
559  {do {Slash--;} while(Slash != begRfn && *Slash != '/');
560  if (Slush) *Slush = '/';
561  *Slash = 0; Slush = Slash;
562  n++;
563  }
564  pMutex.UnLock();
565 
566 // Compute offset of uncreated part
567 //
568  *Slash = '/';
569  if (Slash == begRfn) n = 0;
570  else n = (n >= 0 ? Slash - begRfn : FName - begRfn);
571  sprintf(Mdp, "%d", n);
572 
573 // All done
574 //
575  return n;
576 }
577 
578 /******************************************************************************/
579 
580 int XrdFrmTransfer::TrackDC(char *Rfn)
581 {
582  char *Slash;
583 
584 // Trim off the trailing end
585 //
586  if (!(Slash = rindex(Rfn, '/')) || Slash == Rfn) return 0;
587  *Slash = 0;
588 
589 // The path has been added, do insert it into the table of created paths
590 //
591  pMutex.Lock();
592  pTab.Add(Rfn, 0, 0, Hash_data_is_key);
593  pMutex.UnLock();
594  *Slash = '/';
595  return 0;
596 }
597 
598 /******************************************************************************/
599 /* T h r o w */
600 /******************************************************************************/
601 
602 const char *XrdFrmTransfer::Throw()
603 {
604  XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
605  XrdFrmTranArg cmdArg(&myEnv);
606  struct stat begStat, endStat;
607  XrdFrmTranChk Chk(&begStat);
608  time_t xfrET;
609  const char *eTxt, *retMsg = 0;
610  char Rfn[MAXPATHLEN+256], *lfnpath = xfrP->reqData.LFN, *theDest;
611  char pdBuff[1024];
612  int isMigr = xfrP->reqData.Options & XrdFrcRequest::Migrate;
613  int iXfr, isURL, pdSZ, rc, mDP = -1;
614 
615 // The remote source is either the url-lfn or a translated lfn
616 //
617  if ((isURL = xfrP->reqData.LFO)) theDest = xfrP->reqData.LFN;
618  else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
619  return "lfn2rfn failed";
620  theDest = Rfn;
621  isURL = (*Rfn != '/');
622  }
623 
624 // Check if we can actually handle this transfer
625 //
626  if (isURL)
627  {if (xfrCmd[3]) iXfr = 3;
628  else return "url copies not configured";
629  } else {
630  if (xfrCmd[1]) iXfr = 1;
631  else return "non-url copies not configured";
632  }
633 
634 // Check if the file exists (we only copy resident files)
635 //
636  if (Config.Stat(lfnpath+xfrP->reqData.LFO, xfrP->PFN, &begStat))
637  return (xfrP->reqFQ ? "file not found" : 0);
638 
639 // Check for a fail file
640 //
641  if ((eTxt = ffCheck())) return eTxt;
642 
643 // If this is an mss migration request, then recheck if the file can and
644 // need to be migrated based on the lock file. This also obtains a directory
645 // lock and lock file lock, as needed. If the file need not be migrated but
646 // should be purge, we will get a null string error.
647 //
648  if (isMigr && (eTxt = ThrowOK(&Chk)))
649  {if (*eTxt) return eTxt;
650  if (!(xfrP->reqData.Options & XrdFrcRequest::Purge)) return "logic error";
651  Throwaway();
652  return 0;
653  }
654 
655 // Setup the command, including directory tracking, as needed
656 //
657  cmdArg.theCmd = xfrCmd[iXfr];
658  cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
659  cmdArg.theDst = theDest;
660  cmdArg.theSrc = xfrP->PFN;
661  cmdArg.theINS = xfrP->reqData.iName;
662  if (Config.xfrCmd[iXfr].Opts & Config.cmdMDP)
663  mDP = TrackDC(lfnpath+xfrP->reqData.LFO, cmdArg.theMDP, Rfn);
664  if (!SetupCmd(&cmdArg)) return "outgoing transfer setup failed";
665 
666 // Setup program monitoring data
667 //
668  pdSZ = (Config.xfrCmd[iXfr].Opts & Config.cmdXPD ? sizeof(pdBuff) : 0);
669 
670 // Now run the command to put the file. If the command fails and this is a
671 // migration request, cretae a fail file if one does not exist.
672 //
673  xfrET = time(0);
674  if ((rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
675  {if (isMigr) ffMake(rc == -2);
676  retMsg = "copy failed";
677  }
678 
679 // Track directory creations if we need to track them
680 //
681  if (!rc && mDP >= 0) TrackDC(Rfn);
682 
683 // Obtain state of the file after the copy and make sure the file was not
684 // modified during the copy. This is an error for queued requests but
685 // internally generated requests will simply be retried.
686 //
687  if (!rc)
688  {if ((rc = Config.Stat(lfnpath+xfrP->reqData.LFO, xfrP->PFN, &endStat)))
689  {Say.Emsg("Throw", lfnpath, "transferred but not found!");
690  retMsg = "unable to verify copy";
691  } else {
692  if (begStat.st_mtime != endStat.st_mtime
693  || begStat.st_size != endStat.st_size)
694  {Say.Emsg("Throw", lfnpath, "modified during transfer!");
695  retMsg = "file modified during copy"; rc = 1;
696  }
697  }
698  }
699 
700 // Purge the file if so wanted. Otherwise, if this is a migration request,
701 // make sure that if a lock file exists its date/time is equal to the file
702 // we just copied to prevent the file from being copied again (we have a lock).
703 //
704  if (!rc)
705  {if (xfrP->reqData.Options & XrdFrcRequest::Purge) Throwaway();
706  else if (isMigr) ThrowDone(&Chk, endStat.st_mtime);
707  }
708 
709 // Do statistics if so wanted
710 //
711  if ((Config.xfrCmd[iXfr].Opts & Config.cmdStats) || XrdFrmMonitor::monMIGR
712  || (Trace.What & TRACE_Debug))
713  {time_t eNow = time(0);
714  int inqT, xfrT;
715  long long Fsize = begStat.st_size;
716  inqT = static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
717  if ((xfrT = static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
718  if (((Config.xfrCmd[iXfr].Opts & Config.cmdStats)
719  || (Trace.What & TRACE_Debug)) && !rc)
720  {char sbuff[80];
721  sprintf(sbuff, "Put: %lld qt: %d xt: %d up: ",Fsize,inqT,xfrT);
722  Say.Say(0, sbuff, xfrP->reqData.User, " ", xfrP->reqData.LFN);
723  }
725  {char monBuff[MAXPATHLEN+1024+512+8];
726  if (rc < 0) rc = -rc;
727  snprintf(monBuff, sizeof(monBuff),
728  "%s\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
729  xfrP->reqData.LFN, static_cast<long long>(eNow), Fsize,
730  inqT, xfrT, xfrP->Act, rc,
731  (pdSZ ? "&pd=" : ""), (pdSZ ? pdBuff : ""));
732  XrdFrmMonitor::Map(XROOTD_MON_MAPMIGR,xfrP->reqData.User,monBuff);
733  }
734  }
735 
736 // All done
737 //
738  return retMsg;
739 }
740 
741 /******************************************************************************/
742 /* Private: T h r o w a w a y */
743 /******************************************************************************/
744 
745 void XrdFrmTransfer::Throwaway()
746 {
747  EPNAME("Throwaway");
748 
749 // Purge the file. We do this via the pfn but also indicate we want all
750 // migration support suffixes removed it they exist. Notify the cmsd & cnsd.
751 //
752  if (Config.Test) {DEBUG("Would have removed '" <<xfrP->PFN <<"'");}
753  else {Config.ossFS->Unlink(xfrP->PFN, XRDOSS_isPFN|XRDOSS_isMIG);
754  DEBUG("removed '" <<xfrP->PFN <<"'");
755  if (Config.cmsPath) Config.cmsPath->Gone(xfrP->PFN);
756  XrdFrmCns::Rm(xfrP->PFN);
757  }
758 }
759 
760 /******************************************************************************/
761 /* Private: T h r o w D o n e */
762 /******************************************************************************/
763 
764 void XrdFrmTransfer::ThrowDone(XrdFrmTranChk *cP, time_t endTime)
765 {
766 
767 // Update file attributes if we are running in new mode, otherwise do
768 //
769  if (Config.runNew)
771  cpyInfo.Attr.cpyTime = static_cast<long long>(endTime);
772  if (cpyInfo.Set(xfrP->PFN, cP->lkfd))
773  Say.Emsg("Throw", "Unable to set copy time xattr for", xfrP->PFN);
774  else if (cP->lkfx)
775  {strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
776  unlink(xfrP->PFN);
777  xfrP->PFN[xfrP->pfnEnd] = '\0';
778  }
779  } else {
780  struct stat Stat;
781  strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
782  if (!stat(xfrP->PFN, &Stat))
783  {struct utimbuf tbuff;
784  tbuff.actime = tbuff.modtime = endTime;
785  if (utime(xfrP->PFN, &tbuff))
786  Say.Emsg("Throw", errno, "set utime for", xfrP->PFN);
787  }
788  xfrP->PFN[xfrP->pfnEnd] = '\0';
789  }
790 }
791 
792 /******************************************************************************/
793 /* Private: T h r o w O K */
794 /******************************************************************************/
795 
796 const char *XrdFrmTransfer::ThrowOK(XrdFrmTranChk *cP)
797 {
798  class fdClose
799  {public:
800  int Num;
801  fdClose() : Num(-1) {}
802  ~fdClose() {if (Num >= 0) close(Num);}
803  } fnFD;
804 
806  struct stat lokStat;
807  int statRC;
808 
809 // Check if the file is in use by checking if we got an exclusive lock
810 //
811  if ((fnFD.Num = XrdSysFD_Open(xfrP->PFN, O_RDWR)) < 0)
812  return "unable to open file";
813  if (XrdOucSxeq::Serialize(fnFD.Num,XrdOucSxeq::noWait)) return "file in use";
814 
815 // Get the info on the lock file (enabled if old mode is in effect
816 //
817  if (Config.runOld)
818  {strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
819  statRC = stat(xfrP->PFN, &lokStat);
820  xfrP->PFN[xfrP->pfnEnd] = '\0';
821  } else statRC = 1;
822  if (statRC && !Config.runNew) return "missing lock file";
823 
824 // If running in new mode then we must get the extended attribute for this file
825 // unless we got the lock file time which takes precendence.
826 //
827  if (Config.runNew)
828  {if (!statRC)
829  cpyInfo.Attr.cpyTime = static_cast<long long>(lokStat.st_mtime);
830  else if (cpyInfo.Get(xfrP->PFN, fnFD.Num) <= 0)
831  return "unable to get copy time xattr";
832  }
833 
834 // Verify the information
835 //
836  if (cpyInfo.Attr.cpyTime >= static_cast<long long>(cP->Stat->st_mtime))
837  {if (xfrP->reqData.Options & XrdFrcRequest::Purge) return "";
838  return "already migrated";
839  }
840 
841 // Keep the lock on the base file until we are through. No one is allowed to
842 // modify this file until we have migrate it.
843 //
844  cP->lkfd = fnFD.Num;
845  cP->lkfx = statRC == 0;
846  fnFD.Num = -1;
847  return 0;
848 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
struct stat Stat
Definition: XrdCks.cc:49
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
void * InitXfer(void *parg)
@ Info
#define XRDOSS_isPFN
Definition: XrdOss.hh:469
#define XRDOSS_isMIG
Definition: XrdOss.hh:470
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
@ Hash_data_is_key
Definition: XrdOucHash.hh:52
#define SEC_HOST
Definition: XrdOucMsubs.hh:45
#define CMS_CID
Definition: XrdOucMsubs.hh:43
#define XRD_INS
Definition: XrdOucMsubs.hh:47
int stat(const char *path, struct stat *buf)
int open(const char *path, int oflag,...)
int unlink(const char *path)
#define close(a)
Definition: XrdPosix.hh:43
XrdOucString Path
if(Avsz)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define XRDSYSTHREAD_BIND
const kXR_char XROOTD_MON_MAPMIGR
const kXR_char XROOTD_MON_MAPSTAG
XrdOss * ossFS
const char * myName
int Get(const char *iName, char *buff, int blen)
Definition: XrdFrcCID.cc:124
int Init(const char *qPath)
Definition: XrdFrcCID.cc:159
static const int Purge
static const int makeRW
static const int outQ
static const int Migrate
long long cpyTime
Definition: XrdFrcXAttr.hh:55
static void Rm(const char *Path, int islfn=0)
Definition: XrdFrmCns.hh:53
static void Add(const char *tID, const char *Path, long long Size, mode_t Mode)
Definition: XrdFrmCns.cc:67
static kXR_unt32 Map(char code, const char *uname, const char *path)
static char monMIGR
static char monSTAGE
static int Init()
static const char * checkFF(const char *Path)
void Start(int ioqType)
static const int useInpQ
static void Done(XrdFrmXfrJob *xP, const char *Msg)
static XrdFrmXfrJob * Get(int ioQType)
static int Init()
static const int useOutQ
static const int useAnyQ
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
int Subs(XrdOucMsubsInfo &Info, char **Data, int *Dlen)
Definition: XrdOucMsubs.cc:146
static const int maxElem
Definition: XrdOucMsubs.hh:94
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
Definition: XrdOucProg.cc:296
int Serialize(int Opts=0)
Definition: XrdOucSxeq.cc:165
static const int noWait
Definition: XrdOucSxeq.hh:37
static int makePath(char *path, mode_t mode, bool reset=false)
Definition: XrdOucUtils.cc:917
int Get(const char *Path, int fd=-1)
Definition: XrdOucXAttr.hh:128
int Set(const char *Path, int fd=-1)
Definition: XrdOucXAttr.hh:139
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
XrdCmsConfig Config
XrdOucEnv theEnv
XrdOucTrace Trace
XrdSysError Say
XrdFrcCID CID
Definition: XrdFrcCID.cc:56
XrdOucEnv * theEnv
XrdFrmTranArg(XrdOucEnv *Env)
XrdOucMsubs * theVec
XrdOucProg * theCmd
struct stat * Stat
XrdFrmTranChk(struct stat *sP)