XRootD
XrdThrottleManager Class Reference

#include <XrdThrottleManager.hh>

+ Collaboration diagram for XrdThrottleManager:

Public Member Functions

 XrdThrottleManager (XrdSysError *lP, XrdOucTrace *tP)
 
 ~XrdThrottleManager ()
 
void Apply (int reqsize, int reqops, int uid)
 
bool CheckLoadShed (const std::string &opaque)
 
bool CloseFile (const std::string &entity)
 
void Init ()
 
bool IsThrottling ()
 
bool OpenFile (const std::string &entity, std::string &open_error_message)
 
void PerformLoadShed (const std::string &opaque, std::string &host, unsigned &port)
 
void PrepLoadShed (const char *opaque, std::string &lsOpaque)
 
void SetLoadShed (std::string &hostname, unsigned port, unsigned frequency)
 
void SetMaxConns (unsigned long max_conns)
 
void SetMaxOpen (unsigned long max_open)
 
void SetMonitor (XrdXrootdGStream *gstream)
 
void SetThrottles (float reqbyterate, float reqoprate, int concurrency, float interval_length)
 
XrdThrottleTimer StartIOTimer ()
 

Static Public Member Functions

static int GetUid (const char *username)
 

Protected Member Functions

void StopIOTimer (struct timespec)
 

Friends

class XrdThrottleTimer
 

Detailed Description

Definition at line 45 of file XrdThrottleManager.hh.

Constructor & Destructor Documentation

◆ XrdThrottleManager()

XrdThrottleManager::XrdThrottleManager ( XrdSysError lP,
XrdOucTrace tP 
)

Definition at line 27 of file XrdThrottleManager.cc.

27  :
28  m_trace(tP),
29  m_log(lP),
30  m_interval_length_seconds(1.0),
31  m_bytes_per_second(-1),
32  m_ops_per_second(-1),
33  m_concurrency_limit(-1),
34  m_last_round_allocation(100*1024),
35  m_io_active(0),
36  m_loadshed_host(""),
37  m_loadshed_port(0),
38  m_loadshed_frequency(0),
39  m_loadshed_limit_hit(0)
40 {
41  m_stable_io_wait.tv_sec = 0;
42  m_stable_io_wait.tv_nsec = 0;
43 }

◆ ~XrdThrottleManager()

XrdThrottleManager::~XrdThrottleManager ( )
inline

Definition at line 89 of file XrdThrottleManager.hh.

89 {} // The buffmanager is never deleted

Member Function Documentation

◆ Apply()

void XrdThrottleManager::Apply ( int  reqsize,
int  reqops,
int  uid 
)

Definition at line 258 of file XrdThrottleManager.cc.

259 {
260  if (m_bytes_per_second < 0)
261  reqsize = 0;
262  if (m_ops_per_second < 0)
263  reqops = 0;
264  while (reqsize || reqops)
265  {
266  // Subtract the requested out of the shares
267  AtomicBeg(m_compute_var);
268  GetShares(m_primary_bytes_shares[uid], reqsize);
269  if (reqsize)
270  {
271  TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
272  GetShares(m_secondary_bytes_shares[uid], reqsize);
273  TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
274  }
275  else
276  {
277  TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
278  }
279  GetShares(m_primary_ops_shares[uid], reqops);
280  if (reqops)
281  {
282  GetShares(m_secondary_ops_shares[uid], reqops);
283  }
284  StealShares(uid, reqsize, reqops);
285  AtomicEnd(m_compute_var);
286 
287  if (reqsize || reqops)
288  {
289  if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
290  if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
291  m_compute_var.Wait();
292  AtomicBeg(m_compute_var);
293  AtomicInc(m_loadshed_limit_hit);
294  AtomicEnd(m_compute_var);
295  }
296  }
297 
298 }
#define AtomicInc(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
#define TRACE(act, x)
Definition: XrdTrace.hh:63

References AtomicBeg, AtomicEnd, AtomicInc, TRACE, and XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ CheckLoadShed()

bool XrdThrottleManager::CheckLoadShed ( const std::string &  opaque)

Definition at line 531 of file XrdThrottleManager.cc.

532 {
533  if (m_loadshed_port == 0)
534  {
535  return false;
536  }
537  if (AtomicGet(m_loadshed_limit_hit) == 0)
538  {
539  return false;
540  }
541  if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
542  {
543  return false;
544  }
545  if (opaque.empty())
546  {
547  return false;
548  }
549  return true;
550 }
#define AtomicGet(x)

References AtomicGet.

◆ CloseFile()

bool XrdThrottleManager::CloseFile ( const std::string &  entity)

Definition at line 192 of file XrdThrottleManager.cc.

193 {
194  if (m_max_open == 0 && m_max_conns == 0) return true;
195 
196  bool result = true;
197  const std::lock_guard<std::mutex> lock(m_file_mutex);
198  if (m_max_open) {
199  auto iter = m_file_counters.find(entity);
200  if (iter == m_file_counters.end()) {
201  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
202  result = false;
203  } else if (iter->second == 0) {
204  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
205  result = false;
206  } else {
207  iter->second--;
208  }
209  if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
210  " remain open");
211  }
212 
213  if (m_max_conns) {
214  auto pid = XrdSysThread::Num();
215  auto conn_iter = m_active_conns.find(entity);
216  auto conn_count_iter = m_conn_counters.find(entity);
217  if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
218  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
219  " tracking");
220  return false;
221  }
222  auto pid_iter = conn_iter->second->find(pid);
223  if (pid_iter == conn_iter->second->end()) {
224  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
225  " tracking");
226  return false;
227  }
228  if (pid_iter->second == 0) {
229  TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
230  " plugin thinks was idle");
231  } else {
232  pid_iter->second--;
233  }
234  if (conn_count_iter == m_conn_counters.end()) {
235  TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
236  " observed an open file");
237  } else if (pid_iter->second == 0) {
238  if (conn_count_iter->second == 0) {
239  TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
240  " throttle plugin already thought all connections were idle");
241  } else {
242  conn_count_iter->second--;
243  TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
244  << conn_count_iter->second << " active connections remain");
245  }
246  }
247  }
248 
249  return result;
250 }
static unsigned long Num(void)

References XrdSysThread::Num(), and TRACE.

Referenced by XrdThrottle::File::close(), and XrdThrottle::File::open().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetUid()

int XrdThrottleManager::GetUid ( const char *  username)
static

Definition at line 471 of file XrdThrottleManager.cc.

472 {
473  const char *cur = username;
474  int hval = 0;
475  while (cur && *cur && *cur != '@' && *cur != '.')
476  {
477  hval += *cur;
478  hval %= m_max_users;
479  cur++;
480  }
481  //std::cerr << "Calculated UID " << hval << " for " << username << std::endl;
482  return hval;
483 }

Referenced by XrdThrottle::File::open().

+ Here is the caller graph for this function:

◆ Init()

void XrdThrottleManager::Init ( )

Definition at line 46 of file XrdThrottleManager.cc.

47 {
48  TRACE(DEBUG, "Initializing the throttle manager.");
49  // Initialize all our shares to zero.
50  m_primary_bytes_shares.resize(m_max_users);
51  m_secondary_bytes_shares.resize(m_max_users);
52  m_primary_ops_shares.resize(m_max_users);
53  m_secondary_ops_shares.resize(m_max_users);
54  // Allocate each user 100KB and 10 ops to bootstrap;
55  for (int i=0; i<m_max_users; i++)
56  {
57  m_primary_bytes_shares[i] = m_last_round_allocation;
58  m_secondary_bytes_shares[i] = 0;
59  m_primary_ops_shares[i] = 10;
60  m_secondary_ops_shares[i] = 0;
61  }
62 
63  m_io_wait.tv_sec = 0;
64  m_io_wait.tv_nsec = 0;
65 
66  int rc;
67  pthread_t tid;
68  if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
69  m_log->Emsg("ThrottleManager", rc, "create throttle thread");
70 
71 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)

