XRootD
XrdPfcConfiguration.cc
Go to the documentation of this file.
1 #include "XrdPfc.hh"
2 #include "XrdPfcTrace.hh"
3 #include "XrdPfcInfo.hh"
4 
6 #include "XrdPfcPurgePin.hh"
7 
8 #include "XrdOss/XrdOss.hh"
9 
10 #include "XrdOuc/XrdOucEnv.hh"
11 #include "XrdOuc/XrdOucUtils.hh"
12 #include "XrdOuc/XrdOucStream.hh"
14 #include "XrdOuc/XrdOuca2x.hh"
15 
16 #include "XrdVersion.hh"
17 #include "XrdOfs/XrdOfsConfigPI.hh"
18 #include "XrdSys/XrdSysXAttr.hh"
19 
20 #include <fcntl.h>
21 
23 
24 namespace XrdPfc
25 {
26  const char *trace_what_strings[] = {"","error ","warning ","info ","debug ","dump "};
27 }
28 
29 using namespace XrdPfc;
30 
32 
34  m_hdfsmode(false),
35  m_allow_xrdpfc_command(false),
36  m_data_space("public"),
37  m_meta_space("public"),
38  m_diskTotalSpace(-1),
39  m_diskUsageLWM(-1),
40  m_diskUsageHWM(-1),
41  m_fileUsageBaseline(-1),
42  m_fileUsageNominal(-1),
43  m_fileUsageMax(-1),
44  m_purgeInterval(300),
45  m_purgeColdFilesAge(-1),
46  m_purgeAgeBasedPeriod(10),
47  m_accHistorySize(20),
48  m_dirStatsInterval(1500),
49  m_dirStatsMaxDepth(-1),
50  m_dirStatsStoreDepth(0),
51  m_bufferSize(128*1024),
52  m_RamAbsAvailable(0),
53  m_RamKeepStdBlocks(0),
54  m_wqueue_blocks(16),
55  m_wqueue_threads(4),
56  m_prefetch_max_blocks(10),
57  m_hdfsbsize(128*1024*1024),
58  m_flushCnt(2000),
59  m_cs_UVKeep(-1),
60  m_cs_Chk(CSChk_Net),
61  m_cs_ChkTLS(false),
62  m_onlyIfCachedMinSize(1024*1024),
63  m_onlyIfCachedMinFrac(1.0)
64 {}
65 
66 
67 bool Cache::cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name)
68 {
69  char errStr[1024];
70  snprintf(errStr, 1024, "ConfigParameters() Error parsing parameter %s", name);
71 
72  if (::isalpha(*(str.rbegin())))
73  {
74  if (XrdOuca2x::a2sz(m_log, errStr, str.c_str(), &store, 0, totalSpace))
75  {
76  return false;
77  }
78  }
79  else
80  {
81  char *eP;
82  errno = 0;
83  double frac = strtod(str.c_str(), &eP);
84  if (errno || eP == str.c_str())
85  {
86  m_log.Emsg(errStr, str.c_str());
87  return false;
88  }
89 
90  store = static_cast<long long>(totalSpace * frac + 0.5);
91  }
92 
93  if (store < 0 || store > totalSpace)
94  {
95  snprintf(errStr, 1024, "ConfigParameters() Error: parameter %s should be between 0 and total available disk space (%lld) - it is %lld (given as %s)",
96  name, totalSpace, store, str.c_str());
97  m_log.Emsg(errStr, "");
98  return false;
99  }
100 
101  return true;
102 }
103 
104 /* Function: xcschk
105 
106  Purpose: To parse the directive: cschk <parms>
107 
108  parms: [[no]net] [[no]tls] [[no]cache] [uvkeep <arg>]
109 
110  all Checksum check on cache & net transfers.
111  cache Checksum check on cache only, 'no' turns it off.
112  net Checksum check on net transfers 'no' turns it off.
113  tls use TLS if server doesn't support checksums 'no' turns it off.
114  uvkeep Maximum amount of time a cached file make be kept if it
115  contains unverified checksums as n[d|h|m|s], where 'n'
116  is a non-negative integer. A value of 0 prohibits disk
117  caching unless the checksum can be verified. You can
118  also specify "lru" which means the standard purge policy
119  is to be used.
120 
121  Output: true upon success or false upon failure.
122  */
123 bool Cache::xcschk(XrdOucStream &Config)
124 {
125  const char *val, *val2;
126  struct cschkopts {const char *opname; int opval;} csopts[] =
127  {
128  {"off", CSChk_None},
129  {"cache", CSChk_Cache},
130  {"net", CSChk_Net},
131  {"tls", CSChk_TLS}
132  };
133  int i, numopts = sizeof(csopts)/sizeof(struct cschkopts);
134  bool isNo;
135 
136  if (! (val = Config.GetWord()))
137  {m_log.Emsg("Config", "cschk parameter not specified"); return false; }
138 
139  while(val)
140  {
141  if ((isNo = strncmp(val, "no", 2) == 0))
142  val2 = val + 2;
143  else
144  val2 = val;
145  for (i = 0; i < numopts; i++)
146  {
147  if (!strcmp(val2, csopts[i].opname))
148  {
149  if (isNo)
150  m_configuration.m_cs_Chk &= ~csopts[i].opval;
151  else if (csopts[i].opval)
152  m_configuration.m_cs_Chk |= csopts[i].opval;
153  else
154  m_configuration.m_cs_Chk = csopts[i].opval;
155  break;
156  }
157  }
158  if (i >= numopts)
159  {
160  if (strcmp(val, "uvkeep"))
161  {
162  m_log.Emsg("Config", "invalid cschk option -", val);
163  return false;
164  }
165  if (!(val = Config.GetWord()))
166  {
167  m_log.Emsg("Config", "cschk uvkeep value not specified");
168  return false;
169  }
170  if (!strcmp(val, "lru"))
171  m_configuration.m_cs_UVKeep = -1;
172  else
173  {
174  int uvkeep;
175  if (XrdOuca2x::a2tm(m_log, "uvkeep time", val, &uvkeep, 0))
176  return false;
177  m_configuration.m_cs_UVKeep = uvkeep;
178  }
179  }
180  val = Config.GetWord();
181  }
182  // Decompose into separate TLS state, it is only passed on to psx
183  m_configuration.m_cs_ChkTLS = m_configuration.m_cs_Chk & CSChk_TLS;
184  m_configuration.m_cs_Chk &= ~CSChk_TLS;
185 
186  m_env->Put("psx.CSNet", m_configuration.is_cschk_net() ? (m_configuration.m_cs_ChkTLS ? "2" : "1") : "0");
187 
188  return true;
189 }
190 
191 
192 /* Function: xdlib
193 
194  Purpose: To parse the directive: decisionlib <path> [<parms>]
195 
196  <path> the path of the decision library to be used.
197  <parms> optional parameters to be passed.
198 
199 
200  Output: true upon success or false upon failure.
201  */
202 bool Cache::xdlib(XrdOucStream &Config)
203 {
204  const char* val;
205 
206  std::string libp;
207  if (! (val = Config.GetWord()) || ! val[0])
208  {
209  TRACE(Info," Cache::Config() decisionlib not specified; always caching files");
210  return true;
211  }
212  else
213  {
214  libp = val;
215  }
216 
217  char params[4096];
218  if (val[0])
219  Config.GetRest(params, 4096);
220  else
221  params[0] = 0;
222 
223  XrdOucPinLoader* myLib = new XrdOucPinLoader(&m_log, 0, "decisionlib",
224  libp.c_str());
225 
226  Decision *(*ep)(XrdSysError&);
227  ep = (Decision *(*)(XrdSysError&))myLib->Resolve("XrdPfcGetDecision");
228  if (! ep) {myLib->Unload(true); return false; }
229 
230  Decision * d = ep(m_log);
231  if (! d)
232  {
233  TRACE(Error, "Config() decisionlib was not able to create a decision object");
234  return false;
235  }
236  if (params[0])
237  d->ConfigDecision(params);
238 
239  m_decisionpoints.push_back(d);
240  return true;
241 }
242 
243 /* Function: xplib
244 
245  Purpose: To parse the directive: purgelib <path> [<parms>]
246 
247  <path> the path of the decision library to be used.
248  <parms> optional parameters to be passed.
249 
250 
251  Output: true upon success or false upon failure.
252  */
253 bool Cache::xplib(XrdOucStream &Config)
254 {
255  const char* val;
256 
257  std::string libp;
258  if (! (val = Config.GetWord()) || ! val[0])
259  {
260  TRACE(Info," Cache::Config() purgelib not specified; will use LRU for purging files");
261  return true;
262  }
263  else
264  {
265  libp = val;
266  }
267 
268  char params[4096];
269  if (val[0])
270  Config.GetRest(params, 4096);
271  else
272  params[0] = 0;
273 
274  XrdOucPinLoader* myLib = new XrdOucPinLoader(&m_log, 0, "purgelib",
275  libp.c_str());
276 
277  PurgePin *(*ep)(XrdSysError&);
278  ep = (PurgePin *(*)(XrdSysError&))myLib->Resolve("XrdPfcGetPurgePin");
279  if (! ep) {myLib->Unload(true); return false; }
280 
281  PurgePin * dp = ep(m_log);
282  if (! dp)
283  {
284  TRACE(Error, "Config() purgelib was not able to create a Purge Plugin object?");
285  return false;
286  }
287  m_purge_pin = dp;
288 
289  if (params[0])
290  m_purge_pin->ConfigPurgePin(params);
291 
292 
293  return true;
294 }
295 
296 /* Function: xtrace
297 
298  Purpose: To parse the directive: trace <level>
299  Output: true upon success or false upon failure.
300  */
301 bool Cache::xtrace(XrdOucStream &Config)
302 {
303  char *val;
304  static struct traceopts {const char *opname; int opval; } tropts[] =
305  {
306  {"none", 0},
307  {"error", 1},
308  {"warning", 2},
309  {"info", 3},
310  {"debug", 4},
311  {"dump", 5},
312  {"dumpxl", 6}
313  };
314  int numopts = sizeof(tropts)/sizeof(struct traceopts);
315 
316  if (! (val = Config.GetWord()))
317  {m_log.Emsg("Config", "trace option not specified"); return 1; }
318 
319  for (int i = 0; i < numopts; i++)
320  {
321  if (! strcmp(val, tropts[i].opname))
322  {
323  m_trace->What = tropts[i].opval;
324  return true;
325  }
326  }
327  m_log.Emsg("Config", "invalid trace option -", val);
328  return false;
329 }
330 
331 // Determine if oss spaces are operational and if they support xattrs.
332 bool Cache::test_oss_basics_and_features()
333 {
334  static const char *epfx = "test_oss_basics_and_features()";
335 
336  const auto &conf = m_configuration;
337  const char *user = conf.m_username.c_str();
338  XrdOucEnv env;
339 
340  auto check_space = [&](const char *space, bool &has_xattr)
341  {
342  std::string fname("__prerun_test_pfc_");
343  fname += space;
344  fname += "_space__";
345  env.Put("oss.cgroup", space);
346 
347  int res = m_oss->Create(user, fname.c_str(), 0600, env, XRDOSS_mkpath);
348  if (res != XrdOssOK) {
349  m_log.Emsg(epfx, "Can not create a file on space", space);
350  return false;
351  }
352  XrdOssDF *oss_file = m_oss->newFile(user);
353  res = oss_file->Open(fname.c_str(), O_RDWR, 0600, env);
354  if (res != XrdOssOK) {
355  m_log.Emsg(epfx, "Can not open a file on space", space);
356  return false;
357  }
358  res = oss_file->Write(fname.data(), 0, fname.length());
359  if (res != (int) fname.length()) {
360  m_log.Emsg(epfx, "Can not write into a file on space", space);
361  return false;
362  }
363 
364  has_xattr = true;
365  long long fsize = fname.length();
366  res = XrdSysXAttrActive->Set("pfc.fsize", &fsize, sizeof(long long), 0, oss_file->getFD(), 0);
367  if (res != 0) {
368  m_log.Emsg(epfx, "Can not write xattr to a file on space", space);
369  has_xattr = false;
370  }
371 
372  oss_file->Close();
373 
374  if (has_xattr) {
375  char pfn[4096];
376  m_oss->Lfn2Pfn(fname.c_str(), pfn, 4096);
377  fsize = -1ll;
378  res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
379  if (res != sizeof(long long) || fsize != (long long) fname.length())
380  {
381  m_log.Emsg(epfx, "Can not read xattr from a file on space", space);
382  has_xattr = false;
383  }
384  }
385 
386  res = m_oss->Unlink(fname.c_str());
387  if (res != XrdOssOK) {
388  m_log.Emsg(epfx, "Can not unlink a file on space", space);
389  return false;
390  }
391 
392  return true;
393  };
394 
395  bool aOK = true;
396  aOK &= check_space(conf.m_data_space.c_str(), m_dataXattr);
397  aOK &= check_space(conf.m_meta_space.c_str(), m_metaXattr);
398 
399  return aOK;
400 }
401 
402 //______________________________________________________________________________
403 /* Function: Config
404 
405  Purpose: To parse configuration file and configure Cache instance.
406  Output: true upon success or false upon failure.
407  */
408 bool Cache::Config(const char *config_filename, const char *parameters)
409 {
410  // Indicate whether or not we are a client instance
411  const char *theINS = getenv("XRDINSTANCE");
412  m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
413 
414  // Tell everyone else we are a caching proxy
415  XrdOucEnv::Export("XRDPFC", 1);
416 
417  XrdOucEnv myEnv;
418  XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
419 
420  if (! config_filename || ! *config_filename)
421  {
422  TRACE(Error, "Config() configuration file not specified.");
423  return false;
424  }
425 
426  int fd;
427  if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
428  {
429  TRACE( Error, "Config() can't open configuration file " << config_filename);
430  return false;
431  }
432 
433  Config.Attach(fd);
434  static const char *cvec[] = { "*** pfc plugin config:", 0 };
435  Config.Capture(cvec);
436 
437  // Obtain OFS configurator for OSS plugin.
438  XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
439  &XrdVERSIONINFOVAR(XrdOucGetCache));
440  if (! ofsCfg) return false;
441 
442  TmpConfiguration tmpc;
443 
444  // Adjust default parameters for client/serverless caching
445  if (m_isClient)
446  {
447  m_configuration.m_bufferSize = 128 * 1024; // same as normal.
448  m_configuration.m_wqueue_blocks = 8;
449  m_configuration.m_wqueue_threads = 1;
450  }
451 
452  // If network checksum processing is the default, indicate so.
453  if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
454 
455  // Actual parsing of the config file.
456  bool retval = true, aOK = true;
457  char *var;
458  while ((var = Config.GetMyFirstWord()))
459  {
460  if (! strcmp(var,"pfc.osslib"))
461  {
462  retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
463  }
464  else if (! strcmp(var,"pfc.cschk"))
465  {
466  retval = xcschk(Config);
467  }
468  else if (! strcmp(var,"pfc.decisionlib"))
469  {
470  retval = xdlib(Config);
471  }
472  else if (! strcmp(var,"pfc.purgelib"))
473  {
474  retval = xplib(Config);
475  }
476  else if (! strcmp(var,"pfc.trace"))
477  {
478  retval = xtrace(Config);
479  }
480  else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
481  {
482  m_configuration.m_allow_xrdpfc_command = true;
483  }
484  else if (! strncmp(var,"pfc.", 4))
485  {
486  retval = ConfigParameters(std::string(var+4), Config, tmpc);
487  }
488 
489  if ( ! retval)
490  {
491  TRACE(Error, "Config() error in parsing");
492  aOK = false;
493  }
494  }
495 
496  Config.Close();
497 
498  // Load OSS plugin.
499  myEnv.Put("oss.runmode", "pfc");
500  if (m_configuration.is_cschk_cache())
501  {
502  char csi_conf[128];
503  if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
504  {
505  ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
506  } else {
507  TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
508  return false;
509  }
510  }
511  if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
512  {
513  ofsCfg->Plugin(m_oss);
514  }
515  else
516  {
517  TRACE(Error, "Config() Unable to create an OSS object");
518  return false;
519  }
520 
521  // Test if OSS is operational, determine optional features.
522  aOK &= test_oss_basics_and_features();
523 
524  // sets default value for disk usage
525  XrdOssVSInfo sP;
526  {
527  if (m_configuration.m_meta_space != m_configuration.m_data_space &&
528  m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
529  {
530  m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
531  return false;
532  }
533  if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
534  {
535  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
536  m_configuration.m_meta_space.c_str());
537  return false;
538  }
539  if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
540  {
541  m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
542  return false;
543  }
544  if (sP.Total < 10ll << 20)
545  {
546  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
547  m_configuration.m_data_space.c_str());
548  return false;
549  }
550 
551  m_configuration.m_diskTotalSpace = sP.Total;
552 
553  if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
554  cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
555  {
556  if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
557  m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
558  aOK = false;
559  }
560  }
561  else aOK = false;
562 
563  if ( ! tmpc.m_fileUsageMax.empty())
564  {
565  if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
566  cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
567  cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
568  {
569  if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
570  m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
571  m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
572  {
573  m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
574  aOK = false;
575  }
576 
577 
578  if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
579  {
580  m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
581  aOK = false;
582  }
583  }
584  else aOK = false;
585  }
586  }
587 
588  // sets flush frequency
589  if ( ! tmpc.m_flushRaw.empty())
590  {
591  if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
592  {
593  if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
594  &m_configuration.m_flushCnt,
595  100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
596  {
597  return false;
598  }
599  m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
600  }
601  else
602  {
603  if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
604  &m_configuration.m_flushCnt, 100, 100000))
605  {
606  return false;
607  }
608  }
609  }
610 
611  // get number of available RAM blocks after process configuration
612  if (m_configuration.m_RamAbsAvailable == 0)
613  {
614  m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
615  char buff[1024];
616  snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
617  m_log.Say("Config info: ", buff);
618  }
619  // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
620  m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
621 
622  // Set tracing to debug if this is set in environment
623  char* cenv = getenv("XRDDEBUG");
624  if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
625 
626  if (aOK)
627  {
628  int loff = 0;
629 // 000 001 010
630  const char *csc[] = {"off", "cache nonet", "nocache net notls",
631 // 011
632  "cache net notls",
633 // 100 101 110
634  "off", "cache nonet", "nocache net tls",
635 // 111
636  "cache net tls"};
637  char buff[8192], uvk[32];
638  if (m_configuration.m_cs_UVKeep < 0)
639  strcpy(uvk, "lru");
640  else
641  sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
642  float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
643  loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
644  " pfc.cschk %s uvkeep %s\n"
645  " pfc.blocksize %lld\n"
646  " pfc.prefetch %d\n"
647  " pfc.ram %.fg\n"
648  " pfc.writequeue %d %d\n"
649  " # Total available disk: %lld\n"
650  " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
651  " pfc.spaces %s %s\n"
652  " pfc.trace %d\n"
653  " pfc.flush %lld\n"
654  " pfc.acchistorysize %d\n"
655  " pfc.onlyIfCachedMinBytes %lld\n"
656  " pfc.onlyIfCachedMinFrac %.2f\n",
657  config_filename,
658  csc[int(m_configuration.m_cs_Chk)], uvk,
659  m_configuration.m_bufferSize,
660  m_configuration.m_prefetch_max_blocks,
661  rg,
662  m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
663  sP.Total,
664  m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
665  m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
666  m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
667  m_configuration.m_data_space.c_str(),
668  m_configuration.m_meta_space.c_str(),
669  m_trace->What,
670  m_configuration.m_flushCnt,
671  m_configuration.m_accHistorySize,
672  m_configuration.m_onlyIfCachedMinSize,
673  m_configuration.m_onlyIfCachedMinFrac);
674 
675  if (m_configuration.is_dir_stat_reporting_on())
676  {
677  loff += snprintf(buff + loff, sizeof(buff) - loff,
678  " pfc.dirstats interval %d maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
679  m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
680  (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
681  loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
682  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
683  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
684  loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
685  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
686  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
687  }
688 
689  if (m_configuration.m_hdfsmode)
690  {
691  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
692  }
693 
694  if (m_configuration.m_username.empty())
695  {
696  char unameBuff[256];
697  XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
698  m_configuration.m_username = unameBuff;
699  }
700  else
701  {
702  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
703  }
704 
705  m_log.Say(buff);
706 
707  m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
708  }
709 
710  // Derived settings
711  m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
712  Info::s_maxNumAccess = m_configuration.m_accHistorySize;
713 
714  m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
715 
716  m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
717 
718  // Create the ResourceMonitor and get it ready for starting the main thread function.
719  if (aOK)
720  {
721  m_res_mon = new ResourceMonitor(*m_oss);
722  m_res_mon->init_before_main();
723  }
724 
725  m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
726 
727  if (ofsCfg) delete ofsCfg;
728 
729  // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
730  // Building of xrdpfc_print fails when this is enabled.
731 #ifdef XRDPFC_CKSUM_TEST
732  {
733  int xxx = m_configuration.m_cs_Chk;
734 
735  for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
736  {
737  Info::TestCksumStuff();
738  }
739 
740  m_configuration.m_cs_Chk = xxx;
741  }
742 #endif
743 
744  return aOK;
745 }
746 
747 //------------------------------------------------------------------------------
748 
749 bool Cache::ConfigParameters(std::string part, XrdOucStream& config, TmpConfiguration &tmpc)
750 {
751  struct ConfWordGetter
752  {
753  XrdOucStream &m_config;
754  char *m_last_word;
755 
756  ConfWordGetter(XrdOucStream& c) : m_config(c), m_last_word((char*)1) {}
757 
758  const char* GetWord() { if (HasLast()) m_last_word = m_config.GetWord(); return HasLast() ? m_last_word : ""; }
759  bool HasLast() { return (m_last_word != 0); }
760  };
761 
762  ConfWordGetter cwg(config);
763 
764  XrdSysError err(0, "");
765  if ( part == "user" )
766  {
767  m_configuration.m_username = cwg.GetWord();
768  if ( ! cwg.HasLast())
769  {
770  m_log.Emsg("Config", "Error: pfc.user requires a parameter.");
771  return false;
772  }
773  }
774  else if ( part == "diskusage" )
775  {
776  tmpc.m_diskUsageLWM = cwg.GetWord();
777  tmpc.m_diskUsageHWM = cwg.GetWord();
778 
779  if (tmpc.m_diskUsageHWM.empty())
780  {
781  m_log.Emsg("Config", "Error: pfc.diskusage parameter requires at least two arguments.");
782  return false;
783  }
784 
785  const char *p = 0;
786  while ((p = cwg.GetWord()) && cwg.HasLast())
787  {
788  if (strcmp(p, "files") == 0)
789  {
790  tmpc.m_fileUsageBaseline = cwg.GetWord();
791  tmpc.m_fileUsageNominal = cwg.GetWord();
792  tmpc.m_fileUsageMax = cwg.GetWord();
793 
794  if ( ! cwg.HasLast())
795  {
796  m_log.Emsg("Config", "Error: pfc.diskusage files directive requires three arguments.");
797  return false;
798  }
799  }
800  else if (strcmp(p, "sleep") == 0 || strcmp(p, "purgeinterval") == 0)
801  {
802  if (strcmp(p, "sleep") == 0) m_log.Emsg("Config", "warning sleep directive is deprecated in pfc.diskusage. Please use purgeinterval instead.");
803 
804  if (XrdOuca2x::a2tm(m_log, "Error getting purgeinterval", cwg.GetWord(), &m_configuration.m_purgeInterval, 60, 3600))
805  {
806  return false;
807  }
808  }
809  else if (strcmp(p, "purgecoldfiles") == 0)
810  {
811  if (XrdOuca2x::a2tm(m_log, "Error getting purgecoldfiles age", cwg.GetWord(), &m_configuration.m_purgeColdFilesAge, 3600, 3600*24*360))
812  {
813  return false;
814  }
815  if (XrdOuca2x::a2i(m_log, "Error getting purgecoldfiles period", cwg.GetWord(), &m_configuration.m_purgeAgeBasedPeriod, 1, 1000))
816  {
817  return false;
818  }
819  }
820  else
821  {
822  m_log.Emsg("Config", "Error: diskusage stanza contains unknown directive", p);
823  }
824  }
825  }
826  else if ( part == "acchistorysize" )
827  {
828  if ( XrdOuca2x::a2i(m_log, "Error getting access-history-size", cwg.GetWord(), &m_configuration.m_accHistorySize, 20, 200))
829  {
830  return false;
831  }
832  }
833  else if ( part == "dirstats" )
834  {
835  const char *p = 0;
836  while ((p = cwg.GetWord()) && cwg.HasLast())
837  {
838  if (strcmp(p, "interval") == 0)
839  {
840  if (XrdOuca2x::a2i(m_log, "Error getting dirstsat interval", cwg.GetWord(), &m_configuration.m_dirStatsInterval, 0, 7 * 24 * 3600))
841  {
842  return false;
843  }
844  int validIntervals[] = {60, 300, 600, 900, 1800, 3600};
845  int size = sizeof(validIntervals) / sizeof(int);
846  bool match = false;
847  std::string vvl;
848  for (int i = 0; i < size; i++) {
849  if (validIntervals[i] == m_configuration.m_dirStatsInterval) {
850  match = true;
851  break;
852  }
853  vvl += std::to_string(validIntervals[i]);
854  if ((i+1) != size) vvl += ", ";
855  }
856 
857  if (!match) {
858  m_log.Emsg("Config", "Error: Dirstat interval is not valid. Possible interval values are ", vvl.c_str());
859  return false;
860  }
861 
862  }
863  else if (strcmp(p, "maxdepth") == 0)
864  {
865  if (XrdOuca2x::a2i(m_log, "Error getting maxdepth value", cwg.GetWord(), &m_configuration.m_dirStatsMaxDepth, 0, 16))
866  {
867  return false;
868  }
869  m_configuration.m_dirStatsStoreDepth = std::max(m_configuration.m_dirStatsStoreDepth, m_configuration.m_dirStatsMaxDepth);
870  }
871  else if (strcmp(p, "dir") == 0)
872  {
873  p = cwg.GetWord();
874  if (p && p[0] == '/')
875  {
876  // XXX -- should we just store them as sets of PathTokenizer objects, not strings?
877 
878  char d[1024]; d[0] = 0;
879  int depth = 0;
880  { // Compress multiple slashes and "measure" depth
881  const char *pp = p;
882  char *pd = d;
883  *(pd++) = *(pp++);
884  while (*pp != 0)
885  {
886  if (*(pd - 1) == '/')
887  {
888  if (*pp == '/')
889  {
890  ++pp; continue;
891  }
892  ++depth;
893  }
894  *(pd++) = *(pp++);
895  }
896  *(pd--) = 0;
897  // remove trailing but but not leading /
898  if (*pd == '/' && pd != d) *pd = 0;
899  }
900  int ld = strlen(d);
901  if (ld >= 2 && d[ld-1] == '*' && d[ld-2] == '/')
902  {
903  d[ld-2] = 0;
904  ld -= 2;
905  m_configuration.m_dirStatsDirGlobs.insert(d);
906  printf("Glob %s -> %s -- depth = %d\n", p, d, depth);
907  }
908  else
909  {
910  m_configuration.m_dirStatsDirs.insert(d);
911  printf("Dir %s -> %s -- depth = %d\n", p, d, depth);
912  }
913 
914  m_configuration.m_dirStatsStoreDepth = std::max(m_configuration.m_dirStatsStoreDepth, depth);
915  }
916  else
917  {
918  m_log.Emsg("Config", "Error: dirstats dir parameter requires a directory argument starting with a '/'.");
919  return false;
920  }
921  }
922  else
923  {
924  m_log.Emsg("Config", "Error: dirstats stanza contains unknown directive '", p, "'");
925  return false;
926  }
927  }
928  }
929  else if ( part == "blocksize" )
930  {
931  long long minBSize = 4 * 1024;
932  long long maxBSize = 512 * 1024 * 1024;
933  if (XrdOuca2x::a2sz(m_log, "Error reading block-size", cwg.GetWord(), &m_configuration.m_bufferSize, minBSize, maxBSize))
934  {
935  return false;
936  }
937  if (m_configuration.m_bufferSize & 0xFFF)
938  {
939  m_configuration.m_bufferSize &= ~0x0FFF;
940  m_configuration.m_bufferSize += 0x1000;
941  m_log.Emsg("Config", "pfc.blocksize must be a multiple of 4 kB. Rounded up.");
942  }
943  }
944  else if ( part == "prefetch" || part == "nramprefetch" )
945  {
946  if (part == "nramprefetch")
947  {
948  m_log.Emsg("Config", "pfc.nramprefetch is deprecated, please use pfc.prefetch instead. Replacing the directive internally.");
949  }
950 
951  if (XrdOuca2x::a2i(m_log, "Error setting prefetch block count", cwg.GetWord(), &m_configuration.m_prefetch_max_blocks, 0, 128))
952  {
953  return false;
954  }
955 
956  }
957  else if ( part == "nramread" )
958  {
959  m_log.Emsg("Config", "pfc.nramread is deprecated, please use pfc.ram instead. Ignoring this directive.");
960  cwg.GetWord(); // Ignoring argument.
961  }
962  else if ( part == "ram" )
963  {
964  long long minRAM = m_isClient ? 256 * 1024 * 1024 : 1024 * 1024 * 1024;
965  long long maxRAM = 256 * minRAM;
966  if ( XrdOuca2x::a2sz(m_log, "get RAM available", cwg.GetWord(), &m_configuration.m_RamAbsAvailable, minRAM, maxRAM))
967  {
968  return false;
969  }
970  }
971  else if ( part == "writequeue")
972  {
973  if (XrdOuca2x::a2i(m_log, "Error getting pfc.writequeue num-blocks", cwg.GetWord(), &m_configuration.m_wqueue_blocks, 1, 1024))
974  {
975  return false;
976  }
977  if (XrdOuca2x::a2i(m_log, "Error getting pfc.writequeue num-threads", cwg.GetWord(), &m_configuration.m_wqueue_threads, 1, 64))
978  {
979  return false;
980  }
981  }
982  else if ( part == "spaces" )
983  {
984  m_configuration.m_data_space = cwg.GetWord();
985  m_configuration.m_meta_space = cwg.GetWord();
986  if ( ! cwg.HasLast())
987  {
988  m_log.Emsg("Config", "spacenames requires two parameters: <data-space> <metadata-space>.");
989  return false;
990  }
991  }
992  else if ( part == "hdfsmode" )
993  {
994  m_log.Emsg("Config", "pfc.hdfsmode is currently unsupported.");
995  return false;
996 
997  m_configuration.m_hdfsmode = true;
998 
999  const char* params = cwg.GetWord();
1000  if (params)
1001  {
1002  if (! strncmp("hdfsbsize", params, 9))
1003  {
1004  long long minBlSize = 32 * 1024;
1005  long long maxBlSize = 128 * 1024 * 1024;
1006  if ( XrdOuca2x::a2sz(m_log, "Error getting file fragment size", cwg.GetWord(), &m_configuration.m_hdfsbsize, minBlSize, maxBlSize))
1007  {
1008  return false;
1009  }
1010  }
1011  else
1012  {
1013  m_log.Emsg("Config", "Error setting the fragment size parameter name");
1014  return false;
1015  }
1016  }
1017  }
1018  else if ( part == "flush" )
1019  {
1020  tmpc.m_flushRaw = cwg.GetWord();
1021  if ( ! cwg.HasLast())
1022  {
1023  m_log.Emsg("Config", "Error: pfc.flush requires a parameter.");
1024  return false;
1025  }
1026  }
1027  else if ( part == "onlyifcached" )
1028  {
1029  const char *p = 0;
1030  while ((p = cwg.GetWord()) && cwg.HasLast())
1031  {
1032  if (strcmp(p, "minsize") == 0)
1033  {
1034  std::string minBytes = cwg.GetWord();
1035  long long minBytesTop = 1024 * 1024 * 1024;
1036  if (::isalpha(*(minBytes.rbegin())))
1037  {
1038  if (XrdOuca2x::a2sz(m_log, "Error in parsing minsize value for onlyifcached parameter", minBytes.c_str(), &m_configuration.m_onlyIfCachedMinSize, 0, minBytesTop))
1039  {
1040  return false;
1041  }
1042  }
1043  else
1044  {
1045  if (XrdOuca2x::a2ll(m_log, "Error in parsing numeric minsize value for onlyifcached parameter", minBytes.c_str(),&m_configuration.m_onlyIfCachedMinSize, 0, minBytesTop))
1046  {
1047  return false;
1048  }
1049  }
1050  }
1051  if (strcmp(p, "minfrac") == 0)
1052  {
1053  std::string minFrac = cwg.GetWord();
1054  char *eP;
1055  errno = 0;
1056  double frac = strtod(minFrac.c_str(), &eP);
1057  if (errno || eP == minFrac.c_str())
1058  {
1059  m_log.Emsg("Config", "Error setting fraction for only-if-cached directive");
1060  return false;
1061  }
1062  m_configuration.m_onlyIfCachedMinFrac = frac;
1063  }
1064  else
1065  {
1066  m_log.Emsg("Config", "Error: onlyifcached stanza contains unknown directive", p);
1067  }
1068  }
1069  }
1070  else
1071  {
1072  m_log.Emsg("ConfigParameters() unmatched pfc parameter", part.c_str());
1073  return false;
1074  }
1075 
1076  return true;
1077 }
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
XrdSysXAttr * XrdSysXAttrActive
Definition: XrdSysFAttr.cc:61
XrdVERSIONINFO(XrdOucGetCache, XrdPfc)
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:76
int open(const char *path, int oflag,...)
int isNo(int dflt, const char *Msg1, const char *Msg2, const char *Msg3)
if(Avsz)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
long long Total
Definition: XrdOssVS.hh:90
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:188
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
void * Resolve(const char *symbl, int mcnt=1)
void Unload(bool dodel=false)
char * GetWord(int lowcase=0)
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:70
static int a2tm(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:288
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
Base class for selecting which files should be cached.
virtual bool ConfigDecision(const char *params)
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
static size_t s_maxNumAccess
Definition: XrdPfcInfo.hh:311
Base class for reguesting directory space to obtain.
virtual bool ConfigPurgePin(const char *params)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
XrdCmsConfig Config
Definition: XrdPfc.hh:41
const char * trace_what_strings[]
@ CSChk_Both
Definition: XrdPfcTypes.hh:27
@ CSChk_Net
Definition: XrdPfcTypes.hh:27
@ CSChk_TLS
Definition: XrdPfcTypes.hh:28
@ CSChk_Cache
Definition: XrdPfcTypes.hh:27
@ CSChk_None
Definition: XrdPfcTypes.hh:27
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:115
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:109
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:116
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition: XrdPfc.hh:100
int m_wqueue_threads
number of threads writing blocks to disk
Definition: XrdPfc.hh:112
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:94
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition: XrdPfc.hh:106
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:85
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:93
bool is_cschk_cache() const
Definition: XrdPfc.hh:75
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition: XrdPfc.hh:103
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:113
bool m_cs_ChkTLS
Allow TLS.
Definition: XrdPfc.hh:120
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:95
int m_cs_Chk
Checksum check.
Definition: XrdPfc.hh:119
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition: XrdPfc.hh:99
bool m_hdfsmode
flag for enabling block-level operation
Definition: XrdPfc.hh:84
int m_purgeColdFilesAge
purge files older than this age
Definition: XrdPfc.hh:98
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:88
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition: XrdPfc.hh:102
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:92
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:110
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:108
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
Definition: XrdPfc.hh:104
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:89
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:111
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
bool is_cschk_net() const
Definition: XrdPfc.hh:76
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:123
time_t m_cs_UVKeep
unverified checksum cache keep
Definition: XrdPfc.hh:118
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition: XrdPfc.hh:105
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:97
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:122
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:70
std::string m_diskUsageLWM
Definition: XrdPfc.hh:130
std::string m_diskUsageHWM
Definition: XrdPfc.hh:131
std::string m_fileUsageBaseline
Definition: XrdPfc.hh:132
std::string m_fileUsageNominal
Definition: XrdPfc.hh:133
std::string m_flushRaw
Definition: XrdPfc.hh:135
std::string m_fileUsageMax
Definition: XrdPfc.hh:134