16 #define dprintf(...) printf(__VA_ARGS__)
18 #define dprintf(...) (void(0))
26 const char *m_traceID =
"ResourceMonitor";
47 m_dir_scan_mutex.
Lock();
48 if (m_dir_scan_in_progress) {
49 m_dir_scan_open_requests.push_back({lfn, cond});
50 LfnCondRecord &lcr = m_dir_scan_open_requests.back();
53 while ( ! lcr.f_checked)
61 void ResourceMonitor::process_inter_dir_scan_open_requests(
FsTraversal &fst)
63 m_dir_scan_mutex.
Lock();
64 while ( ! m_dir_scan_open_requests.empty())
66 LfnCondRecord &lcr = m_dir_scan_open_requests.front();
69 cross_check_or_process_oob_lfn(lcr.f_lfn, fst);
75 m_dir_scan_mutex.
Lock();
76 m_dir_scan_open_requests.pop_front();
81 void ResourceMonitor::cross_check_or_process_oob_lfn(
const std::string &lfn,
FsTraversal &fst)
85 static const char *trc_pfx =
"cross_check_or_process_oob_lfn() ";
87 DirState *last_existing_ds =
nullptr;
92 size_t pos = lfn.find_last_of(
"/");
93 std::string dir = (pos == std::string::npos) ?
"" : lfn.substr(0, pos);
104 if (it->second.has_data && it->second.has_cinfo) {
105 here.
m_StBlocks += it->second.stat_data.st_blocks;
116 dprintf(
"In scan_dir_and_recurse for '%s', size of dir_vec = %d, file_stat_map = %d\n",
127 dprintf(
"would be doing something with %s ... has_data=%d, has_cinfo=%d\n",
128 it->first.c_str(), it->second.has_data, it->second.has_cinfo);
136 if (it->second.has_data && it->second.has_cinfo) {
137 here.
m_StBlocks += it->second.stat_data.st_blocks;
145 std::vector<std::string> dirs;
148 if (++m_dir_scan_check_counter >= 100)
150 process_inter_dir_scan_open_requests(fst);
151 m_dir_scan_check_counter = 0;
156 for (
auto &dname : dirs)
183 m_dir_scan_in_progress =
true;
184 m_dir_scan_check_counter = 0;
195 m_dir_scan_in_progress =
false;
196 m_dir_scan_check_counter = 0;
198 while ( ! m_dir_scan_open_requests.empty())
200 LfnCondRecord &lcr = m_dir_scan_open_requests.front();
202 lcr.f_checked =
true;
206 m_dir_scan_open_requests.pop_front();
225 static const char *trc_pfx =
"process_queues() ";
236 n_records += m_file_open_q.swap_queues();
237 n_records += m_file_update_stats_q.swap_queues();
238 n_records += m_file_close_q.swap_queues();
239 n_records += m_file_purge_q1.swap_queues();
240 n_records += m_file_purge_q2.swap_queues();
241 n_records += m_file_purge_q3.swap_queues();
245 for (
auto &i : m_file_open_q.read_queue())
248 AccessToken &at =
token(i.id);
249 dprintf(
"process file open for token %d, time %ld -- %s\n",
250 i.id, i.record.m_open_time, at.m_filename.c_str());
255 DirState *last_existing_ds =
nullptr;
261 if ( ! i.record.m_existing_file) {
264 while (pp != last_existing_ds) {
273 for (
auto &i : m_file_update_stats_q.read_queue())
276 AccessToken &at =
token(i.id);
279 dprintf(
"process file update for token %d, %p -- %s\n",
280 i.id, ds, at.m_filename.c_str());
283 m_current_usage_in_st_blocks += i.record.m_StBlocksAdded;
286 for (
auto &i : m_file_close_q.read_queue())
289 AccessToken &at =
token(i.id);
290 dprintf(
"process file close for token %d, time %ld -- %s\n",
291 i.id, i.record.m_close_time, at.m_filename.c_str());
301 for (
auto &i : m_file_close_q.read_queue())
302 m_access_tokens_free_slots.push_back(i.id);
305 for (
auto &i : m_file_purge_q1.read_queue())
311 m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
313 for (
auto &i : m_file_purge_q2.read_queue())
318 TRACE(
Error, trc_pfx <<
"DirState not found for directory path '" << i.id <<
"'.");
324 m_current_usage_in_st_blocks -= i.record.m_size_in_st_blocks;
326 for (
auto &i : m_file_purge_q3.read_queue())
331 TRACE(
Error, trc_pfx <<
"DirState not found for LFN path '" << i.id <<
"'.");
336 m_current_usage_in_st_blocks -= i.record;
351 static const char *tpfx =
"heart_beat() ";
355 const int s_queue_proc_interval = 10;
357 const int s_sshot_report_interval = 60;
358 const int s_purge_check_interval = 60;
364 time_t now = time(0);
365 time_t next_queue_proc_time = now + s_queue_proc_interval;
366 time_t next_sshot_report_time = now + s_sshot_report_interval;
367 time_t next_purge_check_time = now + s_purge_check_interval;
368 time_t next_purge_report_time = now + s_purge_report_interval;
369 time_t next_purge_cold_files_time = now + s_purge_cold_files_interval;
376 time_t start = time(0);
377 time_t next_event = std::min({ next_queue_proc_time, next_sshot_report_time,
378 next_purge_check_time, next_purge_report_time, next_purge_cold_files_time });
380 if (next_event > start)
382 unsigned int t_sleep = next_event - start;
383 TRACE(
Debug, tpfx <<
"sleeping for " << t_sleep <<
" seconds until the next beat.");
400 next_queue_proc_time += s_queue_proc_interval;
401 TRACE(
Debug, tpfx <<
"process_queues -- n_records=" << n_processed);
407 if (next_sshot_report_time <= now)
409 next_sshot_report_time += s_sshot_report_interval;
449 bool do_purge_check = next_purge_check_time <= now;
450 bool do_purge_report = next_purge_report_time <= now;
451 bool do_purge_cold_files = next_purge_cold_files_time <= now;
452 if (do_purge_check || do_purge_report || do_purge_cold_files)
456 next_purge_check_time = now + s_purge_check_interval;
457 if (do_purge_report) next_purge_report_time = now + s_purge_report_interval;
458 if (do_purge_cold_files) next_purge_cold_files_time = now + s_purge_cold_files_interval;
470 std::vector<DirStateElement> &vec,
473 int pos = vec.size();
474 int n_children = parent_ds.
m_subdirs.size();
481 if (parent_ds.
m_depth < max_depth)
498 std::vector<DirPurgeElement> &vec,
501 int pos = vec.size();
502 int n_children = parent_ds.
m_subdirs.size();
509 if (parent_ds.
m_depth < max_depth)
530 static const char *trc_pfx =
"update_vs_and_file_usage_info() ";
536 if (m_oss.
StatVS(&vsi, conf.m_data_space.c_str(), 1) < 0) {
537 TRACE(
Error, trc_pfx <<
"can't get StatVS for oss space '" << conf.m_data_space <<
"'. This is a fatal error.");
542 m_fs_state.
m_file_usage = 512ll * m_current_usage_in_st_blocks;
543 if (m_oss.
StatVS(&vsi, conf.m_meta_space.c_str(), 1) < 0) {
544 TRACE(
Error, trc_pfx <<
"can't get StatVS for oss space '" << conf.m_meta_space <<
"'. This is a fatal error.");
551 long long ResourceMonitor::get_file_usage_bytes_to_remove(
const DataFsPurgeshot &ps,
long long write_estimate,
int tl)
567 long long delta = write_estimate;
568 TRACE_INT(tl,
"file usage increased since the previous purge interval in bytes: " << delta );
570 long long bytes_to_remove = 0;
573 auto clamp = [&x, &bytes_to_remove](
long long lowval,
long long highval)
576 long long newval = val - bytes_to_remove;
585 if (newval > highval)
587 return highval - val;
590 return bytes_to_remove;
601 float frac_u =
static_cast<float>(u - w2) / (T - w2);
602 float frac_x =
static_cast<float>(x - f0) / (f1 - f0);
606 bytes_to_remove = u -w1;
613 bytes_to_remove = (frac_x - frac_u) * (f1 - f0);
614 bytes_to_remove += delta;
615 bytes_to_remove = clamp(f0, f1);
620 bytes_to_remove = clamp(f0, f2);
622 return bytes_to_remove;
628 if (u > w1 && x > f1)
630 float frac_u =
static_cast<float>(u - w1) / (w2 - w1);
631 float frac_x =
static_cast<float>(x - f1) / (f2 - f1);
634 TRACE_INT(tl,
"Disproportional file quota usage comapared to disc usage (frac_x/frac_u) = " << frac_x <<
"/"<< frac_u);
635 bytes_to_remove = (frac_x - frac_u) * (f2 - f1);
636 bytes_to_remove += delta;
640 bytes_to_remove = clamp(f0, f2);
641 return bytes_to_remove;
651 TRACE_INT(tl,
"File usage exceeds maxim file usage. Total disk usage is under lowWatermark. Clearing to low file usage.");
652 long long f2delta = std::max(f2 - delta, f0);
653 bytes_to_remove = clamp(f0, f2delta);
654 return bytes_to_remove;
657 return bytes_to_remove;
662 static const char *trc_pfx =
"perform_purge_check() ";
665 std::unique_ptr<DataFsPurgeshot> psp(
new DataFsPurgeshot(m_fs_state) );
673 TRACE_INT(tl, trc_pfx <<
"Purge check:");
707 TRACE(
Info, trc_pfx <<
"purge not required.");
712 TRACE(
Warning, trc_pfx <<
"purge required but previous purge task is still active!");
716 TRACE(
Info, trc_pfx <<
"scheduling purge task.");
732 dprintf(
"purge dir count recursive=%d vs from_usage=%d\n", n_pshot_dirs, n_calc_dirs);
740 struct PurgeDriverJob :
public XrdJob
745 XrdJob(
"XrdPfc::ResourceMonitor::PurgeDriver"),
746 m_purge_shot_ptr(psp)
754 delete m_purge_shot_ptr;
808 m_dir_scan_in_progress =
true;
813 const char *tpfx =
"main_thread_function ";
815 time_t is_start = time(0);
816 TRACE(
Info, tpfx <<
"Stating initial directory scan.");
819 TRACE(
Error, tpfx <<
"Initial directory scan has failed. This is a terminal error, aborting.")
824 time_t is_duration = time(0) - is_start;
825 TRACE(
Info, tpfx <<
"Initial directory scan complete, duration=" << is_duration <<
"s");
829 TRACE(
Info, tpfx <<
"First process_queues finished, n_records=" << n_proc_is);
832 if (is_duration > 30 || n_proc_is > 3000)
834 m_file_open_q.shrink_read_queue();
835 m_file_update_stats_q.shrink_read_queue();
836 m_file_close_q.shrink_read_queue();
837 m_file_purge_q1.shrink_read_queue();
838 m_file_purge_q2.shrink_read_queue();
839 m_file_purge_q3.shrink_read_queue();
885 time_t heartbeat_start = time(0);
929 int heartbeat_duration = time(0) - heartbeat_start;
934 int sleep_time = 60 - heartbeat_duration;
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
void Proto_ResourceMonitorHeartBeat()
#define TRACE_INT(act, x)
virtual int Opendir(const char *path, XrdOucEnv &env)
virtual XrdOssDF * newDir(const char *tident)=0
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
struct XrdOucCacheStats::CacheStats X
XrdOucCacheStats Statistics
PurgePin * GetPurgePin() const
static const Configuration & Conf()
static ResourceMonitor & ResMon()
void ClearPurgeProtectedSet()
static Cache & GetInstance()
Singleton access.
static XrdScheduler * schedP
long long WritesSinceLastCall()
void AddUp(const DirStats &s)
int m_NDirectoriesCreated
long long m_StBlocksRemoved
std::vector< std::string > m_current_dirs
std::string m_current_path
XrdOucEnv & default_env()
void slurp_dir_ll(XrdOssDF &dh, int dir_level, const char *path, const char *trc_pfx)
bool begin_traversal(DirState *root, const char *root_path)
std::set< std::string > m_protected_top_dirs
bool cd_down(const std::string &dir_name)
std::map< std::string, FilePairStat > m_current_files
Status of cached file. Can be read from and written into a binary file.
virtual bool CallPeriodically()
bool m_purge_task_complete
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
void perform_purge_check(bool purge_cold_files, int tl)
void fill_sshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)
ResourceMonitor(XrdOss &oss)
void perform_purge_task(DataFsPurgeshot &ps)
void scan_dir_and_recurse(FsTraversal &fst)
void fill_pshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)
XrdSysCondVar m_purge_task_cond
void perform_purge_task_cleanup()
void update_vs_and_file_usage_info()
bool perform_initial_scan()
AccessToken & token(int i)
time_t m_purge_task_start
void main_thread_function()
void Schedule(XrdJob *jp)
void OldStylePurgeDriver(DataFsPurgeshot &ps)
Contains parameters configurable from the xrootd config file.
long long m_RamAbsAvailable
available from configuration
long long m_diskTotalSpace
total disk space on configured partition or oss space
long long m_fileUsageMax
cache purge - files usage maximum
long long m_fileUsageBaseline
cache purge - files usage baseline
int m_dirStatsStoreDepth
depth to which statistics should be collected
long long m_diskUsageHWM
cache purge - disk usage high water mark
bool are_file_usage_limits_set() const
long long m_fileUsageNominal
cache purge - files usage nominal
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
long long m_diskUsageLWM
cache purge - disk usage low water mark
bool is_age_based_purge_in_effect() const
int m_purgeInterval
sleep interval between cache purges
bool is_dir_stat_reporting_on() const
std::vector< DirPurgeElement > m_dir_vec
long long m_bytes_to_remove
long long m_estimated_writes_from_writeq
void apply_stats_to_usages()
void dump_recursively(int max_depth) const
void upward_propagate_stats_and_times()
DirState * find_dirstate_for_lfn(const std::string &lfn, DirState **last_existing_dir=nullptr)
DirUsage m_recursive_subdir_usage
int count_dirs_to_level(int max_depth) const
DirState * find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs, DirState **last_existing_dir=nullptr)
void upward_propagate_initial_scan_usages()