XRootD
XrdThrottleFileSystemConfig.cc
Go to the documentation of this file.
1 
2 #include <fcntl.h>
3 
4 #include "XrdSys/XrdSysPlugin.hh"
5 #include "XrdOuc/XrdOuca2x.hh"
6 #include "XrdOuc/XrdOucEnv.hh"
7 #include "XrdOuc/XrdOucStream.hh"
8 
11 
12 using namespace XrdThrottle;
13 
14 #define OFS_NAME "libXrdOfs.so"
15 
16 /*
17  * Note nothing in this file is thread-safe.
18  */
19 
20 static XrdSfsFileSystem *
21 LoadFS(const std::string &fslib, XrdSysError &eDest, const std::string &config_file){
22  // Load the library
23  XrdSysPlugin ofsLib(&eDest, fslib.c_str(), "fslib", NULL);
24  XrdSfsFileSystem *fs;
25  if (fslib == OFS_NAME)
26  {
28  XrdSysLogger *lp,
29  const char *configfn,
30  XrdOucEnv *EnvInfo);
31 
32  if (!(fs = XrdSfsGetDefaultFileSystem(0, eDest.logger(), config_file.c_str(), 0)))
33  {
34  eDest.Emsg("Config", "Unable to load OFS filesystem.");
35  }
36  }
37  else
38  {
39  XrdSfsFileSystem *(*ep)(XrdSfsFileSystem *, XrdSysLogger *, const char *);
40  if (!(ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *,XrdSysLogger *,const char *))
41  ofsLib.getPlugin("XrdSfsGetFileSystem")))
42  return NULL;
43  if (!(fs = (*ep)(0, eDest.logger(), config_file.c_str())))
44  {
45  eDest.Emsg("Config", "Unable to create file system object via", fslib.c_str());
46  return NULL;
47  }
48  }
49  ofsLib.Persist();
50 
51  return fs;
52 }
53 
54 namespace XrdThrottle {
57  XrdSysLogger *lp,
58  const char *configfn,
59  XrdOucEnv *envP)
60 {
61  FileSystem* fs = NULL;
62  FileSystem::Initialize(fs, native_fs, lp, configfn, envP);
63  return fs;
64 }
65 }
66 
67 // Export the symbol necessary for this to be dynamically loaded.
68 extern "C" {
71  XrdSysLogger *lp,
72  const char *configfn)
73 {
74  return XrdSfsGetFileSystem_Internal(native_fs, lp, configfn, nullptr);
75 }
76 
79  XrdSysLogger *lp,
80  const char *configfn,
81  XrdOucEnv *envP)
82 {
83  return XrdSfsGetFileSystem_Internal(native_fs, lp, configfn, envP);
84 }
85 }
86 
89 
90 FileSystem* FileSystem::m_instance = 0;
91 
92 FileSystem::FileSystem()
93  : m_eroute(0), m_trace(&m_eroute), m_sfs_ptr(0), m_initialized(false), m_throttle(&m_eroute, &m_trace)
94 {
95  myVersion = &XrdVERSIONINFOVAR(XrdSfsGetFileSystem);
96 }
97 
98 FileSystem::~FileSystem() {}
99 
100 void
101 FileSystem::Initialize(FileSystem *&fs,
102  XrdSfsFileSystem *native_fs,
103  XrdSysLogger *lp,
104  const char *configfn,
105  XrdOucEnv *envP)
106 {
107  fs = NULL;
108  if (m_instance == NULL && !(m_instance = new FileSystem()))
109  {
110  return;
111  }
112  fs = m_instance;
113  if (!fs->m_initialized)
114  {
115  fs->m_config_file = configfn;
116  fs->m_eroute.logger(lp);
117  fs->m_eroute.Say("Initializing a Throttled file system.");
118  if (fs->Configure(fs->m_eroute, native_fs, envP))
119  {
120  fs->m_eroute.Say("Initialization of throttled file system failed.");
121  fs = NULL;
122  return;
123  }
124  fs->m_throttle.Init();
125  fs->m_initialized = true;
126  }
127 }
128 
129 #define TS_Xeq(key, func) NoGo = (strcmp(key, var) == 0) ? func(Config) : 0
130 int
132 {
133  XrdOucEnv myEnv;
134  XrdOucStream Config(&m_eroute, getenv("XRDINSTANCE"), &myEnv, "(Throttle Config)> ");
135  int cfgFD;
136  if (m_config_file.length() == 0)
137  {
138  log.Say("No filename specified.");
139  return 1;
140  }
141  if ((cfgFD = open(m_config_file.c_str(), O_RDONLY)) < 0)
142  {
143  log.Emsg("Config", errno, "Unable to open configuration file", m_config_file.c_str());
144  return 1;
145  }
146  Config.Attach(cfgFD);
147  static const char *cvec[] = { "*** throttle (ofs) plugin config:", 0 };
148  Config.Capture(cvec);
149 
150  std::string fslib = OFS_NAME;
151 
152  char *var, *val;
153  int NoGo = 0;
154  while( (var = Config.GetMyFirstWord()) )
155  {
156  if (strcmp("throttle.fslib", var) == 0)
157  {
158  val = Config.GetWord();
159  if (!val || !val[0]) {log.Emsg("Config", "fslib not specified."); continue;}
160  fslib = val;
161  }
162  TS_Xeq("throttle.max_open_files", xmaxopen);
163  TS_Xeq("throttle.max_active_connections", xmaxconn);
164  TS_Xeq("throttle.throttle", xthrottle);
165  TS_Xeq("throttle.loadshed", xloadshed);
166  TS_Xeq("throttle.trace", xtrace);
167  if (NoGo)
168  {
169  log.Emsg("Config", "Throttle configuration failed.");
170  }
171  }
172 
173  // Load the filesystem object.
174  m_sfs_ptr = native_fs ? native_fs : LoadFS(fslib, m_eroute, m_config_file);
175  if (!m_sfs_ptr) return 1;
176 
177  // Overwrite the environment variable saying that throttling is the fslib.
178  XrdOucEnv::Export("XRDOFSLIB", fslib.c_str());
179 
180  if (envP)
181  {
182  auto gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("Throttle.gStream*"));
183  log.Say("Config", "Throttle g-stream has", gstream ? "" : " NOT", " been configured via xrootd.mongstream directive");
184  m_throttle.SetMonitor(gstream);
185  }
186 
187 
188  return 0;
189 }
190 
191 /******************************************************************************/
192 /* x m a x o p e n */
193 /******************************************************************************/
194 
195 /* Function: xmaxopen
196 
197  Purpose: Parse the directive: throttle.max_open_files <limit>
198 
199  <limit> maximum number of open file handles for a unique entity.
200 
201  Output: 0 upon success or !0 upon failure.
202 */
203 int
204 FileSystem::xmaxopen(XrdOucStream &Config)
205 {
206  auto val = Config.GetWord();
207  if (!val || val[0] == '\0')
208  {m_eroute.Emsg("Config", "Max open files not specified! Example usage: throttle.max_open_files 16000");}
209  long long max_open = -1;
210  if (XrdOuca2x::a2sz(m_eroute, "max open files value", val, &max_open, 1)) return 1;
211 
212  m_throttle.SetMaxOpen(max_open);
213  return 0;
214 }
215 
216 
217 /******************************************************************************/
218 /* x m a x c o n n */
219 /******************************************************************************/
220 
221 /* Function: xmaxconn
222 
223  Purpose: Parse the directive: throttle.max_active_connections <limit>
224 
225  <limit> maximum number of connections with at least one open file for a given entity
226 
227  Output: 0 upon success or !0 upon failure.
228 */
229 int
230 FileSystem::xmaxconn(XrdOucStream &Config)
231 {
232  auto val = Config.GetWord();
233  if (!val || val[0] == '\0')
234  {m_eroute.Emsg("Config", "Max active cconnections not specified! Example usage: throttle.max_active_connections 4000");}
235  long long max_conn = -1;
236  if (XrdOuca2x::a2sz(m_eroute, "max active connections value", val, &max_conn, 1)) return 1;
237 
238  m_throttle.SetMaxConns(max_conn);
239  return 0;
240 }
241 
242 
243 /******************************************************************************/
244 /* x t h r o t t l e */
245 /******************************************************************************/
246 
247 /* Function: xthrottle
248 
249  Purpose: To parse the directive: throttle [data <drate>] [iops <irate>] [concurrency <climit>] [interval <rint>]
250 
251  <drate> maximum bytes per second through the server.
252  <irate> maximum IOPS per second through the server.
253  <climit> maximum number of concurrent IO connections.
254  <rint> minimum interval in milliseconds between throttle re-computing.
255 
256  Output: 0 upon success or !0 upon failure.
257 */
258 int
259 FileSystem::xthrottle(XrdOucStream &Config)
260 {
261  long long drate = -1, irate = -1, rint = 1000, climit = -1;
262  char *val;
263 
264  while ((val = Config.GetWord()))
265  {
266  if (strcmp("data", val) == 0)
267  {
268  if (!(val = Config.GetWord()))
269  {m_eroute.Emsg("Config", "data throttle limit not specified."); return 1;}
270  if (XrdOuca2x::a2sz(m_eroute,"data throttle value",val,&drate,1)) return 1;
271  }
272  else if (strcmp("iops", val) == 0)
273  {
274  if (!(val = Config.GetWord()))
275  {m_eroute.Emsg("Config", "IOPS throttle limit not specified."); return 1;}
276  if (XrdOuca2x::a2sz(m_eroute,"IOPS throttle value",val,&irate,1)) return 1;
277  }
278  else if (strcmp("rint", val) == 0)
279  {
280  if (!(val = Config.GetWord()))
281  {m_eroute.Emsg("Config", "recompute interval not specified."); return 1;}
282  if (XrdOuca2x::a2sp(m_eroute,"recompute interval value",val,&rint,10)) return 1;
283  }
284  else if (strcmp("concurrency", val) == 0)
285  {
286  if (!(val = Config.GetWord()))
287  {m_eroute.Emsg("Config", "Concurrency limit not specified."); return 1;}
288  if (XrdOuca2x::a2sz(m_eroute,"Concurrency limit value",val,&climit,1)) return 1;
289  }
290  else
291  {
292  m_eroute.Emsg("Config", "Warning - unknown throttle option specified", val, ".");
293  }
294  }
295 
296  m_throttle.SetThrottles(drate, irate, climit, static_cast<float>(rint)/1000.0);
297  return 0;
298 }
299 
300 /******************************************************************************/
301 /* x l o a d s h e d */
302 /******************************************************************************/
303 
304 /* Function: xloadshed
305 
306  Purpose: To parse the directive: loadshed host <hostname> [port <port>] [frequency <freq>]
307 
308  <hostname> hostname of server to shed load to. Required
309  <port> port of server to shed load to. Defaults to 1094
310  <freq> A value from 1 to 100 specifying how often to shed load
311  (1 = 1% chance; 100 = 100% chance; defaults to 10).
312 
313  Output: 0 upon success or !0 upon failure.
314 */
315 int FileSystem::xloadshed(XrdOucStream &Config)
316 {
317  long long port = 0, freq = 0;
318  char *val;
319  std::string hostname;
320 
321  while ((val = Config.GetWord()))
322  {
323  if (strcmp("host", val) == 0)
324  {
325  if (!(val = Config.GetWord()))
326  {m_eroute.Emsg("Config", "loadshed hostname not specified."); return 1;}
327  hostname = val;
328  }
329  else if (strcmp("port", val) == 0)
330  {
331  if (!(val = Config.GetWord()))
332  {m_eroute.Emsg("Config", "Port number not specified."); return 1;}
333  if (XrdOuca2x::a2sz(m_eroute,"Port number",val,&port,1, 65536)) return 1;
334  }
335  else if (strcmp("frequency", val) == 0)
336  {
337  if (!(val = Config.GetWord()))
338  {m_eroute.Emsg("Config", "Loadshed frequency not specified."); return 1;}
339  if (XrdOuca2x::a2sz(m_eroute,"Loadshed frequency",val,&freq,1,100)) return 1;
340  }
341  else
342  {
343  m_eroute.Emsg("Config", "Warning - unknown loadshed option specified", val, ".");
344  }
345  }
346 
347  if (hostname.empty())
348  {
349  m_eroute.Emsg("Config", "must specify hostname for loadshed parameter.");
350  return 1;
351  }
352 
353  m_throttle.SetLoadShed(hostname, port, freq);
354  return 0;
355 }
356 
357 /******************************************************************************/
358 /* x t r a c e */
359 /******************************************************************************/
360 
361 /* Function: xtrace
362 
363  Purpose: To parse the directive: trace <events>
364 
365  <events> the blank separated list of events to trace. Trace
366  directives are cummalative.
367 
368  Output: 0 upon success or 1 upon failure.
369 */
370 
371 int FileSystem::xtrace(XrdOucStream &Config)
372 {
373  char *val;
374  static const struct traceopts {const char *opname; int opval;} tropts[] =
375  {
376  {"all", TRACE_ALL},
377  {"off", TRACE_NONE},
378  {"none", TRACE_NONE},
379  {"debug", TRACE_DEBUG},
380  {"iops", TRACE_IOPS},
381  {"bandwidth", TRACE_BANDWIDTH},
382  {"ioload", TRACE_IOLOAD},
383  {"files", TRACE_FILES},
384  {"connections",TRACE_CONNS},
385  };
386  int i, neg, trval = 0, numopts = sizeof(tropts)/sizeof(struct traceopts);
387 
388  if (!(val = Config.GetWord()))
389  {
390  m_eroute.Emsg("Config", "trace option not specified");
391  return 1;
392  }
393  while (val)
394  {
395  if (!strcmp(val, "off"))
396  {
397  trval = 0;
398  }
399  else
400  {
401  if ((neg = (val[0] == '-' && val[1])))
402  {
403  val++;
404  }
405  for (i = 0; i < numopts; i++)
406  {
407  if (!strcmp(val, tropts[i].opname))
408  {
409  if (neg)
410  {
411  if (tropts[i].opval) trval &= ~tropts[i].opval;
412  else trval = TRACE_ALL;
413  }
414  else if (tropts[i].opval) trval |= tropts[i].opval;
415  else trval = TRACE_NONE;
416  break;
417  }
418  }
419  if (i >= numopts)
420  {
421  m_eroute.Say("Config warning: ignoring invalid trace option '", val, "'.");
422  }
423  }
424  val = Config.GetWord();
425  }
426  m_trace.What = trval;
427  return 0;
428 }
429 
static XrdSysError eDest(0,"crypto_")
XrdSfsFileSystem * XrdSfsGetDefaultFileSystem(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *EnvInfo)
Definition: XrdOfsFS.cc:49
int open(const char *path, int oflag,...)
static XrdSfsFileSystem * LoadFS(const std::string &fslib, XrdSysError &eDest, const std::string &config_file)
#define OFS_NAME
XrdSfsFileSystem * XrdSfsGetFileSystem2(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP)
#define TS_Xeq(key, func)
XrdSfsFileSystem * XrdSfsGetFileSystem(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn)
XrdVERSIONINFO(XrdSfsGetFileSystem, FileSystem)
#define TRACE_IOLOAD
#define TRACE_BANDWIDTH
#define TRACE_FILES
#define TRACE_CONNS
#define TRACE_IOPS
#define TRACE_NONE
Definition: XrdTrace.hh:34
#define TRACE_DEBUG
Definition: XrdTrace.hh:36
#define TRACE_ALL
Definition: XrdTrace.hh:35
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:188
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
static int a2sp(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:213
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:141
void * getPlugin(const char *pname, int optional=0)
void * Persist()
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMonitor(XrdXrootdGStream *gstream)
void SetMaxConns(unsigned long max_conns)
virtual int Configure(XrdSysError &, XrdSfsFileSystem *native_fs, XrdOucEnv *envP)
XrdVersionInfo myVersion
XrdCmsConfig Config
XrdOucEnv * envP
Definition: XrdPss.cc:109
XrdSfsFileSystem * XrdSfsGetFileSystem_Internal(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP)