37 #include <sys/param.h>
38 #include <sys/types.h>
77 const char *xfrType = xfrName(*rP, qNum);
78 char *Lfn, lclpath[MAXPATHLEN];
84 {sprintf(lclpath,
"%d", qNum);
85 Say.
Emsg(
"Queue", lclpath,
" is an invalid queue; skipping", rP->
LFN);
86 if (reqFQ) reqFQ->
Del(rP);
94 Lfn = (Outgoing ? rP->
LFN : (rP->
LFN)+rP->
LFO);
96 if ((xP = hTab.Find(Lfn)))
104 {sprintf(lclpath,
" in progress; %s skipped for ", xfrType);
107 if (reqFQ) reqFQ->
Del(rP);
114 if (!
Config.LocalPath((rP->
LFN)+rP->
LFO, lclpath,
sizeof(lclpath)-16))
115 {
if (reqFQ) reqFQ->
Del(rP);
116 return Notify(rP, qNum, 1,
"Unable to generate pfn");
125 Say.
Say(0, xfrType,
"skipped; ",lclpath,
" not resident.");
126 if (reqFQ) reqFQ->
Del(rP);
127 return Notify(rP, qNum, 2,
"file not resident");
132 Say.
Say(0, xfrType,
"skipped; ", lclpath,
" exists.");
133 if (reqFQ) reqFQ->
Del(rP);
134 return Notify(rP, qNum, 0);
141 if ((xP = xfrQ[qNum].Free))
break;
143 xfrQ[qNum].Avail.Wait();
145 xfrQ[qNum].Free = xP->
Next;
155 strcpy(xP->
PFN, lclpath);
156 xP->
pfnEnd = strlen(lclpath);
160 xP->
Type = xfrType+1;
171 if (xfrQ[qNum].Last) {xfrQ[qNum].Last->Next = xP; xfrQ[qNum].Last = xP;}
172 else xfrQ[qNum].Last = xfrQ[qNum].First = xP;
205 hMutex.Lock(); hTab.Del(xP->
reqFile); hMutex.UnLock();
211 xfrQ[xP->
qNum].Free = xP;
212 xfrQ[xP->
qNum].Avail.Post();
226 do {qReady.Wait();}
while(!(xfrP = Pull(ioQType)));
241 static const char *StopFN[] = {
"STAGE",
"MIGR",
"COPYIN",
"COPYOUT"};
242 static const char *StopQN[] = {
"stage",
"migr",
"copyin",
"copyout"};
245 char StopFile[1024], *fnSfx;
250 strcpy(StopFile,
Config.AdminPath);
251 strcat(StopFile,
"STOP");
252 fnSfx = StopFile + strlen(StopFile);
261 strcpy(fnSfx, StopFN[qNum]);
262 xfrQ[qNum].File = strdup(StopFile);
263 xfrQ[qNum].Name = StopQN[qNum];
264 xfrQ[qNum].qNum = qNum;
270 {
Say.
Emsg(
"main", retc,
"create stopfile thread");
return 0;}
279 xP->
Next = xfrQ[qNum].Free;
280 xfrQ[qNum].Free = xP;
281 xfrQ[qNum].Avail.Post();
296 static bool ioX =
false, prevQ[2] = {0,0};
298 int pikQ, theQ, Q1, Q2, nSel = 1;
303 do{
if (!ioQType) ioX = !ioX;
304 else {ioX = (ioQType < 0 ? 1 : 0); nSel = 0;}
315 if (xfrQ[Q1].First && xfrQ[Q2].First)
316 {
if (xfrQ[Q1].First->reqData.addTOD < xfrQ[Q2].First->reqData.addTOD)
318 else if (xfrQ[Q1].First->reqData.addTOD > xfrQ[Q2].First->reqData.addTOD)
320 else theQ = (prevQ[pikQ] == Q1 ? Q2 : Q1);
321 }
else theQ = (xfrQ[Q1].First ? Q1 : Q2);
325 if ((xfrP = xfrQ[theQ].First)
326 && !(xfrQ[theQ].First = xfrP->
Next)) xfrQ[theQ].Last = 0;
327 }
while(!xfrP && nSel--);
340 int XrdFrmXfrQueue::Notify(
XrdFrcRequest *rP,
int qNum,
int rc,
const char *msg)
342 static const char *isFile =
"file:///";
343 static const int lnFile = 8;
344 static const char *isUDP =
"udp://";
345 static const int lnUDP = 6;
346 static const char *qOpr[] = {
"stage",
"migr",
"get",
"put"};
347 char msgbuff[4096], *nP, *mP = rP->
Notify;
358 do{
if ((nP = index(rP->
Notify,
'\r'))) *nP++ =
'\0';
362 if (!strncmp(mP, isFile, lnFile))
363 {
if (rc) n = sprintf(msgbuff,
"%s %s %s %s\n", qOpr[qNum],
364 (rc > 1 ?
"ENOENT":
"BAD"), rP->
LFN, (msg ? msg:
"?"));
365 else n = sprintf(msgbuff,
"stage OK %s\n", rP->
LFN);
366 Send2File(mP+lnFile, msgbuff, n);
371 else if (!strncmp(mP, isUDP, lnUDP))
372 {
char *txtP, *dstP = mP+lnUDP;
373 if ((txtP = index(dstP,
'/'))) *txtP++ =
'\0';
374 else txtP = (
char *)
"";
375 n = sprintf(msgbuff,
"%s %s %s %s", (rc ?
"unprep" :
"ready"),
376 rP->
ID, txtP, rP->
LFN);
377 Send2UDP(dstP, msgbuff, n);
383 Say.
Emsg(
"Notify",
"Unsupported notification path '", mP,
"'.");
395 void XrdFrmXfrQueue::Send2File(
char *Dest,
char *Msg,
int Mln)
402 DEBUG(
"sending '" <<Msg <<
"' via " <<Dest);
406 if ((FD = XrdSysFD_Open(Dest, O_WRONLY)) < 0)
407 {
Say.
Emsg(
"Notify", errno,
"send notification via", Dest);
return;}
411 if (
write(FD, Msg, Mln) < 0)
412 Say.
Emsg(
"Notify", errno,
"send notification via", Dest);
420 void XrdFrmXfrQueue::Send2UDP(
char *Dest,
char *Msg,
int Mln)
427 DEBUG(
"sending '" <<Msg <<
"' via " <<Dest);
431 Relay.Send(Msg, Mln, Dest);
440 struct theQueue *monQ = (
struct theQueue *)parg;
448 sprintf(theMsg,
"exists; %s transfers suspended.", monQ->Name);
455 while(!
stat(monQ->File, &buf))
456 {
if (!Cnt--) {
Say.
Emsg(
"StopMon", monQ->File, theMsg); Cnt = 12;}
462 while(xP) {qReady.Post(); xP = xP->
Next;}
471 int XrdFrmXfrQueue::Stopped(
int qNum)
478 if (
stat(xfrQ[qNum].
File, &buf))
return 0;
479 if (!xfrQ[qNum].Stop) {xfrQ[qNum].Stop = 1; xfrQ[qNum].Alert.Post();}
487 const char *XrdFrmXfrQueue::xfrName(
XrdFrcRequest &reqData,
int qNum)
504 "3Migr+rm ":
"2Migrate ");
508 "5Copy+rm " :
"4CopyOut ");
void * InitStop(void *parg)
int stat(const char *path, struct stat *buf)
ssize_t write(int fildes, const void *buf, size_t nbyte)
#define XRDSYSTHREAD_BIND
void Del(XrdFrcRequest *rP)
static void Done(XrdFrmXfrJob *xP, const char *Msg)
static int Add(XrdFrcRequest *rP, XrdFrcReqFile *reqF, int theQ)
static void StopMon(void *parg)
static XrdFrmXfrJob * Get(int ioQType)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Snooze(int seconds)