14 #define OFS_NAME "libXrdOfs.so"
34 eDest.
Emsg(
"Config",
"Unable to load OFS filesystem.");
43 if (!(fs = (*ep)(0,
eDest.
logger(), config_file.c_str())))
45 eDest.
Emsg(
"Config",
"Unable to create file system object via", fslib.c_str());
62 FileSystem::Initialize(fs, native_fs, lp, configfn,
envP);
92 FileSystem::FileSystem()
93 : m_eroute(0), m_trace(&m_eroute), m_sfs_ptr(0), m_initialized(false), m_throttle(&m_eroute, &m_trace)
98 FileSystem::~FileSystem() {}
104 const char *configfn,
108 if (m_instance == NULL && !(m_instance =
new FileSystem()))
113 if (!fs->m_initialized)
115 fs->m_config_file = configfn;
117 fs->m_eroute.
Say(
"Initializing a Throttled file system.");
120 fs->m_eroute.
Say(
"Initialization of throttled file system failed.");
124 fs->m_throttle.
Init();
125 fs->m_initialized =
true;
129 #define TS_Xeq(key, func) NoGo = (strcmp(key, var) == 0) ? func(Config) : 0
136 if (m_config_file.length() == 0)
138 log.
Say(
"No filename specified.");
141 if ((cfgFD =
open(m_config_file.c_str(), O_RDONLY)) < 0)
143 log.
Emsg(
"Config", errno,
"Unable to open configuration file", m_config_file.c_str());
147 static const char *cvec[] = {
"*** throttle (ofs) plugin config:", 0 };
154 while( (var =
Config.GetMyFirstWord()) )
156 if (strcmp(
"throttle.fslib", var) == 0)
159 if (!val || !val[0]) {log.
Emsg(
"Config",
"fslib not specified.");
continue;}
162 TS_Xeq(
"throttle.max_open_files", xmaxopen);
163 TS_Xeq(
"throttle.max_active_connections", xmaxconn);
164 TS_Xeq(
"throttle.throttle", xthrottle);
165 TS_Xeq(
"throttle.loadshed", xloadshed);
166 TS_Xeq(
"throttle.trace", xtrace);
169 log.
Emsg(
"Config",
"Throttle configuration failed.");
174 m_sfs_ptr = native_fs ? native_fs :
LoadFS(fslib, m_eroute, m_config_file);
175 if (!m_sfs_ptr)
return 1;
183 log.
Say(
"Config",
"Throttle g-stream has", gstream ?
"" :
" NOT",
" been configured via xrootd.mongstream directive");
206 auto val =
Config.GetWord();
207 if (!val || val[0] ==
'\0')
208 {m_eroute.
Emsg(
"Config",
"Max open files not specified! Example usage: throttle.max_open_files 16000");}
209 long long max_open = -1;
210 if (
XrdOuca2x::a2sz(m_eroute,
"max open files value", val, &max_open, 1))
return 1;
232 auto val =
Config.GetWord();
233 if (!val || val[0] ==
'\0')
234 {m_eroute.
Emsg(
"Config",
"Max active cconnections not specified! Example usage: throttle.max_active_connections 4000");}
235 long long max_conn = -1;
236 if (
XrdOuca2x::a2sz(m_eroute,
"max active connections value", val, &max_conn, 1))
return 1;
261 long long drate = -1, irate = -1, rint = 1000, climit = -1;
264 while ((val =
Config.GetWord()))
266 if (strcmp(
"data", val) == 0)
268 if (!(val =
Config.GetWord()))
269 {m_eroute.
Emsg(
"Config",
"data throttle limit not specified.");
return 1;}
270 if (
XrdOuca2x::a2sz(m_eroute,
"data throttle value",val,&drate,1))
return 1;
272 else if (strcmp(
"iops", val) == 0)
274 if (!(val =
Config.GetWord()))
275 {m_eroute.
Emsg(
"Config",
"IOPS throttle limit not specified.");
return 1;}
276 if (
XrdOuca2x::a2sz(m_eroute,
"IOPS throttle value",val,&irate,1))
return 1;
278 else if (strcmp(
"rint", val) == 0)
280 if (!(val =
Config.GetWord()))
281 {m_eroute.
Emsg(
"Config",
"recompute interval not specified.");
return 1;}
282 if (
XrdOuca2x::a2sp(m_eroute,
"recompute interval value",val,&rint,10))
return 1;
284 else if (strcmp(
"concurrency", val) == 0)
286 if (!(val =
Config.GetWord()))
287 {m_eroute.
Emsg(
"Config",
"Concurrency limit not specified.");
return 1;}
288 if (
XrdOuca2x::a2sz(m_eroute,
"Concurrency limit value",val,&climit,1))
return 1;
292 m_eroute.
Emsg(
"Config",
"Warning - unknown throttle option specified", val,
".");
296 m_throttle.
SetThrottles(drate, irate, climit,
static_cast<float>(rint)/1000.0);
317 long long port = 0, freq = 0;
319 std::string hostname;
321 while ((val =
Config.GetWord()))
323 if (strcmp(
"host", val) == 0)
325 if (!(val =
Config.GetWord()))
326 {m_eroute.
Emsg(
"Config",
"loadshed hostname not specified.");
return 1;}
329 else if (strcmp(
"port", val) == 0)
331 if (!(val =
Config.GetWord()))
332 {m_eroute.
Emsg(
"Config",
"Port number not specified.");
return 1;}
333 if (
XrdOuca2x::a2sz(m_eroute,
"Port number",val,&port,1, 65536))
return 1;
335 else if (strcmp(
"frequency", val) == 0)
337 if (!(val =
Config.GetWord()))
338 {m_eroute.
Emsg(
"Config",
"Loadshed frequency not specified.");
return 1;}
339 if (
XrdOuca2x::a2sz(m_eroute,
"Loadshed frequency",val,&freq,1,100))
return 1;
343 m_eroute.
Emsg(
"Config",
"Warning - unknown loadshed option specified", val,
".");
347 if (hostname.empty())
349 m_eroute.
Emsg(
"Config",
"must specify hostname for loadshed parameter.");
374 static const struct traceopts {
const char *opname;
int opval;} tropts[] =
386 int i, neg, trval = 0, numopts =
sizeof(tropts)/
sizeof(
struct traceopts);
388 if (!(val =
Config.GetWord()))
390 m_eroute.
Emsg(
"Config",
"trace option not specified");
395 if (!strcmp(val,
"off"))
401 if ((neg = (val[0] ==
'-' && val[1])))
405 for (i = 0; i < numopts; i++)
407 if (!strcmp(val, tropts[i].opname))
411 if (tropts[i].opval) trval &= ~tropts[i].opval;
414 else if (tropts[i].opval) trval |= tropts[i].opval;
421 m_eroute.
Say(
"Config warning: ignoring invalid trace option '", val,
"'.");
426 m_trace.
What = trval;
static XrdSysError eDest(0,"crypto_")
XrdSfsFileSystem * XrdSfsGetDefaultFileSystem(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *EnvInfo)
int open(const char *path, int oflag,...)
static XrdSfsFileSystem * LoadFS(const std::string &fslib, XrdSysError &eDest, const std::string &config_file)
XrdSfsFileSystem * XrdSfsGetFileSystem2(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP)
#define TS_Xeq(key, func)
XrdSfsFileSystem * XrdSfsGetFileSystem(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn)
XrdVERSIONINFO(XrdSfsGetFileSystem, FileSystem)
static int Export(const char *Var, const char *Val)
void * GetPtr(const char *varname)
static int a2sp(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
void * getPlugin(const char *pname, int optional=0)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMonitor(XrdXrootdGStream *gstream)
void SetMaxConns(unsigned long max_conns)
virtual int Configure(XrdSysError &, XrdSfsFileSystem *native_fs, XrdOucEnv *envP)
XrdSfsFileSystem * XrdSfsGetFileSystem_Internal(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP)