XRootD
XrdThrottleManager.hh
Go to the documentation of this file.
1 
2 /*
3  * XrdThrottleManager
4  *
5  * This class provides an implementation of a throttle manager.
6  * The throttled manager purposely pause if the bandwidth, IOPS
7  * rate, or number of outstanding IO requests is sustained above
8  * a certain level.
9  *
10  * The XrdThrottleManager is user-aware and provides fairshare.
11  *
12  * This works by having a separate thread periodically refilling
13  * each user's shares.
14  *
15  * Note that we do not actually keep close track of users, but rather
16  * put them into a hash. This way, we can pretend there's a constant
17  * number of users and use a lock-free algorithm.
18  */
19 
20 #ifndef __XrdThrottleManager_hh_
21 #define __XrdThrottleManager_hh_
22 
23 #ifdef __GNUC__
24 #define likely(x) __builtin_expect(!!(x), 1)
25 #define unlikely(x) __builtin_expect(!!(x), 0)
26 #else
27 #define likely(x) x
28 #define unlikely(x) x
29 #endif
30 
31 #include <string>
32 #include <vector>
33 #include <ctime>
34 #include <mutex>
35 #include <unordered_map>
36 #include <memory>
37 
38 #include "XrdSys/XrdSysPthread.hh"
39 
40 class XrdSysError;
41 class XrdOucTrace;
42 class XrdThrottleTimer;
43 class XrdXrootdGStream;
44 
46 {
47 
48 friend class XrdThrottleTimer;
49 
50 public:
51 
52 void Init();
53 
54 bool OpenFile(const std::string &entity, std::string &open_error_message);
55 bool CloseFile(const std::string &entity);
56 
57 void Apply(int reqsize, int reqops, int uid);
58 
59 bool IsThrottling() {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}
60 
61 void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
62  {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
63  m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}
64 
65 void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
66  {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}
67 
68 void SetMaxOpen(unsigned long max_open) {m_max_open = max_open;}
69 
70 void SetMaxConns(unsigned long max_conns) {m_max_conns = max_conns;}
71 
72 void SetMonitor(XrdXrootdGStream *gstream) {m_gstream = gstream;}
73 
74 //int Stats(char *buff, int blen, int do_sync=0) {return m_pool.Stats(buff, blen, do_sync);}
75 
76 static
77 int GetUid(const char *username);
78 
80 
81 void PrepLoadShed(const char *opaque, std::string &lsOpaque);
82 
83 bool CheckLoadShed(const std::string &opaque);
84 
85 void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port);
86 
88 
89  ~XrdThrottleManager() {} // The buffmanager is never deleted
90 
91 protected:
92 
93 void StopIOTimer(struct timespec);
94 
95 private:
96 
97 void Recompute();
98 
99 void RecomputeInternal();
100 
101 static
102 void * RecomputeBootstrap(void *pp);
103 
104 int WaitForShares();
105 
106 void GetShares(int &shares, int &request);
107 
108 void StealShares(int uid, int &reqsize, int &reqops);
109 
110 XrdOucTrace * m_trace;
111 XrdSysError * m_log;
112 
113 XrdSysCondVar m_compute_var;
114 
115 // Controls for the various rates.
116 float m_interval_length_seconds;
117 float m_bytes_per_second;
118 float m_ops_per_second;
119 int m_concurrency_limit;
120 
121 // Maintain the shares
122 static const
123 int m_max_users;
124 std::vector<int> m_primary_bytes_shares;
125 std::vector<int> m_secondary_bytes_shares;
126 std::vector<int> m_primary_ops_shares;
127 std::vector<int> m_secondary_ops_shares;
128 int m_last_round_allocation;
129 
130 // Active IO counter
131 int m_io_active;
132 struct timespec m_io_wait;
133 unsigned m_io_total{0};
134 // Stable IO counters - must hold m_compute_var lock when reading/writing;
135 int m_stable_io_active;
136 int m_stable_io_total{0}; // It would take ~3 years to overflow a 32-bit unsigned integer at 100Hz of IO operations.
137 struct timespec m_stable_io_wait;
138 
139 // Load shed details
140 std::string m_loadshed_host;
141 unsigned m_loadshed_port;
142 unsigned m_loadshed_frequency;
143 int m_loadshed_limit_hit;
144 
145 // Maximum number of open files
146 unsigned long m_max_open{0};
147 unsigned long m_max_conns{0};
148 std::unordered_map<std::string, unsigned long> m_file_counters;
149 std::unordered_map<std::string, unsigned long> m_conn_counters;
150 std::unordered_map<std::string, std::unique_ptr<std::unordered_map<pid_t, unsigned long>>> m_active_conns;
151 std::mutex m_file_mutex;
152 
153 // Monitoring handle, if configured
154 XrdXrootdGStream* m_gstream{nullptr};
155 
156 static const char *TraceID;
157 
158 };
159 
161 {
162 
163 friend class XrdThrottleManager;
164 
165 public:
166 
167 void StopTimer()
168 {
169  struct timespec end_timer = {0, 0};
170 #if defined(__linux__) || defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
171  int retval = clock_gettime(clock_id, &end_timer);
172 #else
173  int retval = -1;
174 #endif
175  if (likely(retval == 0))
176  {
177  end_timer.tv_sec -= m_timer.tv_sec;
178  end_timer.tv_nsec -= m_timer.tv_nsec;
179  if (end_timer.tv_nsec < 0)
180  {
181  end_timer.tv_sec--;
182  end_timer.tv_nsec += 1000000000;
183  }
184  }
185  if (m_timer.tv_nsec != -1)
186  {
187  m_manager.StopIOTimer(end_timer);
188  }
189  m_timer.tv_sec = 0;
190  m_timer.tv_nsec = -1;
191 }
192 
194 {
195  if (!((m_timer.tv_sec == 0) && (m_timer.tv_nsec == -1)))
196  {
197  StopTimer();
198  }
199 }
200 
201 protected:
202 
204  m_manager(manager)
205 {
206 #if defined(__linux__) || defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
207  int retval = clock_gettime(clock_id, &m_timer);
208 #else
209  int retval = -1;
210 #endif
211  if (unlikely(retval == -1))
212  {
213  m_timer.tv_sec = 0;
214  m_timer.tv_nsec = 0;
215  }
216 }
217 
218 private:
219 XrdThrottleManager &m_manager;
220 struct timespec m_timer;
221 
222 static clockid_t clock_id;
223 };
224 
225 #endif
#define likely(x)
#define unlikely(x)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void Apply(int reqsize, int reqops, int uid)
void StopIOTimer(struct timespec)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMonitor(XrdXrootdGStream *gstream)
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
XrdThrottleTimer StartIOTimer()
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
static int GetUid(const char *username)
XrdThrottleTimer(XrdThrottleManager &manager)