References DEBUG, XrdSysError::Emsg(), XrdSysThread::Run(), and TRACE.

+ Here is the call graph for this function:

◆ IsThrottling()

bool XrdThrottleManager::IsThrottling ( )
inline

Definition at line 59 of file XrdThrottleManager.hh.

59 {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}

◆ OpenFile()

bool XrdThrottleManager::OpenFile ( const std::string &  entity,
std::string &  open_error_message 
)

Definition at line 115 of file XrdThrottleManager.cc.

116 {
117  if (m_max_open == 0 && m_max_conns == 0) return true;
118 
119  const std::lock_guard<std::mutex> lock(m_file_mutex);
120  auto iter = m_file_counters.find(entity);
121  unsigned long cur_open_files = 0, cur_open_conns;
122  if (m_max_open) {
123  if (iter == m_file_counters.end()) {
124  m_file_counters[entity] = 1;
125  TRACE(FILES, "User " << entity << " has opened their first file");
126  cur_open_files = 1;
127  } else if (iter->second < m_max_open) {
128  iter->second++;
129  cur_open_files = iter->second;
130  } else {
131  std::stringstream ss;
132  ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
133  TRACE(FILES, ss.str());
134  error_message = ss.str();
135  return false;
136  }
137  }
138 
139  if (m_max_conns) {
140  auto pid = XrdSysThread::Num();
141  auto conn_iter = m_active_conns.find(entity);
142  auto conn_count_iter = m_conn_counters.find(entity);
143  if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
144  (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
145  {
146  // note: we are rolling back the increment in open files
147  if (m_max_open) iter->second--;
148  std::stringstream ss;
149  ss << "User " << entity << " has hit the limit of " << m_max_conns <<
150  " open connections";
151  TRACE(CONNS, ss.str());
152  error_message = ss.str();
153  return false;
154  }
155  if (conn_iter == m_active_conns.end()) {
156  std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
157  new std::unordered_map<pid_t, unsigned long>());
158  (*conn_map)[pid] = 1;
159  m_active_conns[entity] = std::move(conn_map);
160  if (conn_count_iter == m_conn_counters.end()) {
161  m_conn_counters[entity] = 1;
162  cur_open_conns = 1;
163  } else {
164  m_conn_counters[entity] ++;
165  cur_open_conns = m_conn_counters[entity];
166  }
167  } else {
168  auto pid_iter = conn_iter->second->find(pid);
169  if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
170  (*(conn_iter->second))[pid] = 1;
171  conn_count_iter->second++;
172  cur_open_conns = conn_count_iter->second;
173  } else {
174  (*(conn_iter->second))[pid] ++;
175  cur_open_conns = conn_count_iter->second;
176  }
177  }
178  TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections");
179  }
180  if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
181  return true;
182 }

References XrdSysThread::Num(), and TRACE.

Referenced by XrdThrottle::File::open().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ PerformLoadShed()

void XrdThrottleManager::PerformLoadShed ( const std::string &  opaque,
std::string &  host,
unsigned &  port 
)

Definition at line 577 of file XrdThrottleManager.cc.

578 {
579  host = m_loadshed_host;
580  host += "?";
581  host += opaque;
582  port = m_loadshed_port;
583 }

◆ PrepLoadShed()

void XrdThrottleManager::PrepLoadShed ( const char *  opaque,
std::string &  lsOpaque 
)

Definition at line 553 of file XrdThrottleManager.cc.

