XRootD
XrdThrottleManager.cc
Go to the documentation of this file.
1 
2 #include "XrdThrottleManager.hh"
3 
4 #include "XrdOuc/XrdOucEnv.hh"
6 #include "XrdSys/XrdSysTimer.hh"
9 
10 #define XRD_TRACE m_trace->
12 
13 #include <sstream>
14 
15 const char *
16 XrdThrottleManager::TraceID = "ThrottleManager";
17 
18 const
19 int XrdThrottleManager::m_max_users = 1024;
20 
21 #if defined(__linux__) || defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
22 clockid_t XrdThrottleTimer::clock_id = CLOCK_MONOTONIC;
23 #else
24 int XrdThrottleTimer::clock_id = 0;
25 #endif
26 
28  m_trace(tP),
29  m_log(lP),
30  m_interval_length_seconds(1.0),
31  m_bytes_per_second(-1),
32  m_ops_per_second(-1),
33  m_concurrency_limit(-1),
34  m_last_round_allocation(100*1024),
35  m_io_active(0),
36  m_loadshed_host(""),
37  m_loadshed_port(0),
38  m_loadshed_frequency(0),
39  m_loadshed_limit_hit(0)
40 {
41  m_stable_io_wait.tv_sec = 0;
42  m_stable_io_wait.tv_nsec = 0;
43 }
44 
45 void
47 {
48  TRACE(DEBUG, "Initializing the throttle manager.");
49  // Initialize all our shares to zero.
50  m_primary_bytes_shares.resize(m_max_users);
51  m_secondary_bytes_shares.resize(m_max_users);
52  m_primary_ops_shares.resize(m_max_users);
53  m_secondary_ops_shares.resize(m_max_users);
54  // Allocate each user 100KB and 10 ops to bootstrap;
55  for (int i=0; i<m_max_users; i++)
56  {
57  m_primary_bytes_shares[i] = m_last_round_allocation;
58  m_secondary_bytes_shares[i] = 0;
59  m_primary_ops_shares[i] = 10;
60  m_secondary_ops_shares[i] = 0;
61  }
62 
63  m_io_wait.tv_sec = 0;
64  m_io_wait.tv_nsec = 0;
65 
66  int rc;
67  pthread_t tid;
68  if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
69  m_log->Emsg("ThrottleManager", rc, "create throttle thread");
70 
71 }
72 
73 /*
74  * Take as many shares as possible to fulfill the request; update
75  * request with current remaining value, or zero if satisfied.
76  */
77 inline void
78 XrdThrottleManager::GetShares(int &shares, int &request)
79 {
80  int remaining;
81  AtomicFSub(remaining, shares, request);
82  if (remaining > 0)
83  {
84  request -= (remaining < request) ? remaining : request;
85  }
86 }
87 
88 /*
89  * Iterate through all of the secondary shares, attempting
90  * to steal enough to fulfill the request.
91  */
92 void
93 XrdThrottleManager::StealShares(int uid, int &reqsize, int &reqops)
94 {
95  if (!reqsize && !reqops) return;
96  TRACE(BANDWIDTH, "Stealing shares to fill request of " << reqsize << " bytes");
97  TRACE(IOPS, "Stealing shares to fill request of " << reqops << " ops.");
98 
99  for (int i=uid+1; i % m_max_users == uid; i++)
100  {
101  if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
102  if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
103  }
104 
105  TRACE(BANDWIDTH, "After stealing shares, " << reqsize << " of request bytes remain.");
106  TRACE(IOPS, "After stealing shares, " << reqops << " of request ops remain.");
107 }
108 
109 /*
110  * Increment the number of files held open by a given entity. Returns false
111  * if the user is at the maximum; in this case, the internal counter is not
112  * incremented.
113  */
114 bool
115 XrdThrottleManager::OpenFile(const std::string &entity, std::string &error_message)
116 {
117  if (m_max_open == 0 && m_max_conns == 0) return true;
118 
119  const std::lock_guard<std::mutex> lock(m_file_mutex);
120  auto iter = m_file_counters.find(entity);
121  unsigned long cur_open_files = 0, cur_open_conns;
122  if (m_max_open) {
123  if (iter == m_file_counters.end()) {
124  m_file_counters[entity] = 1;
125  TRACE(FILES, "User " << entity << " has opened their first file");
126  cur_open_files = 1;
127  } else if (iter->second < m_max_open) {
128  iter->second++;
129  cur_open_files = iter->second;
130  } else {
131  std::stringstream ss;
132  ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
133  TRACE(FILES, ss.str());
134  error_message = ss.str();
135  return false;
136  }
137  }
138 
139  if (m_max_conns) {
140  auto pid = XrdSysThread::Num();
141  auto conn_iter = m_active_conns.find(entity);
142  auto conn_count_iter = m_conn_counters.find(entity);
143  if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
144  (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
145  {
146  // note: we are rolling back the increment in open files
147  if (m_max_open) iter->second--;
148  std::stringstream ss;
149  ss << "User " << entity << " has hit the limit of " << m_max_conns <<
150  " open connections";
151  TRACE(CONNS, ss.str());
152  error_message = ss.str();
153  return false;
154  }
155  if (conn_iter == m_active_conns.end()) {
156  std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
157  new std::unordered_map<pid_t, unsigned long>());
158  (*conn_map)[pid] = 1;
159  m_active_conns[entity] = std::move(conn_map);
160  if (conn_count_iter == m_conn_counters.end()) {
161  m_conn_counters[entity] = 1;
162  cur_open_conns = 1;
163  } else {
164  m_conn_counters[entity] ++;
165  cur_open_conns = m_conn_counters[entity];
166  }
167  } else {
168  auto pid_iter = conn_iter->second->find(pid);
169  if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
170  (*(conn_iter->second))[pid] = 1;
171  conn_count_iter->second++;
172  cur_open_conns = conn_count_iter->second;
173  } else {
174  (*(conn_iter->second))[pid] ++;
175  cur_open_conns = conn_count_iter->second;
176  }
177  }
178  TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections");
179  }
180  if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
181  return true;
182 }
183 
184 
185 /*
186  * Decrement the number of files held open by a given entity.
187  *
188  * Returns false if the value would have fallen below zero or
189  * if the entity isn't tracked.
190  */
191 bool
192 XrdThrottleManager::CloseFile(const std::string &entity)
193 {
194  if (m_max_open == 0 && m_max_conns == 0) return true;
195 
196  bool result = true;
197  const std::lock_guard<std::mutex> lock(m_file_mutex);
198  if (m_max_open) {
199  auto iter = m_file_counters.find(entity);
200  if (iter == m_file_counters.end()) {
201  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
202  result = false;
203  } else if (iter->second == 0) {
204  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
205  result = false;
206  } else {
207  iter->second--;
208  }
209  if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
210  " remain open");
211  }
212 
213  if (m_max_conns) {
214  auto pid = XrdSysThread::Num();
215  auto conn_iter = m_active_conns.find(entity);
216  auto conn_count_iter = m_conn_counters.find(entity);
217  if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
218  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
219  " tracking");
220  return false;
221  }
222  auto pid_iter = conn_iter->second->find(pid);
223  if (pid_iter == conn_iter->second->end()) {
224  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
225  " tracking");
226  return false;
227  }
228  if (pid_iter->second == 0) {
229  TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
230  " plugin thinks was idle");
231  } else {
232  pid_iter->second--;
233  }
234  if (conn_count_iter == m_conn_counters.end()) {
235  TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
236  " observed an open file");
237  } else if (pid_iter->second == 0) {
238  if (conn_count_iter->second == 0) {
239  TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
240  " throttle plugin already thought all connections were idle");
241  } else {
242  conn_count_iter->second--;
243  TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
244  << conn_count_iter->second << " active connections remain");
245  }
246  }
247  }
248 
249  return result;
250 }
251 
252 
253 /*
254  * Apply the throttle. If there are no limits set, returns immediately. Otherwise,
255  * this applies the limits as best possible, stalling the thread if necessary.
256  */
257 void
258 XrdThrottleManager::Apply(int reqsize, int reqops, int uid)
259 {
260  if (m_bytes_per_second < 0)
261  reqsize = 0;
262  if (m_ops_per_second < 0)
263  reqops = 0;
264  while (reqsize || reqops)
265  {
266  // Subtract the requested out of the shares
267  AtomicBeg(m_compute_var);
268  GetShares(m_primary_bytes_shares[uid], reqsize);
269  if (reqsize)
270  {
271  TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
272  GetShares(m_secondary_bytes_shares[uid], reqsize);
273  TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
274  }
275  else
276  {
277  TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
278  }
279  GetShares(m_primary_ops_shares[uid], reqops);
280  if (reqops)
281  {
282  GetShares(m_secondary_ops_shares[uid], reqops);
283  }
284  StealShares(uid, reqsize, reqops);
285  AtomicEnd(m_compute_var);
286 
287  if (reqsize || reqops)
288  {
289  if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
290  if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
291  m_compute_var.Wait();
292  AtomicBeg(m_compute_var);
293  AtomicInc(m_loadshed_limit_hit);
294  AtomicEnd(m_compute_var);
295  }
296  }
297 
298 }
299 
300 void *
301 XrdThrottleManager::RecomputeBootstrap(void *instance)
302 {
303  XrdThrottleManager * manager = static_cast<XrdThrottleManager*>(instance);
304  manager->Recompute();
305  return NULL;
306 }
307 
308 void
309 XrdThrottleManager::Recompute()
310 {
311  while (1)
312  {
313  // The connection counter can accumulate a number of known-idle connections.
314  // We only need to keep long-term memory of idle ones. Take this chance to garbage
315  // collect old connection counters.
316  if (m_max_open || m_max_conns) {
317  const std::lock_guard<std::mutex> lock(m_file_mutex);
318  for (auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
319  {
320  auto & conn_count = *iter;
321  if (!conn_count.second) {
322  iter = m_active_conns.erase(iter);
323  continue;
324  }
325  for (auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
326  if (iter2->second == 0) {
327  iter2 = conn_count.second->erase(iter2);
328  } else {
329  iter2++;
330  }
331  }
332  if (!conn_count.second->size()) {
333  iter = m_active_conns.erase(iter);
334  } else {
335  iter++;
336  }
337  }
338  for (auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
339  if (!iter->second) {
340  iter = m_conn_counters.erase(iter);
341  } else {
342  iter++;
343  }
344  }
345  for (auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
346  if (!iter->second) {
347  iter = m_file_counters.erase(iter);
348  } else {
349  iter++;
350  }
351  }
352  }
353 
354  TRACE(DEBUG, "Recomputing fairshares for throttle.");
355  RecomputeInternal();
356  TRACE(DEBUG, "Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds << " seconds.");
357  XrdSysTimer::Wait(static_cast<int>(1000*m_interval_length_seconds));
358  }
359 }
360 
361 /*
362  * The heart of the manager approach.
363  *
364  * This routine periodically recomputes the shares of each current user.
365  * Each user has a "primary" and a "secondary" share. At the end of the
366  * each time interval, the remaining primary share is moved to secondary.
367  * A user can utilize both shares; if both are gone, they must block until
368  * the next recompute interval.
369  *
370  * The secondary share can be "stolen" by any other user; so, if a user
371  * is idle or under-utilizing, their share can be used by someone else.
372  * However, they can never be completely starved, as no one can steal
373  * primary share.
374  *
375  * In this way, we violate the throttle for an interval, but never starve.
376  *
377  */
378 void
379 XrdThrottleManager::RecomputeInternal()
380 {
381  // Compute total shares for this interval;
382  float intervals_per_second = 1.0/m_interval_length_seconds;
383  float total_bytes_shares = m_bytes_per_second / intervals_per_second;
384  float total_ops_shares = m_ops_per_second / intervals_per_second;
385 
386  // Compute the number of active users; a user is active if they used
387  // any primary share during the last interval;
388  AtomicBeg(m_compute_var);
389  float active_users = 0;
390  long bytes_used = 0;
391  for (int i=0; i<m_max_users; i++)
392  {
393  int primary = AtomicFAZ(m_primary_bytes_shares[i]);
394  if (primary != m_last_round_allocation)
395  {
396  active_users++;
397  if (primary >= 0)
398  m_secondary_bytes_shares[i] = primary;
399  primary = AtomicFAZ(m_primary_ops_shares[i]);
400  if (primary >= 0)
401  m_secondary_ops_shares[i] = primary;
402  bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
403  }
404  }
405 
406  if (active_users == 0)
407  {
408  active_users++;
409  }
410 
411  // Note we allocate the same number of shares to *all* users, not
412  // just the active ones. If a new user becomes active in the next
413  // interval, we'll go over our bandwidth budget just a bit.
414  m_last_round_allocation = static_cast<int>(total_bytes_shares / active_users);
415  int ops_shares = static_cast<int>(total_ops_shares / active_users);
416  TRACE(BANDWIDTH, "Round byte allocation " << m_last_round_allocation << " ; last round used " << bytes_used << ".");
417  TRACE(IOPS, "Round ops allocation " << ops_shares);
418  for (int i=0; i<m_max_users; i++)
419  {
420  m_primary_bytes_shares[i] = m_last_round_allocation;
421  m_primary_ops_shares[i] = ops_shares;
422  }
423 
424  // Reset the loadshed limit counter.
425  int limit_hit = AtomicFAZ(m_loadshed_limit_hit);
426  TRACE(DEBUG, "Throttle limit hit " << limit_hit << " times during last interval.");
427 
428  AtomicEnd(m_compute_var);
429 
430  // Update the IO counters
431  m_compute_var.Lock();
432  m_stable_io_active = AtomicGet(m_io_active);
433  auto io_active = m_stable_io_active;
434  m_stable_io_total = static_cast<unsigned>(AtomicGet(m_io_total));
435  auto io_total = m_stable_io_total;
436  time_t secs; AtomicFZAP(secs, m_io_wait.tv_sec);
437  long nsecs; AtomicFZAP(nsecs, m_io_wait.tv_nsec);
438  m_stable_io_wait.tv_sec += static_cast<long>(secs * intervals_per_second);
439  m_stable_io_wait.tv_nsec += static_cast<long>(nsecs * intervals_per_second);
440  while (m_stable_io_wait.tv_nsec > 1000000000)
441  {
442  m_stable_io_wait.tv_nsec -= 1000000000;
443  m_stable_io_wait.tv_sec ++;
444  }
445  struct timespec io_wait_ts;
446  io_wait_ts.tv_sec = m_stable_io_wait.tv_sec;
447  io_wait_ts.tv_nsec = m_stable_io_wait.tv_nsec;
448 
449  m_compute_var.UnLock();
450  uint64_t io_wait_ms = io_wait_ts.tv_sec*1000+io_wait_ts.tv_nsec/1000000;
451  TRACE(IOLOAD, "Current IO counter is " << io_active << "; total IO wait time is " << io_wait_ms << "ms.");
452  if (m_gstream)
453  {
454  char buf[128];
455  auto len = snprintf(buf, 128,
456  R"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%d})",
457  static_cast<double>(io_wait_ms) / 1000.0, io_active, io_total);
458  auto suc = (len < 128) ? m_gstream->Insert(buf, len + 1) : false;
459  if (!suc)
460  {
461  TRACE(IOLOAD, "Failed g-stream insertion of throttle_update record (len=" << len << "): " << buf);
462  }
463  }
464  m_compute_var.Broadcast();
465 }
466 
467 /*
468  * Do a simple hash across the username.
469  */
470 int
471 XrdThrottleManager::GetUid(const char *username)
472 {
473  const char *cur = username;
474  int hval = 0;
475  while (cur && *cur && *cur != '@' && *cur != '.')
476  {
477  hval += *cur;
478  hval %= m_max_users;
479  cur++;
480  }
481  //std::cerr << "Calculated UID " << hval << " for " << username << std::endl;
482  return hval;
483 }
484 
485 /*
486  * Create an IO timer object; increment the number of outstanding IOs.
487  */
490 {
491  AtomicBeg(m_compute_var);
492  int cur_counter = AtomicInc(m_io_active);
493  AtomicInc(m_io_total);
494  AtomicEnd(m_compute_var);
495  while (m_concurrency_limit >= 0 && cur_counter > m_concurrency_limit)
496  {
497  AtomicBeg(m_compute_var);
498  AtomicInc(m_loadshed_limit_hit);
499  AtomicDec(m_io_active);
500  AtomicEnd(m_compute_var);
501  m_compute_var.Wait();
502  AtomicBeg(m_compute_var);
503  cur_counter = AtomicInc(m_io_active);
504  AtomicEnd(m_compute_var);
505  }
506  return XrdThrottleTimer(*this);
507 }
508 
509 /*
510  * Finish recording an IO timer.
511  */
512 void
513 XrdThrottleManager::StopIOTimer(struct timespec timer)
514 {
515  AtomicBeg(m_compute_var);
516  AtomicDec(m_io_active);
517  AtomicAdd(m_io_wait.tv_sec, timer.tv_sec);
518  // Note this may result in tv_nsec > 1e9
519  AtomicAdd(m_io_wait.tv_nsec, timer.tv_nsec);
520  AtomicEnd(m_compute_var);
521 }
522 
523 /*
524  * Check the counters to see if we have hit any throttle limits in the
525  * current time period. If so, shed the client randomly.
526  *
527  * If the client has already been load-shedded once and reconnected to this
528  * server, then do not load-shed it again.
529  */
530 bool
531 XrdThrottleManager::CheckLoadShed(const std::string &opaque)
532 {
533  if (m_loadshed_port == 0)
534  {
535  return false;
536  }
537  if (AtomicGet(m_loadshed_limit_hit) == 0)
538  {
539  return false;
540  }
541  if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
542  {
543  return false;
544  }
545  if (opaque.empty())
546  {
547  return false;
548  }
549  return true;
550 }
551 
552 void
553 XrdThrottleManager::PrepLoadShed(const char * opaque, std::string &lsOpaque)
554 {
555  if (m_loadshed_port == 0)
556  {
557  return;
558  }
559  if (opaque && opaque[0])
560  {
561  XrdOucEnv env(opaque);
562  // Do not load shed client if it has already been done once.
563  if (env.Get("throttle.shed") != 0)
564  {
565  return;
566  }
567  lsOpaque = opaque;
568  lsOpaque += "&throttle.shed=1";
569  }
570  else
571  {
572  lsOpaque = "throttle.shed=1";
573  }
574 }
575 
576 void
577 XrdThrottleManager::PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
578 {
579  host = m_loadshed_host;
580  host += "?";
581  host += opaque;
582  port = m_loadshed_port;
583 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define AtomicFSub(w, x, y)
#define AtomicInc(x)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicFZAP(w, x)
#define AtomicDec(x)
#define AtomicGet(x)
#define AtomicEnd(Mtx)
#define AtomicAdd(x, y)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
char * Get(const char *varname)
Definition: XrdOucEnv.hh:69
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
void Apply(int reqsize, int reqops, int uid)
void StopIOTimer(struct timespec)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
XrdThrottleTimer StartIOTimer()
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
static int GetUid(const char *username)
bool Insert(const char *data, int dlen)