XRootD
XrdBuffer.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d B u f f e r . c c */
4 /* */
5 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <ctime>
31 #include <unistd.h>
32 #include <cstdio>
33 #include <cstdlib>
34 #include <sys/types.h>
35 
36 #include "XrdOuc/XrdOucUtils.hh"
37 #include "XrdSys/XrdSysError.hh"
38 #include "XrdSys/XrdSysPlatform.hh"
39 #include "XrdSys/XrdSysTimer.hh"
40 #include "Xrd/XrdBuffer.hh"
41 #include "Xrd/XrdBuffXL.hh"
42 #include "Xrd/XrdTrace.hh"
43 
44 /******************************************************************************/
45 /* E x t e r n a l L i n k a g e s */
46 /******************************************************************************/
47 
48 void *XrdReshaper(void *pp)
49 {
50  XrdBuffManager *bmp = (XrdBuffManager *)pp;
51  bmp->Reshape();
52  return (void *)0;
53 }
54 
55 /******************************************************************************/
56 /* G l o b a l s */
57 /******************************************************************************/
58 
59 const char *XrdBuffManager::TraceID = "BuffManager";
60 
61 namespace
62 {
63 static const int minBuffSz = 1 << XRD_BUSHIFT;
64 }
65 
66 namespace XrdGlobal
67 {
69 extern XrdSysError Log;
70 }
71 
72 using namespace XrdGlobal;
73 
74 /******************************************************************************/
75 /* C o n s t r u c t o r */
76 /******************************************************************************/
77 
79  slots(XRD_BUCKETS),
80  shift(XRD_BUSHIFT),
81  pagsz(getpagesize()),
82  maxsz(1<<(XRD_BUSHIFT+XRD_BUCKETS-1)),
83  Reshaper(0, "buff reshaper")
84 {
85 
86 // Clear everything to zero
87 //
88  totbuf = 0;
89  totreq = 0;
90  totalo = 0;
91  totadj = 0;
92 #ifdef _SC_PHYS_PAGES
93  maxalo = static_cast<long long>(pagsz)/8
94  * static_cast<long long>(sysconf(_SC_PHYS_PAGES));
95 #else
96  maxalo = 0x7ffffff;
97 #endif
98  rsinprog = 0;
99  minrsw = minrst;
100  memset(static_cast<void *>(bucket), 0, sizeof(bucket));
101 }
102 
103 /******************************************************************************/
104 /* D e s t r u c t o r */
105 /******************************************************************************/
106 
108 {
109  XrdBuffer *bP;
110 
111  for (int i = 0; i < XRD_BUCKETS; i++)
112  {while((bP = bucket[i].bnext))
113  {bucket[i].bnext = bP->next;
114  delete bP;
115  }
116  bucket[i].numbuf = 0;
117  }
118 }
119 
120 /******************************************************************************/
121 /* I n i t */
122 /******************************************************************************/
123 
125 {
126  pthread_t tid;
127  int rc;
128 
129 // Start the reshaper thread
130 //
131  if ((rc = XrdSysThread::Run(&tid, XrdReshaper, static_cast<void *>(this), 0,
132  "Buffer Manager reshaper")))
133  Log.Emsg("BuffManager", rc, "create reshaper thread");
134 }
135 
136 /******************************************************************************/
137 /* O b t a i n */
138 /******************************************************************************/
139 
141 {
142  XrdBuffer *bp;
143  char *memp;
144  int mk, pk, bindex;
145 
146 // Make sure the request is within our limits
147 //
148  if (sz <= 0) return 0;
149  if (sz > maxsz) return xlBuff.Obtain(sz);
150 
151 // Calculate bucket index
152 //
153  mk = sz >> shift;
154  bindex = XrdOucUtils::Log2(mk);
155  mk = minBuffSz << bindex;
156  if (mk < sz) {bindex++; mk = mk << 1;}
157  if (bindex >= slots) return 0; // Should never happen!
158 
159 // Obtain a lock on the bucket array and try to give away an existing buffer
160 //
161  Reshaper.Lock();
162  totreq++;
163  bucket[bindex].numreq++;
164  if ((bp = bucket[bindex].bnext))
165  {bucket[bindex].bnext = bp->next; bucket[bindex].numbuf--;}
166  Reshaper.UnLock();
167 
168 // Check if we really allocated a buffer
169 //
170  if (bp) return bp;
171 
172 // Allocate a chunk of aligned memory
173 //
174  pk = (mk < pagsz ? mk : pagsz);
175  if (posix_memalign((void **)&memp, pk, mk)) return 0;
176 
177 // Wrap the memory with a buffer object
178 //
179  if (!(bp = new XrdBuffer(memp, mk, bindex))) {free(memp); return 0;}
180 
181 // Update statistics
182 //
183  Reshaper.Lock();
184  totbuf++;
185  if ((totalo += mk) > maxalo && !rsinprog)
186  {rsinprog = 1; Reshaper.Signal();}
187  Reshaper.UnLock();
188  return bp;
189 }
190 
191 /******************************************************************************/
192 /* R e c a l c */
193 /******************************************************************************/
194 
196 {
197  int mk, bindex;
198 
199 // Make sure the request is within our limits
200 //
201  if (sz <= 0) return 0;
202  if (sz > maxsz) return xlBuff.Recalc(sz);
203 
204 // Calculate bucket index
205 //
206  mk = sz >> shift;
207  bindex = XrdOucUtils::Log2(mk);
208  mk = minBuffSz << bindex;
209  if (mk < sz) {bindex++; mk = mk << 1;}
210  if (bindex >= slots) return 0; // Should never happen!
211 
212 // All done, return the actual size we would have allocated
213 //
214  return mk;
215 }
216 
217 /******************************************************************************/
218 /* R e l e a s e */
219 /******************************************************************************/
220 
222 {
223  int bindex = bp->bindex;
224 
225 // Check if we should release this via the big buffer object
226 //
227  if (bindex >= slots) {xlBuff.Release(bp); return;}
228 
229 // Obtain a lock on the bucket array and reclaim the buffer
230 //
231  Reshaper.Lock();
232  bp->next = bucket[bp->bindex].bnext;
233  bucket[bp->bindex].bnext = bp;
234  bucket[bindex].numbuf++;
235  Reshaper.UnLock();
236 }
237 
238 /******************************************************************************/
239 /* R e s h a p e */
240 /******************************************************************************/
241 
243 {
244 int i, bufprof[XRD_BUCKETS], numfreed;
245 time_t delta, lastshape = time(0);
246 long long memslot, memhave, memtarget = (long long)(.80*(float)maxalo);
247 XrdSysTimer Timer;
248 float requests, buffers;
249 XrdBuffer *bp;
250 
251 // This is an endless loop to periodically reshape the buffer pool
252 //
253 while(1)
254  {Reshaper.Lock();
255  while(Reshaper.Wait(minrsw) && totalo <= maxalo)
256  {TRACE(MEM, "Reshaper has " <<(totalo>>10) <<"K; target " <<(memtarget>>10) <<"K");}
257  if ((delta = (time(0) - lastshape)) < minrsw)
258  {Reshaper.UnLock();
259  Timer.Wait((minrsw-delta)*1000);
260  Reshaper.Lock();
261  }
262 
263  // We have the lock so compute the request profile
264  //
265  if (totreq > slots)
266  {requests = (float)totreq;
267  buffers = (float)totbuf;
268  for (i = 0; i < slots; i++)
269  {bufprof[i] = (int)(buffers*(((float)bucket[i].numreq)/requests));
270  bucket[i].numreq = 0;
271  }
272  totreq = 0; memhave = totalo;
273  } else memhave = 0;
274  Reshaper.UnLock();
275 
276  // Reshape the buffer pool to agree with the request profile
277  //
278  memslot = maxsz; numfreed = 0;
279  for (i = slots-1; i >= 0 && memhave > memtarget; i--)
280  {Reshaper.Lock();
281  while(bucket[i].numbuf > bufprof[i])
282  if ((bp = bucket[i].bnext))
283  {bucket[i].bnext = bp->next;
284  delete bp;
285  bucket[i].numbuf--; numfreed++;
286  memhave -= memslot; totalo -= memslot;
287  totbuf--;
288  } else {bucket[i].numbuf = 0; break;}
289  Reshaper.UnLock();
290  memslot = memslot>>1;
291  }
292 
293  // All done
294  //
295  totadj += numfreed;
296  TRACE(MEM, "Pool reshaped; " <<numfreed <<" freed; have " <<(memhave>>10) <<"K; target " <<(memtarget>>10) <<"K");
297  lastshape = time(0);
298  rsinprog = 0; // No need to lock, we're the only ones now setting it
299 
300  xlBuff.Trim(); // Trim big buffers
301  }
302 }
303 
304 /******************************************************************************/
305 /* S e t */
306 /******************************************************************************/
307 
308 void XrdBuffManager::Set(int maxmem, int minw)
309 {
310 
311 // Obtain a lock and set the values
312 //
313  Reshaper.Lock();
314  if (maxmem > 0) maxalo = (long long)maxmem;
315  if (minw > 0) minrsw = minw;
316  Reshaper.UnLock();
317 }
318 
319 /******************************************************************************/
320 /* S t a t s */
321 /******************************************************************************/
322 
323 int XrdBuffManager::Stats(char *buff, int blen, int do_sync)
324 {
325  static char statfmt[] = "<stats id=\"buff\"><reqs>%d</reqs>"
326  "<mem>%lld</mem><buffs>%d</buffs><adj>%d</adj>%s</stats>";
327  char xlStats[1024];
328  int nlen;
329 
330 // If only size wanted, return it
331 //
332  if (!buff) return sizeof(statfmt) + 16*4 + xlBuff.Stats(0,0);
333 
334 // Return formatted stats
335 //
336  if (do_sync) Reshaper.Lock();
337  xlBuff.Stats(xlStats, sizeof(xlStats), do_sync);
338  nlen = snprintf(buff,blen,statfmt,totreq,totalo,totbuf,totadj,xlStats);
339  if (do_sync) Reshaper.UnLock();
340  return nlen;
341 }
void * XrdReshaper(void *pp)
Definition: XrdBuffer.cc:48
#define XRD_BUSHIFT
Definition: XrdBuffer.hh:67
#define XRD_BUCKETS
Definition: XrdBuffer.hh:66
#define TRACE(act, x)
Definition: XrdTrace.hh:63
void Reshape()
Definition: XrdBuffer.cc:242
int Stats(char *buff, int blen, int do_sync=0)
Definition: XrdBuffer.cc:323
void Release(XrdBuffer *bp)
Definition: XrdBuffer.cc:221
int Recalc(int bsz)
Definition: XrdBuffer.cc:195
XrdBuffer * Obtain(int bsz)
Definition: XrdBuffer.cc:140
XrdBuffManager(int minrst=20 *60)
Definition: XrdBuffer.cc:78
void Set(int maxmem=-1, int minw=-1)
Definition: XrdBuffer.cc:308
void Release(XrdBuffer *bp)
Definition: XrdBuffXL.cc:182
void Trim()
Definition: XrdBuffXL.cc:221
int Recalc(int bsz)
Definition: XrdBuffXL.cc:155
int Stats(char *buff, int blen, int do_sync=0)
Definition: XrdBuffXL.cc:199
XrdBuffer * Obtain(int bsz)
Definition: XrdBuffXL.cc:99
static int Log2(unsigned long long n)
Definition: XrdOucUtils.cc:818
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
XrdSysError Log
Definition: XrdConfig.cc:112
XrdBuffXL xlBuff
Definition: XrdBuffer.cc:68