XRootD
XrdPfcResourceMonitor.hh
Go to the documentation of this file.
1 #ifndef __XRDPFC_RESOURCEMONITOR_HH__
2 #define __XRDPFC_RESOURCEMONITOR_HH__
3 
4 #include "XrdPfcStats.hh"
5 
7 
8 #include <string>
9 #include <vector>
10 #include <list>
11 
12 class XrdOss;
13 
14 namespace XrdPfc {
15 
16 class DataFsState;
17 class DirState;
18 class DirStateElement;
19 class DataFsSnapshot;
20 class DirPurgeElement;
21 class DataFsPurgeshot;
22 class FsTraversal;
23 
24 //==============================================================================
25 // ResourceMonitor
26 //==============================================================================
27 
28 // Encapsulates local variables used withing the previous mega-function Purge().
29 //
30 // This will be used within the continuously/periodically ran heart-beat / breath
31 // function ... and then parts of it will be passed to invoked FS scan and purge
32 // jobs (which will be controlled throught this as well).
33 
34 // Andy: XRDADMINPATH Is the directory for administrative files (i.e. all.adminpath)
35 // Also: XrdOucEnv::Export("XRDLOGDIR", logParms.logfn); (in XrdOucLogging::configLog)
36 
38 {
39  template<typename ID, typename RECORD>
40  class Queue {
41  public:
42  struct Entry {
43  ID id;
44  RECORD record;
45  };
46  using queue_type = std::vector<Entry>;
47  using iterator = typename queue_type::iterator;
48 
49  Queue() = default;
50 
51  int write_queue_size() const { return m_write_queue.size(); }
52  bool read_queue_empty() const { return m_read_queue.empty(); }
53  int read_queue_size() const { return m_read_queue.size(); }
54 
55  // Writer / producer access
56  void push(ID id, RECORD stat) { m_write_queue.push_back({ id, stat }); }
57  // Existing entry access for updating Stats
58  RECORD& write_record(int pos) { return m_write_queue[pos].record; }
59 
60  // Reader / consumer access
61  int swap_queues() { m_read_queue.clear(); m_write_queue.swap(m_read_queue); return read_queue_size(); }
62  const queue_type& read_queue() const { return m_read_queue; }
63  iterator begin() const { return m_read_queue.begin(); }
64  iterator end() const { return m_read_queue.end(); }
65 
66  // Shrinkage of overgrown queues
67  void shrink_read_queue() { m_read_queue.clear(); m_read_queue.shrink_to_fit(); }
68 
69  private:
70  queue_type m_write_queue, m_read_queue;
71  };
72 
73  struct AccessToken {
74  std::string m_filename;
75  unsigned int m_last_queue_swap_u1 = 0xffffffff;
76  int m_last_write_queue_pos = -1;
77  DirState *m_dir_state = nullptr;
78 
79  void clear() {
80  m_filename.clear();
81  m_last_queue_swap_u1 = 0xffffffff;
82  m_last_write_queue_pos = -1;
83  m_dir_state = nullptr;
84  }
85  };
86  std::vector<AccessToken> m_access_tokens;
87  std::vector<int> m_access_tokens_free_slots;
88 
89  struct OpenRecord {
90  time_t m_open_time;
91  bool m_existing_file;
92  };
93 
94  struct CloseRecord {
95  time_t m_close_time;
96  Stats m_full_stats;
97  };
98 
99  struct PurgeRecord {
100  long long m_size_in_st_blocks;
101  int m_n_files;
102  };
103 
104  Queue<int, OpenRecord> m_file_open_q;
105  Queue<int, Stats> m_file_update_stats_q;
106  Queue<int, CloseRecord> m_file_close_q;
107  Queue<DirState*, PurgeRecord> m_file_purge_q1;
108  Queue<std::string, PurgeRecord> m_file_purge_q2;
109  Queue<std::string, long long> m_file_purge_q3;
110  // DirPurge queue -- not needed? But we do need last-change timestamp in DirState.
111 
112  long long m_current_usage_in_st_blocks = 0; // aggregate disk usage by files
113 
114  XrdSysMutex m_queue_mutex; // mutex shared between queues
115  unsigned int m_queue_swap_u1 = 0u; // identifier of current swap cycle
116 
117  DataFsState &m_fs_state;
118  XrdOss &m_oss;
119 
120  // Requests for File opens during name-space scans. Such LFNs are processed
121  // with some priority
122  struct LfnCondRecord
123  {
124  const std::string &f_lfn;
125  XrdSysCondVar &f_cond;
126  bool f_checked = false;
127  };
128 
129  XrdSysMutex m_dir_scan_mutex;
130  std::list<LfnCondRecord> m_dir_scan_open_requests;
131  int m_dir_scan_check_counter;
132  bool m_dir_scan_in_progress = false;
133 
134  void process_inter_dir_scan_open_requests(FsTraversal &fst);
135  void cross_check_or_process_oob_lfn(const std::string &lfn, FsTraversal &fst);
136  long long get_file_usage_bytes_to_remove(const DataFsPurgeshot &ps, long long previous_file_usage, int logLeve);
137 
138 public:
139  ResourceMonitor(XrdOss& oss);
141 
142  // --- Initial scan, building of DirState tree
143 
144  void scan_dir_and_recurse(FsTraversal &fst);
145  bool perform_initial_scan();
146 
147  // --- Event registration
148 
149  int register_file_open(const std::string& filename, time_t open_timestamp, bool existing_file) {
150  // Simply return a token, we will resolve it in the actual processing of the queue.
151  XrdSysMutexHelper _lock(&m_queue_mutex);
152  int token_id;
153  if ( ! m_access_tokens_free_slots.empty()) {
154  token_id = m_access_tokens_free_slots.back();
155  m_access_tokens_free_slots.pop_back();
156  m_access_tokens[token_id].m_filename = filename;
157  m_access_tokens[token_id].m_last_queue_swap_u1 = m_queue_swap_u1 - 1;
158  } else {
159  token_id = (int) m_access_tokens.size();
160  m_access_tokens.push_back({filename, m_queue_swap_u1 - 1});
161  }
162 
163  m_file_open_q.push(token_id, {open_timestamp, existing_file});
164  return token_id;
165  }
166 
167  void register_file_update_stats(int token_id, const Stats& stats) {
168  XrdSysMutexHelper _lock(&m_queue_mutex);
169  AccessToken &at = token(token_id);
170  // Check if this is the first update within this queue swap cycle.
171  if (at.m_last_queue_swap_u1 != m_queue_swap_u1) {
172  m_file_update_stats_q.push(token_id, stats);
173  at.m_last_queue_swap_u1 = m_queue_swap_u1;
174  at.m_last_write_queue_pos = m_file_update_stats_q.write_queue_size() - 1;
175  } else {
176  Stats &existing_stats = m_file_update_stats_q.write_record(at.m_last_write_queue_pos);
177  existing_stats.AddUp(stats);
178  }
179  // Optionally, one could return "scaler" to moodify stat-reporting
180  // frequency in the file ... if it comes too often or too rarely.
181  // See also the logic for determining reporting interval (in N_bytes_read)
182  // in File::Open().
183  }
184 
185  void register_file_close(int token_id, time_t close_timestamp, const Stats& full_stats) {
186  XrdSysMutexHelper _lock(&m_queue_mutex);
187  m_file_close_q.push(token_id, {close_timestamp, full_stats});
188  }
189 
190  // deletions can come from purge and from direct requests (Cache::UnlinkFile), the latter
191  // also covering the emergency shutdown of a file.
192  void register_file_purge(DirState* target, long long size_in_st_blocks) {
193  XrdSysMutexHelper _lock(&m_queue_mutex);
194  m_file_purge_q1.push(target, {size_in_st_blocks, 1});
195  }
196  void register_multi_file_purge(DirState* target, long long size_in_st_blocks, int n_files) {
197  XrdSysMutexHelper _lock(&m_queue_mutex);
198  m_file_purge_q1.push(target, {size_in_st_blocks, n_files});
199  }
200  void register_multi_file_purge(const std::string& target, long long size_in_st_blocks, int n_files) {
201  XrdSysMutexHelper _lock(&m_queue_mutex);
202  m_file_purge_q2.push(target, {size_in_st_blocks, n_files});
203  }
204  void register_file_purge(const std::string& filename, long long size_in_st_blocks) {
205  XrdSysMutexHelper _lock(&m_queue_mutex);
206  m_file_purge_q3.push(filename, size_in_st_blocks);
207  }
208 
209  // void register_dir_purge(DirState* target);
210  // target assumed to be empty at this point, triggered by a file_purge removing the last file in it.
211  // hmmh, this is actually tricky ... who will purge the dirs? we should now at export-to-vector time
212  // and can prune leaf directories. This might fail if a file has been created in there in the meantime, which is ok.
213  // However, is there a race condition between rmdir and creation of a new file in that dir? Ask Andy.
214 
215  // --- Helpers for event processing and actions
216 
217  AccessToken& token(int i) { return m_access_tokens[i]; }
218 
219  // --- Actions
220 
221  int process_queues();
222 
223  void heart_beat();
224 
225  // --- Helpers for export of DirState vector snapshot.
226 
227  void fill_sshot_vec_children(const DirState &parent_ds,
228  int parent_idx,
229  std::vector<DirStateElement> &vec,
230  int max_depth);
231 
232  void fill_pshot_vec_children(const DirState &parent_ds,
233  int parent_idx,
234  std::vector<DirPurgeElement> &vec,
235  int max_depth);
236 
237  // Interface to other part of XCache -- note the CamelCase() notation.
238  void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond);
239 
240  // main function, steers startup then enters heart_beat. does not die.
241  void init_before_main(); // called from startup thread / configuration processing
242  void main_thread_function(); // run in dedicated thread
243 
245  // The following variables are set under the above lock, purge task signals to heart_beat.
246  time_t m_purge_task_start {0};
247  time_t m_purge_task_end {0};
248  bool m_purge_task_active {false}; // from the perspective of heart-beat, set only in heartbeat
249  bool m_purge_task_complete {false}; // from the perspective of the task, reset in heartbeat, set in task
250  // When m_purge_task_active == true, DirState entries are not removed from the tree to
251  // allow purge thread to report cleared files directly via DirState ptr.
252  // Note, DirState removal happens during stat propagation traversal.
253 
254  // Purge helpers etc.
256  void perform_purge_check(bool purge_cold_files, int tl);
257 
260 };
261 
262 }
263 
264 #endif
int stat(const char *path, struct stat *buf)
#define ID
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void perform_purge_check(bool purge_cold_files, int tl)
void fill_sshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)
void perform_purge_task(DataFsPurgeshot &ps)
void register_file_purge(const std::string &filename, long long size_in_st_blocks)
void scan_dir_and_recurse(FsTraversal &fst)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void fill_pshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)
void register_multi_file_purge(DirState *target, long long size_in_st_blocks, int n_files)
void register_file_update_stats(int token_id, const Stats &stats)
void register_multi_file_purge(const std::string &target, long long size_in_st_blocks, int n_files)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
void AddUp(const Stats &s)
Definition: XrdPfcStats.hh:119
Definition: XrdPfc.hh:41
XrdPosixStats Stats
Definition: XrdPosixFile.cc:64