34 #include <sys/types.h>
81 resetcnt = scrub2rst = 3;
105 if (rc)
Say.
Emsg(
"Add", rc,
"prepare", pargs.
path);
106 else {PTMutex.
Lock();
116 if (!prepif || !prepSched.
isAlive())
117 {
Say.
Emsg(
"Add",
"No prepare manager; prepare",pargs.
reqid,
"ignored.");
125 {*pP++ = pargs.
prty[0]; *pP =
'\0';
126 pdata[0] = (
char *)
"+ "; pdlen[0] = 2;
127 pdata[1] = pargs.
reqid; pdlen[1] = strlen(pargs.
reqid);
128 pdata[2] = (
char *)
" "; pdlen[2] = 1;
129 pdata[3] = pargs.
notify; pdlen[3] = strlen(pargs.
notify);
130 pdata[4] = (
char *)
" "; pdlen[4] = 1;
131 pdata[5] = prtybuff; pdlen[5] = strlen(prtybuff);
132 pdata[6] = (
char *)
" "; pdlen[6] = 1;
133 pdata[7] = pargs.
mode; pdlen[7] = strlen(pargs.
mode);
134 pdata[8] = (
char *)
" "; pdlen[8] = 1;
135 pdata[9] = pargs.
path; pdlen[9] = strlen(pargs.
path);
136 pdata[10] = (
char *)
"\n"; pdlen[10] = 1;
137 pdata[11]= 0; pdlen[11]= 0;
138 if (!(rc = prepSched.
Put((
const char **)pdata, (
const int *)pdlen)))
141 int Oflag = (index(pargs.
mode, (
int)
'w') ? O_RDWR : 0);
142 mode_t Prty = atoi(pargs.
prty);
146 int k = prepMsg->
Subs(
Info, pdata, pdlen);
147 pdata[k] = (
char *)
"\n"; pdlen[k++] = 1;
148 pdata[k] = 0; pdlen[k] = 0;
149 if (!(rc = prepSched.
Put((
const char **)pdata, (
const int *)pdlen)))
171 {
if ((rc = PrepFrm->
Del(
'-', reqid)))
172 Say.
Emsg(
"Del", rc,
"unprepare", reqid);
179 if (!prepif || !prepSched.
isAlive())
180 {
Say.
Emsg(
"Del",
"No prepare manager; unprepare",reqid,
"ignored.");
187 pdata[0] = (
char *)
"- ";
190 pdlen[1] = strlen(reqid);
191 pdata[2] = (
char *)
"\n";
193 pdata[3] = (
char *)0;
195 rc = prepSched.
Put((
const char **)pdata, (
const int *)pdlen);
226 Found = (NumFiles ? PTable.
Find(path) != 0 : 0);
247 if (NumFiles > 0 && PTable.
Del(path) == 0) NumFiles--;
266 if (!index(pargs->
mode, (
int)
'n')
267 || strncmp(
"udp://", pargs->
notify, 6)
277 if ((minfo = index(mdest, (
int)
'/')))
278 {*minfo =
'\0'; minfo++;}
279 if (!minfo || !*minfo) minfo = (
char *)
"*";
280 DEBUG(
"Sending " <<mdest <<
": " <<cmd <<
' '<<pargs->
reqid <<
' ' <<minfo);
284 Msg[0].iov_base = (
char *)cmd; Msg[0].iov_len = strlen(cmd);
285 Msg[1].iov_base = (
char *)
" "; Msg[1].iov_len = 1;
286 Msg[2].iov_base = pargs->
reqid; Msg[2].iov_len = strlen(pargs->
reqid);
287 Msg[3].iov_base = (
char *)
" "; Msg[3].iov_len = 1;
288 Msg[4].iov_base = minfo; Msg[4].iov_len = strlen(minfo);
289 Msg[5].iov_base = (
char *)
" "; Msg[5].iov_len = 1;
290 Msg[6].iov_base = pargs->
path; Msg[6].iov_len = (pargs->
pathlen)-1;
291 Msg[7].iov_base = (
char *)
"\n"; Msg[7].iov_len = 1;
295 Relay->
Send(Msg, 8, mdest);
320 if (!(rc = isOnline(pargs->
path)))
331 if (rc > 0)
Inform(
"avail", pargs);
341 char baseAP[1024], *Slash;
354 DEBUG(
"Initializing internal FRM prepare interface.");
355 strcpy(baseAP, aPath); baseAP[strlen(baseAP)-1] =
'\0';
356 if ((Slash = rindex(baseAP,
'/'))) *Slash =
'\0';
358 {
Say.
Emsg(
"Reset",
"Built-in prepare init failed; prepare disabled.");
375 {
if (rcnt > 0) resetcnt = scrub2rst = rcnt;
376 if (stime > 0) scrubtime = stime;
383 {
const char *Slash = rindex(ifpgm,
'/');
384 if (prepif) free(prepif);
385 if (Slash && !strcmp(Slash+1,
"frm_xfragent")) ifpgm =
"";
386 prepif = strdup(ifpgm);
389 {
if (prepMsg)
delete prepMsg;
391 if (!(prepMsg->
Parse(
"prepmsg", ifmsg)))
392 {
delete prepMsg; prepMsg = 0;
return 1;}
404 int XrdCmsPrepare::isOnline(
char *path)
425 char *lp, *pdata[] = {(
char *)
"?\n", 0};
426 int pdlen[] = {2, 0};
434 {PTable.
Purge(); NumFiles = 0;
435 while(PrepFrm->
List(State, Buff,
sizeof(Buff)))
437 if (doEcho)
Say.
Emsg(
"Reset",
"Prepare pending for",Buff);
446 {
Say.
Emsg(
"Reset",
"Prepare program not specified; prepare disabled.");
452 if (!prepSched.
isAlive() && !startIF())
return;
453 if (prepSched.
Put((
const char **)pdata, (
const int *)pdlen))
455 prepSched.
Drain(); prepOK = 0;
457 else {PTable.
Purge(); NumFiles = 0;
458 while((lp = prepSched.
GetLine()) && *lp)
460 if (doEcho)
Say.
Emsg(
"Reset",
"Prepare pending for",lp);
469 void XrdCmsPrepare::Scrub()
474 scrub2rst = resetcnt;
479 if (!PrepFrm && !prepSched.
isAlive()) startIF();
487 int XrdCmsPrepare::startIF()
493 if (PrepFrm) return prepOK;
498 {
Say.
Emsg(
"startIF",
"Prepare program not specified; prepare disabled.");
504 DEBUG(
"Prepare: Starting " <<prepif);
505 if (prepSched.
Exec(prepif, 1))
506 {time_t eNow = time(0);
508 if ((eNow - lastemsg) >= 60)
int XrdCmsScrubScan(const char *key, char *cip, void *xargp)
int stat(const char *path, struct stat *buf)
void Prepare(XrdCmsPrepArgs *pargs)
int setParms(int rcnt, int stime, int deco=0)
void Inform(const char *cmd, XrdCmsPrepArgs *pargs)
int Add(XrdCmsPrepArgs &pargs)
void Reset(const char *iName, const char *aPath, int aMode)
int Add(char Opc, const char *Lfn, const char *Opq, const char *Usr, const char *Rid, const char *Nop, const char *Pop, int Prty=1)
int Init(int opX, const char *aPath, int aMode, const char *qPath=0)
int List(Queues &State, char *Buff, int Bsz)
int Del(char Opc, const char *Rid)
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
int Del(const char *KeyVal, XrdOucHash_Options opt=Hash_default)
T * Apply(int(*func)(const char *, T *, void *), void *Arg)
T * Add(const char *KeyVal, T *KeyData, const int LifeTime=0, XrdOucHash_Options opt=Hash_default)
T * Find(const char *KeyVal, time_t *KeyTime=0)
int Parse(const char *oname, char *msg)
int Subs(XrdOucMsubsInfo &Info, char **Data, int *Dlen)
int Put(const char *data, const int dlen)
int Exec(const char *, int inrd=0, int efd=0)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)