10 #define XRD_TRACE m_trace->
16 XrdThrottleManager::TraceID =
"ThrottleManager";
19 int XrdThrottleManager::m_max_users = 1024;
21 #if defined(__linux__) || defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
22 clockid_t XrdThrottleTimer::clock_id = CLOCK_MONOTONIC;
24 int XrdThrottleTimer::clock_id = 0;
30 m_interval_length_seconds(1.0),
31 m_bytes_per_second(-1),
33 m_concurrency_limit(-1),
34 m_last_round_allocation(100*1024),
38 m_loadshed_frequency(0),
39 m_loadshed_limit_hit(0)
41 m_stable_io_wait.tv_sec = 0;
42 m_stable_io_wait.tv_nsec = 0;
48 TRACE(
DEBUG,
"Initializing the throttle manager.");
50 m_primary_bytes_shares.resize(m_max_users);
51 m_secondary_bytes_shares.resize(m_max_users);
52 m_primary_ops_shares.resize(m_max_users);
53 m_secondary_ops_shares.resize(m_max_users);
55 for (
int i=0; i<m_max_users; i++)
57 m_primary_bytes_shares[i] = m_last_round_allocation;
58 m_secondary_bytes_shares[i] = 0;
59 m_primary_ops_shares[i] = 10;
60 m_secondary_ops_shares[i] = 0;
64 m_io_wait.tv_nsec = 0;
68 if ((rc =
XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap,
static_cast<void *
>(
this), 0,
"Buffer Manager throttle")))
69 m_log->
Emsg(
"ThrottleManager", rc,
"create throttle thread");
78 XrdThrottleManager::GetShares(
int &shares,
int &request)
84 request -= (remaining < request) ? remaining : request;
93 XrdThrottleManager::StealShares(
int uid,
int &reqsize,
int &reqops)
95 if (!reqsize && !reqops)
return;
96 TRACE(BANDWIDTH,
"Stealing shares to fill request of " << reqsize <<
" bytes");
97 TRACE(IOPS,
"Stealing shares to fill request of " << reqops <<
" ops.");
99 for (
int i=uid+1; i % m_max_users == uid; i++)
101 if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
102 if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
105 TRACE(BANDWIDTH,
"After stealing shares, " << reqsize <<
" of request bytes remain.");
106 TRACE(IOPS,
"After stealing shares, " << reqops <<
" of request ops remain.");
117 if (m_max_open == 0 && m_max_conns == 0)
return true;
119 const std::lock_guard<std::mutex> lock(m_file_mutex);
120 auto iter = m_file_counters.find(entity);
121 unsigned long cur_open_files = 0, cur_open_conns;
123 if (iter == m_file_counters.end()) {
124 m_file_counters[entity] = 1;
125 TRACE(FILES,
"User " << entity <<
" has opened their first file");
127 }
else if (iter->second < m_max_open) {
129 cur_open_files = iter->second;
131 std::stringstream ss;
132 ss <<
"User " << entity <<
" has hit the limit of " << m_max_open <<
" open files";
133 TRACE(FILES, ss.str());
134 error_message = ss.str();
141 auto conn_iter = m_active_conns.find(entity);
142 auto conn_count_iter = m_conn_counters.find(entity);
143 if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
144 (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
147 if (m_max_open) iter->second--;
148 std::stringstream ss;
149 ss <<
"User " << entity <<
" has hit the limit of " << m_max_conns <<
151 TRACE(CONNS, ss.str());
152 error_message = ss.str();
155 if (conn_iter == m_active_conns.end()) {
156 std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
157 new std::unordered_map<pid_t, unsigned long>());
158 (*conn_map)[pid] = 1;
159 m_active_conns[entity] = std::move(conn_map);
160 if (conn_count_iter == m_conn_counters.end()) {
161 m_conn_counters[entity] = 1;
164 m_conn_counters[entity] ++;
165 cur_open_conns = m_conn_counters[entity];
168 auto pid_iter = conn_iter->second->find(pid);
169 if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
170 (*(conn_iter->second))[pid] = 1;
171 conn_count_iter->second++;
172 cur_open_conns = conn_count_iter->second;
174 (*(conn_iter->second))[pid] ++;
175 cur_open_conns = conn_count_iter->second;
178 TRACE(CONNS,
"User " << entity <<
" has " << cur_open_conns <<
" open connections");
180 if (m_max_open)
TRACE(FILES,
"User " << entity <<
" has " << cur_open_files <<
" open files");
194 if (m_max_open == 0 && m_max_conns == 0)
return true;
197 const std::lock_guard<std::mutex> lock(m_file_mutex);
199 auto iter = m_file_counters.find(entity);
200 if (iter == m_file_counters.end()) {
201 TRACE(FILES,
"WARNING: User " << entity <<
" closed a file but throttle plugin never saw an open file");
203 }
else if (iter->second == 0) {
204 TRACE(FILES,
"WARNING: User " << entity <<
" closed a file but throttle plugin thinks all files were already closed");
209 if (result)
TRACE(FILES,
"User " << entity <<
" closed a file; " << iter->second <<
215 auto conn_iter = m_active_conns.find(entity);
216 auto conn_count_iter = m_conn_counters.find(entity);
217 if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
218 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file on a connection we are not"
222 auto pid_iter = conn_iter->second->find(pid);
223 if (pid_iter == conn_iter->second->end()) {
224 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file on a connection we are not"
228 if (pid_iter->second == 0) {
229 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file on connection the throttle"
230 " plugin thinks was idle");
234 if (conn_count_iter == m_conn_counters.end()) {
235 TRACE(CONNS,
"WARNING: User " << entity <<
" closed a file but the throttle plugin never"
236 " observed an open file");
237 }
else if (pid_iter->second == 0) {
238 if (conn_count_iter->second == 0) {
239 TRACE(CONNS,
"WARNING: User " << entity <<
" had a connection go idle but the "
240 " throttle plugin already thought all connections were idle");
242 conn_count_iter->second--;
243 TRACE(CONNS,
"User " << entity <<
" had connection on thread " << pid <<
" go idle; "
244 << conn_count_iter->second <<
" active connections remain");
260 if (m_bytes_per_second < 0)
262 if (m_ops_per_second < 0)
264 while (reqsize || reqops)
268 GetShares(m_primary_bytes_shares[uid], reqsize);
271 TRACE(BANDWIDTH,
"Using secondary shares; request has " << reqsize <<
" bytes left.");
272 GetShares(m_secondary_bytes_shares[uid], reqsize);
273 TRACE(BANDWIDTH,
"Finished with secondary shares; request has " << reqsize <<
" bytes left.");
277 TRACE(BANDWIDTH,
"Filled byte shares out of primary; " << m_primary_bytes_shares[uid] <<
" left.");
279 GetShares(m_primary_ops_shares[uid], reqops);
282 GetShares(m_secondary_ops_shares[uid], reqops);
284 StealShares(uid, reqsize, reqops);
287 if (reqsize || reqops)
289 if (reqsize)
TRACE(BANDWIDTH,
"Sleeping to wait for throttle fairshare.");
290 if (reqops)
TRACE(IOPS,
"Sleeping to wait for throttle fairshare.");
291 m_compute_var.
Wait();
301 XrdThrottleManager::RecomputeBootstrap(
void *instance)
304 manager->Recompute();
309 XrdThrottleManager::Recompute()
316 if (m_max_open || m_max_conns) {
317 const std::lock_guard<std::mutex> lock(m_file_mutex);
318 for (
auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
320 auto & conn_count = *iter;
321 if (!conn_count.second) {
322 iter = m_active_conns.erase(iter);
325 for (
auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
326 if (iter2->second == 0) {
327 iter2 = conn_count.second->erase(iter2);
332 if (!conn_count.second->size()) {
333 iter = m_active_conns.erase(iter);
338 for (
auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
340 iter = m_conn_counters.erase(iter);
345 for (
auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
347 iter = m_file_counters.erase(iter);
354 TRACE(
DEBUG,
"Recomputing fairshares for throttle.");
356 TRACE(
DEBUG,
"Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds <<
" seconds.");
379 XrdThrottleManager::RecomputeInternal()
382 float intervals_per_second = 1.0/m_interval_length_seconds;
383 float total_bytes_shares = m_bytes_per_second / intervals_per_second;
384 float total_ops_shares = m_ops_per_second / intervals_per_second;
389 float active_users = 0;
391 for (
int i=0; i<m_max_users; i++)
393 int primary =
AtomicFAZ(m_primary_bytes_shares[i]);
394 if (primary != m_last_round_allocation)
398 m_secondary_bytes_shares[i] = primary;
399 primary =
AtomicFAZ(m_primary_ops_shares[i]);
401 m_secondary_ops_shares[i] = primary;
402 bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
406 if (active_users == 0)
414 m_last_round_allocation =
static_cast<int>(total_bytes_shares / active_users);
415 int ops_shares =
static_cast<int>(total_ops_shares / active_users);
416 TRACE(BANDWIDTH,
"Round byte allocation " << m_last_round_allocation <<
" ; last round used " << bytes_used <<
".");
417 TRACE(IOPS,
"Round ops allocation " << ops_shares);
418 for (
int i=0; i<m_max_users; i++)
420 m_primary_bytes_shares[i] = m_last_round_allocation;
421 m_primary_ops_shares[i] = ops_shares;
425 int limit_hit =
AtomicFAZ(m_loadshed_limit_hit);
426 TRACE(
DEBUG,
"Throttle limit hit " << limit_hit <<
" times during last interval.");
431 m_compute_var.
Lock();
432 m_stable_io_active =
AtomicGet(m_io_active);
433 auto io_active = m_stable_io_active;
434 m_stable_io_total =
static_cast<unsigned>(
AtomicGet(m_io_total));
435 auto io_total = m_stable_io_total;
436 time_t secs;
AtomicFZAP(secs, m_io_wait.tv_sec);
437 long nsecs;
AtomicFZAP(nsecs, m_io_wait.tv_nsec);
438 m_stable_io_wait.tv_sec +=
static_cast<long>(secs * intervals_per_second);
439 m_stable_io_wait.tv_nsec +=
static_cast<long>(nsecs * intervals_per_second);
440 while (m_stable_io_wait.tv_nsec > 1000000000)
442 m_stable_io_wait.tv_nsec -= 1000000000;
443 m_stable_io_wait.tv_sec ++;
445 struct timespec io_wait_ts;
446 io_wait_ts.tv_sec = m_stable_io_wait.tv_sec;
447 io_wait_ts.tv_nsec = m_stable_io_wait.tv_nsec;
450 uint64_t io_wait_ms = io_wait_ts.tv_sec*1000+io_wait_ts.tv_nsec/1000000;
451 TRACE(IOLOAD,
"Current IO counter is " << io_active <<
"; total IO wait time is " << io_wait_ms <<
"ms.");
455 auto len = snprintf(buf, 128,
456 R
"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%d})",
457 static_cast<double>(io_wait_ms) / 1000.0, io_active, io_total);
458 auto suc = (len < 128) ? m_gstream->
Insert(buf, len + 1) :
false;
461 TRACE(IOLOAD,
"Failed g-stream insertion of throttle_update record (len=" << len <<
"): " << buf);
473 const char *cur = username;
475 while (cur && *cur && *cur !=
'@' && *cur !=
'.')
492 int cur_counter =
AtomicInc(m_io_active);
495 while (m_concurrency_limit >= 0 && cur_counter > m_concurrency_limit)
501 m_compute_var.
Wait();
517 AtomicAdd(m_io_wait.tv_sec, timer.tv_sec);
519 AtomicAdd(m_io_wait.tv_nsec, timer.tv_nsec);
533 if (m_loadshed_port == 0)
537 if (
AtomicGet(m_loadshed_limit_hit) == 0)
541 if (
static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
555 if (m_loadshed_port == 0)
559 if (opaque && opaque[0])
563 if (env.
Get(
"throttle.shed") != 0)
568 lsOpaque +=
"&throttle.shed=1";
572 lsOpaque =
"throttle.shed=1";
579 host = m_loadshed_host;
582 port = m_loadshed_port;
#define AtomicFSub(w, x, y)
char * Get(const char *varname)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
void Apply(int reqsize, int reqops, int uid)
void StopIOTimer(struct timespec)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
XrdThrottleTimer StartIOTimer()
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
static int GetUid(const char *username)
bool Insert(const char *data, int dlen)