33 #include <netinet/in.h>
75 int verClient(
int dodel=0);
77 void sendResult(
char *lp,
int caned=0,
int erc=0);
79 static const int maxClients = 8;
96 static const int argvnum =
sizeof(theArgs)/
sizeof(theArgs[0]);
131 for (i = 0; i < argvnum && args[i]; i++) theArgs[i] = strdup(args[i]);
132 for ( ; i < argvnum; i++) theArgs[i] = (
char *)0;
141 addClient(resp,
opts);
152 for (i = 0; i < numClients; i++)
153 if (!Client[i].isSync) {sendResult(0, 1);
break;}
155 for (i = 0; i < argvnum; i++)
156 if (theArgs[i]) free(theArgs[i]);
163 #define jobInfo theJob->JobName<<' '<<(theArgs[1] ? theArgs[1] : "")\
164 <<(theArgs[2] ? " " : "")<<(theArgs[2] ? theArgs[2] : "")
168 static const char *TraceID =
"jobXeq";
170 const char *endStat =
" completed";
179 if ((rc = theJob->theProg->
Run(&jobStream, theArgs[1], theArgs[2],
180 theArgs[3], theArgs[4])))
184 theJob->myMutex.
Lock();
186 else {lp = jobStream.
GetLine();
187 rc = theJob->theProg->
RunDone(jobStream);
188 theJob->myMutex.
Lock();
189 if ((rc && rc != -EPIPE) || (rc == -EPIPE && (!lp || !(*lp))))
193 for (i = 0; i < numClients; i++)
194 if (!Client[i].isSync) {sendResult(lp);
break;}
198 endStat =
" cancelled";
199 theJob->myMutex.
Lock();
211 {
if (theJob->numJobs > theJob->maxJobs) Redrive();
220 jp = theJob->JobTable.
Remove(JobNum);
240 unsigned int Inst = lp->
Inst();
245 if (numClients >= maxClients) verClient();
249 for (i = 0; i < numClients; i++)
250 if (lp == Client[i].Link && Inst == Client[i].Inst)
return 0;
254 if (numClients >= maxClients)
return -1;
255 Client[numClients].Link = lp;
256 Client[numClients].Inst = Inst;
258 else {rp->
StreamID(Client[numClients].streamid);
259 Client[numClients].isSync = 0;
273 unsigned int Inst = lp->
Inst();
278 for (i = 0; i < numClients; i++)
279 if (lp == Client[i].Link && Inst == Client[i].Inst)
280 {
for (j = i+1; j < numClients; j++) Client[i++] = Client[j];
295 char State, buff[4096], *bp = buff;
305 default: State =
'u';
break;
310 bp = buff + sprintf(buff,
"<s>%c</s><conn>", State);
311 bsz =
sizeof(buff) - (bp - buff) - 8;
315 if (!numClients) bp++;
316 else for (i = 0; i < numClients; i++)
317 if (Client[i].Link && Client[i].Link->isInstance(Client[i].Inst))
318 {
if ((k =
strlcpy(bp, Client[i].Link->ID, bsz)) >= bsz
319 || (bsz -= k) < 1) {bp++;
break;}
320 bp += k; *bp =
' '; bp++; bsz--;
325 if (*(bp-1) ==
' ') bp--;
326 strcpy(bp,
"</conn>");
337 int XrdXrootdJob2Do::verClient(
int dodel)
343 for (i = 0; i < numClients; i++)
344 if (!Client[i].Link->isInstance(Client[i].Inst))
346 for (j = i+1; j < numClients && j < maxClients; j++,k++) Client[k] = Client[j];
352 if (!numClients && dodel)
360 snprintf(ebuff,
sizeof(ebuff),
"Unable to find %s job %d;",
361 theJob->JobName, JobNum);
362 eLog.
Emsg(
"Job2Do", ebuff,
"job slot disabled!");
372 void XrdXrootdJob2Do::Redrive()
381 if (jp->verClient(jp->JobMark > 0))
break;
382 else Start = jp->JobNum+1;
398 void XrdXrootdJob2Do::sendResult(
char *lp,
int caned,
int jrc)
400 static const char *TraceID =
"jobSendResult";
403 struct iovec jobVec[6];
405 const char *trc, *tre;
407 int j, i, dlen = 0, n = 1;
412 {jobStat =
kXR_ok; trc =
"ok";
414 { jobVec[n].iov_base = theArgs[0];
415 dlen = jobVec[n].iov_len = strlen(theArgs[0]); n++;
416 jobVec[n].iov_base = (
char *)
" ";
417 dlen += jobVec[n].iov_len = 1; n++;
421 if (caned > 0) {erc = Xcan; lp = (
char *)
"Cancelled by admin.";}
423 erc =
static_cast<kXR_int32>(htonl(erc));
424 if (!lp || !*lp) lp = (
char *)
"Program failed.";
426 jobVec[n].iov_base = (
char *)&erc;
427 dlen = jobVec[n].iov_len =
sizeof(erc); n++;
429 jobVec[n].iov_base = lp;
430 dlen += jobVec[n].iov_len = strlen(lp)+1; n++;
435 for (i = 0; i < numClients; i++)
436 {
if (!Client[i].isSync)
437 {ReqID.
setID(Client[i].streamid,
438 Client[i].Link->FDnum(), Client[i].Link->Inst());
440 ?
"skipped" :
"sent");
441 TRACE(RSP, tre <<
" async " <<trc <<
" to " <<Client[i].Link->ID);
442 }
else if (i != j) Client[j++] = Client[i];
458 :
XrdJob(
"Job Scheduler"),
465 JobName = strdup(jname);
484 if (JobName) free(JobName);
497 int i, jNum, jNext = 0, numcaned = 0;
506 {
if ((jp = JobTable.
Find(jkey)))
508 if (resp) {jp->delClient(resp);
509 if (!jp->numClients) CleanUp(jp);
519 while((jNum = JobTable.
Next(jNext)) >= 0)
520 {jp = JobTable.
Item(jNum);
524 if (i != jp->numClients) numcaned++;
525 if (!jp->numClients) CleanUp(jp);
549 while((jNum = JobTable.
Next(jNext)) >= 0)
551 if ((jp = JobTable.
Item(jNum)))
552 {
if (jp->JobMark) {
if (!jp->verClient()) CleanUp(jp);}
553 else jp->JobMark = 1;
571 char *jkey, buff[1024];
572 int tlen, jNum, jNext = 0;
578 while((jNum = JobTable.
Next(jNext)) >= 0)
580 if ((jp = JobTable.
Item(jNum, &jkey)) && (tp = jp->lstClient()))
581 {tlen = sprintf(buff,
"<job id=\"%s\">%s", JobName, jkey);
582 if (tL) tL->next =
new XrdOucTList(buff, tlen, tp);
604 const char *msg =
"Job resources currently not available.";
609 if (!jkey || !(*jkey))
617 {rc = sendResult(resp, args[0], jp);
621 if (jp->addClient(resp,
Opts) < 0) isSync = 1;
622 else msg =
"Job scheduled.";
624 if ((jobNum = JobTable.
Alloc()) < 0) isSync = 1;
626 {JobTable.
Insert(jp, jkey, jobNum);
627 if (numJobs < maxJobs)
632 numJobs++; msg =
"Job Scheduled";
656 int theStatus = jp->
Status;
682 struct iovec jobResp[4];
688 else {
if (!rpfx) {dlen = 0; i = 1;}
689 else { jobResp[1].iov_base = (
char *)rpfx;
690 dlen = jobResp[1].iov_len = strlen(rpfx);
691 jobResp[2].iov_base = (
char *)
" ";
692 dlen += jobResp[2].iov_len = 1;
695 jobResp[i].iov_base = job->theResult;
696 dlen += jobResp[i].iov_len = strlen(job->theResult);
697 rc = resp->
Send(jobResp, i+1, dlen);
702 job->delClient(resp);
703 if (!job->numClients) CleanUp(job);
XrdSysTrace XrdXrootdTrace
int XrdXrootdJobWaiting(XrdXrootdJob2Do *item, void *arg)
static int mapError(int rc)
unsigned int Inst() const
int RunDone(XrdOucStream &cmd) const
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
T * Find(const char *key, int *Tnum=0)
T * Apply(int(*func)(T *, void *), void *Arg, int Start=0)
T * Item(int Tnum, char **ikey=0)
int Insert(T *Item, const char *key=0, int Tnum=-1)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdXrootdJob2Do(XrdXrootdJob *job, int jnum, const char **args, XrdXrootdResponse *Resp, int opts)
XrdSys::RAtomic< JobStatus > Status
XrdXrootdJob(XrdScheduler *schp, XrdOucProg *pgm, const char *jname, int maxjobs=4)
int Schedule(const char *jkey, const char **args, XrdXrootdResponse *resp, int Opts=0)
friend class XrdXrootdJob2Do
int Cancel(const char *jkey=0, XrdXrootdResponse *resp=0)
void setID(unsigned long long id)
void StreamID(kXR_char *sid)