XRootD
XrdThrottleManager.cc
Go to the documentation of this file.
1 
2 #include "XrdThrottleManager.hh"
3 
4 #include "XrdOuc/XrdOucEnv.hh"
5 #include "XrdSec/XrdSecEntity.hh"
8 #include "XrdSys/XrdSysTimer.hh"
12 
13 #define XRD_TRACE m_trace->
15 
16 #include <algorithm>
17 #include <array>
18 #include <cmath>
19 #include <random>
20 #include <sstream>
21 
22 #if defined(__linux__)
23 
24 #include <sched.h>
25 unsigned XrdThrottleManager::GetTimerListHash() {
26  int cpu = sched_getcpu();
27  if (cpu < 0) {
28  return 0;
29  }
30  return cpu % m_timer_list_size;
31 }
32 
33 #else
34 
35 unsigned XrdThrottleManager::GetTimerListHash() {
36  return 0;
37 }
38 
39 #endif
40 
41 const char *
42 XrdThrottleManager::TraceID = "ThrottleManager";
43 
45  m_trace(tP),
46  m_log(lP),
47  m_interval_length_seconds(1.0),
48  m_bytes_per_second(-1),
49  m_ops_per_second(-1),
50  m_concurrency_limit(-1),
51  m_last_round_allocation(100*1024),
52  m_loadshed_host(""),
53  m_loadshed_port(0),
54  m_loadshed_frequency(0)
55 {
56 }
57 
58 void
60 {
61 
62  auto max_open = config.GetMaxOpen();
63  if (max_open != -1) SetMaxOpen(max_open);
64  auto max_conn = config.GetMaxConn();
65  if (max_conn != -1) SetMaxConns(max_conn);
66  auto max_wait = config.GetMaxWait();
67  if (max_wait != -1) SetMaxWait(max_wait);
68 
70  config.GetThrottleIOPSRate(),
71  config.GetThrottleConcurrency(),
72  static_cast<float>(config.GetThrottleRecomputeIntervalMS())/1000.0);
73 
74  m_trace->What = config.GetTraceLevels();
75 
76  auto loadshed_host = config.GetLoadshedHost();
77  auto loadshed_port = config.GetLoadshedPort();
78  auto loadshed_freq = config.GetLoadshedFreq();
79  if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
80  {
81  // Loadshed specified, so set it.
82  SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
83  }
84 }
85 
86 void
88 {
89  TRACE(DEBUG, "Initializing the throttle manager.");
90  // Initialize all our shares to zero.
91  m_primary_bytes_shares.resize(m_max_users);
92  m_secondary_bytes_shares.resize(m_max_users);
93  m_primary_ops_shares.resize(m_max_users);
94  m_secondary_ops_shares.resize(m_max_users);
95  for (auto & waiter : m_waiter_info) {
96  waiter.m_manager = this;
97  }
98 
99  // Allocate each user 100KB and 10 ops to bootstrap;
100  for (int i=0; i<m_max_users; i++)
101  {
102  m_primary_bytes_shares[i] = m_last_round_allocation;
103  m_secondary_bytes_shares[i] = 0;
104  m_primary_ops_shares[i] = 10;
105  m_secondary_ops_shares[i] = 0;
106  }
107 
108  int rc;
109  pthread_t tid;
110  if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
111  m_log->Emsg("ThrottleManager", rc, "create throttle thread");
112 
113 }
114 
115 std::tuple<std::string, uint16_t>
117  // Try various potential "names" associated with the request, from the most
118  // specific to most generic.
119  std::string user;
120 
121  if (client->eaAPI && client->eaAPI->Get("token.subject", user)) {
122  if (client->vorg) user = std::string(client->vorg) + ":" + user;
123  } else if (client->eaAPI) {
124  std::string request_name;
125  if (client->eaAPI->Get("request.name", request_name) && !request_name.empty()) user = request_name;
126  }
127  if (user.empty()) {user = client->name ? client->name : "nobody";}
128  uint16_t uid = GetUid(user.c_str());
129  return std::make_tuple(user, uid);
130 }
131 
132 /*
133  * Take as many shares as possible to fulfill the request; update
134  * request with current remaining value, or zero if satisfied.
135  */
136 inline void
137 XrdThrottleManager::GetShares(int &shares, int &request)
138 {
139  int remaining;
140  AtomicFSub(remaining, shares, request);
141  if (remaining > 0)
142  {
143  request -= (remaining < request) ? remaining : request;
144  }
145 }
146 
147 /*
148  * Iterate through all of the secondary shares, attempting
149  * to steal enough to fulfill the request.
150  */
151 void
152 XrdThrottleManager::StealShares(int uid, int &reqsize, int &reqops)
153 {
154  if (!reqsize && !reqops) return;
155  TRACE(BANDWIDTH, "Stealing shares to fill request of " << reqsize << " bytes");
156  TRACE(IOPS, "Stealing shares to fill request of " << reqops << " ops.");
157 
158  for (int i=uid+1; i % m_max_users == uid; i++)
159  {
160  if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
161  if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
162  }
163 
164  TRACE(BANDWIDTH, "After stealing shares, " << reqsize << " of request bytes remain.");
165  TRACE(IOPS, "After stealing shares, " << reqops << " of request ops remain.");
166 }
167 
168 /*
169  * Increment the number of files held open by a given entity. Returns false
170  * if the user is at the maximum; in this case, the internal counter is not
171  * incremented.
172  */
173 bool
174 XrdThrottleManager::OpenFile(const std::string &entity, std::string &error_message)
175 {
176  if (m_max_open == 0 && m_max_conns == 0) return true;
177 
178  const std::lock_guard<std::mutex> lock(m_file_mutex);
179  auto iter = m_file_counters.find(entity);
180  unsigned long cur_open_files = 0, cur_open_conns;
181  if (m_max_open) {
182  if (iter == m_file_counters.end()) {
183  m_file_counters[entity] = 1;
184  TRACE(FILES, "User " << entity << " has opened their first file");
185  cur_open_files = 1;
186  } else if (iter->second < m_max_open) {
187  iter->second++;
188  cur_open_files = iter->second;
189  } else {
190  std::stringstream ss;
191  ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
192  TRACE(FILES, ss.str());
193  error_message = ss.str();
194  return false;
195  }
196  }
197 
198  if (m_max_conns) {
199  auto pid = XrdSysThread::Num();
200  auto conn_iter = m_active_conns.find(entity);
201  auto conn_count_iter = m_conn_counters.find(entity);
202  if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
203  (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
204  {
205  // note: we are rolling back the increment in open files
206  if (m_max_open) iter->second--;
207  std::stringstream ss;
208  ss << "User " << entity << " has hit the limit of " << m_max_conns <<
209  " open connections";
210  TRACE(CONNS, ss.str());
211  error_message = ss.str();
212  return false;
213  }
214  if (conn_iter == m_active_conns.end()) {
215  std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
216  new std::unordered_map<pid_t, unsigned long>());
217  (*conn_map)[pid] = 1;
218  m_active_conns[entity] = std::move(conn_map);
219  if (conn_count_iter == m_conn_counters.end()) {
220  m_conn_counters[entity] = 1;
221  cur_open_conns = 1;
222  } else {
223  m_conn_counters[entity] ++;
224  cur_open_conns = m_conn_counters[entity];
225  }
226  } else {
227  auto pid_iter = conn_iter->second->find(pid);
228  if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
229  (*(conn_iter->second))[pid] = 1;
230  conn_count_iter->second++;
231  cur_open_conns = conn_count_iter->second;
232  } else {
233  (*(conn_iter->second))[pid] ++;
234  cur_open_conns = conn_count_iter->second;
235  }
236  }
237  TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections");
238  }
239  if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
240  return true;
241 }
242 
243 
244 /*
245  * Decrement the number of files held open by a given entity.
246  *
247  * Returns false if the value would have fallen below zero or
248  * if the entity isn't tracked.
249  */
250 bool
251 XrdThrottleManager::CloseFile(const std::string &entity)
252 {
253  if (m_max_open == 0 && m_max_conns == 0) return true;
254 
255  bool result = true;
256  const std::lock_guard<std::mutex> lock(m_file_mutex);
257  if (m_max_open) {
258  auto iter = m_file_counters.find(entity);
259  if (iter == m_file_counters.end()) {
260  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
261  result = false;
262  } else if (iter->second == 0) {
263  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
264  result = false;
265  } else {
266  iter->second--;
267  }
268  if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
269  " remain open");
270  }
271 
272  if (m_max_conns) {
273  auto pid = XrdSysThread::Num();
274  auto conn_iter = m_active_conns.find(entity);
275  auto conn_count_iter = m_conn_counters.find(entity);
276  if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
277  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
278  " tracking");
279  return false;
280  }
281  auto pid_iter = conn_iter->second->find(pid);
282  if (pid_iter == conn_iter->second->end()) {
283  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
284  " tracking");
285  return false;
286  }
287  if (pid_iter->second == 0) {
288  TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
289  " plugin thinks was idle");
290  } else {
291  pid_iter->second--;
292  }
293  if (conn_count_iter == m_conn_counters.end()) {
294  TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
295  " observed an open file");
296  } else if (pid_iter->second == 0) {
297  if (conn_count_iter->second == 0) {
298  TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
299  " throttle plugin already thought all connections were idle");
300  } else {
301  conn_count_iter->second--;
302  TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
303  << conn_count_iter->second << " active connections remain");
304  }
305  }
306  }
307 
308  return result;
309 }
310 
311 
312 /*
313  * Apply the throttle. If there are no limits set, returns immediately. Otherwise,
314  * this applies the limits as best possible, stalling the thread if necessary.
315  */
316 void
317 XrdThrottleManager::Apply(int reqsize, int reqops, int uid)
318 {
319  if (m_bytes_per_second < 0)
320  reqsize = 0;
321  if (m_ops_per_second < 0)
322  reqops = 0;
323  while (reqsize || reqops)
324  {
325  // Subtract the requested out of the shares
326  AtomicBeg(m_compute_var);
327  GetShares(m_primary_bytes_shares[uid], reqsize);
328  if (reqsize)
329  {
330  TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
331  GetShares(m_secondary_bytes_shares[uid], reqsize);
332  TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
333  }
334  else
335  {
336  TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
337  }
338  GetShares(m_primary_ops_shares[uid], reqops);
339  if (reqops)
340  {
341  GetShares(m_secondary_ops_shares[uid], reqops);
342  }
343  StealShares(uid, reqsize, reqops);
344  AtomicEnd(m_compute_var);
345 
346  if (reqsize || reqops)
347  {
348  if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
349  if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
350  m_compute_var.Wait();
351  m_loadshed_limit_hit++;
352  }
353  }
354 
355 }
356 
357 void
358 XrdThrottleManager::UserIOAccounting()
359 {
360  std::chrono::steady_clock::duration::rep total_active_time = 0;
361  for (size_t idx = 0; idx < m_timer_list.size(); idx++) {
362  auto &timerList = m_timer_list[idx];
363  std::unique_lock<std::mutex> lock(timerList.m_mutex);
364  auto timer = timerList.m_first;
365  while (timer) {
366  auto next = timer->m_next;
367  auto uid = timer->m_owner;
368  auto &waiter = m_waiter_info[uid];
369  auto recent_duration = timer->Reset();
370  waiter.m_io_time += recent_duration.count();
371 
372  total_active_time += recent_duration.count();
373  timer = next;
374  }
375  }
376  m_io_active_time += total_active_time;
377 }
378 
379 void
380 XrdThrottleManager::ComputeWaiterOrder()
381 {
382  // Update the IO time for long-running I/O operations. This prevents,
383  // for example, a 2-minute I/O operation from causing a spike in
384  // concurrency because it's otherwise only reported at the end.
385  UserIOAccounting();
386 
387  auto now = std::chrono::steady_clock::now();
388  auto elapsed = now - m_last_waiter_recompute_time;
389  m_last_waiter_recompute_time = now;
390  std::chrono::duration<double> elapsed_secs = elapsed;
391  // Alpha is the decay factor for the exponential moving average. One window is 10 seconds,
392  // so every 10 seconds we decay the prior average by 1/e (that is, the weight is 64% of the
393  // total). This means the contribution of I/O load from a minute ago is 0.2% of the total.
394 
395  // The moving average will be used to determine how close the user is to their "fair share"
396  // of the concurrency limit among the users that are waiting.
397  auto alpha = 1 - std::exp(-1 * elapsed_secs.count() / 10.0);
398 
399  std::vector<double> share;
400  share.resize(m_max_users);
401  size_t users_with_waiters = 0;
402  // For each user, compute their current concurrency and determine how many waiting users
403  // total there are.
404  for (int i = 0; i < m_max_users; i++)
405  {
406  auto &waiter = m_waiter_info[i];
407  auto io_duration_rep = waiter.m_io_time.exchange(std::chrono::steady_clock::duration(0).count());
408  std::chrono::steady_clock::duration io_duration = std::chrono::steady_clock::duration(io_duration_rep);
409  std::chrono::duration<double> io_duration_secs = io_duration;
410  auto prev_concurrency = io_duration_secs.count() / elapsed_secs.count();
411  float new_concurrency = waiter.m_concurrency;
412 
413  new_concurrency = (1 - alpha) * new_concurrency + alpha * prev_concurrency;
414  waiter.m_concurrency = new_concurrency;
415  if (new_concurrency > 0) {
416  TRACE(DEBUG, "User " << i << " has concurrency of " << new_concurrency);
417  }
418  unsigned waiting;
419  {
420  std::lock_guard<std::mutex> lock(waiter.m_mutex);
421  waiting = waiter.m_waiting;
422  }
423  if (waiting > 0)
424  {
425  share[i] = new_concurrency;
426  TRACE(DEBUG, "User " << i << " has concurrency of " << share[i] << " and is waiting for " << waiting);
427  // Handle the division-by-zero case; if we have no history of usage whatsoever, we should pretend we
428  // have at least some minimal load
429  if (share[i] == 0) {
430  share[i] = 0.1;
431  }
432  users_with_waiters++;
433  }
434  else
435  {
436  share[i] = 0;
437  }
438  }
439  auto fair_share = static_cast<double>(m_concurrency_limit) / static_cast<double>(users_with_waiters);
440  std::vector<uint16_t> waiter_order;
441  waiter_order.resize(m_max_users);
442 
443  // Calculate the share for each user. We assume the user should get a share proportional to how
444  // far above or below the fair share they are. So, a user with concurrency of 20 when the fairshare
445  // is 10 will get 0.5 shares; a user with concurrency of 5 when the fairshare is 10 will get 2.0 shares.
446  double shares_sum = 0;
447  for (int idx = 0; idx < m_max_users; idx++)
448  {
449  if (share[idx]) {
450  shares_sum += fair_share / share[idx];
451  }
452  }
453 
454  // We must quantize the overall shares into an array of 1024 elements. We do this by
455  // scaling up (or down) based on the total number of shares computed above. Note this
456  // quantization can lead to an over-provisioned user being assigned zero shares; thus,
457  // we scale based on (1024-#users) so we can give one extra share to each user.
458  auto scale_factor = (static_cast<double>(m_max_users) - static_cast<double>(users_with_waiters)) / shares_sum;
459  size_t offset = 0;
460  for (int uid = 0; uid < m_max_users; uid++) {
461  if (share[uid] > 0) {
462  auto shares = static_cast<unsigned>(scale_factor * fair_share / share[uid]) + 1;
463  TRACE(DEBUG, "User " << uid << " has " << shares << " shares");
464  for (unsigned idx = 0; idx < shares; idx++)
465  {
466  waiter_order[offset % m_max_users] = uid;
467  offset++;
468  }
469  }
470  }
471  if (offset < m_max_users) {
472  for (size_t idx = offset; idx < m_max_users; idx++) {
473  waiter_order[idx] = -1;
474  }
475  }
476  // Shuffle the order to randomize the wakeup order.
477  std::shuffle(waiter_order.begin(), waiter_order.end(), std::default_random_engine());
478 
479  // Copy the order to the inactive array. We do not shuffle in-place because RAtomics are
480  // not move constructible, which is a requirement for std::shuffle.
481  auto &waiter_order_to_modify = (m_wake_order_active == 0) ? m_wake_order_1 : m_wake_order_0;
482  std::copy(waiter_order.begin(), waiter_order.end(), waiter_order_to_modify.begin());
483 
484  // Set the array we just modified to be the active one. Since this is a relaxed write, it could take
485  // some time for other CPUs to see the change; that's OK as this is all stochastic anyway.
486  m_wake_order_active = (m_wake_order_active + 1) % 2;
487 
488  m_waiter_offset = 0;
489 
490  // If we find ourselves below the concurrency limit because we woke up too few operations in the last
491  // interval, try waking up enough operations to fill the gap. If we race with new incoming operations,
492  // the threads will just go back to sleep.
493  if (users_with_waiters) {
494  m_waiting_users = users_with_waiters;
495  auto io_active = m_io_active.load(std::memory_order_acquire);
496  for (size_t idx = io_active; idx < static_cast<size_t>(m_concurrency_limit); idx++) {
497  NotifyOne();
498  }
499  }
500 }
501 
502 void *
503 XrdThrottleManager::RecomputeBootstrap(void *instance)
504 {
505  XrdThrottleManager * manager = static_cast<XrdThrottleManager*>(instance);
506  manager->Recompute();
507  return NULL;
508 }
509 
510 void
511 XrdThrottleManager::Recompute()
512 {
513  while (1)
514  {
515  // The connection counter can accumulate a number of known-idle connections.
516  // We only need to keep long-term memory of idle ones. Take this chance to garbage
517  // collect old connection counters.
518  if (m_max_open || m_max_conns) {
519  const std::lock_guard<std::mutex> lock(m_file_mutex);
520  for (auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
521  {
522  auto & conn_count = *iter;
523  if (!conn_count.second) {
524  iter = m_active_conns.erase(iter);
525  continue;
526  }
527  for (auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
528  if (iter2->second == 0) {
529  iter2 = conn_count.second->erase(iter2);
530  } else {
531  iter2++;
532  }
533  }
534  if (!conn_count.second->size()) {
535  iter = m_active_conns.erase(iter);
536  } else {
537  iter++;
538  }
539  }
540  for (auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
541  if (!iter->second) {
542  iter = m_conn_counters.erase(iter);
543  } else {
544  iter++;
545  }
546  }
547  for (auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
548  if (!iter->second) {
549  iter = m_file_counters.erase(iter);
550  } else {
551  iter++;
552  }
553  }
554  }
555 
556  TRACE(DEBUG, "Recomputing fairshares for throttle.");
557  RecomputeInternal();
558  ComputeWaiterOrder();
559  TRACE(DEBUG, "Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds << " seconds.");
560  XrdSysTimer::Wait(static_cast<int>(1000*m_interval_length_seconds));
561  }
562 }
563 
564 /*
565  * The heart of the manager approach.
566  *
567  * This routine periodically recomputes the shares of each current user.
568  * Each user has a "primary" and a "secondary" share. At the end of the
569  * each time interval, the remaining primary share is moved to secondary.
570  * A user can utilize both shares; if both are gone, they must block until
571  * the next recompute interval.
572  *
573  * The secondary share can be "stolen" by any other user; so, if a user
574  * is idle or under-utilizing, their share can be used by someone else.
575  * However, they can never be completely starved, as no one can steal
576  * primary share.
577  *
578  * In this way, we violate the throttle for an interval, but never starve.
579  *
580  */
581 void
582 XrdThrottleManager::RecomputeInternal()
583 {
584  // Compute total shares for this interval;
585  float intervals_per_second = 1.0/m_interval_length_seconds;
586  float total_bytes_shares = m_bytes_per_second / intervals_per_second;
587  float total_ops_shares = m_ops_per_second / intervals_per_second;
588 
589  // Compute the number of active users; a user is active if they used
590  // any primary share during the last interval;
591  AtomicBeg(m_compute_var);
592  float active_users = 0;
593  long bytes_used = 0;
594  for (int i=0; i<m_max_users; i++)
595  {
596  int primary = AtomicFAZ(m_primary_bytes_shares[i]);
597  if (primary != m_last_round_allocation)
598  {
599  active_users++;
600  if (primary >= 0)
601  m_secondary_bytes_shares[i] = primary;
602  primary = AtomicFAZ(m_primary_ops_shares[i]);
603  if (primary >= 0)
604  m_secondary_ops_shares[i] = primary;
605  bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
606  }
607  }
608 
609  if (active_users == 0)
610  {
611  active_users++;
612  }
613 
614  // Note we allocate the same number of shares to *all* users, not
615  // just the active ones. If a new user becomes active in the next
616  // interval, we'll go over our bandwidth budget just a bit.
617  m_last_round_allocation = static_cast<int>(total_bytes_shares / active_users);
618  int ops_shares = static_cast<int>(total_ops_shares / active_users);
619  TRACE(BANDWIDTH, "Round byte allocation " << m_last_round_allocation << " ; last round used " << bytes_used << ".");
620  TRACE(IOPS, "Round ops allocation " << ops_shares);
621  for (int i=0; i<m_max_users; i++)
622  {
623  m_primary_bytes_shares[i] = m_last_round_allocation;
624  m_primary_ops_shares[i] = ops_shares;
625  }
626 
627  AtomicEnd(m_compute_var);
628 
629  // Reset the loadshed limit counter.
630  int limit_hit = m_loadshed_limit_hit.exchange(0);
631  TRACE(DEBUG, "Throttle limit hit " << limit_hit << " times during last interval.");
632 
633  // Update the IO counters
634  m_compute_var.Lock();
635  m_stable_io_active = m_io_active.load(std::memory_order_acquire);
636  auto io_active = m_stable_io_active;
637  m_stable_io_total = m_io_total;
638  auto io_total = m_stable_io_total;
639  auto io_wait_rep = m_io_active_time.exchange(std::chrono::steady_clock::duration(0).count());
640  m_stable_io_wait += std::chrono::steady_clock::duration(io_wait_rep);
641 
642  m_compute_var.UnLock();
643 
644  auto io_wait_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_stable_io_wait).count();
645  TRACE(IOLOAD, "Current IO counter is " << io_active << "; total IO active time is " << io_wait_ms << "ms.");
646  if (m_gstream)
647  {
648  char buf[128];
649  auto len = snprintf(buf, 128,
650  R"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%llu})",
651  static_cast<double>(io_wait_ms) / 1000.0, io_active, static_cast<long long unsigned>(io_total));
652  auto suc = (len < 128) ? m_gstream->Insert(buf, len + 1) : false;
653  if (!suc)
654  {
655  TRACE(IOLOAD, "Failed g-stream insertion of throttle_update record (len=" << len << "): " << buf);
656  }
657  }
658  m_compute_var.Broadcast();
659 }
660 
661 /*
662  * Do a simple hash across the username.
663  */
664 uint16_t
665 XrdThrottleManager::GetUid(const std::string &username)
666 {
667  std::hash<std::string> hash_fn;
668  auto hash = hash_fn(username);
669  auto uid = static_cast<uint16_t>(hash % m_max_users);
670  TRACE(DEBUG, "Mapping user " << username << " to UID " << uid);
671  return uid;
672 }
673 
674 /*
675  * Notify a single waiter thread that it can proceed.
676  */
677 void
678 XrdThrottleManager::NotifyOne()
679 {
680  auto &wake_order = (m_wake_order_active == 0) ? m_wake_order_0 : m_wake_order_1;
681 
682  for (size_t idx = 0; idx < m_max_users; ++idx)
683  {
684  auto offset = m_waiter_offset.fetch_add(1, std::memory_order_acq_rel);
685  int16_t uid = wake_order[offset % m_max_users];
686  if (uid < 0)
687  {
688  continue;
689  }
690  auto &waiter_info = m_waiter_info[uid];
691  std::unique_lock<std::mutex> lock(waiter_info.m_mutex);
692  if (waiter_info.m_waiting) {
693  waiter_info.NotifyOne(std::move(lock));
694  return;
695  }
696  }
697 }
698 
699 /*
700  * Create an IO timer object; increment the number of outstanding IOs.
701  */
703 XrdThrottleManager::StartIOTimer(uint16_t uid, bool &ok)
704 {
705  int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
706  m_io_total++;
707 
708  while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
709  {
710  // If the user has essentially no concurrency, then we let them
711  // temporarily exceed the limit. This prevents potential waits for
712  // every single read for an infrequent user.
713  if (m_waiter_info[uid].m_concurrency < 1)
714  {
715  break;
716  }
717  m_loadshed_limit_hit++;
718  m_io_active.fetch_sub(1, std::memory_order_acq_rel);
719  TRACE(DEBUG, "ThrottleManager (user=" << uid << "): IO concurrency limit hit; waiting for other IOs to finish.");
720  ok = m_waiter_info[uid].Wait();
721  if (!ok) {
722  TRACE(DEBUG, "ThrottleManager (user=" << uid << "): timed out waiting for other IOs to finish.");
723  return XrdThrottleTimer();
724  }
725  cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
726  }
727 
728  ok = true;
729  return XrdThrottleTimer(this, uid);
730 }
731 
732 /*
733  * Finish recording an IO timer.
734  */
735 void
736 XrdThrottleManager::StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid)
737 {
738  m_io_active_time += event_duration.count();
739  auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
740  m_waiter_info[uid].m_io_time += event_duration.count();
741  if (old_active == static_cast<unsigned>(m_concurrency_limit))
742  {
743  // If we are below the concurrency limit threshold and have another waiter
744  // for our user, then execute it immediately. Otherwise, we will give
745  // someone else a chance to run (as we have gotten more than our share recently).
746  unsigned waiting_users = m_waiting_users;
747  if (waiting_users == 0) waiting_users = 1;
748  if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
749  {
750  std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
751  if (m_waiter_info[uid].m_waiting > 0)
752  {
753  m_waiter_info[uid].NotifyOne(std::move(lock));
754  return;
755  }
756  }
757  NotifyOne();
758  }
759 }
760 
761 /*
762  * Check the counters to see if we have hit any throttle limits in the
763  * current time period. If so, shed the client randomly.
764  *
765  * If the client has already been load-shedded once and reconnected to this
766  * server, then do not load-shed it again.
767  */
768 bool
769 XrdThrottleManager::CheckLoadShed(const std::string &opaque)
770 {
771  if (m_loadshed_port == 0)
772  {
773  return false;
774  }
775  if (m_loadshed_limit_hit == 0)
776  {
777  return false;
778  }
779  if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
780  {
781  return false;
782  }
783  if (opaque.empty())
784  {
785  return false;
786  }
787  return true;
788 }
789 
790 void
791 XrdThrottleManager::PrepLoadShed(const char * opaque, std::string &lsOpaque)
792 {
793  if (m_loadshed_port == 0)
794  {
795  return;
796  }
797  if (opaque && opaque[0])
798  {
799  XrdOucEnv env(opaque);
800  // Do not load shed client if it has already been done once.
801  if (env.Get("throttle.shed") != 0)
802  {
803  return;
804  }
805  lsOpaque = opaque;
806  lsOpaque += "&throttle.shed=1";
807  }
808  else
809  {
810  lsOpaque = "throttle.shed=1";
811  }
812 }
813 
814 void
815 XrdThrottleManager::PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
816 {
817  host = m_loadshed_host;
818  host += "?";
819  host += opaque;
820  port = m_loadshed_port;
821 }
822 
823 bool
824 XrdThrottleManager::Waiter::Wait()
825 {
826  auto timeout = std::chrono::steady_clock::now() + m_manager->m_max_wait_time;
827  {
828  std::unique_lock<std::mutex> lock(m_mutex);
829  m_waiting++;
830  m_cv.wait_until(lock, timeout,
831  [&] { return m_manager->m_io_active.load(std::memory_order_acquire) < static_cast<unsigned>(m_manager->m_concurrency_limit) || std::chrono::steady_clock::now() >= timeout; });
832  m_waiting--;
833  }
834  if (std::chrono::steady_clock::now() > timeout) {
835  return false;
836  }
837  return true;
838 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define AtomicFSub(w, x, y)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
char * Get(const char *varname)
Definition: XrdOucEnv.hh:69
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
Definition: XrdSecEntity.hh:71
XrdSecEntityAttr * eaAPI
non-const API to attributes
Definition: XrdSecEntity.hh:92
char * name
Entity's name.
Definition: XrdSecEntity.hh:69
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
T exchange(T v, std::memory_order mo=std::memory_order_relaxed) noexcept
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
long long GetThrottleConcurrency() const
const std::string & GetLoadshedHost() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const
bool Insert(const char *data, int dlen)