37 #include <sys/param.h>
38 #include <sys/types.h>
81 :
theEnv(Env), theCmd(0), theVec(0), theSrc(0),
83 {theMDP[0] =
'0'; theMDP[1] = 0;}
113 for (i = 0; i < 4; i++)
129 {
if (buf.st_ctime+
Config.FailHold >= time(0))
130 return "request previously failed";
146 const char *XrdFrmTransfer::Fetch()
149 static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
150 static const int crOpts = (O_CREAT|O_TRUNC)<<8|
XRDOSS_mkpath;
152 XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
156 const char *eTxt, *retMsg = 0;
157 char lfnpath[MAXPATHLEN+1024+512+8], *Lfn, Rfn[MAXPATHLEN+256], *theSrc;
159 int iXfr, pdSZ, lfnEnd, rc, isURL = 0, doRM = 0;
164 if ((isURL = xfrP->reqData.LFO)) theSrc = xfrP->reqData.LFN;
165 else {
if (!
Config.RemotePath(xfrP->reqData.LFN, Rfn,
sizeof(Rfn)))
166 return "lfn2rfn failed";
168 isURL = (*Rfn !=
'/');
174 {
if (xfrCmd[2]) iXfr = 2;
175 else return "url copies not configured";
177 if (xfrCmd[0]) iXfr = 0;
178 else return "non-url copies not configured";
183 if ((eTxt = ffCheck()))
return eTxt;
187 Lfn = (xfrP->reqData.LFN)+xfrP->reqData.LFO;
188 if (!
Config.Stat(Lfn, xfrP->PFN, &pfnStat))
189 {
DEBUG(xfrP->PFN <<
" exists; not fetched.");
196 lfnEnd = strlen(Lfn);
197 strlcpy(lfnpath, Lfn,
sizeof(lfnpath)-8);
199 {strcpy(&lfnpath[lfnEnd],
".anew");
200 strcpy(&xfrP->PFN[xfrP->pfnEnd],
".anew");
205 cmdArg.theCmd = xfrCmd[iXfr];
206 cmdArg.theVec =
Config.xfrCmd[iXfr].theVec;
207 cmdArg.theSrc = theSrc;
208 cmdArg.theDst = xfrP->PFN;
209 cmdArg.theINS = xfrP->reqData.iName;
210 if (!SetupCmd(&cmdArg))
return "incoming transfer setup failed";
221 {
Say.
Emsg(
"Fetch", rc,
"create placeholder for", lfnpath);
222 return "create failed";
229 pdSZ = (
Config.xfrCmd[iXfr].Opts &
Config.cmdXPD ?
sizeof(pdBuff) : 0);
236 if (!(rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
237 {
if ((rc =
Config.Stat(lfnpath, xfrP->PFN, &pfnStat)))
238 {
Say.
Emsg(
"Fetch", lfnpath,
"fetched but not resident!"); fSize = 0;}
239 else {fSize = pfnStat.st_size;
241 FetchDone(lfnpath, pfnStat, rc);
248 xfrP->PFN[xfrP->pfnEnd] =
'\0';
252 if (rc == -2) {xfrP->RetCode = 2; retMsg =
"file not found";}
253 else retMsg =
"fetch failed";
260 {time_t eNow = time(0);
262 inqT =
static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
263 if ((xfrT =
static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
267 sprintf(sbuff,
"Got: %lld qt: %d xt: %d up: ", fSize, inqT, xfrT);
268 lfnpath[lfnEnd] =
'\0';
269 Say.
Say(0, sbuff, xfrP->reqData.User,
" ", lfnpath);
272 {
if (rc < 0) rc = -rc;
273 snprintf(lfnpath+lfnEnd,
sizeof(lfnpath)-lfnEnd-1,
274 "\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
275 static_cast<long long>(eNow), fSize, inqT, xfrT,
276 xfrP->Act, rc, (pdSZ ?
"&pd=" :
""), (pdSZ ? pdBuff :
""));
290 const char *XrdFrmTransfer::FetchDone(
char *lfnpath,
struct stat &
Stat,
int &rc)
299 if ((rc = cpyInfo.
Set(xfrP->PFN)))
300 Say.
Emsg(
"Fetch", rc,
"set copy time xattr on", xfrP->PFN);
306 {
struct stat lkfStat;
307 strcpy(&xfrP->PFN[xfrP->pfnEnd+5],
".lock");
308 if (!
stat(xfrP->PFN, &lkfStat))
310 else {
struct utimbuf tbuff;
311 tbuff.actime = tbuff.modtime =
Stat.st_mtime;
312 if ((rc = utime(xfrP->PFN, &tbuff)))
313 Say.
Emsg(
"Fetch", rc,
"set utime on", xfrP->PFN);
321 Say.
Emsg(
"Fetch", rc,
"rename", lfnpath);
327 return (rc ?
"Failed" : 0);
334 const char *XrdFrmTransfer::ffCheck()
341 {
char ffPath[MAXPATHLEN+8];
342 if (
Config.xfrFdln+xfrP->pfnEnd+5 >=
int(
sizeof(ffPath)))
return 0;
343 strcpy(ffPath,
Config.xfrFdir);
344 strcpy(ffPath+
Config.xfrFdln, xfrP->PFN);
345 strcpy(ffPath+
Config.xfrFdln+xfrP->pfnEnd,
".fail");
346 eTxt = checkFF(ffPath);
348 strcpy(&xfrP->PFN[xfrP->pfnEnd],
".fail");
349 eTxt = checkFF(xfrP->PFN);
350 xfrP->PFN[xfrP->pfnEnd] =
'\0';
355 if (eTxt) xfrP->RetCode = 1;
363 void XrdFrmTransfer::ffMake(
int nofile){
364 static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
365 static const mode_t dMode = S_IXUSR|S_IWGRP|S_IXGRP|S_IXOTH | fMode;
366 char ffPath[MAXPATHLEN+8], *ffP;
372 {
if (
Config.xfrFdln+xfrP->pfnEnd+5 >=
int(
sizeof(ffPath)))
return;
373 strcpy(ffPath,
Config.xfrFdir);
374 strcpy(ffPath+
Config.xfrFdln, xfrP->PFN);
375 strcpy(ffPath+
Config.xfrFdln+xfrP->pfnEnd,
".fail");
379 strcpy(&xfrP->PFN[xfrP->pfnEnd],
".fail");
386 myFD =
open(ffP, O_CREAT, fMode);
390 {
struct utimbuf tbuff;
391 tbuff.actime = time(0); tbuff.modtime = 2;
395 if (!
Config.xfrFdir) xfrP->PFN[xfrP->pfnEnd] =
'\0';
404 if (parg) xP->
Start(*(
int *)parg);
431 { qWant = (
void *)&inpQ;
Config.xfrMaxIn--;}
433 { qWant = (
void *)&outQ;
Config.xfrMaxOt--;}
434 else qWant = (
void *)&anyQ;
438 {
Say.
Emsg(
"main", retc,
"create xfr thread");
return 0;}
457 xfrP->reqData.LFN+xfrP->reqData.LFO,
458 argP->
theSrc, xfrP->reqData.Prty,
460 argP->
theMDP, xfrP->reqData.ID, xfrP->PFN, argP->
theDst);
478 *cmdBuff =
'\0'; n =
sizeof(cmdBuff) - 4; cP = cmdBuff;
479 for (i = 0; i < k; i++)
482 {
Say.
Emsg(
"Setup",E2BIG,
"build command line for", xfrP->reqData.LFN);
485 strcpy(cP, pdata[i]); cP += pdlen[i];
509 DEBUG(xfrP->Type <<
" starting " <<xfrP->reqData.LFN
510 <<
" for " <<xfrP->reqData.User);
513 if (Msg && !(xfrP->RetCode)) xfrP->RetCode = 1;
514 xfrP->PFN[xfrP->pfnEnd] = 0;
516 if (xfrP->RetCode ||
Config.Verbose)
517 {
char buff1[280], buff2[80];
518 sprintf(buff1,
"%s for %s", xfrP->RetCode ?
"failed" :
"complete",
520 if (xfrP->RetCode == 0) *buff2 = 0;
521 else sprintf(buff2,
"; %s", (Msg ? Msg :
"reason unknown"));
522 Say.
Say(0, xfrP->Type, buff1, xfrP->reqData.LFN,buff2);
525 <<(xfrP->RetCode ?
" failed " :
" complete ")
526 << xfrP->reqData.LFN <<
" rc=" <<xfrP->RetCode
527 <<
' ' <<(Msg ? Msg :
""));
538 int XrdFrmTransfer::TrackDC(
char *Lfn,
char *Mdp,
char *Rfn)
541 char *FName, *Slash, *Slush = 0, *begRfn = Rfn;
547 && (Slash = index(Rfn,
'/')) && *(Slash+1) ==
'/'
548 && (Slash = index(Slash+2,
'/')) && *(Slash+1) ==
'/') begRfn = Slash+1;
552 if (!(FName = rindex(begRfn,
'/')) || FName == begRfn)
return 0;
553 *FName = 0; Slash = Slush = FName;
558 while(Slash != begRfn && !pTab.Find(Rfn))
559 {
do {Slash--;}
while(Slash != begRfn && *Slash !=
'/');
560 if (Slush) *Slush =
'/';
561 *Slash = 0; Slush = Slash;
569 if (Slash == begRfn) n = 0;
570 else n = (n >= 0 ? Slash - begRfn : FName - begRfn);
571 sprintf(Mdp,
"%d", n);
580 int XrdFrmTransfer::TrackDC(
char *Rfn)
586 if (!(Slash = rindex(Rfn,
'/')) || Slash == Rfn)
return 0;
602 const char *XrdFrmTransfer::Throw()
604 XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
606 struct stat begStat, endStat;
609 const char *eTxt, *retMsg = 0;
610 char Rfn[MAXPATHLEN+256], *lfnpath = xfrP->reqData.LFN, *theDest;
613 int iXfr, isURL, pdSZ, rc, mDP = -1;
617 if ((isURL = xfrP->reqData.LFO)) theDest = xfrP->reqData.LFN;
618 else {
if (!
Config.RemotePath(xfrP->reqData.LFN, Rfn,
sizeof(Rfn)))
619 return "lfn2rfn failed";
621 isURL = (*Rfn !=
'/');
627 {
if (xfrCmd[3]) iXfr = 3;
628 else return "url copies not configured";
630 if (xfrCmd[1]) iXfr = 1;
631 else return "non-url copies not configured";
636 if (
Config.Stat(lfnpath+xfrP->reqData.LFO, xfrP->PFN, &begStat))
637 return (xfrP->reqFQ ?
"file not found" : 0);
641 if ((eTxt = ffCheck()))
return eTxt;
648 if (isMigr && (eTxt = ThrowOK(&Chk)))
649 {
if (*eTxt)
return eTxt;
657 cmdArg.theCmd = xfrCmd[iXfr];
658 cmdArg.theVec =
Config.xfrCmd[iXfr].theVec;
659 cmdArg.theDst = theDest;
660 cmdArg.theSrc = xfrP->PFN;
661 cmdArg.theINS = xfrP->reqData.iName;
663 mDP = TrackDC(lfnpath+xfrP->reqData.LFO, cmdArg.theMDP, Rfn);
664 if (!SetupCmd(&cmdArg))
return "outgoing transfer setup failed";
668 pdSZ = (
Config.xfrCmd[iXfr].Opts &
Config.cmdXPD ?
sizeof(pdBuff) : 0);
674 if ((rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
675 {
if (isMigr) ffMake(rc == -2);
676 retMsg =
"copy failed";
681 if (!rc && mDP >= 0) TrackDC(Rfn);
688 {
if ((rc =
Config.Stat(lfnpath+xfrP->reqData.LFO, xfrP->PFN, &endStat)))
689 {
Say.
Emsg(
"Throw", lfnpath,
"transferred but not found!");
690 retMsg =
"unable to verify copy";
692 if (begStat.st_mtime != endStat.st_mtime
693 || begStat.st_size != endStat.st_size)
694 {
Say.
Emsg(
"Throw", lfnpath,
"modified during transfer!");
695 retMsg =
"file modified during copy"; rc = 1;
706 else if (isMigr) ThrowDone(&Chk, endStat.st_mtime);
713 {time_t eNow = time(0);
715 long long Fsize = begStat.st_size;
716 inqT =
static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
717 if ((xfrT =
static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
721 sprintf(sbuff,
"Put: %lld qt: %d xt: %d up: ",Fsize,inqT,xfrT);
722 Say.
Say(0, sbuff, xfrP->reqData.User,
" ", xfrP->reqData.LFN);
725 {
char monBuff[MAXPATHLEN+1024+512+8];
726 if (rc < 0) rc = -rc;
727 snprintf(monBuff,
sizeof(monBuff),
728 "%s\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
729 xfrP->reqData.LFN,
static_cast<long long>(eNow), Fsize,
730 inqT, xfrT, xfrP->Act, rc,
731 (pdSZ ?
"&pd=" :
""), (pdSZ ? pdBuff :
""));
745 void XrdFrmTransfer::Throwaway()
752 if (
Config.Test) {
DEBUG(
"Would have removed '" <<xfrP->PFN <<
"'");}
754 DEBUG(
"removed '" <<xfrP->PFN <<
"'");
764 void XrdFrmTransfer::ThrowDone(
XrdFrmTranChk *cP, time_t endTime)
771 cpyInfo.
Attr.
cpyTime =
static_cast<long long>(endTime);
772 if (cpyInfo.
Set(xfrP->PFN, cP->
lkfd))
773 Say.
Emsg(
"Throw",
"Unable to set copy time xattr for", xfrP->PFN);
775 {strcpy(&xfrP->PFN[xfrP->pfnEnd],
".lock");
777 xfrP->PFN[xfrP->pfnEnd] =
'\0';
781 strcpy(&xfrP->PFN[xfrP->pfnEnd],
".lock");
783 {
struct utimbuf tbuff;
784 tbuff.actime = tbuff.modtime = endTime;
785 if (utime(xfrP->PFN, &tbuff))
786 Say.
Emsg(
"Throw", errno,
"set utime for", xfrP->PFN);
788 xfrP->PFN[xfrP->pfnEnd] =
'\0';
801 fdClose() : Num(-1) {}
802 ~fdClose() {
if (Num >= 0)
close(Num);}
811 if ((fnFD.Num = XrdSysFD_Open(xfrP->PFN, O_RDWR)) < 0)
812 return "unable to open file";
818 {strcpy(&xfrP->PFN[xfrP->pfnEnd],
".lock");
819 statRC =
stat(xfrP->PFN, &lokStat);
820 xfrP->PFN[xfrP->pfnEnd] =
'\0';
822 if (statRC && !
Config.runNew)
return "missing lock file";
829 cpyInfo.
Attr.
cpyTime =
static_cast<long long>(lokStat.st_mtime);
830 else if (cpyInfo.
Get(xfrP->PFN, fnFD.Num) <= 0)
831 return "unable to get copy time xattr";
836 if (cpyInfo.
Attr.
cpyTime >=
static_cast<long long>(cP->
Stat->st_mtime))
838 return "already migrated";
845 cP->
lkfx = statRC == 0;
void * InitXfer(void *parg)
int stat(const char *path, struct stat *buf)
int open(const char *path, int oflag,...)
int unlink(const char *path)
#define XRDSYSTHREAD_BIND
const kXR_char XROOTD_MON_MAPMIGR
const kXR_char XROOTD_MON_MAPSTAG
int Get(const char *iName, char *buff, int blen)
int Init(const char *qPath)
static void Rm(const char *Path, int islfn=0)
static void Add(const char *tID, const char *Path, long long Size, mode_t Mode)
static kXR_unt32 Map(char code, const char *uname, const char *path)
static const char * checkFF(const char *Path)
static void Done(XrdFrmXfrJob *xP, const char *Msg)
static XrdFrmXfrJob * Get(int ioQType)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void Put(const char *varname, const char *value)
int Subs(XrdOucMsubsInfo &Info, char **Data, int *Dlen)
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
int Serialize(int Opts=0)
static int makePath(char *path, mode_t mode, bool reset=false)
int Get(const char *Path, int fd=-1)
int Set(const char *Path, int fd=-1)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
XrdFrmTranArg(XrdOucEnv *Env)
XrdFrmTranChk(struct stat *sP)