XRootD
XrdPfcResourceMonitor.cc
Go to the documentation of this file.
2 #include "XrdPfc.hh"
4 #include "XrdPfcFsTraversal.hh"
5 #include "XrdPfcDirState.hh"
7 #include "XrdPfcTrace.hh"
8 #include "XrdPfcPurgePin.hh"
9 
10 #include "XrdOss/XrdOss.hh"
11 
12 #include <algorithm>
13 
14 // #define RM_DEBUG
15 #ifdef RM_DEBUG
16 #define dprintf(...) printf(__VA_ARGS__)
17 #else
18 #define dprintf(...) (void(0))
19 #endif
20 
21 using namespace XrdPfc;
22 
23 namespace
24 {
25  XrdSysTrace* GetTrace() { return Cache::GetInstance().GetTrace(); }
26  const char *m_traceID = "ResourceMonitor";
27 }
28 
29 //------------------------------------------------------------------------------
30 
32  m_fs_state(* new DataFsState),
33  m_oss(oss)
34 {}
35 
37 {
38  delete &m_fs_state;
39 }
40 
41 //------------------------------------------------------------------------------
42 // Initial scan
43 //------------------------------------------------------------------------------
44 
46 {
47  m_dir_scan_mutex.Lock();
48  if (m_dir_scan_in_progress) {
49  m_dir_scan_open_requests.push_back({lfn, cond});
50  LfnCondRecord &lcr = m_dir_scan_open_requests.back();
51  cond.Lock();
52  m_dir_scan_mutex.UnLock();
53  while ( ! lcr.f_checked)
54  cond.Wait();
55  cond.UnLock();
56  } else {
57  m_dir_scan_mutex.UnLock();
58  }
59 }
60 
61 void ResourceMonitor::process_inter_dir_scan_open_requests(FsTraversal &fst)
62 {
63  m_dir_scan_mutex.Lock();
64  while ( ! m_dir_scan_open_requests.empty())
65  {
66  LfnCondRecord &lcr = m_dir_scan_open_requests.front();
67  m_dir_scan_mutex.UnLock();
68 
69  cross_check_or_process_oob_lfn(lcr.f_lfn, fst);
70  lcr.f_cond.Lock();
71  lcr.f_checked = true;
72  lcr.f_cond.Signal();
73  lcr.f_cond.UnLock();
74 
75  m_dir_scan_mutex.Lock();
76  m_dir_scan_open_requests.pop_front();
77  }
78  m_dir_scan_mutex.UnLock();
79 }
80 
81 void ResourceMonitor::cross_check_or_process_oob_lfn(const std::string &lfn, FsTraversal &fst)
82 {
83  // Check if lfn has already been processed ... or process it now and mark
84  // the DirState accordingly (partially processed oob).
85  static const char *trc_pfx = "cross_check_or_process_oob_lfn() ";
86 
87  DirState *last_existing_ds = nullptr;
88  DirState *ds = m_fs_state.find_dirstate_for_lfn(lfn, &last_existing_ds);
89  if (ds->m_scanned)
90  return;
91 
92  size_t pos = lfn.find_last_of("/");
93  std::string dir = (pos == std::string::npos) ? "" : lfn.substr(0, pos);
94 
95  XrdOssDF *dhp = m_oss.newDir(trc_pfx);
96  if (dhp->Opendir(dir.c_str(), fst.default_env()) == XrdOssOK)
97  {
98  fst.slurp_dir_ll(*dhp, ds->m_depth, dir.c_str(), trc_pfx);
99 
100  // XXXX clone of function below .... move somewhere? Esp. removal of non-paired files?
101  DirUsage &here = ds->m_here_usage;
102  for (auto it = fst.m_current_files.begin(); it != fst.m_current_files.end(); ++it)
103  {
104  if (it->second.has_data && it->second.has_cinfo) {
105  here.m_StBlocks += it->second.stat_data.st_blocks;
106  here.m_NFiles += 1;
107  }
108  }
109  }
110  delete dhp;
111  ds->m_scanned = true;
112 }
113 
115 {
116  dprintf("In scan_dir_and_recurse for '%s', size of dir_vec = %d, file_stat_map = %d\n",
117  fst.m_current_path.c_str(),
118  (int)fst.m_current_dirs.size(), (int)fst.m_current_files.size());
119 
120  // Breadth first, accumulate into "here", unless it was already scanned via an
121  // OOB open file request.
122  if ( ! fst.m_dir_state->m_scanned)
123  {
124  DirUsage &here = fst.m_dir_state->m_here_usage;
125  for (auto it = fst.m_current_files.begin(); it != fst.m_current_files.end(); ++it)
126  {
127  dprintf("would be doing something with %s ... has_data=%d, has_cinfo=%d\n",
128  it->first.c_str(), it->second.has_data, it->second.has_cinfo);
129 
130  // XXX Make some of these optional?
131  // Remove files that do not have both cinfo and data?
132  // Remove empty directories before even descending?
133  // Leave this for some consistency pass?
134  // Note that FsTraversal supports ignored paths ... some details (config, N2N) to be clarified.
135 
136  if (it->second.has_data && it->second.has_cinfo) {
137  here.m_StBlocks += it->second.stat_data.st_blocks;
138  here.m_NFiles += 1;
139  }
140  }
141  fst.m_dir_state->m_scanned = true;
142  }
143 
144  // Swap-out directories as inter_dir_scan can use the FsTraversal.
145  std::vector<std::string> dirs;
146  dirs.swap(fst.m_current_dirs);
147 
148  if (++m_dir_scan_check_counter >= 100)
149  {
150  process_inter_dir_scan_open_requests(fst);
151  m_dir_scan_check_counter = 0;
152  }
153 
154  // Descend into sub-dirs, do not accumulate into recursive_subdir_usage yet. This is done
155  // in a separate pass to allow for proper accounting of files being opened during the initial scan.
156  for (auto &dname : dirs)
157  {
158  if (fst.cd_down(dname))
159  {
161  fst.cd_up();
162  }
163  // XXX else try to remove it?
164  }
165 }
166 
168 {
169  // Called after PFC configuration is complete, but before full startup of the daemon.
170  // Base line usages are accumulated as part of the file-system, traversal.
171 
173 
174  DirState *root_ds = m_fs_state.get_root();
175  FsTraversal fst(m_oss);
176  fst.m_protected_top_dirs.insert("pfc-stats"); // XXXX This should come from config. Also: N2N?
177 
178  if ( ! fst.begin_traversal(root_ds, "/"))
179  return false;
180 
181  {
182  XrdSysMutexHelper _lock(m_dir_scan_mutex);
183  m_dir_scan_in_progress = true;
184  m_dir_scan_check_counter = 0; // recheck oob file-open requests periodically.
185  }
186 
188 
189  fst.end_traversal();
190 
191  // We have all directories scanned, available in DirState tree, let all remaining files go
192  // and then we shall do the upward propagation of usages.
193  {
194  XrdSysMutexHelper _lock(m_dir_scan_mutex);
195  m_dir_scan_in_progress = false;
196  m_dir_scan_check_counter = 0;
197 
198  while ( ! m_dir_scan_open_requests.empty())
199  {
200  LfnCondRecord &lcr = m_dir_scan_open_requests.front();
201  lcr.f_cond.Lock();
202  lcr.f_checked = true;
203  lcr.f_cond.Signal();
204  lcr.f_cond.UnLock();
205 
206  m_dir_scan_open_requests.pop_front();
207  }
208  }
209 
210  // Do upward propagation of usages.
212  m_current_usage_in_st_blocks = root_ds->m_here_usage.m_StBlocks +
215 
216  return true;
217 }
218 
219 //------------------------------------------------------------------------------
220 // Processing of queues
221 //------------------------------------------------------------------------------
222 
224 {
225  static const char *trc_pfx = "process_queues() ";
226 
227  // Assure that we pick up only entries that are present now.
228  // We really want all open records to be processed before file-stats updates
229  // and all those before the close records.
230  // Purges are sort of tangential as they really just modify bytes / number
231  // of files in a directory and do not deal with any persistent file id tokens.
232 
233  int n_records = 0;
234  {
235  XrdSysMutexHelper _lock(&m_queue_mutex);
236  n_records += m_file_open_q.swap_queues();
237  n_records += m_file_update_stats_q.swap_queues();
238  n_records += m_file_close_q.swap_queues();
239  n_records += m_file_purge_q1.swap_queues();
240  n_records += m_file_purge_q2.swap_queues();
241  n_records += m_file_purge_q3.swap_queues();
242  ++m_queue_swap_u1;
243  }
244 
245  for (auto &i : m_file_open_q.read_queue())
246  {
247  // i.id: LFN, i.record: OpenRecord
248  AccessToken &at = token(i.id);
249  dprintf("process file open for token %d, time %ld -- %s\n",
250  i.id, i.record.m_open_time, at.m_filename.c_str());
251 
252  // Resolve fname into DirState.
253  // We could clear the filename after this ... or keep it, should we need it later on.
254  // For now it is just used for debug printouts.
255  DirState *last_existing_ds = nullptr;
256  DirState *ds = m_fs_state.find_dirstate_for_lfn(at.m_filename, &last_existing_ds);
257  at.m_dir_state = ds;
258  ds->m_here_stats.m_NFilesOpened += 1;
259 
260  // If this is a new file figure out how many new parent dirs got created along the way.
261  if ( ! i.record.m_existing_file) {
262  ds->m_here_stats.m_NFilesCreated += 1;
263  DirState *pp = ds;
264  while (pp != last_existing_ds) {
265  pp = pp->get_parent();
267  }
268  }
269 
270  ds->m_here_usage.m_LastOpenTime = i.record.m_open_time;
271  }
272 
273  for (auto &i : m_file_update_stats_q.read_queue())
274  {
275  // i.id: token, i.record: Stats
276  AccessToken &at = token(i.id);
277  // Stats
278  DirState *ds = at.m_dir_state;
279  dprintf("process file update for token %d, %p -- %s\n",
280  i.id, ds, at.m_filename.c_str());
281 
282  ds->m_here_stats.AddUp(i.record);
283  m_current_usage_in_st_blocks += i.record.m_StBlocksAdded;
284  }
285 
286  for (auto &i : m_file_close_q.read_queue())
287  {
288  // i.id: token, i.record: CloseRecord
289  AccessToken &at = token(i.id);
290  dprintf("process file close for token %d, time %ld -- %s\n",
291  i.id, i.record.m_close_time, at.m_filename.c_str());
292 
293  DirState *ds = at.m_dir_state;
294  ds->m_here_stats.m_NFilesClosed += 1;
295  ds->m_here_usage.m_LastCloseTime = i.record.m_close_time;
296 
297  at.clear();
298  }
299  { // Release the AccessToken slots under lock.
300  XrdSysMutexHelper _lock(&m_queue_mutex);
301  for (auto &i : m_file_close_q.read_queue())
302  m_access_tokens_free_slots.push_back(i.id);
303  }
304 
305  for (auto &i : m_file_purge_q1.read_queue())
306  {
307  // i.id: DirState*, i.record: PurgeRecord
308  DirState *ds = i.id;
309  ds->m_here_stats.m_StBlocksRemoved += i.record.m_size_in_st_blocks;
310  ds->m_here_stats.m_NFilesRemoved += i.record.m_n_files;
311  m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
312  }
313  for (auto &i : m_file_purge_q2.read_queue())
314  {
315  // i.id: directory-path, i.record: PurgeRecord
316  DirState *ds = m_fs_state.get_root()->find_path(i.id, -1, false, false);
317  if ( ! ds) {
318  TRACE(Error, trc_pfx << "DirState not found for directory path '" << i.id << "'.");
319  // find_path can return the last dir found ... but this clearly isn't a valid purge record.
320  continue;
321  }
322  ds->m_here_stats.m_StBlocksRemoved += i.record.m_size_in_st_blocks;
323  ds->m_here_stats.m_NFilesRemoved += i.record.m_n_files;
324  m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
325  }
326  for (auto &i : m_file_purge_q3.read_queue())
327  {
328  // i.id: LFN, i.record: size of file in st_blocks
329  DirState *ds = m_fs_state.get_root()->find_path(i.id, -1, true, false);
330  if ( ! ds) {
331  TRACE(Error, trc_pfx << "DirState not found for LFN path '" << i.id << "'.");
332  continue;
333  }
334  ds->m_here_stats.m_StBlocksRemoved += i.record;
335  ds->m_here_stats.m_NFilesRemoved += 1;
336  m_current_usage_in_st_blocks -= i.record;
337  }
338 
339  // Read queues / vectors are cleared at swap time.
340  // We might consider reducing their capacity by half if, say, their usage is below 25%.
341 
342  return n_records;
343 }
344 
345 //------------------------------------------------------------------------------
346 // Heart beat
347 //------------------------------------------------------------------------------
348 
350 {
351  static const char *tpfx = "heart_beat() ";
352 
353  const Configuration &conf = Cache::Conf();
354 
355  const int s_queue_proc_interval = 10;
356  // const s_stats_up_prop_interval = 60; -- for when we have dedicated purge / stat report structs
357  const int s_sshot_report_interval = 60; // to be bumped (300s?) or made configurable.
358  const int s_purge_check_interval = 60;
359  const int s_purge_report_interval = conf.m_purgeInterval;
360  const int s_purge_cold_files_interval = conf.m_purgeInterval * conf.m_purgeAgeBasedPeriod;
361 
362  // initial scan performed as part of config
363 
364  time_t now = time(0);
365  time_t next_queue_proc_time = now + s_queue_proc_interval;
366  time_t next_sshot_report_time = now + s_sshot_report_interval;
367  time_t next_purge_check_time = now + s_purge_check_interval;
368  time_t next_purge_report_time = now + s_purge_report_interval;
369  time_t next_purge_cold_files_time = now + s_purge_cold_files_interval;
370 
371  // XXXXX On initial entry should reclaim space from queues as they might have grown
372  // very large during the initial scan.
373 
374  while (true)
375  {
376  time_t start = time(0);
377  time_t next_event = std::min({ next_queue_proc_time, next_sshot_report_time,
378  next_purge_check_time, next_purge_report_time, next_purge_cold_files_time });
379 
380  if (next_event > start)
381  {
382  unsigned int t_sleep = next_event - start;
383  TRACE(Debug, tpfx << "sleeping for " << t_sleep << " seconds until the next beat.");
384  sleep(t_sleep);
385  }
386 
387  // Check if purge has been running and has completed yet.
388  // For now this is only used to prevent removal of empty leaf directories
389  // during stat propagation so we do not need to wait for the condition in
390  // the above sleep.
391  if (m_purge_task_active) {
393  if (m_purge_task_complete) {
395  }
396  }
397 
398  // Always process the queues.
399  int n_processed = process_queues();
400  next_queue_proc_time += s_queue_proc_interval;
401  TRACE(Debug, tpfx << "process_queues -- n_records=" << n_processed);
402 
403  // Always update basic info on m_fs_state (space, usage, file_usage).
405 
406  now = time(0);
407  if (next_sshot_report_time <= now)
408  {
409  next_sshot_report_time += s_sshot_report_interval;
410 
411  // XXXX pass in m_purge_task_active as control over "should empty dirs be purged";
412  // Or should this be separate pass or variant in purge?
414 
415  m_fs_state.apply_stats_to_usages();
416 
417  // Dump statistics before actual purging so maximum usage values get recorded.
418  // This should dump out binary snapshot into /pfc-stats/, if so configured.
419  // Also, optionally, json.
420  // Could also go to gstream but this easily gets too large.
421  if (conf.is_dir_stat_reporting_on())
422  {
423  const int store_depth = conf.m_dirStatsStoreDepth;
424  #ifdef RM_DEBUG
425  const DirState &root_ds = *m_fs_state.get_root();
426  dprintf("Snapshot n_dirs=%d, total n_dirs=%d\n", root_ds.count_dirs_to_level(store_depth),
428  #endif
429  m_fs_state.dump_recursively(store_depth);
430 
431  /*
432  // json dump to std::out for debug purpose
433  DataFsSnapshot ss(m_fs_state);
434  ss.m_dir_states.reserve(n_sshot_dirs);
435 
436  ss.m_dir_states.emplace_back( DirStateElement(root_ds, -1) );
437  fill_sshot_vec_children(root_ds, 0, ss.m_dir_states, store_depth);
438 
439  // This should really be export to a file (preferably binary, but then bin->json command is needed, too).
440  ss.dump();
441  */
442  }
443 
444  m_fs_state.reset_stats();
445 
446  now = time(0);
447  }
448 
449  bool do_purge_check = next_purge_check_time <= now;
450  bool do_purge_report = next_purge_report_time <= now;
451  bool do_purge_cold_files = next_purge_cold_files_time <= now;
452  if (do_purge_check || do_purge_report || do_purge_cold_files)
453  {
454  perform_purge_check(do_purge_cold_files, do_purge_report ? TRACE_Info : TRACE_Debug);
455 
456  next_purge_check_time = now + s_purge_check_interval;
457  if (do_purge_report) next_purge_report_time = now + s_purge_report_interval;
458  if (do_purge_cold_files) next_purge_cold_files_time = now + s_purge_cold_files_interval;
459  }
460 
461  } // end while forever
462 }
463 
464 //------------------------------------------------------------------------------
465 // DirState export helpers
466 //------------------------------------------------------------------------------
467 
469  int parent_idx,
470  std::vector<DirStateElement> &vec,
471  int max_depth)
472 {
473  int pos = vec.size();
474  int n_children = parent_ds.m_subdirs.size();
475 
476  for (auto const & [name, child] : parent_ds.m_subdirs)
477  {
478  vec.emplace_back( DirStateElement(child, parent_idx) );
479  }
480 
481  if (parent_ds.m_depth < max_depth)
482  {
483  DirStateElement &parent_dse = vec[parent_idx];
484  parent_dse.m_daughters_begin = pos;
485  parent_dse.m_daughters_end = pos + n_children;
486 
487  for (auto const & [name, child] : parent_ds.m_subdirs)
488  {
489  if (n_children > 0)
490  fill_sshot_vec_children(child, pos, vec, max_depth);
491  ++pos;
492  }
493  }
494 }
495 
497  int parent_idx,
498  std::vector<DirPurgeElement> &vec,
499  int max_depth)
500 {
501  int pos = vec.size();
502  int n_children = parent_ds.m_subdirs.size();
503 
504  for (auto const & [name, child] : parent_ds.m_subdirs)
505  {
506  vec.emplace_back( DirPurgeElement(child, parent_idx) );
507  }
508 
509  if (parent_ds.m_depth < max_depth)
510  {
511  DirPurgeElement &parent_dpe = vec[parent_idx];
512  parent_dpe.m_daughters_begin = pos;
513  parent_dpe.m_daughters_end = pos + n_children;
514 
515  for (auto const & [name, child] : parent_ds.m_subdirs)
516  {
517  if (n_children > 0)
518  fill_pshot_vec_children(child, pos, vec, max_depth);
519  ++pos;
520  }
521  }
522 }
523 
524 //------------------------------------------------------------------------------
525 // Purge helpers, drivers, etc.
526 //------------------------------------------------------------------------------
527 
529 {
530  static const char *trc_pfx = "update_vs_and_file_usage_info() ";
531 
532  const auto &conf = Cache::Conf();
533  XrdOssVSInfo vsi;
534 
535  // StatVS error (after it succeeded in config) implies a memory corruption (according to Mr. H).
536  if (m_oss.StatVS(&vsi, conf.m_data_space.c_str(), 1) < 0) {
537  TRACE(Error, trc_pfx << "can't get StatVS for oss space '" << conf.m_data_space << "'. This is a fatal error.");
538  _exit(1);
539  }
540  m_fs_state.m_disk_total = vsi.Total;
541  m_fs_state.m_disk_used = vsi.Total - vsi.Free;
542  m_fs_state.m_file_usage = 512ll * m_current_usage_in_st_blocks;
543  if (m_oss.StatVS(&vsi, conf.m_meta_space.c_str(), 1) < 0) {
544  TRACE(Error, trc_pfx << "can't get StatVS for oss space '" << conf.m_meta_space << "'. This is a fatal error.");
545  _exit(1);
546  }
547  m_fs_state.m_meta_total = vsi.Total;
548  m_fs_state.m_meta_used = vsi.Total - vsi.Free;
549 }
550 
551 long long ResourceMonitor::get_file_usage_bytes_to_remove(const DataFsPurgeshot &ps, long long write_estimate, int tl)
552 {
553  // short names from config values
554  const Configuration &conf = Cache::Conf();
555  long long f0 = conf.m_fileUsageBaseline;
556  long long f1 = conf.m_fileUsageNominal;
557  long long f2 = conf.m_fileUsageMax;
558  long long w1 = conf.m_diskUsageLWM;
559  long long w2 = conf.m_diskUsageHWM;
560 
561  // get usage from purge snapshot
562  long long T = ps.m_disk_total;
563  long long x = ps.m_file_usage;
564  long long u = ps.m_disk_used;
565 
566  // get file usage increase from the previous time interval check
567  long long delta = write_estimate;
568  TRACE_INT(tl, "file usage increased since the previous purge interval in bytes: " << delta );
569 
570  long long bytes_to_remove = 0;
571 
572  // helper lambda function
573  auto clamp = [&x, &bytes_to_remove](long long lowval, long long highval)
574  {
575  long long val = x;
576  long long newval = val - bytes_to_remove;
577 
578  // removed too much
579  if (newval < lowval)
580  {
581  return lowval - val;
582  }
583 
584  // removed too little
585  if (newval > highval)
586  {
587  return highval - val;
588  }
589  // keep the original value
590  return bytes_to_remove;
591  };
592 
593  // under file quota, nothing to do
594  if (x < f0)
595  return 0;
596 
597  // total disk usage exceeds highWatermark
598  if (u >= w2)
599  {
600  TRACE_INT(tl, "Disk usage: " << ps.m_disk_used << " exceed highWatermark " << conf.m_diskUsageHWM);
601  float frac_u = static_cast<float>(u - w2) / (T - w2);
602  float frac_x = static_cast<float>(x - f0) / (f1 - f0);
603 
604  if (w2 == T)
605  {
606  bytes_to_remove = u -w1;
607  }
608  else
609  {
610  if (frac_x > frac_u)
611  {
612  // the cache is the reason for going out of w2 range
613  bytes_to_remove = (frac_x - frac_u) * (f1 - f0);
614  bytes_to_remove += delta;
615  bytes_to_remove = clamp(f0, f1);
616  }
617  else
618  {
619  // someone else is filling disk space, go to f1
620  bytes_to_remove = clamp(f0, f2);
621  }
622  return bytes_to_remove;
623  }
624  }
625 
626  // file quota and total disk usage is within normal range, check if this space usage is
627  // proportinal to disk usage and correct it
628  if (u > w1 && x > f1)
629  {
630  float frac_u = static_cast<float>(u - w1) / (w2 - w1);
631  float frac_x = static_cast<float>(x - f1) / (f2 - f1);
632  if (frac_x > frac_u)
633  {
634  TRACE_INT(tl, "Disproportional file quota usage comapared to disc usage (frac_x/frac_u) = " << frac_x << "/"<< frac_u);
635  bytes_to_remove = (frac_x - frac_u) * (f2 - f1);
636  bytes_to_remove += delta;
637  }
638 
639  // check the new x val will not be below f0
640  bytes_to_remove = clamp(f0, f2);
641  return bytes_to_remove;
642  }
643 
644  // final check: disk useage is lower that w1, check if exceed the max file usage f2
645  if (x > f2)
646  {
647  // drop usage to f2
648  // compare with global disk usage in the previous purge cycle (default 300s)
649  // check delta is not overflowing f2, else set numver of bytes to remove according remove to f0
650 
651  TRACE_INT(tl, "File usage exceeds maxim file usage. Total disk usage is under lowWatermark. Clearing to low file usage.");
652  long long f2delta = std::max(f2 - delta, f0);
653  bytes_to_remove = clamp(f0, f2delta);
654  return bytes_to_remove;
655  }
656 
657  return bytes_to_remove;
658 }
659 
660 void ResourceMonitor::perform_purge_check(bool purge_cold_files, int tl)
661 {
662  static const char *trc_pfx = "perform_purge_check() ";
663  const Configuration &conf = Cache::Conf();
664 
665  std::unique_ptr<DataFsPurgeshot> psp( new DataFsPurgeshot(m_fs_state) );
666  DataFsPurgeshot &ps = *psp;
667 
668  ps.m_file_usage = 512ll * m_current_usage_in_st_blocks;
669  // These are potentially wrong as cache might be writing over preallocated byte ranges.
671  // Can have another estimate based on eiter writes or st-blocks from purge-stats, once we have them.
672 
673  TRACE_INT(tl, trc_pfx << "Purge check:");
674 
675  ps.m_bytes_to_remove = 0;
676  if (conf.are_file_usage_limits_set())
677  {
678  ps.m_bytes_to_remove = get_file_usage_bytes_to_remove(ps, ps.m_estimated_writes_from_writeq, tl);
679  }
680  else
681  {
682  if (ps.m_disk_used > conf.m_diskUsageHWM)
683  {
684  TRACE_INT(tl, "Disk usage: " << ps.m_disk_used << " exceed highWatermark.");
686  }
687  }
688 
689  ps.m_space_based_purge = ps.m_bytes_to_remove ? 1 : 0;
690 
691  // Purge precheck -- check if age-based purge is required
692  // We ignore uvkeep time, it requires reading of cinfo files and it is enforced in File::Open() anyway.
693 
694  if (purge_cold_files && conf.is_age_based_purge_in_effect()) // || conf.is_uvkeep_purge_in_effect())
695  {
696  ps.m_age_based_purge = true;
697  }
698 
699  TRACE_INT(tl, "\tbytes_to_remove = " << ps.m_bytes_to_remove << " B");
700  TRACE_INT(tl, "\tspace_based_purge = " << ps.m_space_based_purge);
701  TRACE_INT(tl, "\tage_based_purge = " << ps.m_age_based_purge);
702 
703  bool periodic = Cache::GetInstance().GetPurgePin() ?
705 
706  if ( ! ps.m_space_based_purge && ! ps.m_age_based_purge && !periodic ) {
707  TRACE(Info, trc_pfx << "purge not required.");
709  return;
710  }
711  if (m_purge_task_active) {
712  TRACE(Warning, trc_pfx << "purge required but previous purge task is still active!");
713  return;
714  }
715 
716  TRACE(Info, trc_pfx << "scheduling purge task.");
717 
718  // At this point we have all the information: report, decide on action.
719  // There is still some missing infrastructure, especially as regards to purge-plugin:
720  // - at what point do we start bugging the pu-pin to start coughing up purge lists?
721  // - have a new parameter or just do it "one cycle before full"?
722  // - what if it doesn't -- when do we do the old-stlye scan & purge?
723  // - how do we do age-based purge and uvkeep purge?
724  // - they are really quite different -- and could run separately, registering
725  // files into a purge-candidate list. This has to be rechecked before the actual
726  // deletion -- eg, by comparing stat time of cinfo + doing the is-active / is-purge-protected.
727 
728  const DirState &root_ds = *m_fs_state.get_root();
729  const int n_calc_dirs = 1 + root_ds.m_here_usage.m_NDirectories + root_ds.m_recursive_subdir_usage.m_NDirectories;
730 #ifdef RM_DEBUG
731  const int n_pshot_dirs = root_ds.count_dirs_to_level(9999);
732  dprintf("purge dir count recursive=%d vs from_usage=%d\n", n_pshot_dirs, n_calc_dirs);
733 #endif
734  ps.m_dir_vec.reserve(n_calc_dirs);
735  ps.m_dir_vec.emplace_back( DirPurgeElement(root_ds, -1) );
736  fill_pshot_vec_children(root_ds, 0, ps.m_dir_vec, 9999);
737 
738  m_purge_task_active = true;
739 
740  struct PurgeDriverJob : public XrdJob
741  {
742  DataFsPurgeshot *m_purge_shot_ptr;
743 
744  PurgeDriverJob(DataFsPurgeshot *psp) :
745  XrdJob("XrdPfc::ResourceMonitor::PurgeDriver"),
746  m_purge_shot_ptr(psp)
747  {}
748 
749  void DoIt() override
750  {
751  Cache::ResMon().perform_purge_task(*m_purge_shot_ptr);
753 
754  delete m_purge_shot_ptr;
755  delete this;
756  }
757  };
758 
759  Cache::schedP->Schedule( new PurgeDriverJob(psp.release()) );
760 }
761 
762 namespace XrdPfc
763 {
765 }
766 
768 {
769  // BEWARE: Runs in a dedicated thread - is only to communicate back to the
770  // hear_beat() / data structs via the purge queues and condition variable.
771 
772  // const char *tpfx = "perform_purge_task ";
773 
774  {
776  m_purge_task_start = time(0);
777  }
778 
779  // For now, fall back to the old purge ... to be improved with:
780  // - new scan, following the DataFsPurgeshot;
781  // - usage of cinfo stat mtime for time of last access (touch already done at output);
782  // - use DirState* to report back purged files.
783  // Already changed to report back purged files --- but using the string / path variant.
784  OldStylePurgeDriver(ps); // In XrdPfcPurge.cc
785 }
786 
788 {
789  // Separated out so the purge_task can exit without post-checks.
790 
791  {
793  m_purge_task_end = time(0);
794  m_purge_task_complete = true;
796  }
798 }
799 
800 //==============================================================================
801 // Main thread function, do initial test, then enter heart_beat().
802 //==============================================================================
803 
805 {
806  // setup for in-scan -- this is called from initial setup.
807  MutexHolder _lck(m_dir_scan_mutex);
808  m_dir_scan_in_progress = true;
809 }
810 
812 {
813  const char *tpfx = "main_thread_function ";
814  {
815  time_t is_start = time(0);
816  TRACE(Info, tpfx << "Stating initial directory scan.");
817 
818  if ( ! perform_initial_scan()) {
819  TRACE(Error, tpfx << "Initial directory scan has failed. This is a terminal error, aborting.")
820  _exit(1);
821  }
822  // Reset of m_dir_scan_in_progress is done in perform_initial_scan()
823 
824  time_t is_duration = time(0) - is_start;
825  TRACE(Info, tpfx << "Initial directory scan complete, duration=" << is_duration <<"s");
826 
827  // run first process queues
828  int n_proc_is = process_queues();
829  TRACE(Info, tpfx << "First process_queues finished, n_records=" << n_proc_is);
830 
831  // shrink queues if scan time was longer than 30s.
832  if (is_duration > 30 || n_proc_is > 3000)
833  {
834  m_file_open_q.shrink_read_queue();
835  m_file_update_stats_q.shrink_read_queue();
836  m_file_close_q.shrink_read_queue();
837  m_file_purge_q1.shrink_read_queue();
838  m_file_purge_q2.shrink_read_queue();
839  m_file_purge_q3.shrink_read_queue();
840  }
841  }
842  heart_beat();
843 }
844 
845 //==============================================================================
846 // Old prototype from Cache / Purge, now to go into heart_beat() here, above.
847 //==============================================================================
848 
850 {
851  // static const char *trc_pfx = "ResourceMonitorHeartBeat() ";
852 
853  // Pause before initial run
854  sleep(1);
855 
856  // XXXX Setup initial / constant stats (total RAM, total disk, ???)
857 
860 
861  S.Lock();
862 
864 
866 
867  S.UnLock();
868 
869  // XXXX Schedule initial disk scan, time it!
870  //
871  // TRACE(Info, trc_pfx << "scheduling intial disk scan.");
872  // schedP->Schedule( new ScanAndPurgeJob("XrdPfc::ScanAndPurge") );
873  //
874  // bool scan_and_purge_running = true;
875 
876  // XXXX Could we really hold last-usage for all files in memory?
877 
878  // XXXX Think how to handle disk-full, scan/purge not finishing:
879  // - start dropping things out of write queue, but only when RAM gets near full;
880  // - monitoring this then becomes a high-priority job, inner loop with sleep of,
881  // say, 5 or 10 seconds.
882 
883  while (true)
884  {
885  time_t heartbeat_start = time(0);
886 
887  // TRACE(Info, trc_pfx << "HeartBeat starting ...");
888 
889  // if sumary monitoring configured, pupulate OucCacheStats:
890  S.Lock();
891 
892  // - available / used disk space (files usage calculated elsewhere (maybe))
893 
894  // - RAM usage
895  /* XXXX From Cache
896  { XrdSysMutexHelper lck(&m_RAM_mutex);
897  X.MemUsed = m_RAM_used;
898  X.MemWriteQ = m_RAM_write_queue;
899  }
900  */
901 
902  // - files opened / closed etc
903 
904  // do estimate of available space
905  S.UnLock();
906 
907  // if needed, schedule purge in a different thread.
908  // purge is:
909  // - deep scan + gather FSPurgeState
910  // - actual purge
911  //
912  // this thread can continue running and, if needed, stop writing to disk
913  // if purge is taking too long.
914 
915  // think how data is passed / synchronized between this and purge thread
916 
917  // !!!! think how stat collection is done and propgated upwards;
918  // until now it was done once per purge-interval.
919  // now stats will be added up more often, but purge will be done
920  // only occasionally.
921  // also, do we report cumulative values or deltas? cumulative should
922  // be easier and consistent with summary data.
923  // still, some are state - like disk usage, num of files.
924 
925  // Do we take care of directories that need to be newly added into DirState hierarchy?
926  // I.e., when user creates new directories and these are covered by either full
927  // spec or by root + depth declaration.
928 
929  int heartbeat_duration = time(0) - heartbeat_start;
930 
931  // TRACE(Info, trc_pfx << "HeartBeat finished, heartbeat_duration " << heartbeat_duration);
932 
933  // int sleep_time = m_fs_state..m_purgeInterval - heartbeat_duration;
934  int sleep_time = 60 - heartbeat_duration;
935  if (sleep_time > 0)
936  {
937  sleep(sleep_time);
938  }
939  }
940 }
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
Definition: XrdAccTest.cc:262
static void child()
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
#define TRACE_Info
#define XrdOssOK
Definition: XrdOss.hh:50
#define dprintf(...)
void Proto_ResourceMonitorHeartBeat()
#define TRACE_INT(act, x)
Definition: XrdPfcTrace.hh:52
@ Warning
#define TRACE(act, x)
Definition: XrdTrace.hh:63
Definition: XrdJob.hh:43
virtual int Opendir(const char *path, XrdOucEnv &env)
Definition: XrdOss.hh:79
long long Total
Definition: XrdOssVS.hh:90
long long Free
Definition: XrdOssVS.hh:91
virtual XrdOssDF * newDir(const char *tident)=0
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
struct XrdOucCacheStats::CacheStats X
XrdOucCacheStats Statistics
Definition: XrdOucCache.hh:686
PurgePin * GetPurgePin() const
Definition: XrdPfc.hh:271
static const Configuration & Conf()
Definition: XrdPfc.cc:135
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:282
static ResourceMonitor & ResMon()
Definition: XrdPfc.cc:136
void ClearPurgeProtectedSet()
Definition: XrdPfc.cc:670
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:133
static XrdScheduler * schedP
Definition: XrdPfc.hh:289
long long WritesSinceLastCall()
Definition: XrdPfc.cc:321
void AddUp(const DirStats &s)
Definition: XrdPfcStats.hh:187
long long m_StBlocksRemoved
Definition: XrdPfcStats.hh:144
std::vector< std::string > m_current_dirs
XrdOucEnv & default_env()
void slurp_dir_ll(XrdOssDF &dh, int dir_level, const char *path, const char *trc_pfx)
bool begin_traversal(DirState *root, const char *root_path)
std::set< std::string > m_protected_top_dirs
bool cd_down(const std::string &dir_name)
std::map< std::string, FilePairStat > m_current_files
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
virtual bool CallPeriodically()
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
void perform_purge_check(bool purge_cold_files, int tl)
void fill_sshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)
void perform_purge_task(DataFsPurgeshot &ps)
void scan_dir_and_recurse(FsTraversal &fst)
void fill_pshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)
void Schedule(XrdJob *jp)
Definition: XrdPfc.hh:41
void OldStylePurgeDriver(DataFsPurgeshot &ps)
Definition: XrdPfcPurge.cc:99
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:108
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:94
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition: XrdPfc.hh:105
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:93
bool are_file_usage_limits_set() const
Definition: XrdPfc.hh:67
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:95
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition: XrdPfc.hh:99
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:92
bool is_age_based_purge_in_effect() const
Definition: XrdPfc.hh:68
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:97
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:70
std::vector< DirPurgeElement > m_dir_vec
void dump_recursively(int max_depth) const
void upward_propagate_stats_and_times()
DirState * find_dirstate_for_lfn(const std::string &lfn, DirState **last_existing_dir=nullptr)
DirUsage m_recursive_subdir_usage
DirState * get_parent()
int count_dirs_to_level(int max_depth) const
DirState * find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs, DirState **last_existing_dir=nullptr)
void upward_propagate_initial_scan_usages()
long long m_StBlocks