34 #include <sys/resource.h>
36 #include <sys/types.h>
39 #include <AvailabilityMacros.h>
48 #define XRD_TRACE XrdTrace->
55 const char *XrdScheduler::TraceID =
"Sched";
98 int minw,
int maxw,
int maxi)
99 :
XrdJob(
"underused thread monitor"),
100 XrdTraceOld(0), WorkAvail(0,
"sched work")
102 Boot(eP, tP, minw, maxw, maxi);
109 int minw,
int maxw,
int maxi)
110 :
XrdJob(
"underused thread monitor"),
111 XrdTraceOld(tP), WorkAvail(0,
"sched work")
124 :
XrdJob(
"underused thread monitor"),
125 XrdTraceOld(0), WorkAvail(0,
"sched work")
132 #if ( defined(__linux__) || defined(__GNU__) ) && defined(F_DUPFD_CLOEXEC)
133 eFD =
fcntl(STDERR_FILENO, F_DUPFD_CLOEXEC, 0);
135 eFD = dup(STDERR_FILENO);
136 fcntl(eFD, F_SETFD, FD_CLOEXEC);
151 Init(minw, maxw, maxi);
159 int minw,
int maxw,
int maxi)
165 Init(minw, maxw, maxi);
169 #if ( defined(__linux__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) ) && defined(RLIMIT_NPROC)
177 if ((pdFD =
open(
"/proc/sys/kernel/pid_max", O_RDONLY)) >= 0)
179 if ((rdsz =
read(pdFD, pmBuff,
sizeof(pmBuff))) > 0)
180 {rdsz = atoi(pmBuff);
181 if (rdsz < 16384) theMax = 16384;
183 theMax =
static_cast<rlim_t
>(rdsz-2000);
191 if (!getrlimit(RLIMIT_NPROC, &rlim))
192 {
if (rlim.rlim_max == RLIM_INFINITY || rlim.rlim_max > theMax)
193 {rlim.rlim_cur = theMax;
194 setrlimit(RLIMIT_NPROC, &rlim);
196 if (rlim.rlim_cur != rlim.rlim_max)
197 {rlim.rlim_cur = rlim.rlim_max;
198 setrlimit(RLIMIT_NPROC, &rlim);
205 if (!getrlimit(RLIMIT_NPROC, &rlim))
206 {
if (rlim.rlim_cur == RLIM_INFINITY || rlim.rlim_cur > theMax)
207 max_Workers =
static_cast<int>(theMax);
208 else max_Workers =
static_cast<int>(rlim.rlim_cur);
237 while(p && p != jp) {pp = p; p = p->
NextJob;}
244 TRACE(SCHED,
"time event " <<jp->
Comment <<
" cancelled");
258 int num_kill, num_idle;
263 {DispatchMutex.
Lock(); num_idle = idl_Workers; DispatchMutex.
UnLock();
264 num_kill = num_idle - min_Workers;
265 TRACE(SCHED, num_Workers <<
" threads; " <<num_idle <<
" idle");
267 {
if (num_kill > 1) num_kill = num_kill/2;
269 num_Layoffs = num_kill;
270 while(num_kill--) WorkAvail.
Post();
277 if (max_Workidl > 0)
Schedule((
XrdJob *)
this, max_Workidl+time(0));
288 static int retc, ReaperStarted = 0;
294 if ((pid = fork()) < 0)
295 {
XrdLog->
Emsg(
"Scheduler",errno,
"fork to handle",
id);
298 if (!pid)
return pid;
304 retc = ReaperStarted;
312 0,
"Process reaper")))
313 {
XrdLog->
Emsg(
"Scheduler", retc,
"create reaper thread");
329 #if defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_5)
330 struct timespec ts = { 1, 0 };
338 sigaddset(&Sset, SIGCHLD);
343 do {ReaperMutex.
Lock();
344 tp = firstPID; ptp = 0;
346 {
do {pid = waitpid(tp->
pid, &status, WNOHANG);}
347 while (pid < 0 && errno == EINTR);
350 xtp = tp; tp = tp->
next;
351 if (ptp) ptp->
next = tp;
354 }
else {ptp = tp; tp = tp->
next;}
357 #if defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_5)
359 }
while (nanosleep(&ts, 0) <= 0);
361 }
while(sigwait(&Sset, &signum) >= 0);
377 do {
do {DispatchMutex.
Lock(); idl_Workers++;DispatchMutex.
UnLock();
379 DispatchMutex.
Lock();waiting = --idl_Workers;DispatchMutex.
UnLock();
381 if ((jp = WorkFirst))
382 {
if (!(WorkFirst = jp->
NextJob)) WorkLast = 0;
383 if (num_JobsinQ) num_JobsinQ--;
384 else XrdLog->
Emsg(
"Scheduler",
"Job queue count underflow!");
391 TRACE(SCHED,
"terminating thread; workers=" <<num_Workers);
403 if (!waiting) hireWorker();
405 {
TRACE(SCHED,
"running " <<jp->
Comment <<
" inq=" <<num_JobsinQ);}
466 num_JobsinQ += numjobs;
471 while(numjobs--) WorkAvail.
Post();
491 {
TRACE(SCHED,
"scheduling " <<jp->
Comment <<
" in " <<atime-time(0) <<
" seconds");}
492 jp->SchedTime = atime;
498 while(p && p->SchedTime <= atime) {pp = p; p = p->
NextJob;}
504 else {TimerQueue = jp; TimerRings.
Signal();}
517 static int isSet = 0;
522 if (once && isSet) {SchedMutex.
UnLock();
return;}
527 if (maxw <= 0) maxw = max_Workers;
528 if (minw < 0) minw = min_Workers;
529 if (minw > maxw) minw = maxw;
530 if (avlw < 0) avlw = maxw/4*3;
531 else if (avlw > maxw) avlw = maxw;
537 stk_Workers = maxw - avlw;
538 if (maxi >=0) max_Workidl = maxi;
553 TRACE(SCHED,
"Set min_Workers=" <<min_Workers <<
" max_Workers=" <<max_Workers);
554 TRACE(SCHED,
"Set stk_Workers=" <<stk_Workers <<
" max_Workidl=" <<max_Workidl);
575 XrdLog->
Emsg(
"Scheduler", retc,
"create time scheduler thread");
579 if (max_Workidl > 0)
Schedule((
XrdJob *)
this, (time_t)max_Workidl+time(0));
583 if (!(numw = min_Workers/3)) numw = 2;
584 while(numw--) hireWorker(0);
588 TRACE(SCHED,
"Starting with " <<num_Workers <<
" workers" );
597 int cnt_Jobs, cnt_JobsinQ, xam_QLength, cnt_Workers, cnt_idl;
598 int cnt_TCreate, cnt_TDestroy, cnt_Limited;
599 static char statfmt[] =
"<stats id=\"sched\"><jobs>%d</jobs>"
600 "<inq>%d</inq><maxinq>%d</maxinq>"
601 "<threads>%d</threads><idle>%d</idle>"
602 "<tcr>%d</tcr><tde>%d</tde>"
603 "<tlimr>%d</tlimr></stats>";
607 if (!buff)
return sizeof(statfmt) + 16*8;
611 if (do_sync) DispatchMutex.
Lock();
612 cnt_idl = idl_Workers;
613 if (do_sync) DispatchMutex.
UnLock();
617 if (do_sync) SchedMutex.
Lock();
618 cnt_Workers = num_Workers;
620 cnt_JobsinQ = num_JobsinQ;
625 if (do_sync) SchedMutex.
UnLock();
629 return snprintf(buff, blen, statfmt, cnt_Jobs, cnt_JobsinQ, xam_QLength,
630 cnt_Workers, cnt_idl, cnt_TCreate, cnt_TDestroy,
645 do {TimerMutex.
Lock();
646 if (TimerQueue) wtime = TimerQueue->SchedTime-time(0);
650 TimerRings.
Wait(wtime);
667 void XrdScheduler::hireWorker(
int dotrace)
675 if (num_Workers >= max_Workers)
678 XrdLog->
Emsg(
"Scheduler",
"Thread limit has been reached!");
694 {
XrdLog->
Emsg(
"Scheduler", retc,
"create worker thread");
698 max_Workers = num_Workers;
699 min_Workers = (max_Workers/10 ? max_Workers/10 : 1);
700 stk_Workers = max_Workers/4*3;
702 }
else if (dotrace)
TRACE(SCHED,
"Now have " <<num_Workers <<
" workers" );
709 void XrdScheduler::Init(
int minw,
int maxw,
int maxi)
716 stk_Workers = maxw - (maxw/4*3);
725 WorkFirst = WorkLast = TimerQueue = 0;
732 void XrdScheduler::traceExit(pid_t pid,
int status)
736 if (WIFEXITED(status))
737 {retc = WEXITSTATUS(status);
738 why =
" exited with rc=";
739 }
else if (WIFSIGNALED(status))
740 {retc = WTERMSIG(status);
741 why =
" killed with signal ";
743 why =
" changed state ";
745 TRACE(SCHED,
"Process " <<pid <<why <<retc);
XrdSysError XrdLog(0, "")
int open(const char *path, int oflag,...)
int fcntl(int fd, int cmd,...)
ssize_t read(int fildes, void *buf, size_t nbyte)
void * XrdStartTSched(void *carg)
void * XrdStartWorking(void *carg)
void * XrdStartReaper(void *carg)
#define XRDSYSTHREAD_BIND
friend class XrdScheduler
XrdSchedulerPID(pid_t newpid, XrdSchedulerPID *prev)
int Stats(char *buff, int blen, int do_sync=0)
void Schedule(XrdJob *jp)
void setParms(int minw, int maxw, int avlt, int maxi, int once=0)
pid_t Fork(const char *id)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)