1 #ifndef __XRDPFC_RESOURCEMONITOR_HH__
2 #define __XRDPFC_RESOURCEMONITOR_HH__
18 class DirStateElement;
20 class DirPurgeElement;
21 class DataFsPurgeshot;
39 template<
typename ID,
typename RECORD>
46 using queue_type = std::vector<Entry>;
47 using iterator =
typename queue_type::iterator;
51 int write_queue_size()
const {
return m_write_queue.size(); }
52 bool read_queue_empty()
const {
return m_read_queue.empty(); }
53 int read_queue_size()
const {
return m_read_queue.size(); }
56 void push(
ID id, RECORD
stat) { m_write_queue.push_back({ id,
stat }); }
58 RECORD& write_record(
int pos) {
return m_write_queue[pos].record; }
61 int swap_queues() { m_read_queue.clear(); m_write_queue.swap(m_read_queue);
return read_queue_size(); }
62 const queue_type& read_queue()
const {
return m_read_queue; }
63 iterator begin()
const {
return m_read_queue.begin(); }
64 iterator end()
const {
return m_read_queue.end(); }
67 void shrink_read_queue() { m_read_queue.clear(); m_read_queue.shrink_to_fit(); }
70 queue_type m_write_queue, m_read_queue;
74 std::string m_filename;
75 unsigned int m_last_queue_swap_u1 = 0xffffffff;
76 int m_last_write_queue_pos = -1;
77 DirState *m_dir_state =
nullptr;
81 m_last_queue_swap_u1 = 0xffffffff;
82 m_last_write_queue_pos = -1;
83 m_dir_state =
nullptr;
86 std::vector<AccessToken> m_access_tokens;
87 std::vector<int> m_access_tokens_free_slots;
100 long long m_size_in_st_blocks;
104 Queue<int, OpenRecord> m_file_open_q;
105 Queue<int, Stats> m_file_update_stats_q;
106 Queue<int, CloseRecord> m_file_close_q;
107 Queue<DirState*, PurgeRecord> m_file_purge_q1;
108 Queue<std::string, PurgeRecord> m_file_purge_q2;
109 Queue<std::string, long long> m_file_purge_q3;
112 long long m_current_usage_in_st_blocks = 0;
115 unsigned int m_queue_swap_u1 = 0u;
117 DataFsState &m_fs_state;
124 const std::string &f_lfn;
126 bool f_checked =
false;
130 std::list<LfnCondRecord> m_dir_scan_open_requests;
131 int m_dir_scan_check_counter;
132 bool m_dir_scan_in_progress =
false;
134 void process_inter_dir_scan_open_requests(FsTraversal &fst);
135 void cross_check_or_process_oob_lfn(
const std::string &lfn, FsTraversal &fst);
136 long long get_file_usage_bytes_to_remove(
const DataFsPurgeshot &ps,
long long previous_file_usage,
int logLeve);
153 if ( ! m_access_tokens_free_slots.empty()) {
154 token_id = m_access_tokens_free_slots.back();
155 m_access_tokens_free_slots.pop_back();
156 m_access_tokens[token_id].m_filename = filename;
157 m_access_tokens[token_id].m_last_queue_swap_u1 = m_queue_swap_u1 - 1;
159 token_id = (int) m_access_tokens.size();
160 m_access_tokens.push_back({filename, m_queue_swap_u1 - 1});
163 m_file_open_q.push(token_id, {open_timestamp, existing_file});
169 AccessToken &at =
token(token_id);
171 if (at.m_last_queue_swap_u1 != m_queue_swap_u1) {
172 m_file_update_stats_q.push(token_id, stats);
173 at.m_last_queue_swap_u1 = m_queue_swap_u1;
174 at.m_last_write_queue_pos = m_file_update_stats_q.write_queue_size() - 1;
176 Stats &existing_stats = m_file_update_stats_q.write_record(at.m_last_write_queue_pos);
177 existing_stats.
AddUp(stats);
187 m_file_close_q.push(token_id, {close_timestamp, full_stats});
194 m_file_purge_q1.push(target, {size_in_st_blocks, 1});
198 m_file_purge_q1.push(target, {size_in_st_blocks, n_files});
202 m_file_purge_q2.push(target, {size_in_st_blocks, n_files});
206 m_file_purge_q3.push(filename, size_in_st_blocks);
217 AccessToken&
token(
int i) {
return m_access_tokens[i]; }
229 std::vector<DirStateElement> &vec,
234 std::vector<DirPurgeElement> &vec,
int stat(const char *path, struct stat *buf)
bool m_purge_task_complete
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void perform_purge_check(bool purge_cold_files, int tl)
void fill_sshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)
ResourceMonitor(XrdOss &oss)
void perform_purge_task(DataFsPurgeshot &ps)
void register_file_purge(const std::string &filename, long long size_in_st_blocks)
void scan_dir_and_recurse(FsTraversal &fst)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void fill_pshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)
XrdSysCondVar m_purge_task_cond
void perform_purge_task_cleanup()
void register_multi_file_purge(DirState *target, long long size_in_st_blocks, int n_files)
void register_file_update_stats(int token_id, const Stats &stats)
void update_vs_and_file_usage_info()
bool perform_initial_scan()
AccessToken & token(int i)
time_t m_purge_task_start
void register_multi_file_purge(const std::string &target, long long size_in_st_blocks, int n_files)
void main_thread_function()
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
void AddUp(const Stats &s)