554 {
555  if (m_loadshed_port == 0)
556  {
557  return;
558  }
559  if (opaque && opaque[0])
560  {
561  XrdOucEnv env(opaque);
562  // Do not load shed client if it has already been done once.
563  if (env.Get("throttle.shed") != 0)
564  {
565  return;
566  }
567  lsOpaque = opaque;
568  lsOpaque += "&throttle.shed=1";
569  }
570  else
571  {
572  lsOpaque = "throttle.shed=1";
573  }
574 }

References XrdOucEnv::Get().

Referenced by XrdThrottle::File::open().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetLoadShed()

void XrdThrottleManager::SetLoadShed ( std::string &  hostname,
unsigned  port,
unsigned  frequency 
)
inline

Definition at line 65 of file XrdThrottleManager.hh.

66  {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}

◆ SetMaxConns()

void XrdThrottleManager::SetMaxConns ( unsigned long  max_conns)
inline

Definition at line 70 of file XrdThrottleManager.hh.

70 {m_max_conns = max_conns;}

◆ SetMaxOpen()

void XrdThrottleManager::SetMaxOpen ( unsigned long  max_open)
inline

Definition at line 68 of file XrdThrottleManager.hh.

68 {m_max_open = max_open;}

◆ SetMonitor()

void XrdThrottleManager::SetMonitor ( XrdXrootdGStream gstream)
inline

Definition at line 72 of file XrdThrottleManager.hh.

72 {m_gstream = gstream;}

Referenced by XrdThrottle::FileSystem::Configure().

+ Here is the caller graph for this function:

◆ SetThrottles()

void XrdThrottleManager::SetThrottles ( float  reqbyterate,
float  reqoprate,
int  concurrency,
float  interval_length 
)
inline

Definition at line 61 of file XrdThrottleManager.hh.

62  {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
63  m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}

◆ StartIOTimer()

XrdThrottleTimer XrdThrottleManager::StartIOTimer ( )

Definition at line 489 of file XrdThrottleManager.cc.

490 {
491  AtomicBeg(m_compute_var);
492  int cur_counter = AtomicInc(m_io_active);
493  AtomicInc(m_io_total);
494  AtomicEnd(m_compute_var);
495  while (m_concurrency_limit >= 0 && cur_counter > m_concurrency_limit)
496  {
497  AtomicBeg(m_compute_var);
498  AtomicInc(m_loadshed_limit_hit);
499  AtomicDec(m_io_active);
500  AtomicEnd(m_compute_var);
501  m_compute_var.Wait();
502  AtomicBeg(m_compute_var);
503  cur_counter = AtomicInc(m_io_active);
504  AtomicEnd(m_compute_var);
505  }
506  return XrdThrottleTimer(*this);
507 }
#define AtomicDec(x)
friend class XrdThrottleTimer

References AtomicBeg, AtomicDec, AtomicEnd, AtomicInc, XrdSysCondVar::Wait(), and XrdThrottleTimer.

+ Here is the call graph for this function:

◆ StopIOTimer()

void XrdThrottleManager::StopIOTimer ( struct timespec  timer)
protected

Definition at line 513 of file XrdThrottleManager.cc.

514 {
515  AtomicBeg(m_compute_var);
516  AtomicDec(m_io_active);
517  AtomicAdd(m_io_wait.tv_sec, timer.tv_sec);
518  // Note this may result in tv_nsec > 1e9
519  AtomicAdd(m_io_wait.tv_nsec, timer.tv_nsec);
520  AtomicEnd(m_compute_var);
521 }
#define AtomicAdd(x, y)

References AtomicAdd, AtomicBeg, AtomicDec, and AtomicEnd.

Referenced by XrdThrottleTimer::StopTimer().

+ Here is the caller graph for this function:

Friends And Related Function Documentation

◆ XrdThrottleTimer

friend class XrdThrottleTimer
friend

Definition at line 48 of file XrdThrottleManager.hh.

Referenced by StartIOTimer().


The documentation for this class was generated from the following files: