XRootD
XrdPfcResourceMonitor.cc
Go to the documentation of this file.
2 #include "XrdPfc.hh"
4 #include "XrdPfcFsTraversal.hh"
5 #include "XrdPfcDirState.hh"
8 #include "XrdPfcTrace.hh"
9 #include "XrdPfcPurgePin.hh"
10 
11 #include "XrdOss/XrdOss.hh"
12 
13 #include <algorithm>
14 
15 // #define RM_DEBUG
16 #ifdef RM_DEBUG
17 #define dprintf(...) printf(__VA_ARGS__)
18 #else
19 #define dprintf(...) (void(0))
20 #endif
21 
22 using namespace XrdPfc;
23 
24 namespace
25 {
26  XrdSysTrace* GetTrace() { return Cache::GetInstance().GetTrace(); }
27  const char *m_traceID = "ResourceMonitor";
28 }
29 
30 //------------------------------------------------------------------------------
31 
33  m_fs_state(* new DataFsState),
34  m_oss(oss)
35 {}
36 
38 {
39  delete &m_fs_state;
40 }
41 
42 //------------------------------------------------------------------------------
43 // Initial scan
44 //------------------------------------------------------------------------------
45 
47 {
48  m_dir_scan_mutex.Lock();
49  if (m_dir_scan_in_progress) {
50  m_dir_scan_open_requests.push_back({lfn, cond});
51  LfnCondRecord &lcr = m_dir_scan_open_requests.back();
52  cond.Lock();
53  m_dir_scan_mutex.UnLock();
54  while ( ! lcr.f_checked)
55  cond.Wait();
56  cond.UnLock();
57  } else {
58  m_dir_scan_mutex.UnLock();
59  }
60 }
61 
62 void ResourceMonitor::process_inter_dir_scan_open_requests(FsTraversal &fst)
63 {
64  m_dir_scan_mutex.Lock();
65  while ( ! m_dir_scan_open_requests.empty())
66  {
67  LfnCondRecord &lcr = m_dir_scan_open_requests.front();
68  m_dir_scan_mutex.UnLock();
69 
70  cross_check_or_process_oob_lfn(lcr.f_lfn, fst);
71  lcr.f_cond.Lock();
72  lcr.f_checked = true;
73  lcr.f_cond.Signal();
74  lcr.f_cond.UnLock();
75 
76  m_dir_scan_mutex.Lock();
77  m_dir_scan_open_requests.pop_front();
78  }
79  m_dir_scan_mutex.UnLock();
80 }
81 
82 void ResourceMonitor::cross_check_or_process_oob_lfn(const std::string &lfn, FsTraversal &fst)
83 {
84  // Check if lfn has already been processed ... or process it now and mark
85  // the DirState accordingly (partially processed oob).
86  static const char *trc_pfx = "cross_check_or_process_oob_lfn() ";
87 
88  DirState *last_existing_ds = nullptr;
89  DirState *ds = m_fs_state.find_dirstate_for_lfn(lfn, &last_existing_ds);
90  if (ds->m_scanned)
91  return;
92 
93  size_t pos = lfn.find_last_of("/");
94  std::string dir = (pos == std::string::npos) ? "" : lfn.substr(0, pos);
95 
96  XrdOssDF *dhp = m_oss.newDir(trc_pfx);
97  if (dhp->Opendir(dir.c_str(), fst.default_env()) == XrdOssOK)
98  {
99  fst.slurp_dir_ll(*dhp, ds->m_depth, dir.c_str(), trc_pfx);
100 
101  // XXXX clone of function below .... move somewhere? Esp. removal of non-paired files?
102  DirUsage &here = ds->m_here_usage;
103  for (auto it = fst.m_current_files.begin(); it != fst.m_current_files.end(); ++it)
104  {
105  if (it->second.has_data && it->second.has_cinfo) {
106  here.m_StBlocks += it->second.stat_data.st_blocks;
107  here.m_NFiles += 1;
108  }
109  }
110  }
111  delete dhp;
112  ds->m_scanned = true;
113 }
114 
116 {
117  dprintf("In scan_dir_and_recurse for '%s', size of dir_vec = %d, file_stat_map = %d\n",
118  fst.m_current_path.c_str(),
119  (int)fst.m_current_dirs.size(), (int)fst.m_current_files.size());
120 
121  // Breadth first, accumulate into "here", unless it was already scanned via an
122  // OOB open file request.
123  if ( ! fst.m_dir_state->m_scanned)
124  {
125  DirUsage &here = fst.m_dir_state->m_here_usage;
126  for (auto it = fst.m_current_files.begin(); it != fst.m_current_files.end(); ++it)
127  {
128  dprintf("would be doing something with %s ... has_data=%d, has_cinfo=%d\n",
129  it->first.c_str(), it->second.has_data, it->second.has_cinfo);
130 
131  // XXX Make some of these optional?
132  // Remove files that do not have both cinfo and data?
133  // Remove empty directories before even descending?
134  // Leave this for some consistency pass?
135  // Note that FsTraversal supports ignored paths ... some details (config, N2N) to be clarified.
136 
137  if (it->second.has_data && it->second.has_cinfo) {
138  here.m_StBlocks += it->second.stat_data.st_blocks;
139  here.m_NFiles += 1;
140  }
141  }
142  fst.m_dir_state->m_scanned = true;
143  }
144 
145  // Swap-out directories as inter_dir_scan can use the FsTraversal.
146  std::vector<std::string> dirs;
147  dirs.swap(fst.m_current_dirs);
148 
149  if (++m_dir_scan_check_counter >= 100)
150  {
151  process_inter_dir_scan_open_requests(fst);
152  m_dir_scan_check_counter = 0;
153  }
154 
155  // Descend into sub-dirs, do not accumulate into recursive_subdir_usage yet. This is done
156  // in a separate pass to allow for proper accounting of files being opened during the initial scan.
157  for (auto &dname : dirs)
158  {
159  if (fst.cd_down(dname))
160  {
162  fst.cd_up();
163  }
164  // XXX else try to remove it?
165  }
166 }
167 
169 {
170  // Called after PFC configuration is complete, but before full startup of the daemon.
171  // Base line usages are accumulated as part of the file-system, traversal.
172 
174 
175  DirState *root_ds = m_fs_state.get_root();
176  FsTraversal fst(m_oss);
177  fst.m_protected_top_dirs.insert("pfc-stats"); // XXXX This should come from config. Also: N2N?
178 
179  if ( ! fst.begin_traversal(root_ds, "/"))
180  return false;
181 
182  {
183  XrdSysMutexHelper _lock(m_dir_scan_mutex);
184  m_dir_scan_in_progress = true;
185  m_dir_scan_check_counter = 0; // recheck oob file-open requests periodically.
186  }
187 
189 
190  fst.end_traversal();
191 
192  // We have all directories scanned, available in DirState tree, let all remaining files go
193  // and then we shall do the upward propagation of usages.
194  {
195  XrdSysMutexHelper _lock(m_dir_scan_mutex);
196  m_dir_scan_in_progress = false;
197  m_dir_scan_check_counter = 0;
198 
199  while ( ! m_dir_scan_open_requests.empty())
200  {
201  LfnCondRecord &lcr = m_dir_scan_open_requests.front();
202  lcr.f_cond.Lock();
203  lcr.f_checked = true;
204  lcr.f_cond.Signal();
205  lcr.f_cond.UnLock();
206 
207  m_dir_scan_open_requests.pop_front();
208  }
209  }
210 
211  // Do upward propagation of usages.
213  m_current_usage_in_st_blocks = root_ds->m_here_usage.m_StBlocks +
216 
217  return true;
218 }
219 
220 //------------------------------------------------------------------------------
221 // Processing of queues
222 //------------------------------------------------------------------------------
223 
225 {
226  static const char *trc_pfx = "process_queues() ";
227 
228  // Assure that we pick up only entries that are present now.
229  // We really want all open records to be processed before file-stats updates
230  // and all those before the close records.
231  // Purges are sort of tangential as they really just modify bytes / number
232  // of files in a directory and do not deal with any persistent file id tokens.
233 
234  int n_records = 0;
235  {
236  XrdSysMutexHelper _lock(&m_queue_mutex);
237  n_records += m_file_open_q.swap_queues();
238  n_records += m_file_update_stats_q.swap_queues();
239  n_records += m_file_close_q.swap_queues();
240  n_records += m_file_purge_q1.swap_queues();
241  n_records += m_file_purge_q2.swap_queues();
242  n_records += m_file_purge_q3.swap_queues();
243  ++m_queue_swap_u1;
244  }
245 
246  for (auto &i : m_file_open_q.read_queue())
247  {
248  // i.id: LFN, i.record: OpenRecord
249  AccessToken &at = token(i.id);
250  dprintf("process file open for token %d, time %ld -- %s\n",
251  i.id, i.record.m_open_time, at.m_filename.c_str());
252 
253  // Resolve fname into DirState.
254  // We could clear the filename after this ... or keep it, should we need it later on.
255  // For now it is just used for debug printouts.
256  DirState *last_existing_ds = nullptr;
257  DirState *ds = m_fs_state.find_dirstate_for_lfn(at.m_filename, &last_existing_ds);
258  at.m_dir_state = ds;
259  ds->m_here_stats.m_NFilesOpened += 1;
260 
261  // If this is a new file figure out how many new parent dirs got created along the way.
262  if ( ! i.record.m_existing_file) {
263  ds->m_here_stats.m_NFilesCreated += 1;
264  DirState *pp = ds;
265  while (pp != last_existing_ds) {
266  pp = pp->get_parent();
268  }
269  }
270 
271  ds->m_here_usage.m_LastOpenTime = i.record.m_open_time;
272  }
273 
274  for (auto &i : m_file_update_stats_q.read_queue())
275  {
276  // i.id: token, i.record: Stats
277  AccessToken &at = token(i.id);
278  // Stats
279  DirState *ds = at.m_dir_state;
280  dprintf("process file update for token %d, %p -- %s\n",
281  i.id, ds, at.m_filename.c_str());
282 
283  ds->m_here_stats.AddUp(i.record);
284  m_current_usage_in_st_blocks += i.record.m_StBlocksAdded;
285  }
286 
287  for (auto &i : m_file_close_q.read_queue())
288  {
289  // i.id: token, i.record: CloseRecord
290  AccessToken &at = token(i.id);
291  dprintf("process file close for token %d, time %ld -- %s\n",
292  i.id, i.record.m_close_time, at.m_filename.c_str());
293 
294  DirState *ds = at.m_dir_state;
295  ds->m_here_stats.m_NFilesClosed += 1;
296  ds->m_here_usage.m_LastCloseTime = i.record.m_close_time;
297 
298  at.clear();
299  }
300  { // Release the AccessToken slots under lock.
301  XrdSysMutexHelper _lock(&m_queue_mutex);
302  for (auto &i : m_file_close_q.read_queue())
303  m_access_tokens_free_slots.push_back(i.id);
304  }
305 
306  for (auto &i : m_file_purge_q1.read_queue())
307  {
308  // i.id: DirState*, i.record: PurgeRecord
309  DirState *ds = i.id;
310  ds->m_here_stats.m_StBlocksRemoved += i.record.m_size_in_st_blocks;
311  ds->m_here_stats.m_NFilesRemoved += i.record.m_n_files;
312  m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
313  }
314  for (auto &i : m_file_purge_q2.read_queue())
315  {
316  // i.id: directory-path, i.record: PurgeRecord
317  DirState *ds = m_fs_state.get_root()->find_path(i.id, -1, false, false);
318  if ( ! ds) {
319  TRACE(Error, trc_pfx << "DirState not found for directory path '" << i.id << "'.");
320  // find_path can return the last dir found ... but this clearly isn't a valid purge record.
321  continue;
322  }
323  ds->m_here_stats.m_StBlocksRemoved += i.record.m_size_in_st_blocks;
324  ds->m_here_stats.m_NFilesRemoved += i.record.m_n_files;
325  m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
326  }
327  for (auto &i : m_file_purge_q3.read_queue())
328  {
329  // i.id: LFN, i.record: size of file in st_blocks
330  DirState *ds = m_fs_state.get_root()->find_path(i.id, -1, true, false);
331  if ( ! ds) {
332  TRACE(Error, trc_pfx << "DirState not found for LFN path '" << i.id << "'.");
333  continue;
334  }
335  ds->m_here_stats.m_StBlocksRemoved += i.record;
336  ds->m_here_stats.m_NFilesRemoved += 1;
337  m_current_usage_in_st_blocks -= i.record;
338  }
339 
340  // Read queues / vectors are cleared at swap time.
341  // We might consider reducing their capacity by half if, say, their usage is below 25%.
342 
343  return n_records;
344 }
345 
346 //------------------------------------------------------------------------------
347 // Heart beat
348 //------------------------------------------------------------------------------
349 
351 {
352  static const char *tpfx = "heart_beat() ";
353 
354  const Configuration &conf = Cache::Conf();
355 
356  const int s_queue_proc_interval = 10;
357  const int s_sshot_report_interval = conf.m_dirStatsInterval; // 1, 5, 10, 15, 30 or 60 minutes
358  const int s_purge_check_interval = 60;
359  const int s_purge_report_interval = conf.m_purgeInterval;
360  const int s_purge_cold_files_interval = conf.m_purgeInterval * conf.m_purgeAgeBasedPeriod;
361 
362  // initial scan performed as part of config
363 
364  time_t now = time(0);
365  time_t next_queue_proc_time = now + s_queue_proc_interval;
366  time_t next_sshot_report_time = (now / 60) * 60 + 60; // at next full minute
367  time_t next_purge_check_time = now + s_purge_check_interval;
368  time_t next_purge_report_time = now + s_purge_report_interval;
369  time_t next_purge_cold_files_time = now + s_purge_cold_files_interval;
370 
371  while (true)
372  {
373  time_t start = time(0);
374  time_t next_event = std::min({ next_queue_proc_time, next_sshot_report_time,
375  next_purge_check_time, next_purge_report_time, next_purge_cold_files_time });
376 
377  if (next_event > start)
378  {
379  unsigned int t_sleep = next_event - start;
380  TRACE(Debug, tpfx << "sleeping for " << t_sleep << " seconds until the next beat.");
381  sleep(t_sleep);
382  }
383 
384  // Check if purge has been running and has completed yet.
385  // For now this is only used to prevent removal of empty leaf directories
386  // during stat propagation so we do not need to wait for the condition in
387  // the above sleep.
388  if (m_purge_task_active) {
390  if (m_purge_task_complete) {
392  }
393  }
394 
395  time_t queue_swap_time = time(0);
396 
397  // Always process the queues.
398  int n_processed = process_queues();
399  next_queue_proc_time = queue_swap_time + s_queue_proc_interval;
400  TRACE(Debug, tpfx << "process_queues -- n_records=" << n_processed);
401 
402  // Always update basic info on m_fs_state (space, usage, file_usage).
404 
405  now = time(0);
406 
407  // Make planning for fs_state_update, sshot dump and purge task.
408  // Second two require the first, so figure out what is going to happen.
409  bool do_sshot_report = next_sshot_report_time <= now;
410  bool do_purge_check = next_purge_check_time <= now;
411  bool do_purge_report = next_purge_report_time <= now;
412  bool do_purge_cold_files = next_purge_cold_files_time <= now;
413 
414  // Update stats in usages if any secondary activity will happen.
415  if (do_sshot_report || do_purge_check || do_purge_report || do_purge_cold_files)
416  {
417  unlink_func unlink_foo = [&](const std::string &dp)->int {
418  int ret = m_oss.Unlink(dp.c_str());
419  if (ret != 0) {
420  TRACE(Info, tpfx << "Empty dir unlink error: " << ret << " at " << dp);
421  } else {
422  TRACE(Debug, tpfx << "Empty dir unlink success: " << dp);
423  }
424  return ret;
425  };
426 
427  // Potentially prune the empty leaf dirs even less frequently, once per hour, maybe?
428  bool purge_leaf_dirs = do_sshot_report && ! m_purge_task_active;
429  m_fs_state.update_stats_and_usages(queue_swap_time, purge_leaf_dirs, unlink_foo);
430 
431  // This reporting into log/stdout is to be removed.
432  // Meaning of conf.is_dir_stat_reporting_on() etc is to be clarified / improved.
433  if (do_sshot_report && conf.is_dir_stat_reporting_on())
434  {
435  const int store_depth = conf.m_dirStatsStoreDepth;
436  #ifdef RM_DEBUG
437  const DirState &root_ds = *m_fs_state.get_root();
438  dprintf("Snapshot n_dirs=%d, total n_dirs=%d\n", root_ds.count_dirs_to_level(store_depth),
440  #endif
441  m_fs_state.dump_recursively(store_depth);
442  }
443 
444  m_fs_state.reset_stats(queue_swap_time);
445  }
446 
447  if (do_sshot_report)
448  {
449  // Sshot reports are equidistant, at "full" reporting interval.
450  next_sshot_report_time = ((now + 1) / s_sshot_report_interval) * s_sshot_report_interval + s_sshot_report_interval;
451 
452  // This should dump out binary snapshot into /pfc-stats/, if so configured.
453 
454  // json dump to std::out for debug purpose
455  DataFsSnapshot ss(m_fs_state, m_fs_state.m_sshot_stats_reset_time);
456  const DirState &root_ds = *m_fs_state.get_root();
457  const int store_depth = conf.m_dirStatsStoreDepth;
458  const int n_sshot_dirs = root_ds.count_dirs_to_level(store_depth);
459  ss.m_dir_states.reserve(n_sshot_dirs);
460  ss.m_dir_states.emplace_back( DirStateElement(root_ds, -1) );
461  fill_sshot_vec_children(root_ds, 0, ss.m_dir_states, store_depth);
462 
463  // This should really be export to a file (preferably binary, but then bin->json command is needed, too).
464  // ss.dump();
465 
466  const char* dumpfile = "/pfc-stats/DirStat.json";
467  ss.write_json_file(dumpfile, m_oss, false);
468  m_fs_state.reset_sshot_stats(queue_swap_time);
469  }
470 
471  if (do_purge_check || do_purge_report || do_purge_cold_files)
472  {
473  perform_purge_check(do_purge_cold_files, do_purge_report ? TRACE_Info : TRACE_Debug);
474 
475  next_purge_check_time = now + s_purge_check_interval;
476  if (do_purge_report) next_purge_report_time = now + s_purge_report_interval;
477  if (do_purge_cold_files) next_purge_cold_files_time = now + s_purge_cold_files_interval;
478  }
479 
480  } // end while forever
481 }
482 
483 //------------------------------------------------------------------------------
484 // DirState export helpers
485 //------------------------------------------------------------------------------
486 
488  int parent_idx,
489  std::vector<DirStateElement> &vec,
490  int max_depth)
491 {
492  int pos = vec.size();
493  int n_children = parent_ds.m_subdirs.size();
494 
495  DirStateElement &parent_dse = vec[parent_idx];
496  parent_dse.m_daughters_begin = pos;
497  parent_dse.m_daughters_end = pos + n_children;
498 
499  if (n_children == 0) return;
500 
501  for (auto const & [name, child] : parent_ds.m_subdirs)
502  {
503  vec.emplace_back( DirStateElement(child, parent_idx) );
504  }
505 
506  if (parent_ds.m_depth < max_depth)
507  {
508  for (auto const & [name, child] : parent_ds.m_subdirs)
509  {
510  fill_sshot_vec_children(child, pos, vec, max_depth);
511  ++pos;
512  }
513  }
514 }
515 
517  int parent_idx,
518  std::vector<DirPurgeElement> &vec,
519  int max_depth)
520 {
521  int pos = vec.size();
522  int n_children = parent_ds.m_subdirs.size();
523 
524  DirPurgeElement &parent_dpe = vec[parent_idx];
525  parent_dpe.m_daughters_begin = pos;
526  parent_dpe.m_daughters_end = pos + n_children;
527 
528  if (n_children == 0) return;
529 
530  for (auto const & [name, child] : parent_ds.m_subdirs)
531  {
532  vec.emplace_back( DirPurgeElement(child, child.m_here_usage, child.m_recursive_subdir_usage, parent_idx) );
533  }
534 
535  if (parent_ds.m_depth < max_depth)
536  {
537  for (auto const & [name, child] : parent_ds.m_subdirs)
538  {
539  fill_pshot_vec_children(child, pos, vec, max_depth);
540  ++pos;
541  }
542  }
543 }
544 
545 //------------------------------------------------------------------------------
546 // Purge helpers, drivers, etc.
547 //------------------------------------------------------------------------------
548 
550 {
551  static const char *trc_pfx = "update_vs_and_file_usage_info() ";
552 
553  const auto &conf = Cache::Conf();
554  XrdOssVSInfo vsi;
555 
556  // StatVS error (after it succeeded in config) implies a memory corruption (according to Mr. H).
557  if (m_oss.StatVS(&vsi, conf.m_data_space.c_str(), 1) < 0) {
558  TRACE(Error, trc_pfx << "can't get StatVS for oss space '" << conf.m_data_space << "'. This is a fatal error.");
559  _exit(1);
560  }
561  m_fs_state.m_disk_total = vsi.Total;
562  m_fs_state.m_disk_used = vsi.Total - vsi.Free;
563  m_fs_state.m_file_usage = 512ll * m_current_usage_in_st_blocks;
564  if (m_oss.StatVS(&vsi, conf.m_meta_space.c_str(), 1) < 0) {
565  TRACE(Error, trc_pfx << "can't get StatVS for oss space '" << conf.m_meta_space << "'. This is a fatal error.");
566  _exit(1);
567  }
568  m_fs_state.m_meta_total = vsi.Total;
569  m_fs_state.m_meta_used = vsi.Total - vsi.Free;
570 }
571 
572 long long ResourceMonitor::get_file_usage_bytes_to_remove(const DataFsPurgeshot &ps, long long write_estimate, int tl)
573 {
574  // short names from config values
575  const Configuration &conf = Cache::Conf();
576  long long f0 = conf.m_fileUsageBaseline;
577  long long f1 = conf.m_fileUsageNominal;
578  long long f2 = conf.m_fileUsageMax;
579  long long w1 = conf.m_diskUsageLWM;
580  long long w2 = conf.m_diskUsageHWM;
581 
582  // get usage from purge snapshot
583  long long T = ps.m_disk_total;
584  long long x = ps.m_file_usage;
585  long long u = ps.m_disk_used;
586 
587  // get file usage increase from the previous time interval check
588  long long delta = write_estimate;
589  TRACE_INT(tl, "file usage increased since the previous purge interval in bytes: " << delta );
590 
591  long long bytes_to_remove = 0;
592 
593  // helper lambda function
594  auto clamp = [&x, &bytes_to_remove](long long lowval, long long highval)
595  {
596  long long val = x;
597  long long newval = val - bytes_to_remove;
598 
599  // removed too much
600  if (newval < lowval)
601  {
602  return lowval - val;
603  }
604 
605  // removed too little
606  if (newval > highval)
607  {
608  return val - highval;
609  }
610  // keep the original value
611  return bytes_to_remove;
612  };
613 
614  // under file quota, nothing to do
615  if (x < f0)
616  return 0;
617 
618  // total disk usage exceeds highWatermark
619  if (u >= w2)
620  {
621  TRACE_INT(tl, "Disk usage: " << ps.m_disk_used << " exceed highWatermark " << conf.m_diskUsageHWM);
622  float frac_u = static_cast<float>(u - w2) / (T - w2);
623  float frac_x = static_cast<float>(x - f0) / (f1 - f0);
624 
625  if (w2 == T)
626  {
627  bytes_to_remove = u -w1;
628  }
629  else
630  {
631  if (frac_x > frac_u)
632  {
633  // the cache is the reason for going out of w2 range
634  bytes_to_remove = (frac_x - frac_u) * (f1 - f0);
635  bytes_to_remove += delta;
636  bytes_to_remove = clamp(f0, f1);
637  }
638  else
639  {
640  // someone else is filling disk space, go to f1
641  bytes_to_remove = clamp(f0, f2);
642  }
643  return bytes_to_remove;
644  }
645  }
646 
647  // file quota and total disk usage is within normal range, check if this space usage is
648  // proportinal to disk usage and correct it
649  if (u > w1 && x > f1)
650  {
651  float frac_u = static_cast<float>(u - w1) / (w2 - w1);
652  float frac_x = static_cast<float>(x - f1) / (f2 - f1);
653  if (frac_x > frac_u)
654  {
655  TRACE_INT(tl, "Disproportional file quota usage comapared to disc usage (frac_x/frac_u) = " << frac_x << "/"<< frac_u);
656  bytes_to_remove = (frac_x - frac_u) * (f2 - f1);
657  bytes_to_remove += delta;
658  }
659 
660  // check the new x val will not be below f0
661  bytes_to_remove = clamp(f0, f2);
662  return bytes_to_remove;
663  }
664 
665  // final check: disk useage is lower that w1, check if exceed the max file usage f2
666  if (x > f2)
667  {
668  // drop usage to f2
669  // compare with global disk usage in the previous purge cycle (default 300s)
670  // check delta is not overflowing f2, else set numver of bytes to remove according remove to f0
671 
672  TRACE_INT(tl, "File usage exceeds maxim file usage. Total disk usage is under lowWatermark. Clearing to low file usage.");
673  long long f2delta = std::max(f2 - delta, f0);
674  bytes_to_remove = clamp(f0, f2delta);
675  return bytes_to_remove;
676  }
677 
678  return bytes_to_remove;
679 }
680 
681 void ResourceMonitor::perform_purge_check(bool purge_cold_files, int tl)
682 {
683  static const char *trc_pfx = "perform_purge_check() ";
684  const Configuration &conf = Cache::Conf();
685 
686  std::unique_ptr<DataFsPurgeshot> psp( new DataFsPurgeshot(m_fs_state) );
687  DataFsPurgeshot &ps = *psp;
688 
689  ps.m_file_usage = 512ll * m_current_usage_in_st_blocks;
690  // These are potentially wrong as cache might be writing over preallocated byte ranges.
692  // Can have another estimate based on eiter writes or st-blocks from purge-stats, once we have them.
693 
694  TRACE_INT(tl, trc_pfx << "Purge check:");
695 
696  ps.m_bytes_to_remove = 0;
697  if (conf.are_file_usage_limits_set())
698  {
699  ps.m_bytes_to_remove = get_file_usage_bytes_to_remove(ps, ps.m_estimated_writes_from_writeq, tl);
700  }
701  else
702  {
703  if (ps.m_disk_used > conf.m_diskUsageHWM)
704  {
705  TRACE_INT(tl, "Disk usage: " << ps.m_disk_used << " exceed highWatermark.");
707  }
708  }
709 
710  ps.m_space_based_purge = ps.m_bytes_to_remove ? 1 : 0;
711 
712  // Purge precheck -- check if age-based purge is required
713  // We ignore uvkeep time, it requires reading of cinfo files and it is enforced in File::Open() anyway.
714 
715  if (purge_cold_files && conf.is_age_based_purge_in_effect()) // || conf.is_uvkeep_purge_in_effect())
716  {
717  ps.m_age_based_purge = true;
718  }
719 
720  TRACE_INT(tl, "\tbytes_to_remove = " << ps.m_bytes_to_remove << " B");
721  TRACE_INT(tl, "\tspace_based_purge = " << ps.m_space_based_purge);
722  TRACE_INT(tl, "\tage_based_purge = " << ps.m_age_based_purge);
723 
724  bool periodic = Cache::GetInstance().GetPurgePin() ?
726 
727  if ( ! ps.m_space_based_purge && ! ps.m_age_based_purge && !periodic ) {
728  TRACE(Info, trc_pfx << "purge not required.");
730  return;
731  }
732  if (m_purge_task_active) {
733  TRACE(Warning, trc_pfx << "purge required but previous purge task is still active!");
734  return;
735  }
736 
737  TRACE(Info, trc_pfx << "scheduling purge task.");
738 
739  // At this point we have all the information: report, decide on action.
740  // There is still some missing infrastructure, especially as regards to purge-plugin:
741  // - at what point do we start bugging the pu-pin to start coughing up purge lists?
742  // - have a new parameter or just do it "one cycle before full"?
743  // - what if it doesn't -- when do we do the old-stlye scan & purge?
744  // - how do we do age-based purge and uvkeep purge?
745  // - they are really quite different -- and could run separately, registering
746  // files into a purge-candidate list. This has to be rechecked before the actual
747  // deletion -- eg, by comparing stat time of cinfo + doing the is-active / is-purge-protected.
748 
749  const DirState &root_ds = *m_fs_state.get_root();
750  const int n_calc_dirs = 1 + root_ds.m_here_usage.m_NDirectories + root_ds.m_recursive_subdir_usage.m_NDirectories;
751 #ifdef RM_DEBUG
752  const int n_pshot_dirs = root_ds.count_dirs_to_level(9999);
753  dprintf("purge dir count recursive=%d vs from_usage=%d\n", n_pshot_dirs, n_calc_dirs);
754 #endif
755  ps.m_dir_vec.reserve(n_calc_dirs);
756  ps.m_dir_vec.emplace_back( DirPurgeElement(root_ds, root_ds.m_here_usage, root_ds.m_recursive_subdir_usage, -1) );
757  fill_pshot_vec_children(root_ds, 0, ps.m_dir_vec, 9999);
758 
759  m_purge_task_active = true;
760 
761  struct PurgeDriverJob : public XrdJob
762  {
763  DataFsPurgeshot *m_purge_shot_ptr;
764 
765  PurgeDriverJob(DataFsPurgeshot *psp) :
766  XrdJob("XrdPfc::ResourceMonitor::PurgeDriver"),
767  m_purge_shot_ptr(psp)
768  {}
769 
770  void DoIt() override
771  {
772  Cache::ResMon().perform_purge_task(*m_purge_shot_ptr);
774 
775  delete m_purge_shot_ptr;
776  delete this;
777  }
778  };
779 
780  Cache::schedP->Schedule( new PurgeDriverJob(psp.release()) );
781 }
782 
783 namespace XrdPfc
784 {
786 }
787 
789 {
790  // BEWARE: Runs in a dedicated thread - is only to communicate back to the
791  // hear_beat() / data structs via the purge queues and condition variable.
792 
793  // const char *tpfx = "perform_purge_task ";
794 
795  {
797  m_purge_task_start = time(0);
798  }
799 
800  // For now, fall back to the old purge ... to be improved with:
801  // - new scan, following the DataFsPurgeshot;
802  // - usage of cinfo stat mtime for time of last access (touch already done at output);
803  // - use DirState* to report back purged files.
804  // Already changed to report back purged files --- but using the string / path variant.
805  OldStylePurgeDriver(ps); // In XrdPfcPurge.cc
806 }
807 
809 {
810  // Separated out so the purge_task can exit without post-checks.
811 
812  {
814  m_purge_task_end = time(0);
815  m_purge_task_complete = true;
817  }
819 }
820 
821 //==============================================================================
822 // Main thread function, do initial test, then enter heart_beat().
823 //==============================================================================
824 
826 {
827  // setup for in-scan -- this is called from initial setup.
828  MutexHolder _lck(m_dir_scan_mutex);
829  m_dir_scan_in_progress = true;
830 }
831 
833 {
834  const char *tpfx = "main_thread_function ";
835  {
836  time_t is_start = time(0);
837  m_fs_state.init_stat_reset_times(is_start);
838  TRACE(Info, tpfx << "Stating initial directory scan.");
839 
840  if ( ! perform_initial_scan()) {
841  TRACE(Error, tpfx << "Initial directory scan has failed. This is a terminal error, aborting.")
842  _exit(1);
843  }
844  // Reset of m_dir_scan_in_progress is done in perform_initial_scan()
845 
846  time_t is_duration = time(0) - is_start;
847  TRACE(Info, tpfx << "Initial directory scan complete, duration=" << is_duration <<"s");
848 
849  // run first process queues
850  int n_proc_is = process_queues();
851  TRACE(Info, tpfx << "First process_queues finished, n_records=" << n_proc_is);
852 
853  // shrink queues if scan time was longer than 30s.
854  if (is_duration > 30 || n_proc_is > 3000)
855  {
856  m_file_open_q.shrink_read_queue();
857  m_file_update_stats_q.shrink_read_queue();
858  m_file_close_q.shrink_read_queue();
859  m_file_purge_q1.shrink_read_queue();
860  m_file_purge_q2.shrink_read_queue();
861  m_file_purge_q3.shrink_read_queue();
862  }
863  }
864  heart_beat();
865 }
866 
867 //==============================================================================
868 // Old prototype from Cache / Purge, now to go into heart_beat() here, above.
869 //==============================================================================
870 
872 {
873  // static const char *trc_pfx = "ResourceMonitorHeartBeat() ";
874 
875  // Pause before initial run
876  sleep(1);
877 
878  // XXXX Setup initial / constant stats (total RAM, total disk, ???)
879 
882 
883  S.Lock();
884 
886 
888 
889  S.UnLock();
890 
891  // XXXX Schedule initial disk scan, time it!
892  //
893  // TRACE(Info, trc_pfx << "scheduling intial disk scan.");
894  // schedP->Schedule( new ScanAndPurgeJob("XrdPfc::ScanAndPurge") );
895  //
896  // bool scan_and_purge_running = true;
897 
898  // XXXX Could we really hold last-usage for all files in memory?
899 
900  // XXXX Think how to handle disk-full, scan/purge not finishing:
901  // - start dropping things out of write queue, but only when RAM gets near full;
902  // - monitoring this then becomes a high-priority job, inner loop with sleep of,
903  // say, 5 or 10 seconds.
904 
905  while (true)
906  {
907  time_t heartbeat_start = time(0);
908 
909  // TRACE(Info, trc_pfx << "HeartBeat starting ...");
910 
911  // if sumary monitoring configured, pupulate OucCacheStats:
912  S.Lock();
913 
914  // - available / used disk space (files usage calculated elsewhere (maybe))
915 
916  // - RAM usage
917  /* XXXX From Cache
918  { XrdSysMutexHelper lck(&m_RAM_mutex);
919  X.MemUsed = m_RAM_used;
920  X.MemWriteQ = m_RAM_write_queue;
921  }
922  */
923 
924  // - files opened / closed etc
925 
926  // do estimate of available space
927  S.UnLock();
928 
929  // if needed, schedule purge in a different thread.
930  // purge is:
931  // - deep scan + gather FSPurgeState
932  // - actual purge
933  //
934  // this thread can continue running and, if needed, stop writing to disk
935  // if purge is taking too long.
936 
937  // think how data is passed / synchronized between this and purge thread
938 
939  // !!!! think how stat collection is done and propgated upwards;
940  // until now it was done once per purge-interval.
941  // now stats will be added up more often, but purge will be done
942  // only occasionally.
943  // also, do we report cumulative values or deltas? cumulative should
944  // be easier and consistent with summary data.
945  // still, some are state - like disk usage, num of files.
946 
947  // Do we take care of directories that need to be newly added into DirState hierarchy?
948  // I.e., when user creates new directories and these are covered by either full
949  // spec or by root + depth declaration.
950 
951  int heartbeat_duration = time(0) - heartbeat_start;
952 
953  // TRACE(Info, trc_pfx << "HeartBeat finished, heartbeat_duration " << heartbeat_duration);
954 
955  // int sleep_time = m_fs_state..m_purgeInterval - heartbeat_duration;
956  int sleep_time = 60 - heartbeat_duration;
957  if (sleep_time > 0)
958  {
959  sleep(sleep_time);
960  }
961  }
962 }
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
Definition: XrdAccTest.cc:262
static void child()
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
#define TRACE_Info
@ Warning
#define XrdOssOK
Definition: XrdOss.hh:50
#define dprintf(...)
void Proto_ResourceMonitorHeartBeat()
#define TRACE_INT(act, x)
Definition: XrdPfcTrace.hh:52
#define TRACE(act, x)
Definition: XrdTrace.hh:63
Definition: XrdJob.hh:43
virtual int Opendir(const char *path, XrdOucEnv &env)
Definition: XrdOss.hh:79
long long Total
Definition: XrdOssVS.hh:90
long long Free
Definition: XrdOssVS.hh:91
virtual XrdOssDF * newDir(const char *tident)=0
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
struct XrdOucCacheStats::CacheStats X
XrdOucCacheStats Statistics
Definition: XrdOucCache.hh:686
PurgePin * GetPurgePin() const
Definition: XrdPfc.hh:272
static const Configuration & Conf()
Definition: XrdPfc.cc:134
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:283
static ResourceMonitor & ResMon()
Definition: XrdPfc.cc:135
void ClearPurgeProtectedSet()
Definition: XrdPfc.cc:684
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:132
static XrdScheduler * schedP
Definition: XrdPfc.hh:290
long long WritesSinceLastCall()
Definition: XrdPfc.cc:320
void AddUp(const DirStats &s)
Definition: XrdPfcStats.hh:192
long long m_StBlocksRemoved
Definition: XrdPfcStats.hh:149
std::vector< std::string > m_current_dirs
XrdOucEnv & default_env()
void slurp_dir_ll(XrdOssDF &dh, int dir_level, const char *path, const char *trc_pfx)
bool begin_traversal(DirState *root, const char *root_path)
std::set< std::string > m_protected_top_dirs
bool cd_down(const std::string &dir_name)
std::map< std::string, FilePairStat > m_current_files
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
virtual bool CallPeriodically()
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
void perform_purge_check(bool purge_cold_files, int tl)
void fill_sshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)
void perform_purge_task(DataFsPurgeshot &ps)
void scan_dir_and_recurse(FsTraversal &fst)
void fill_pshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)
void Schedule(XrdJob *jp)
Definition: XrdPfc.hh:41
void OldStylePurgeDriver(DataFsPurgeshot &ps)
Definition: XrdPfcPurge.cc:99
std::function< int(const std::string &)> unlink_func
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:109
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:94
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition: XrdPfc.hh:106
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:93
bool are_file_usage_limits_set() const
Definition: XrdPfc.hh:67
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:95
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition: XrdPfc.hh:99
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:92
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
Definition: XrdPfc.hh:104
bool is_age_based_purge_in_effect() const
Definition: XrdPfc.hh:68
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:97
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:70
std::vector< DirPurgeElement > m_dir_vec
void write_json_file(const std::string &fname, XrdOss &oss, bool include_preamble)
std::vector< DirStateElement > m_dir_states
void reset_stats(time_t last_update)
void dump_recursively(int max_depth) const
void init_stat_reset_times(time_t t)
void update_stats_and_usages(time_t last_update, bool purge_empty_dirs, unlink_func unlink_foo)
DirState * find_dirstate_for_lfn(const std::string &lfn, DirState **last_existing_dir=nullptr)
void reset_sshot_stats(time_t last_update)
DirUsage m_recursive_subdir_usage
DirState * get_parent()
int count_dirs_to_level(int max_depth) const
DirState * find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs, DirState **last_existing_dir=nullptr)
void upward_propagate_initial_scan_usages()