34 #include <sys/types.h>
59 const char *XrdBuffManager::TraceID =
"BuffManager";
83 Reshaper(0,
"buff reshaper")
93 maxalo =
static_cast<long long>(pagsz)/8
94 *
static_cast<long long>(sysconf(_SC_PHYS_PAGES));
100 memset(
static_cast<void *
>(bucket), 0,
sizeof(bucket));
112 {
while((bP = bucket[i].bnext))
113 {bucket[i].bnext = bP->next;
116 bucket[i].numbuf = 0;
132 "Buffer Manager reshaper")))
133 Log.
Emsg(
"BuffManager", rc,
"create reshaper thread");
148 if (sz <= 0)
return 0;
155 mk = minBuffSz << bindex;
156 if (mk < sz) {bindex++; mk = mk << 1;}
157 if (bindex >= slots)
return 0;
163 bucket[bindex].numreq++;
164 if ((bp = bucket[bindex].bnext))
165 {bucket[bindex].bnext = bp->next; bucket[bindex].numbuf--;}
174 pk = (mk < pagsz ? mk : pagsz);
175 if (posix_memalign((
void **)&memp, pk, mk))
return 0;
179 if (!(bp =
new XrdBuffer(memp, mk, bindex))) {free(memp);
return 0;}
185 if ((totalo += mk) > maxalo && !rsinprog)
186 {rsinprog = 1; Reshaper.
Signal();}
201 if (sz <= 0)
return 0;
208 mk = minBuffSz << bindex;
209 if (mk < sz) {bindex++; mk = mk << 1;}
210 if (bindex >= slots)
return 0;
223 int bindex = bp->bindex;
232 bp->next = bucket[bp->bindex].bnext;
233 bucket[bp->bindex].bnext = bp;
234 bucket[bindex].numbuf++;
245 time_t delta, lastshape = time(0);
246 long long memslot, memhave, memtarget = (
long long)(.80*(
float)maxalo);
248 float requests, buffers;
255 while(Reshaper.
Wait(minrsw) && totalo <= maxalo)
256 {
TRACE(MEM,
"Reshaper has " <<(totalo>>10) <<
"K; target " <<(memtarget>>10) <<
"K");}
257 if ((delta = (time(0) - lastshape)) < minrsw)
259 Timer.
Wait((minrsw-delta)*1000);
266 {requests = (float)totreq;
267 buffers = (float)totbuf;
268 for (i = 0; i < slots; i++)
269 {bufprof[i] = (int)(buffers*(((
float)bucket[i].numreq)/requests));
270 bucket[i].numreq = 0;
272 totreq = 0; memhave = totalo;
278 memslot = maxsz; numfreed = 0;
279 for (i = slots-1; i >= 0 && memhave > memtarget; i--)
281 while(bucket[i].numbuf > bufprof[i])
282 if ((bp = bucket[i].bnext))
283 {bucket[i].bnext = bp->next;
285 bucket[i].numbuf--; numfreed++;
286 memhave -= memslot; totalo -= memslot;
288 }
else {bucket[i].numbuf = 0;
break;}
290 memslot = memslot>>1;
296 TRACE(MEM,
"Pool reshaped; " <<numfreed <<
" freed; have " <<(memhave>>10) <<
"K; target " <<(memtarget>>10) <<
"K");
314 if (maxmem > 0) maxalo = (
long long)maxmem;
315 if (minw > 0) minrsw = minw;
325 static char statfmt[] =
"<stats id=\"buff\"><reqs>%d</reqs>"
326 "<mem>%lld</mem><buffs>%d</buffs><adj>%d</adj>%s</stats>";
332 if (!buff)
return sizeof(statfmt) + 16*4 +
xlBuff.
Stats(0,0);
336 if (do_sync) Reshaper.
Lock();
338 nlen = snprintf(buff,blen,statfmt,totreq,totalo,totbuf,totadj,xlStats);
339 if (do_sync) Reshaper.
UnLock();
void * XrdReshaper(void *pp)
int Stats(char *buff, int blen, int do_sync=0)
void Release(XrdBuffer *bp)
XrdBuffer * Obtain(int bsz)
XrdBuffManager(int minrst=20 *60)
void Set(int maxmem=-1, int minw=-1)
void Release(XrdBuffer *bp)
int Stats(char *buff, int blen, int do_sync=0)
XrdBuffer * Obtain(int bsz)
static int Log2(unsigned long long n)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)