17 #define dprintf(...) printf(__VA_ARGS__)
19 #define dprintf(...) (void(0))
27 const char *m_traceID =
"ResourceMonitor";
48 m_dir_scan_mutex.
Lock();
49 if (m_dir_scan_in_progress) {
50 m_dir_scan_open_requests.push_back({lfn, cond});
51 LfnCondRecord &lcr = m_dir_scan_open_requests.back();
54 while ( ! lcr.f_checked)
62 void ResourceMonitor::process_inter_dir_scan_open_requests(
FsTraversal &fst)
64 m_dir_scan_mutex.
Lock();
65 while ( ! m_dir_scan_open_requests.empty())
67 LfnCondRecord &lcr = m_dir_scan_open_requests.front();
70 cross_check_or_process_oob_lfn(lcr.f_lfn, fst);
76 m_dir_scan_mutex.
Lock();
77 m_dir_scan_open_requests.pop_front();
82 void ResourceMonitor::cross_check_or_process_oob_lfn(
const std::string &lfn,
FsTraversal &fst)
86 static const char *trc_pfx =
"cross_check_or_process_oob_lfn() ";
88 DirState *last_existing_ds =
nullptr;
93 size_t pos = lfn.find_last_of(
"/");
94 std::string dir = (pos == std::string::npos) ?
"" : lfn.substr(0, pos);
105 if (it->second.has_data && it->second.has_cinfo) {
106 here.
m_StBlocks += it->second.stat_data.st_blocks;
117 dprintf(
"In scan_dir_and_recurse for '%s', size of dir_vec = %d, file_stat_map = %d\n",
128 dprintf(
"would be doing something with %s ... has_data=%d, has_cinfo=%d\n",
129 it->first.c_str(), it->second.has_data, it->second.has_cinfo);
137 if (it->second.has_data && it->second.has_cinfo) {
138 here.
m_StBlocks += it->second.stat_data.st_blocks;
146 std::vector<std::string> dirs;
149 if (++m_dir_scan_check_counter >= 100)
151 process_inter_dir_scan_open_requests(fst);
152 m_dir_scan_check_counter = 0;
157 for (
auto &dname : dirs)
184 m_dir_scan_in_progress =
true;
185 m_dir_scan_check_counter = 0;
196 m_dir_scan_in_progress =
false;
197 m_dir_scan_check_counter = 0;
199 while ( ! m_dir_scan_open_requests.empty())
201 LfnCondRecord &lcr = m_dir_scan_open_requests.front();
203 lcr.f_checked =
true;
207 m_dir_scan_open_requests.pop_front();
226 static const char *trc_pfx =
"process_queues() ";
237 n_records += m_file_open_q.swap_queues();
238 n_records += m_file_update_stats_q.swap_queues();
239 n_records += m_file_close_q.swap_queues();
240 n_records += m_file_purge_q1.swap_queues();
241 n_records += m_file_purge_q2.swap_queues();
242 n_records += m_file_purge_q3.swap_queues();
246 for (
auto &i : m_file_open_q.read_queue())
249 AccessToken &at =
token(i.id);
250 dprintf(
"process file open for token %d, time %ld -- %s\n",
251 i.id, i.record.m_open_time, at.m_filename.c_str());
256 DirState *last_existing_ds =
nullptr;
262 if ( ! i.record.m_existing_file) {
265 while (pp != last_existing_ds) {
274 for (
auto &i : m_file_update_stats_q.read_queue())
277 AccessToken &at =
token(i.id);
280 dprintf(
"process file update for token %d, %p -- %s\n",
281 i.id, ds, at.m_filename.c_str());
284 m_current_usage_in_st_blocks += i.record.m_StBlocksAdded;
287 for (
auto &i : m_file_close_q.read_queue())
290 AccessToken &at =
token(i.id);
291 dprintf(
"process file close for token %d, time %ld -- %s\n",
292 i.id, i.record.m_close_time, at.m_filename.c_str());
302 for (
auto &i : m_file_close_q.read_queue())
303 m_access_tokens_free_slots.push_back(i.id);
306 for (
auto &i : m_file_purge_q1.read_queue())
312 m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
314 for (
auto &i : m_file_purge_q2.read_queue())
319 TRACE(
Error, trc_pfx <<
"DirState not found for directory path '" << i.id <<
"'.");
325 m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
327 for (
auto &i : m_file_purge_q3.read_queue())
332 TRACE(
Error, trc_pfx <<
"DirState not found for LFN path '" << i.id <<
"'.");
337 m_current_usage_in_st_blocks -= i.record;
352 static const char *tpfx =
"heart_beat() ";
356 const int s_queue_proc_interval = 10;
358 const int s_purge_check_interval = 60;
364 time_t now = time(0);
365 time_t next_queue_proc_time = now + s_queue_proc_interval;
366 time_t next_sshot_report_time = (now / 60) * 60 + 60;
367 time_t next_purge_check_time = now + s_purge_check_interval;
368 time_t next_purge_report_time = now + s_purge_report_interval;
369 time_t next_purge_cold_files_time = now + s_purge_cold_files_interval;
373 time_t start = time(0);
374 time_t next_event = std::min({ next_queue_proc_time, next_sshot_report_time,
375 next_purge_check_time, next_purge_report_time, next_purge_cold_files_time });
377 if (next_event > start)
379 unsigned int t_sleep = next_event - start;
380 TRACE(
Debug, tpfx <<
"sleeping for " << t_sleep <<
" seconds until the next beat.");
395 time_t queue_swap_time = time(0);
399 next_queue_proc_time = queue_swap_time + s_queue_proc_interval;
400 TRACE(
Debug, tpfx <<
"process_queues -- n_records=" << n_processed);
409 bool do_sshot_report = next_sshot_report_time <= now;
410 bool do_purge_check = next_purge_check_time <= now;
411 bool do_purge_report = next_purge_report_time <= now;
412 bool do_purge_cold_files = next_purge_cold_files_time <= now;
415 if (do_sshot_report || do_purge_check || do_purge_report || do_purge_cold_files)
417 unlink_func unlink_foo = [&](
const std::string &dp)->
int {
418 int ret = m_oss.
Unlink(dp.c_str());
420 TRACE(
Info, tpfx <<
"Empty dir unlink error: " << ret <<
" at " << dp);
422 TRACE(
Debug, tpfx <<
"Empty dir unlink success: " << dp);
450 next_sshot_report_time = ((now + 1) / s_sshot_report_interval) * s_sshot_report_interval + s_sshot_report_interval;
466 const char* dumpfile =
"/pfc-stats/DirStat.json";
471 if (do_purge_check || do_purge_report || do_purge_cold_files)
475 next_purge_check_time = now + s_purge_check_interval;
476 if (do_purge_report) next_purge_report_time = now + s_purge_report_interval;
477 if (do_purge_cold_files) next_purge_cold_files_time = now + s_purge_cold_files_interval;
489 std::vector<DirStateElement> &vec,
492 int pos = vec.size();
493 int n_children = parent_ds.
m_subdirs.size();
499 if (n_children == 0)
return;
506 if (parent_ds.
m_depth < max_depth)
518 std::vector<DirPurgeElement> &vec,
521 int pos = vec.size();
522 int n_children = parent_ds.
m_subdirs.size();
528 if (n_children == 0)
return;
535 if (parent_ds.
m_depth < max_depth)
551 static const char *trc_pfx =
"update_vs_and_file_usage_info() ";
557 if (m_oss.
StatVS(&vsi, conf.m_data_space.c_str(), 1) < 0) {
558 TRACE(
Error, trc_pfx <<
"can't get StatVS for oss space '" << conf.m_data_space <<
"'. This is a fatal error.");
563 m_fs_state.
m_file_usage = 512ll * m_current_usage_in_st_blocks;
564 if (m_oss.
StatVS(&vsi, conf.m_meta_space.c_str(), 1) < 0) {
565 TRACE(
Error, trc_pfx <<
"can't get StatVS for oss space '" << conf.m_meta_space <<
"'. This is a fatal error.");
572 long long ResourceMonitor::get_file_usage_bytes_to_remove(
const DataFsPurgeshot &ps,
long long write_estimate,
int tl)
588 long long delta = write_estimate;
589 TRACE_INT(tl,
"file usage increased since the previous purge interval in bytes: " << delta );
591 long long bytes_to_remove = 0;
594 auto clamp = [&x, &bytes_to_remove](
long long lowval,
long long highval)
597 long long newval = val - bytes_to_remove;
606 if (newval > highval)
608 return val - highval;
611 return bytes_to_remove;
622 float frac_u =
static_cast<float>(u - w2) / (T - w2);
623 float frac_x =
static_cast<float>(x - f0) / (f1 - f0);
627 bytes_to_remove = u -w1;
634 bytes_to_remove = (frac_x - frac_u) * (f1 - f0);
635 bytes_to_remove += delta;
636 bytes_to_remove = clamp(f0, f1);
641 bytes_to_remove = clamp(f0, f2);
643 return bytes_to_remove;
649 if (u > w1 && x > f1)
651 float frac_u =
static_cast<float>(u - w1) / (w2 - w1);
652 float frac_x =
static_cast<float>(x - f1) / (f2 - f1);
655 TRACE_INT(tl,
"Disproportional file quota usage comapared to disc usage (frac_x/frac_u) = " << frac_x <<
"/"<< frac_u);
656 bytes_to_remove = (frac_x - frac_u) * (f2 - f1);
657 bytes_to_remove += delta;
661 bytes_to_remove = clamp(f0, f2);
662 return bytes_to_remove;
672 TRACE_INT(tl,
"File usage exceeds maxim file usage. Total disk usage is under lowWatermark. Clearing to low file usage.");
673 long long f2delta = std::max(f2 - delta, f0);
674 bytes_to_remove = clamp(f0, f2delta);
675 return bytes_to_remove;
678 return bytes_to_remove;
683 static const char *trc_pfx =
"perform_purge_check() ";
686 std::unique_ptr<DataFsPurgeshot> psp(
new DataFsPurgeshot(m_fs_state) );
694 TRACE_INT(tl, trc_pfx <<
"Purge check:");
728 TRACE(
Info, trc_pfx <<
"purge not required.");
733 TRACE(
Warning, trc_pfx <<
"purge required but previous purge task is still active!");
737 TRACE(
Info, trc_pfx <<
"scheduling purge task.");
753 dprintf(
"purge dir count recursive=%d vs from_usage=%d\n", n_pshot_dirs, n_calc_dirs);
761 struct PurgeDriverJob :
public XrdJob
766 XrdJob(
"XrdPfc::ResourceMonitor::PurgeDriver"),
767 m_purge_shot_ptr(psp)
775 delete m_purge_shot_ptr;
829 m_dir_scan_in_progress =
true;
834 const char *tpfx =
"main_thread_function ";
836 time_t is_start = time(0);
838 TRACE(
Info, tpfx <<
"Stating initial directory scan.");
841 TRACE(
Error, tpfx <<
"Initial directory scan has failed. This is a terminal error, aborting.")
846 time_t is_duration = time(0) - is_start;
847 TRACE(
Info, tpfx <<
"Initial directory scan complete, duration=" << is_duration <<
"s");
851 TRACE(
Info, tpfx <<
"First process_queues finished, n_records=" << n_proc_is);
854 if (is_duration > 30 || n_proc_is > 3000)
856 m_file_open_q.shrink_read_queue();
857 m_file_update_stats_q.shrink_read_queue();
858 m_file_close_q.shrink_read_queue();
859 m_file_purge_q1.shrink_read_queue();
860 m_file_purge_q2.shrink_read_queue();
861 m_file_purge_q3.shrink_read_queue();
907 time_t heartbeat_start = time(0);
951 int heartbeat_duration = time(0) - heartbeat_start;
956 int sleep_time = 60 - heartbeat_duration;
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
void Proto_ResourceMonitorHeartBeat()
#define TRACE_INT(act, x)
virtual int Opendir(const char *path, XrdOucEnv &env)
virtual XrdOssDF * newDir(const char *tident)=0
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
struct XrdOucCacheStats::CacheStats X
XrdOucCacheStats Statistics
PurgePin * GetPurgePin() const
static const Configuration & Conf()
static ResourceMonitor & ResMon()
void ClearPurgeProtectedSet()
static Cache & GetInstance()
Singleton access.
static XrdScheduler * schedP
long long WritesSinceLastCall()
void AddUp(const DirStats &s)
int m_NDirectoriesCreated
long long m_StBlocksRemoved
std::vector< std::string > m_current_dirs
std::string m_current_path
XrdOucEnv & default_env()
void slurp_dir_ll(XrdOssDF &dh, int dir_level, const char *path, const char *trc_pfx)
bool begin_traversal(DirState *root, const char *root_path)
std::set< std::string > m_protected_top_dirs
bool cd_down(const std::string &dir_name)
std::map< std::string, FilePairStat > m_current_files
Status of cached file. Can be read from and written into a binary file.
virtual bool CallPeriodically()
bool m_purge_task_complete
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
void perform_purge_check(bool purge_cold_files, int tl)
void fill_sshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)
ResourceMonitor(XrdOss &oss)
void perform_purge_task(DataFsPurgeshot &ps)
void scan_dir_and_recurse(FsTraversal &fst)
void fill_pshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)
XrdSysCondVar m_purge_task_cond
void perform_purge_task_cleanup()
void update_vs_and_file_usage_info()
bool perform_initial_scan()
AccessToken & token(int i)
time_t m_purge_task_start
void main_thread_function()
void Schedule(XrdJob *jp)
void OldStylePurgeDriver(DataFsPurgeshot &ps)
std::function< int(const std::string &)> unlink_func
Contains parameters configurable from the xrootd config file.
long long m_RamAbsAvailable
available from configuration
long long m_diskTotalSpace
total disk space on configured partition or oss space
long long m_fileUsageMax
cache purge - files usage maximum
long long m_fileUsageBaseline
cache purge - files usage baseline
int m_dirStatsStoreDepth
depth to which statistics should be collected
long long m_diskUsageHWM
cache purge - disk usage high water mark
bool are_file_usage_limits_set() const
long long m_fileUsageNominal
cache purge - files usage nominal
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
long long m_diskUsageLWM
cache purge - disk usage low water mark
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
bool is_age_based_purge_in_effect() const
int m_purgeInterval
sleep interval between cache purges
bool is_dir_stat_reporting_on() const
std::vector< DirPurgeElement > m_dir_vec
long long m_bytes_to_remove
long long m_estimated_writes_from_writeq
void write_json_file(const std::string &fname, XrdOss &oss, bool include_preamble)
std::vector< DirStateElement > m_dir_states
void reset_stats(time_t last_update)
void dump_recursively(int max_depth) const
void init_stat_reset_times(time_t t)
time_t m_sshot_stats_reset_time
void update_stats_and_usages(time_t last_update, bool purge_empty_dirs, unlink_func unlink_foo)
DirState * find_dirstate_for_lfn(const std::string &lfn, DirState **last_existing_dir=nullptr)
void reset_sshot_stats(time_t last_update)
DirUsage m_recursive_subdir_usage
int count_dirs_to_level(int max_depth) const
DirState * find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs, DirState **last_existing_dir=nullptr)
void upward_propagate_initial_scan_usages()