XRootD
XrdPfcConfiguration.cc
Go to the documentation of this file.
1 #include "XrdPfc.hh"
2 #include "XrdPfcTrace.hh"
3 #include "XrdPfcInfo.hh"
4 
6 #include "XrdPfcPurgePin.hh"
7 
8 #include "XrdOss/XrdOss.hh"
9 
10 #include "XrdOuc/XrdOucEnv.hh"
11 #include "XrdOuc/XrdOucUtils.hh"
12 #include "XrdOuc/XrdOucStream.hh"
14 #include "XrdOuc/XrdOuca2x.hh"
15 
16 #include "XrdVersion.hh"
17 #include "XrdOfs/XrdOfsConfigPI.hh"
18 #include "XrdSys/XrdSysXAttr.hh"
19 
20 #include <fcntl.h>
21 
23 
24 namespace XrdPfc
25 {
26  const char *trace_what_strings[] = {"","error ","warning ","info ","debug ","dump "};
27 }
28 
29 using namespace XrdPfc;
30 
32 
34  m_hdfsmode(false),
35  m_allow_xrdpfc_command(false),
36  m_data_space("public"),
37  m_meta_space("public"),
38  m_diskTotalSpace(-1),
39  m_diskUsageLWM(-1),
40  m_diskUsageHWM(-1),
41  m_fileUsageBaseline(-1),
42  m_fileUsageNominal(-1),
43  m_fileUsageMax(-1),
44  m_purgeInterval(300),
45  m_purgeColdFilesAge(-1),
46  m_purgeAgeBasedPeriod(10),
47  m_accHistorySize(20),
48  m_dirStatsMaxDepth(-1),
49  m_dirStatsStoreDepth(0),
50  m_bufferSize(128*1024),
51  m_RamAbsAvailable(0),
52  m_RamKeepStdBlocks(0),
53  m_wqueue_blocks(16),
54  m_wqueue_threads(4),
55  m_prefetch_max_blocks(10),
56  m_hdfsbsize(128*1024*1024),
57  m_flushCnt(2000),
58  m_cs_UVKeep(-1),
59  m_cs_Chk(CSChk_Net),
60  m_cs_ChkTLS(false),
61  m_onlyIfCachedMinSize(1024*1024),
62  m_onlyIfCachedMinFrac(1.0)
63 {}
64 
65 
66 bool Cache::cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name)
67 {
68  char errStr[1024];
69  snprintf(errStr, 1024, "ConfigParameters() Error parsing parameter %s", name);
70 
71  if (::isalpha(*(str.rbegin())))
72  {
73  if (XrdOuca2x::a2sz(m_log, errStr, str.c_str(), &store, 0, totalSpace))
74  {
75  return false;
76  }
77  }
78  else
79  {
80  char *eP;
81  errno = 0;
82  double frac = strtod(str.c_str(), &eP);
83  if (errno || eP == str.c_str())
84  {
85  m_log.Emsg(errStr, str.c_str());
86  return false;
87  }
88 
89  store = static_cast<long long>(totalSpace * frac + 0.5);
90  }
91 
92  if (store < 0 || store > totalSpace)
93  {
94  snprintf(errStr, 1024, "ConfigParameters() Error: parameter %s should be between 0 and total available disk space (%lld) - it is %lld (given as %s)",
95  name, totalSpace, store, str.c_str());
96  m_log.Emsg(errStr, "");
97  return false;
98  }
99 
100  return true;
101 }
102 
103 /* Function: xcschk
104 
105  Purpose: To parse the directive: cschk <parms>
106 
107  parms: [[no]net] [[no]tls] [[no]cache] [uvkeep <arg>]
108 
109  all Checksum check on cache & net transfers.
110  cache Checksum check on cache only, 'no' turns it off.
111  net Checksum check on net transfers 'no' turns it off.
112  tls use TLS if server doesn't support checksums 'no' turns it off.
113  uvkeep Maximum amount of time a cached file make be kept if it
114  contains unverified checksums as n[d|h|m|s], where 'n'
115  is a non-negative integer. A value of 0 prohibits disk
116  caching unless the checksum can be verified. You can
117  also specify "lru" which means the standard purge policy
118  is to be used.
119 
120  Output: true upon success or false upon failure.
121  */
122 bool Cache::xcschk(XrdOucStream &Config)
123 {
124  const char *val, *val2;
125  struct cschkopts {const char *opname; int opval;} csopts[] =
126  {
127  {"off", CSChk_None},
128  {"cache", CSChk_Cache},
129  {"net", CSChk_Net},
130  {"tls", CSChk_TLS}
131  };
132  int i, numopts = sizeof(csopts)/sizeof(struct cschkopts);
133  bool isNo;
134 
135  if (! (val = Config.GetWord()))
136  {m_log.Emsg("Config", "cschk parameter not specified"); return false; }
137 
138  while(val)
139  {
140  if ((isNo = strncmp(val, "no", 2) == 0))
141  val2 = val + 2;
142  else
143  val2 = val;
144  for (i = 0; i < numopts; i++)
145  {
146  if (!strcmp(val2, csopts[i].opname))
147  {
148  if (isNo)
149  m_configuration.m_cs_Chk &= ~csopts[i].opval;
150  else if (csopts[i].opval)
151  m_configuration.m_cs_Chk |= csopts[i].opval;
152  else
153  m_configuration.m_cs_Chk = csopts[i].opval;
154  break;
155  }
156  }
157  if (i >= numopts)
158  {
159  if (strcmp(val, "uvkeep"))
160  {
161  m_log.Emsg("Config", "invalid cschk option -", val);
162  return false;
163  }
164  if (!(val = Config.GetWord()))
165  {
166  m_log.Emsg("Config", "cschk uvkeep value not specified");
167  return false;
168  }
169  if (!strcmp(val, "lru"))
170  m_configuration.m_cs_UVKeep = -1;
171  else
172  {
173  int uvkeep;
174  if (XrdOuca2x::a2tm(m_log, "uvkeep time", val, &uvkeep, 0))
175  return false;
176  m_configuration.m_cs_UVKeep = uvkeep;
177  }
178  }
179  val = Config.GetWord();
180  }
181  // Decompose into separate TLS state, it is only passed on to psx
182  m_configuration.m_cs_ChkTLS = m_configuration.m_cs_Chk & CSChk_TLS;
183  m_configuration.m_cs_Chk &= ~CSChk_TLS;
184 
185  m_env->Put("psx.CSNet", m_configuration.is_cschk_net() ? (m_configuration.m_cs_ChkTLS ? "2" : "1") : "0");
186 
187  return true;
188 }
189 
190 
191 /* Function: xdlib
192 
193  Purpose: To parse the directive: decisionlib <path> [<parms>]
194 
195  <path> the path of the decision library to be used.
196  <parms> optional parameters to be passed.
197 
198 
199  Output: true upon success or false upon failure.
200  */
201 bool Cache::xdlib(XrdOucStream &Config)
202 {
203  const char* val;
204 
205  std::string libp;
206  if (! (val = Config.GetWord()) || ! val[0])
207  {
208  TRACE(Info," Cache::Config() decisionlib not specified; always caching files");
209  return true;
210  }
211  else
212  {
213  libp = val;
214  }
215 
216  char params[4096];
217  if (val[0])
218  Config.GetRest(params, 4096);
219  else
220  params[0] = 0;
221 
222  XrdOucPinLoader* myLib = new XrdOucPinLoader(&m_log, 0, "decisionlib",
223  libp.c_str());
224 
225  Decision *(*ep)(XrdSysError&);
226  ep = (Decision *(*)(XrdSysError&))myLib->Resolve("XrdPfcGetDecision");
227  if (! ep) {myLib->Unload(true); return false; }
228 
229  Decision * d = ep(m_log);
230  if (! d)
231  {
232  TRACE(Error, "Config() decisionlib was not able to create a decision object");
233  return false;
234  }
235  if (params[0])
236  d->ConfigDecision(params);
237 
238  m_decisionpoints.push_back(d);
239  return true;
240 }
241 
242 /* Function: xplib
243 
244  Purpose: To parse the directive: purgelib <path> [<parms>]
245 
246  <path> the path of the decision library to be used.
247  <parms> optional parameters to be passed.
248 
249 
250  Output: true upon success or false upon failure.
251  */
252 bool Cache::xplib(XrdOucStream &Config)
253 {
254  const char* val;
255 
256  std::string libp;
257  if (! (val = Config.GetWord()) || ! val[0])
258  {
259  TRACE(Info," Cache::Config() purgelib not specified; will use LRU for purging files");
260  return true;
261  }
262  else
263  {
264  libp = val;
265  }
266 
267  char params[4096];
268  if (val[0])
269  Config.GetRest(params, 4096);
270  else
271  params[0] = 0;
272 
273  XrdOucPinLoader* myLib = new XrdOucPinLoader(&m_log, 0, "purgelib",
274  libp.c_str());
275 
276  PurgePin *(*ep)(XrdSysError&);
277  ep = (PurgePin *(*)(XrdSysError&))myLib->Resolve("XrdPfcGetPurgePin");
278  if (! ep) {myLib->Unload(true); return false; }
279 
280  PurgePin * dp = ep(m_log);
281  if (! dp)
282  {
283  TRACE(Error, "Config() purgelib was not able to create a Purge Plugin object?");
284  return false;
285  }
286  m_purge_pin = dp;
287 
288  if (params[0])
289  m_purge_pin->ConfigPurgePin(params);
290 
291 
292  return true;
293 }
294 
295 /* Function: xtrace
296 
297  Purpose: To parse the directive: trace <level>
298  Output: true upon success or false upon failure.
299  */
300 bool Cache::xtrace(XrdOucStream &Config)
301 {
302  char *val;
303  static struct traceopts {const char *opname; int opval; } tropts[] =
304  {
305  {"none", 0},
306  {"error", 1},
307  {"warning", 2},
308  {"info", 3},
309  {"debug", 4},
310  {"dump", 5},
311  {"dumpxl", 6}
312  };
313  int numopts = sizeof(tropts)/sizeof(struct traceopts);
314 
315  if (! (val = Config.GetWord()))
316  {m_log.Emsg("Config", "trace option not specified"); return 1; }
317 
318  for (int i = 0; i < numopts; i++)
319  {
320  if (! strcmp(val, tropts[i].opname))
321  {
322  m_trace->What = tropts[i].opval;
323  return true;
324  }
325  }
326  m_log.Emsg("Config", "invalid trace option -", val);
327  return false;
328 }
329 
330 // Determine if oss spaces are operational and if they support xattrs.
331 bool Cache::test_oss_basics_and_features()
332 {
333  static const char *epfx = "test_oss_basics_and_features()";
334 
335  const auto &conf = m_configuration;
336  const char *user = conf.m_username.c_str();
337  XrdOucEnv env;
338 
339  auto check_space = [&](const char *space, bool &has_xattr)
340  {
341  std::string fname("__prerun_test_pfc_");
342  fname += space;
343  fname += "_space__";
344  env.Put("oss.cgroup", space);
345 
346  int res = m_oss->Create(user, fname.c_str(), 0600, env, XRDOSS_mkpath);
347  if (res != XrdOssOK) {
348  m_log.Emsg(epfx, "Can not create a file on space", space);
349  return false;
350  }
351  XrdOssDF *oss_file = m_oss->newFile(user);
352  res = oss_file->Open(fname.c_str(), O_RDWR, 0600, env);
353  if (res != XrdOssOK) {
354  m_log.Emsg(epfx, "Can not open a file on space", space);
355  return false;
356  }
357  res = oss_file->Write(fname.data(), 0, fname.length());
358  if (res != (int) fname.length()) {
359  m_log.Emsg(epfx, "Can not write into a file on space", space);
360  return false;
361  }
362 
363  has_xattr = true;
364  long long fsize = fname.length();
365  res = XrdSysXAttrActive->Set("pfc.fsize", &fsize, sizeof(long long), 0, oss_file->getFD(), 0);
366  if (res != 0) {
367  m_log.Emsg(epfx, "Can not write xattr to a file on space", space);
368  has_xattr = false;
369  }
370 
371  oss_file->Close();
372 
373  if (has_xattr) {
374  char pfn[4096];
375  m_oss->Lfn2Pfn(fname.c_str(), pfn, 4096);
376  fsize = -1ll;
377  res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
378  if (res != sizeof(long long) || fsize != (long long) fname.length())
379  {
380  m_log.Emsg(epfx, "Can not read xattr from a file on space", space);
381  has_xattr = false;
382  }
383  }
384 
385  res = m_oss->Unlink(fname.c_str());
386  if (res != XrdOssOK) {
387  m_log.Emsg(epfx, "Can not unlink a file on space", space);
388  return false;
389  }
390 
391  return true;
392  };
393 
394  bool aOK = true;
395  aOK &= check_space(conf.m_data_space.c_str(), m_dataXattr);
396  aOK &= check_space(conf.m_meta_space.c_str(), m_metaXattr);
397 
398  return aOK;
399 }
400 
401 //______________________________________________________________________________
402 /* Function: Config
403 
404  Purpose: To parse configuration file and configure Cache instance.
405  Output: true upon success or false upon failure.
406  */
407 bool Cache::Config(const char *config_filename, const char *parameters)
408 {
409  // Indicate whether or not we are a client instance
410  const char *theINS = getenv("XRDINSTANCE");
411  m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
412 
413  // Tell everyone else we are a caching proxy
414  XrdOucEnv::Export("XRDPFC", 1);
415 
416  XrdOucEnv myEnv;
417  XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
418 
419  if (! config_filename || ! *config_filename)
420  {
421  TRACE(Error, "Config() configuration file not specified.");
422  return false;
423  }
424 
425  int fd;
426  if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
427  {
428  TRACE( Error, "Config() can't open configuration file " << config_filename);
429  return false;
430  }
431 
432  Config.Attach(fd);
433  static const char *cvec[] = { "*** pfc plugin config:", 0 };
434  Config.Capture(cvec);
435 
436  // Obtain OFS configurator for OSS plugin.
437  XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
438  &XrdVERSIONINFOVAR(XrdOucGetCache));
439  if (! ofsCfg) return false;
440 
441  TmpConfiguration tmpc;
442 
443  // Adjust default parameters for client/serverless caching
444  if (m_isClient)
445  {
446  m_configuration.m_bufferSize = 128 * 1024; // same as normal.
447  m_configuration.m_wqueue_blocks = 8;
448  m_configuration.m_wqueue_threads = 1;
449  }
450 
451  // If network checksum processing is the default, indicate so.
452  if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
453 
454  // Actual parsing of the config file.
455  bool retval = true, aOK = true;
456  char *var;
457  while ((var = Config.GetMyFirstWord()))
458  {
459  if (! strcmp(var,"pfc.osslib"))
460  {
461  retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
462  }
463  else if (! strcmp(var,"pfc.cschk"))
464  {
465  retval = xcschk(Config);
466  }
467  else if (! strcmp(var,"pfc.decisionlib"))
468  {
469  retval = xdlib(Config);
470  }
471  else if (! strcmp(var,"pfc.purgelib"))
472  {
473  retval = xplib(Config);
474  }
475  else if (! strcmp(var,"pfc.trace"))
476  {
477  retval = xtrace(Config);
478  }
479  else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
480  {
481  m_configuration.m_allow_xrdpfc_command = true;
482  }
483  else if (! strncmp(var,"pfc.", 4))
484  {
485  retval = ConfigParameters(std::string(var+4), Config, tmpc);
486  }
487 
488  if ( ! retval)
489  {
490  TRACE(Error, "Config() error in parsing");
491  aOK = false;
492  }
493  }
494 
495  Config.Close();
496 
497  // Load OSS plugin.
498  myEnv.Put("oss.runmode", "pfc");
499  if (m_configuration.is_cschk_cache())
500  {
501  char csi_conf[128];
502  if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
503  {
504  ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
505  } else {
506  TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
507  return false;
508  }
509  }
510  if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
511  {
512  ofsCfg->Plugin(m_oss);
513  }
514  else
515  {
516  TRACE(Error, "Config() Unable to create an OSS object");
517  return false;
518  }
519 
520  // Test if OSS is operational, determine optional features.
521  aOK &= test_oss_basics_and_features();
522 
523  // sets default value for disk usage
524  XrdOssVSInfo sP;
525  {
526  if (m_configuration.m_meta_space != m_configuration.m_data_space &&
527  m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
528  {
529  m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
530  return false;
531  }
532  if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
533  {
534  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
535  m_configuration.m_meta_space.c_str());
536  return false;
537  }
538  if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
539  {
540  m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
541  return false;
542  }
543  if (sP.Total < 10ll << 20)
544  {
545  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
546  m_configuration.m_data_space.c_str());
547  return false;
548  }
549 
550  m_configuration.m_diskTotalSpace = sP.Total;
551 
552  if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
553  cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
554  {
555  if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
556  m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
557  aOK = false;
558  }
559  }
560  else aOK = false;
561 
562  if ( ! tmpc.m_fileUsageMax.empty())
563  {
564  if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
565  cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
566  cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
567  {
568  if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
569  m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
570  m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
571  {
572  m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
573  aOK = false;
574  }
575 
576 
577  if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
578  {
579  m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
580  aOK = false;
581  }
582  }
583  else aOK = false;
584  }
585  }
586 
587  // sets flush frequency
588  if ( ! tmpc.m_flushRaw.empty())
589  {
590  if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
591  {
592  if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
593  &m_configuration.m_flushCnt,
594  100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
595  {
596  return false;
597  }
598  m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
599  }
600  else
601  {
602  if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
603  &m_configuration.m_flushCnt, 100, 100000))
604  {
605  return false;
606  }
607  }
608  }
609 
610  // get number of available RAM blocks after process configuration
611  if (m_configuration.m_RamAbsAvailable == 0)
612  {
613  m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
614  char buff[1024];
615  snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
616  m_log.Say("Config info: ", buff);
617  }
618  // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
619  m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
620 
621  // Set tracing to debug if this is set in environment
622  char* cenv = getenv("XRDDEBUG");
623  if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
624 
625  if (aOK)
626  {
627  int loff = 0;
628 // 000 001 010
629  const char *csc[] = {"off", "cache nonet", "nocache net notls",
630 // 011
631  "cache net notls",
632 // 100 101 110
633  "off", "cache nonet", "nocache net tls",
634 // 111
635  "cache net tls"};
636  char buff[8192], uvk[32];
637  if (m_configuration.m_cs_UVKeep < 0)
638  strcpy(uvk, "lru");
639  else
640  sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
641  float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
642  loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
643  " pfc.cschk %s uvkeep %s\n"
644  " pfc.blocksize %lld\n"
645  " pfc.prefetch %d\n"
646  " pfc.ram %.fg\n"
647  " pfc.writequeue %d %d\n"
648  " # Total available disk: %lld\n"
649  " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
650  " pfc.spaces %s %s\n"
651  " pfc.trace %d\n"
652  " pfc.flush %lld\n"
653  " pfc.acchistorysize %d\n"
654  " pfc.onlyIfCachedMinBytes %lld\n"
655  " pfc.onlyIfCachedMinFrac %.2f\n",
656  config_filename,
657  csc[int(m_configuration.m_cs_Chk)], uvk,
658  m_configuration.m_bufferSize,
659  m_configuration.m_prefetch_max_blocks,
660  rg,
661  m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
662  sP.Total,
663  m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
664  m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
665  m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
666  m_configuration.m_data_space.c_str(),
667  m_configuration.m_meta_space.c_str(),
668  m_trace->What,
669  m_configuration.m_flushCnt,
670  m_configuration.m_accHistorySize,
671  m_configuration.m_onlyIfCachedMinSize,
672  m_configuration.m_onlyIfCachedMinFrac);
673 
674  if (m_configuration.is_dir_stat_reporting_on())
675  {
676  loff += snprintf(buff + loff, sizeof(buff) - loff,
677  " pfc.dirstats maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
678  m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
679  (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
680  loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
681  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
682  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
683  loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
684  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
685  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
686  }
687 
688  if (m_configuration.m_hdfsmode)
689  {
690  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
691  }
692 
693  if (m_configuration.m_username.empty())
694  {
695  char unameBuff[256];
696  XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
697  m_configuration.m_username = unameBuff;
698  }
699  else
700  {
701  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
702  }
703 
704  m_log.Say(buff);
705 
706  m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
707  }
708 
709  // Derived settings
710  m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
711  Info::s_maxNumAccess = m_configuration.m_accHistorySize;
712 
713  m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
714 
715  m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
716 
717  // Create the ResourceMonitor and get it ready for starting the main thread function.
718  if (aOK)
719  {
720  m_res_mon = new ResourceMonitor(*m_oss);
721  m_res_mon->init_before_main();
722  }
723 
724  m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
725 
726  if (ofsCfg) delete ofsCfg;
727 
728  // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
729  // Building of xrdpfc_print fails when this is enabled.
730 #ifdef XRDPFC_CKSUM_TEST
731  {
732  int xxx = m_configuration.m_cs_Chk;
733 
734  for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
735  {
736  Info::TestCksumStuff();
737  }
738 
739  m_configuration.m_cs_Chk = xxx;
740  }
741 #endif
742 
743  return aOK;
744 }
745 
746 //------------------------------------------------------------------------------
747 
748 bool Cache::ConfigParameters(std::string part, XrdOucStream& config, TmpConfiguration &tmpc)
749 {
750  struct ConfWordGetter
751  {
752  XrdOucStream &m_config;
753  char *m_last_word;
754 
755  ConfWordGetter(XrdOucStream& c) : m_config(c), m_last_word((char*)1) {}
756 
757  const char* GetWord() { if (HasLast()) m_last_word = m_config.GetWord(); return HasLast() ? m_last_word : ""; }
758  bool HasLast() { return (m_last_word != 0); }
759  };
760 
761  ConfWordGetter cwg(config);
762 
763  XrdSysError err(0, "");
764  if ( part == "user" )
765  {
766  m_configuration.m_username = cwg.GetWord();
767  if ( ! cwg.HasLast())
768  {
769  m_log.Emsg("Config", "Error: pfc.user requires a parameter.");
770  return false;
771  }
772  }
773  else if ( part == "diskusage" )
774  {
775  tmpc.m_diskUsageLWM = cwg.GetWord();
776  tmpc.m_diskUsageHWM = cwg.GetWord();
777 
778  if (tmpc.m_diskUsageHWM.empty())
779  {
780  m_log.Emsg("Config", "Error: pfc.diskusage parameter requires at least two arguments.");
781  return false;
782  }
783 
784  const char *p = 0;
785  while ((p = cwg.GetWord()) && cwg.HasLast())
786  {
787  if (strcmp(p, "files") == 0)
788  {
789  tmpc.m_fileUsageBaseline = cwg.GetWord();
790  tmpc.m_fileUsageNominal = cwg.GetWord();
791  tmpc.m_fileUsageMax = cwg.GetWord();
792 
793  if ( ! cwg.HasLast())
794  {
795  m_log.Emsg("Config", "Error: pfc.diskusage files directive requires three arguments.");
796  return false;
797  }
798  }
799  else if (strcmp(p, "sleep") == 0 || strcmp(p, "purgeinterval") == 0)
800  {
801  if (strcmp(p, "sleep") == 0) m_log.Emsg("Config", "warning sleep directive is deprecated in pfc.diskusage. Please use purgeinterval instead.");
802 
803  if (XrdOuca2x::a2tm(m_log, "Error getting purgeinterval", cwg.GetWord(), &m_configuration.m_purgeInterval, 60, 3600))
804  {
805  return false;
806  }
807  }
808  else if (strcmp(p, "purgecoldfiles") == 0)
809  {
810  if (XrdOuca2x::a2tm(m_log, "Error getting purgecoldfiles age", cwg.GetWord(), &m_configuration.m_purgeColdFilesAge, 3600, 3600*24*360))
811  {
812  return false;
813  }
814  if (XrdOuca2x::a2i(m_log, "Error getting purgecoldfiles period", cwg.GetWord(), &m_configuration.m_purgeAgeBasedPeriod, 1, 1000))
815  {
816  return false;
817  }
818  }
819  else
820  {
821  m_log.Emsg("Config", "Error: diskusage stanza contains unknown directive", p);
822  }
823  }
824  }
825  else if ( part == "acchistorysize" )
826  {
827  if ( XrdOuca2x::a2i(m_log, "Error getting access-history-size", cwg.GetWord(), &m_configuration.m_accHistorySize, 20, 200))
828  {
829  return false;
830  }
831  }
832  else if ( part == "dirstats" )
833  {
834  const char *p = 0;
835  while ((p = cwg.GetWord()) && cwg.HasLast())
836  {
837  if (strcmp(p, "maxdepth") == 0)
838  {
839  if (XrdOuca2x::a2i(m_log, "Error getting maxdepth value", cwg.GetWord(), &m_configuration.m_dirStatsMaxDepth, 0, 16))
840  {
841  return false;
842  }
843  m_configuration.m_dirStatsStoreDepth = std::max(m_configuration.m_dirStatsStoreDepth, m_configuration.m_dirStatsMaxDepth);
844  }
845  else if (strcmp(p, "dir") == 0)
846  {
847  p = cwg.GetWord();
848  if (p && p[0] == '/')
849  {
850  // XXX -- should we just store them as sets of PathTokenizer objects, not strings?
851 
852  char d[1024]; d[0] = 0;
853  int depth = 0;
854  { // Compress multiple slashes and "measure" depth
855  const char *pp = p;
856  char *pd = d;
857  *(pd++) = *(pp++);
858  while (*pp != 0)
859  {
860  if (*(pd - 1) == '/')
861  {
862  if (*pp == '/')
863  {
864  ++pp; continue;
865  }
866  ++depth;
867  }
868  *(pd++) = *(pp++);
869  }
870  *(pd--) = 0;
871  // remove trailing but but not leading /
872  if (*pd == '/' && pd != d) *pd = 0;
873  }
874  int ld = strlen(d);
875  if (ld >= 2 && d[ld-1] == '*' && d[ld-2] == '/')
876  {
877  d[ld-2] = 0;
878  ld -= 2;
879  m_configuration.m_dirStatsDirGlobs.insert(d);
880  printf("Glob %s -> %s -- depth = %d\n", p, d, depth);
881  }
882  else
883  {
884  m_configuration.m_dirStatsDirs.insert(d);
885  printf("Dir %s -> %s -- depth = %d\n", p, d, depth);
886  }
887 
888  m_configuration.m_dirStatsStoreDepth = std::max(m_configuration.m_dirStatsStoreDepth, depth);
889  }
890  else
891  {
892  m_log.Emsg("Config", "Error: dirstats dir parameter requires a directory argument starting with a '/'.");
893  return false;
894  }
895  }
896  else
897  {
898  m_log.Emsg("Config", "Error: dirstats stanza contains unknown directive '", p, "'");
899  return false;
900  }
901  }
902  }
903  else if ( part == "blocksize" )
904  {
905  long long minBSize = 4 * 1024;
906  long long maxBSize = 512 * 1024 * 1024;
907  if (XrdOuca2x::a2sz(m_log, "Error reading block-size", cwg.GetWord(), &m_configuration.m_bufferSize, minBSize, maxBSize))
908  {
909  return false;
910  }
911  if (m_configuration.m_bufferSize & 0xFFF)
912  {
913  m_configuration.m_bufferSize &= ~0x0FFF;
914  m_configuration.m_bufferSize += 0x1000;
915  m_log.Emsg("Config", "pfc.blocksize must be a multiple of 4 kB. Rounded up.");
916  }
917  }
918  else if ( part == "prefetch" || part == "nramprefetch" )
919  {
920  if (part == "nramprefetch")
921  {
922  m_log.Emsg("Config", "pfc.nramprefetch is deprecated, please use pfc.prefetch instead. Replacing the directive internally.");
923  }
924 
925  if (XrdOuca2x::a2i(m_log, "Error setting prefetch block count", cwg.GetWord(), &m_configuration.m_prefetch_max_blocks, 0, 128))
926  {
927  return false;
928  }
929 
930  }
931  else if ( part == "nramread" )
932  {
933  m_log.Emsg("Config", "pfc.nramread is deprecated, please use pfc.ram instead. Ignoring this directive.");
934  cwg.GetWord(); // Ignoring argument.
935  }
936  else if ( part == "ram" )
937  {
938  long long minRAM = m_isClient ? 256 * 1024 * 1024 : 1024 * 1024 * 1024;
939  long long maxRAM = 256 * minRAM;
940  if ( XrdOuca2x::a2sz(m_log, "get RAM available", cwg.GetWord(), &m_configuration.m_RamAbsAvailable, minRAM, maxRAM))
941  {
942  return false;
943  }
944  }
945  else if ( part == "writequeue")
946  {
947  if (XrdOuca2x::a2i(m_log, "Error getting pfc.writequeue num-blocks", cwg.GetWord(), &m_configuration.m_wqueue_blocks, 1, 1024))
948  {
949  return false;
950  }
951  if (XrdOuca2x::a2i(m_log, "Error getting pfc.writequeue num-threads", cwg.GetWord(), &m_configuration.m_wqueue_threads, 1, 64))
952  {
953  return false;
954  }
955  }
956  else if ( part == "spaces" )
957  {
958  m_configuration.m_data_space = cwg.GetWord();
959  m_configuration.m_meta_space = cwg.GetWord();
960  if ( ! cwg.HasLast())
961  {
962  m_log.Emsg("Config", "spacenames requires two parameters: <data-space> <metadata-space>.");
963  return false;
964  }
965  }
966  else if ( part == "hdfsmode" )
967  {
968  m_log.Emsg("Config", "pfc.hdfsmode is currently unsupported.");
969  return false;
970 
971  m_configuration.m_hdfsmode = true;
972 
973  const char* params = cwg.GetWord();
974  if (params)
975  {
976  if (! strncmp("hdfsbsize", params, 9))
977  {
978  long long minBlSize = 32 * 1024;
979  long long maxBlSize = 128 * 1024 * 1024;
980  if ( XrdOuca2x::a2sz(m_log, "Error getting file fragment size", cwg.GetWord(), &m_configuration.m_hdfsbsize, minBlSize, maxBlSize))
981  {
982  return false;
983  }
984  }
985  else
986  {
987  m_log.Emsg("Config", "Error setting the fragment size parameter name");
988  return false;
989  }
990  }
991  }
992  else if ( part == "flush" )
993  {
994  tmpc.m_flushRaw = cwg.GetWord();
995  if ( ! cwg.HasLast())
996  {
997  m_log.Emsg("Config", "Error: pfc.flush requires a parameter.");
998  return false;
999  }
1000  }
1001  else if ( part == "onlyifcached" )
1002  {
1003  const char *p = 0;
1004  while ((p = cwg.GetWord()) && cwg.HasLast())
1005  {
1006  if (strcmp(p, "minsize") == 0)
1007  {
1008  std::string minBytes = cwg.GetWord();
1009  long long minBytesTop = 1024 * 1024 * 1024;
1010  if (::isalpha(*(minBytes.rbegin())))
1011  {
1012  if (XrdOuca2x::a2sz(m_log, "Error in parsing minsize value for onlyifcached parameter", minBytes.c_str(), &m_configuration.m_onlyIfCachedMinSize, 0, minBytesTop))
1013  {
1014  return false;
1015  }
1016  }
1017  else
1018  {
1019  if (XrdOuca2x::a2ll(m_log, "Error in parsing numeric minsize value for onlyifcached parameter", minBytes.c_str(),&m_configuration.m_onlyIfCachedMinSize, 0, minBytesTop))
1020  {
1021  return false;
1022  }
1023  }
1024  }
1025  if (strcmp(p, "minfrac") == 0)
1026  {
1027  std::string minFrac = cwg.GetWord();
1028  char *eP;
1029  errno = 0;
1030  double frac = strtod(minFrac.c_str(), &eP);
1031  if (errno || eP == minFrac.c_str())
1032  {
1033  m_log.Emsg("Config", "Error setting fraction for only-if-cached directive");
1034  return false;
1035  }
1036  m_configuration.m_onlyIfCachedMinFrac = frac;
1037  }
1038  else
1039  {
1040  m_log.Emsg("Config", "Error: onlyifcached stanza contains unknown directive", p);
1041  }
1042  }
1043  }
1044  else
1045  {
1046  m_log.Emsg("ConfigParameters() unmatched pfc parameter", part.c_str());
1047  return false;
1048  }
1049 
1050  return true;
1051 }
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
XrdSysXAttr * XrdSysXAttrActive
Definition: XrdSysFAttr.cc:61
XrdVERSIONINFO(XrdOucGetCache, XrdPfc)
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:77
int open(const char *path, int oflag,...)
int isNo(int dflt, const char *Msg1, const char *Msg2, const char *Msg3)
if(Avsz)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
long long Total
Definition: XrdOssVS.hh:90
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:188
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
void * Resolve(const char *symbl, int mcnt=1)
void Unload(bool dodel=false)
char * GetWord(int lowcase=0)
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:70
static int a2tm(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:288
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
Base class for selecting which files should be cached.
virtual bool ConfigDecision(const char *params)
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
static size_t s_maxNumAccess
Definition: XrdPfcInfo.hh:311
Base class for reguesting directory space to obtain.
virtual bool ConfigPurgePin(const char *params)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
XrdCmsConfig Config
Definition: XrdPfc.hh:41
const char * trace_what_strings[]
@ CSChk_Both
Definition: XrdPfcTypes.hh:27
@ CSChk_Net
Definition: XrdPfcTypes.hh:27
@ CSChk_TLS
Definition: XrdPfcTypes.hh:28
@ CSChk_Cache
Definition: XrdPfcTypes.hh:27
@ CSChk_None
Definition: XrdPfcTypes.hh:27
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:114
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:108
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:115
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition: XrdPfc.hh:100
int m_wqueue_threads
number of threads writing blocks to disk
Definition: XrdPfc.hh:111
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:94
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition: XrdPfc.hh:105
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:85
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:93
bool is_cschk_cache() const
Definition: XrdPfc.hh:75
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition: XrdPfc.hh:103
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:112
bool m_cs_ChkTLS
Allow TLS.
Definition: XrdPfc.hh:119
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:95
int m_cs_Chk
Checksum check.
Definition: XrdPfc.hh:118
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition: XrdPfc.hh:99
bool m_hdfsmode
flag for enabling block-level operation
Definition: XrdPfc.hh:84
int m_purgeColdFilesAge
purge files older than this age
Definition: XrdPfc.hh:98
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:88
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition: XrdPfc.hh:102
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:92
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:109
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:107
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:89
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:110
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
bool is_cschk_net() const
Definition: XrdPfc.hh:76
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:122
time_t m_cs_UVKeep
unverified checksum cache keep
Definition: XrdPfc.hh:117
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition: XrdPfc.hh:104
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:97
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:121
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:70
std::string m_diskUsageLWM
Definition: XrdPfc.hh:129
std::string m_diskUsageHWM
Definition: XrdPfc.hh:130
std::string m_fileUsageBaseline
Definition: XrdPfc.hh:131
std::string m_fileUsageNominal
Definition: XrdPfc.hh:132
std::string m_flushRaw
Definition: XrdPfc.hh:134
std::string m_fileUsageMax
Definition: XrdPfc.hh:133