16 #include "XrdVersion.hh"
35 m_allow_xrdpfc_command(false),
36 m_data_space(
"public"),
37 m_meta_space(
"public"),
41 m_fileUsageBaseline(-1),
42 m_fileUsageNominal(-1),
45 m_purgeColdFilesAge(-1),
46 m_purgeAgeBasedPeriod(10),
48 m_dirStatsInterval(1500),
49 m_dirStatsMaxDepth(-1),
50 m_dirStatsStoreDepth(0),
51 m_bufferSize(128*1024),
53 m_RamKeepStdBlocks(0),
56 m_prefetch_max_blocks(10),
57 m_hdfsbsize(128*1024*1024),
62 m_onlyIfCachedMinSize(1024*1024),
63 m_onlyIfCachedMinFrac(1.0)
67 bool Cache::cfg2bytes(
const std::string &str,
long long &store,
long long totalSpace,
const char *name)
70 snprintf(errStr, 1024,
"ConfigParameters() Error parsing parameter %s", name);
72 if (::isalpha(*(str.rbegin())))
83 double frac = strtod(str.c_str(), &eP);
84 if (errno || eP == str.c_str())
86 m_log.
Emsg(errStr, str.c_str());
90 store =
static_cast<long long>(totalSpace * frac + 0.5);
93 if (store < 0 || store > totalSpace)
95 snprintf(errStr, 1024,
"ConfigParameters() Error: parameter %s should be between 0 and total available disk space (%lld) - it is %lld (given as %s)",
96 name, totalSpace, store, str.c_str());
97 m_log.
Emsg(errStr,
"");
125 const char *val, *val2;
126 struct cschkopts {
const char *opname;
int opval;} csopts[] =
133 int i, numopts =
sizeof(csopts)/
sizeof(
struct cschkopts);
136 if (! (val =
Config.GetWord()))
137 {m_log.
Emsg(
"Config",
"cschk parameter not specified");
return false; }
141 if ((
isNo = strncmp(val,
"no", 2) == 0))
145 for (i = 0; i < numopts; i++)
147 if (!strcmp(val2, csopts[i].opname))
150 m_configuration.
m_cs_Chk &= ~csopts[i].opval;
151 else if (csopts[i].opval)
152 m_configuration.
m_cs_Chk |= csopts[i].opval;
154 m_configuration.
m_cs_Chk = csopts[i].opval;
160 if (strcmp(val,
"uvkeep"))
162 m_log.
Emsg(
"Config",
"invalid cschk option -", val);
165 if (!(val =
Config.GetWord()))
167 m_log.
Emsg(
"Config",
"cschk uvkeep value not specified");
170 if (!strcmp(val,
"lru"))
183 m_configuration.m_cs_ChkTLS = m_configuration.m_cs_Chk &
CSChk_TLS;
186 m_env->Put(
"psx.CSNet", m_configuration.is_cschk_net() ? (m_configuration.m_cs_ChkTLS ?
"2" :
"1") :
"0");
207 if (! (val =
Config.GetWord()) || ! val[0])
209 TRACE(
Info,
" Cache::Config() decisionlib not specified; always caching files");
219 Config.GetRest(params, 4096);
228 if (! ep) {myLib->
Unload(
true);
return false; }
233 TRACE(
Error,
"Config() decisionlib was not able to create a decision object");
239 m_decisionpoints.push_back(d);
258 if (! (val =
Config.GetWord()) || ! val[0])
260 TRACE(
Info,
" Cache::Config() purgelib not specified; will use LRU for purging files");
270 Config.GetRest(params, 4096);
279 if (! ep) {myLib->
Unload(
true);
return false; }
284 TRACE(
Error,
"Config() purgelib was not able to create a Purge Plugin object?");
304 static struct traceopts {
const char *opname;
int opval; } tropts[] =
314 int numopts =
sizeof(tropts)/
sizeof(
struct traceopts);
316 if (! (val =
Config.GetWord()))
317 {m_log.
Emsg(
"Config",
"trace option not specified");
return 1; }
319 for (
int i = 0; i < numopts; i++)
321 if (! strcmp(val, tropts[i].opname))
323 m_trace->
What = tropts[i].opval;
327 m_log.
Emsg(
"Config",
"invalid trace option -", val);
332 bool Cache::test_oss_basics_and_features()
334 static const char *epfx =
"test_oss_basics_and_features()";
336 const auto &conf = m_configuration;
340 auto check_space = [&](
const char *space,
bool &has_xattr)
342 std::string fname(
"__prerun_test_pfc_");
345 env.
Put(
"oss.cgroup", space);
349 m_log.
Emsg(epfx,
"Can not create a file on space", space);
353 res = oss_file->
Open(fname.c_str(), O_RDWR, 0600, env);
355 m_log.
Emsg(epfx,
"Can not open a file on space", space);
358 res = oss_file->
Write(fname.data(), 0, fname.length());
359 if (res != (
int) fname.length()) {
360 m_log.
Emsg(epfx,
"Can not write into a file on space", space);
365 long long fsize = fname.length();
368 m_log.
Emsg(epfx,
"Can not write xattr to a file on space", space);
376 m_oss->
Lfn2Pfn(fname.c_str(), pfn, 4096);
379 if (res !=
sizeof(
long long) || fsize != (
long long) fname.length())
381 m_log.
Emsg(epfx,
"Can not read xattr from a file on space", space);
386 res = m_oss->
Unlink(fname.c_str());
388 m_log.
Emsg(epfx,
"Can not unlink a file on space", space);
396 aOK &= check_space(conf.m_data_space.c_str(), m_dataXattr);
397 aOK &= check_space(conf.m_meta_space.c_str(), m_metaXattr);
411 const char *theINS = getenv(
"XRDINSTANCE");
412 m_isClient = (theINS != 0 && strncmp(
"*client ", theINS, 8) == 0);
420 if (! config_filename || ! *config_filename)
422 TRACE(
Error,
"Config() configuration file not specified.");
427 if ( (fd =
open(config_filename, O_RDONLY, 0)) < 0)
429 TRACE(
Error,
"Config() can't open configuration file " << config_filename);
434 static const char *cvec[] = {
"*** pfc plugin config:", 0 };
440 if (! ofsCfg)
return false;
456 bool retval =
true, aOK =
true;
458 while ((var =
Config.GetMyFirstWord()))
460 if (! strcmp(var,
"pfc.osslib"))
464 else if (! strcmp(var,
"pfc.cschk"))
468 else if (! strcmp(var,
"pfc.decisionlib"))
472 else if (! strcmp(var,
"pfc.purgelib"))
476 else if (! strcmp(var,
"pfc.trace"))
480 else if (! strcmp(var,
"pfc.allow_xrdpfc_command"))
484 else if (! strncmp(var,
"pfc.", 4))
486 retval = ConfigParameters(std::string(var+4),
Config, tmpc);
499 myEnv.
Put(
"oss.runmode",
"pfc");
503 if (snprintf(csi_conf, 128,
"space=%s nofill", m_configuration.
m_meta_space.c_str()) < 128)
507 TRACE(
Error,
"Config() buffer too small for libXrdOssCsi params.");
517 TRACE(
Error,
"Config() Unable to create an OSS object");
522 aOK &= test_oss_basics_and_features();
530 m_log.
Emsg(
"ConfigParameters()",
"error obtaining stat info for meta space ", m_configuration.
m_meta_space.c_str());
535 m_log.
Emsg(
"ConfigParameters()",
"available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
541 m_log.
Emsg(
"ConfigParameters()",
"error obtaining stat info for data space ", m_configuration.
m_data_space.c_str());
544 if (sP.
Total < 10ll << 20)
546 m_log.
Emsg(
"ConfigParameters()",
"available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
557 m_log.
Emsg(
"ConfigParameters()",
"pfc.diskusage should have lowWatermark < highWatermark.");
573 m_log.
Emsg(
"ConfigParameters()",
"pfc.diskusage files should have baseline < nominal < max.");
580 m_log.
Emsg(
"ConfigParameters()",
"pfc.diskusage files values must be below lowWatermark");
614 m_configuration.
m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
616 snprintf(buff,
sizeof(buff),
"RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ?
"256m" :
"1g");
617 m_log.
Say(
"Config info: ", buff);
623 char* cenv = getenv(
"XRDDEBUG");
624 if (cenv && ! strcmp(cenv,
"1") && m_trace->
What < 4) m_trace->
What = 4;
630 const char *csc[] = {
"off",
"cache nonet",
"nocache net notls",
634 "off",
"cache nonet",
"nocache net tls",
637 char buff[8192], uvk[32];
641 sprintf(uvk,
"%lld", (
long long) m_configuration.
m_cs_UVKeep);
643 loff = snprintf(buff,
sizeof(buff),
"Config effective %s pfc configuration:\n"
644 " pfc.cschk %s uvkeep %s\n"
645 " pfc.blocksize %lld\n"
648 " pfc.writequeue %d %d\n"
649 " # Total available disk: %lld\n"
650 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
651 " pfc.spaces %s %s\n"
654 " pfc.acchistorysize %d\n"
655 " pfc.onlyIfCachedMinBytes %lld\n"
656 " pfc.onlyIfCachedMinFrac %.2f\n",
658 csc[
int(m_configuration.
m_cs_Chk)], uvk,
677 loff += snprintf(buff + loff,
sizeof(buff) - loff,
678 " pfc.dirstats interval %d maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
681 loff += snprintf(buff + loff,
sizeof(buff) - loff,
" dirlist:\n");
683 loff += snprintf(buff + loff,
sizeof(buff) - loff,
" %s\n", i->c_str());
684 loff += snprintf(buff + loff,
sizeof(buff) - loff,
" globlist:\n");
686 loff += snprintf(buff + loff,
sizeof(buff) - loff,
" %s/*\n", i->c_str());
691 loff += snprintf(buff + loff,
sizeof(buff) - loff,
" pfc.hdfsmode hdfsbsize %lld\n", m_configuration.
m_hdfsbsize);
702 loff += snprintf(buff + loff,
sizeof(buff) - loff,
" pfc.user %s\n", m_configuration.
m_username.c_str());
707 m_env->
Put(
"XRDPFC.SEGSIZE", std::to_string(m_configuration.
m_bufferSize).c_str());
716 m_log.
Say(
" pfc g-stream has", m_gstream ?
"" :
" NOT",
" been configured via xrootd.monitor directive\n");
725 m_log.
Say(
"=====> Proxy file cache configuration parsing ", aOK ?
"completed" :
"failed");
727 if (ofsCfg)
delete ofsCfg;
731 #ifdef XRDPFC_CKSUM_TEST
737 Info::TestCksumStuff();
751 struct ConfWordGetter
756 ConfWordGetter(
XrdOucStream& c) : m_config(c), m_last_word((char*)1) {}
758 const char* GetWord() {
if (HasLast()) m_last_word = m_config.
GetWord();
return HasLast() ? m_last_word :
""; }
759 bool HasLast() {
return (m_last_word != 0); }
762 ConfWordGetter cwg(config);
765 if ( part ==
"user" )
768 if ( ! cwg.HasLast())
770 m_log.
Emsg(
"Config",
"Error: pfc.user requires a parameter.");
774 else if ( part ==
"diskusage" )
781 m_log.
Emsg(
"Config",
"Error: pfc.diskusage parameter requires at least two arguments.");
786 while ((p = cwg.GetWord()) && cwg.HasLast())
788 if (strcmp(p,
"files") == 0)
794 if ( ! cwg.HasLast())
796 m_log.
Emsg(
"Config",
"Error: pfc.diskusage files directive requires three arguments.");
800 else if (strcmp(p,
"sleep") == 0 || strcmp(p,
"purgeinterval") == 0)
802 if (strcmp(p,
"sleep") == 0) m_log.
Emsg(
"Config",
"warning sleep directive is deprecated in pfc.diskusage. Please use purgeinterval instead.");
809 else if (strcmp(p,
"purgecoldfiles") == 0)
822 m_log.
Emsg(
"Config",
"Error: diskusage stanza contains unknown directive", p);
826 else if ( part ==
"acchistorysize" )
833 else if ( part ==
"dirstats" )
836 while ((p = cwg.GetWord()) && cwg.HasLast())
838 if (strcmp(p,
"interval") == 0)
844 int validIntervals[] = {60, 300, 600, 900, 1800, 3600};
845 int size =
sizeof(validIntervals) /
sizeof(
int);
848 for (
int i = 0; i < size; i++) {
853 vvl += std::to_string(validIntervals[i]);
854 if ((i+1) != size) vvl +=
", ";
858 m_log.
Emsg(
"Config",
"Error: Dirstat interval is not valid. Possible interval values are ", vvl.c_str());
863 else if (strcmp(p,
"maxdepth") == 0)
871 else if (strcmp(p,
"dir") == 0)
874 if (p && p[0] ==
'/')
878 char d[1024]; d[0] = 0;
886 if (*(pd - 1) ==
'/')
898 if (*pd ==
'/' && pd != d) *pd = 0;
901 if (ld >= 2 && d[ld-1] ==
'*' && d[ld-2] ==
'/')
906 printf(
"Glob %s -> %s -- depth = %d\n", p, d, depth);
911 printf(
"Dir %s -> %s -- depth = %d\n", p, d, depth);
918 m_log.
Emsg(
"Config",
"Error: dirstats dir parameter requires a directory argument starting with a '/'.");
924 m_log.
Emsg(
"Config",
"Error: dirstats stanza contains unknown directive '", p,
"'");
929 else if ( part ==
"blocksize" )
931 long long minBSize = 4 * 1024;
932 long long maxBSize = 512 * 1024 * 1024;
941 m_log.
Emsg(
"Config",
"pfc.blocksize must be a multiple of 4 kB. Rounded up.");
944 else if ( part ==
"prefetch" || part ==
"nramprefetch" )
946 if (part ==
"nramprefetch")
948 m_log.
Emsg(
"Config",
"pfc.nramprefetch is deprecated, please use pfc.prefetch instead. Replacing the directive internally.");
957 else if ( part ==
"nramread" )
959 m_log.
Emsg(
"Config",
"pfc.nramread is deprecated, please use pfc.ram instead. Ignoring this directive.");
962 else if ( part ==
"ram" )
964 long long minRAM = m_isClient ? 256 * 1024 * 1024 : 1024 * 1024 * 1024;
965 long long maxRAM = 256 * minRAM;
971 else if ( part ==
"writequeue")
982 else if ( part ==
"spaces" )
986 if ( ! cwg.HasLast())
988 m_log.
Emsg(
"Config",
"spacenames requires two parameters: <data-space> <metadata-space>.");
992 else if ( part ==
"hdfsmode" )
994 m_log.
Emsg(
"Config",
"pfc.hdfsmode is currently unsupported.");
999 const char* params = cwg.GetWord();
1002 if (! strncmp(
"hdfsbsize", params, 9))
1004 long long minBlSize = 32 * 1024;
1005 long long maxBlSize = 128 * 1024 * 1024;
1006 if (
XrdOuca2x::a2sz(m_log,
"Error getting file fragment size", cwg.GetWord(), &m_configuration.
m_hdfsbsize, minBlSize, maxBlSize))
1013 m_log.
Emsg(
"Config",
"Error setting the fragment size parameter name");
1018 else if ( part ==
"flush" )
1021 if ( ! cwg.HasLast())
1023 m_log.
Emsg(
"Config",
"Error: pfc.flush requires a parameter.");
1027 else if ( part ==
"onlyifcached" )
1030 while ((p = cwg.GetWord()) && cwg.HasLast())
1032 if (strcmp(p,
"minsize") == 0)
1034 std::string minBytes = cwg.GetWord();
1035 long long minBytesTop = 1024 * 1024 * 1024;
1036 if (::isalpha(*(minBytes.rbegin())))
1051 if (strcmp(p,
"minfrac") == 0)
1053 std::string minFrac = cwg.GetWord();
1056 double frac = strtod(minFrac.c_str(), &eP);
1057 if (errno || eP == minFrac.c_str())
1059 m_log.
Emsg(
"Config",
"Error setting fraction for only-if-cached directive");
1066 m_log.
Emsg(
"Config",
"Error: onlyifcached stanza contains unknown directive", p);
1072 m_log.
Emsg(
"ConfigParameters() unmatched pfc parameter", part.c_str());
XrdSysXAttr * XrdSysXAttrActive
XrdVERSIONINFO(XrdOucGetCache, XrdPfc)
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
int open(const char *path, int oflag,...)
int isNo(int dflt, const char *Msg1, const char *Msg2, const char *Msg3)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
static int Export(const char *Var, const char *Val)
void * GetPtr(const char *varname)
void Put(const char *varname, const char *value)
void * Resolve(const char *symbl, int mcnt=1)
void Unload(bool dodel=false)
char * GetWord(int lowcase=0)
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
static int a2tm(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
Base class for selecting which files should be cached.
virtual bool ConfigDecision(const char *params)
Status of cached file. Can be read from and written into a binary file.
static size_t s_maxNumAccess
Base class for reguesting directory space to obtain.
virtual bool ConfigPurgePin(const char *params)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
const char * trace_what_strings[]
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
long long m_RamAbsAvailable
available from configuration
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
int m_accHistorySize
max number of entries in access history part of cinfo file
int m_wqueue_threads
number of threads writing blocks to disk
long long m_diskTotalSpace
total disk space on configured partition or oss space
long long m_fileUsageMax
cache purge - files usage maximum
long long m_fileUsageBaseline
cache purge - files usage baseline
int m_dirStatsStoreDepth
depth to which statistics should be collected
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
long long m_diskUsageHWM
cache purge - disk usage high water mark
bool is_cschk_cache() const
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
bool m_cs_ChkTLS
Allow TLS.
long long m_fileUsageNominal
cache purge - files usage nominal
int m_cs_Chk
Checksum check.
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
bool m_hdfsmode
flag for enabling block-level operation
int m_purgeColdFilesAge
purge files older than this age
std::string m_data_space
oss space for data files
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
long long m_diskUsageLWM
cache purge - disk usage low water mark
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
long long m_bufferSize
prefetch buffer size, default 1MB
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
std::string m_meta_space
oss space for metadata files (cinfo)
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
std::string m_username
username passed to oss plugin
bool is_cschk_net() const
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
time_t m_cs_UVKeep
unverified checksum cache keep
int m_dirStatsMaxDepth
maximum depth for statistics write out
int m_purgeInterval
sleep interval between cache purges
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
bool is_dir_stat_reporting_on() const
std::string m_diskUsageLWM
std::string m_diskUsageHWM
std::string m_fileUsageBaseline
std::string m_fileUsageNominal
std::string m_fileUsageMax