XRootD
XrdOssStatsFileSystem.cc
Go to the documentation of this file.
1 
3 #include "XrdOssStatsConfig.hh"
5 #include "XrdOssStatsFile.hh"
9 
10 #include <inttypes.h>
11 #include <stdexcept>
12 #include <thread>
13 
15  XrdOssWrapper(*oss),
16  m_oss(oss),
17  m_env(envP),
18  m_log(lp, "fsstat_"),
19  m_slow_duration(std::chrono::seconds(1))
20 {
21  m_log.Say("------ Initializing the storage statistics plugin.");
22  if (!Config(configfn)) {
23  throw std::runtime_error("Failed to configure the storage statistics plugin.");
24  }
25  pthread_t tid;
26  int rc;
27  if ((rc = XrdSysThread::Run(&tid, StatsFileSystem::AggregateBootstrap, static_cast<void *>(this), 0, "FS Stats Compute Thread"))) {
28  m_log.Emsg("StatsFileSystem", rc, "create stats compute thread");
29  throw std::runtime_error("Failed to create the statistics computing thread.");
30  }
31 
32  // While the plugin _does_ print its activity to the debugging facility (if enabled), its relatively useless
33  // unless the g-stream is available. Hence, if it's _not_ available, we should fail to startup.
34  if (envP) {
35  m_gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("oss.gStream*"));
36  if (m_gstream) {
37  m_log.Say("Config", "Stats monitoring has been configured via xrootd.mongstream directive");
38  } else {
39  throw std::runtime_error("XrdOssStats plugin is loaded but it requires the oss monitoring g-stream to also be enabled to be set; try adding `xrootd.mongstream oss ...` to your configuration");
40  }
41  } else {
42  throw std::runtime_error("XrdOssStats plugin invoked without a configured environment; likely an internal error");
43  }
44 }
45 
47 
48 void *
49 StatsFileSystem::AggregateBootstrap(void *me) {
50  auto myself = static_cast<StatsFileSystem*>(me);
51  while (1) {
52  std::this_thread::sleep_for(std::chrono::seconds(1));
53  myself->AggregateStats();
54  }
55  return nullptr;
56 }
57 
58 bool
59 StatsFileSystem::Config(const char *configfn)
60 {
62 
63  XrdOucGatherConf statsConf("fsstats.trace fsstats.slowop", &m_log);
64  int result;
65  if ((result = statsConf.Gather(configfn, XrdOucGatherConf::trim_lines)) < 0) {
66  m_log.Emsg("Config", -result, "parsing config file", configfn);
67  return false;
68  }
69 
70  char *val;
71  while (statsConf.GetLine()) {
72  val = statsConf.GetToken(); // Ignore -- we asked for a single value
73  if (!strcmp(val, "trace")) {
74  m_log.setMsgMask(0);
75  if (!(val = statsConf.GetToken())) {
76  m_log.Emsg("Config", "fsstats.trace requires an argument. Usage: fsstats.trace [all|err|warning|info|debug|none]");
77  return false;
78  }
79  do {
80  if (!strcmp(val, "all")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::All);}
81  else if (!strcmp(val, "error")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error);}
82  else if (!strcmp(val, "warning")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error | LogMask::Warning);}
83  else if (!strcmp(val, "info")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error | LogMask::Warning | LogMask::Info);}
84  else if (!strcmp(val, "debug")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error | LogMask::Warning | LogMask::Info | LogMask::Debug);}
85  else if (!strcmp(val, "none")) {m_log.setMsgMask(0);}
86  } while ((val = statsConf.GetToken()));
87  } else if (!strcmp(val, "slowop")) {
88  if (!(val = statsConf.GetToken())) {
89  m_log.Emsg("Config", "fsstats.slowop requires an argument. Usage: fsstats.slowop [duration]");
90  return false;
91  }
92  std::string errmsg;
93  if (!ParseDuration(val, m_slow_duration, errmsg)) {
94  m_log.Emsg("Config", "fsstats.slowop couldn't parse duration", val, errmsg.c_str());
95  return false;
96  }
97  }
98  }
99  m_log.Emsg("Config", "Logging levels enabled", LogMaskToString(m_log.getMsgMask()).c_str());
100 
101  return true;
102 }
103 
105 {
106  // Call the underlying OSS newDir
107  std::unique_ptr<XrdOssDF> wrapped(wrapPI.newDir(user));
108  return new StatsDirectory(std::move(wrapped), m_log, *this);
109 }
110 
112 {
113  // Call the underlying OSS newFile
114  std::unique_ptr<XrdOssDF> wrapped(wrapPI.newFile(user));
115  return new StatsFile(std::move(wrapped), m_log, *this);
116 }
117 
118 int StatsFileSystem::Chmod(const char * path, mode_t mode, XrdOucEnv *env)
119 {
120  OpTimer op(m_ops.m_chmod_ops, m_slow_ops.m_chmod_ops, m_times.m_chmod, m_slow_times.m_chmod, m_slow_duration);
121  return wrapPI.Chmod(path, mode, env);
122 }
123 
124 int StatsFileSystem::Rename(const char *oPath, const char *nPath,
125  XrdOucEnv *oEnvP, XrdOucEnv *nEnvP)
126 {
127  OpTimer op(m_ops.m_rename_ops, m_slow_ops.m_rename_ops, m_times.m_rename, m_slow_times.m_rename, m_slow_duration);
128  return wrapPI.Rename(oPath, nPath, oEnvP, nEnvP);
129 }
130 
131 int StatsFileSystem::Stat(const char *path, struct stat *buff,
132  int opts, XrdOucEnv *env)
133 {
134  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
135  return wrapPI.Stat(path, buff, opts, env);
136 }
137 
138 int StatsFileSystem::StatFS(const char *path, char *buff, int &blen,
139  XrdOucEnv *env)
140 {
141  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
142  return wrapPI.StatFS(path, buff, blen, env);
143 }
144 
145 int StatsFileSystem::StatLS(XrdOucEnv &env, const char *path,
146  char *buff, int &blen)
147 {
148  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
149  return wrapPI.StatLS(env, path, buff, blen);
150 }
151 
152 int StatsFileSystem::StatPF(const char *path, struct stat *buff, int opts)
153 {
154  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
155  return wrapPI.StatPF(path, buff, opts);
156 }
157 
158 int StatsFileSystem::StatPF(const char *path, struct stat *buff)
159 {
160  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
161  return wrapPI.StatPF(path, buff, 0);
162 }
163 
164 int StatsFileSystem::StatVS(XrdOssVSInfo *vsP, const char *sname, int updt)
165 {
166  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
167  return wrapPI.StatVS(vsP, sname, updt);
168 }
169 
170 int StatsFileSystem::StatXA(const char *path, char *buff, int &blen,
171  XrdOucEnv *env)
172 {
173  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
174  return wrapPI.StatXA(path, buff, blen, env);
175 }
176 
177 int StatsFileSystem::StatXP(const char *path, unsigned long long &attr,
178  XrdOucEnv *env)
179 {
180  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
181  return wrapPI.StatXP(path, attr, env);
182 }
183 
184 int StatsFileSystem::Truncate(const char *path, unsigned long long fsize,
185  XrdOucEnv *env)
186 {
187  OpTimer op(m_ops.m_truncate_ops, m_slow_ops.m_truncate_ops, m_times.m_truncate, m_slow_times.m_truncate, m_slow_duration);
188  return wrapPI.Truncate(path, fsize, env);
189 }
190 
191 int StatsFileSystem::Unlink(const char *path, int Opts, XrdOucEnv *env)
192 {
193  OpTimer op(m_ops.m_unlink_ops, m_slow_ops.m_unlink_ops, m_times.m_unlink, m_slow_times.m_unlink, m_slow_duration);
194  return wrapPI.Unlink(path, Opts, env);
195 }
196 
197 void StatsFileSystem::AggregateStats()
198 {
199  char buf[1500];
200  auto len = snprintf(buf, 1500,
201  "{"
202  "\"event\":\"oss_stats\"," \
203  "\"reads\":%" PRIu64 ",\"writes\":%" PRIu64 ",\"stats\":%" PRIu64 "," \
204  "\"pgreads\":%" PRIu64 ",\"pgwrites\":%" PRIu64 ",\"readvs\":%" PRIu64 "," \
205  "\"readv_segs\":%" PRIu64 ",\"dirlists\":%" PRIu64 ",\"dirlist_ents\":%" PRIu64 ","
206  "\"truncates\":%" PRIu64 ",\"unlinks\":%" PRIu64 ",\"chmods\":%" PRIu64 ","
207  "\"opens\":%" PRIu64 ",\"renames\":%" PRIu64 ","
208  "\"slow_reads\":%" PRIu64 ",\"slow_writes\":%" PRIu64 ",\"slow_stats\":%" PRIu64 ","
209  "\"slow_pgreads\":%" PRIu64 ",\"slow_pgwrites\":%" PRIu64 ",\"slow_readvs\":%" PRIu64 ","
210  "\"slow_readv_segs\":%" PRIu64 ",\"slow_dirlists\":%" PRIu64 ",\"slow_dirlist_ents\":%" PRIu64 ","
211  "\"slow_truncates\":%" PRIu64 ",\"slow_unlinks\":%" PRIu64 ",\"slow_chmods\":%" PRIu64 ","
212  "\"slow_opens\":%" PRIu64 ",\"slow_renames\":%" PRIu64 ","
213  "\"open_t\":%.4f,\"read_t\":%.4f,\"readv_t\":%.4f,"
214  "\"pgread_t\":%.4f,\"write_t\":%.4f,\"pgwrite_t\":%.4f,"
215  "\"dirlist_t\":%.4f,\"stat_t\":%.4f,\"truncate_t\":%.4f,"
216  "\"unlink_t\":%.4f,\"rename_t\":%.4f,\"chmod_t\":%.4f,"
217  "\"slow_open_t\":%.4f,\"slow_read_t\":%.4f,\"slow_readv_t\":%.4f,"
218  "\"slow_pgread_t\":%.4f,\"slow_write_t\":%.4f,\"slow_pgwrite_t\":%.4f,"
219  "\"slow_dirlist_t\":%.4f,\"slow_stat_t\":%.4f,\"slow_truncate_t\":%.4f,"
220  "\"slow_unlink_t\":%.4f,\"slow_rename_t\":%.4f,\"slow_chmod_t\":%.4f"
221  "}",
222  m_ops.m_read_ops.load(), m_ops.m_write_ops.load(), m_ops.m_stat_ops.load(),
223  m_ops.m_pgread_ops.load(), m_ops.m_pgwrite_ops.load(), m_ops.m_readv_ops.load(),
224  m_ops.m_readv_segs.load(), m_ops.m_dirlist_ops.load(), m_ops.m_dirlist_entries.load(),
225  m_ops.m_truncate_ops.load(), m_ops.m_unlink_ops.load(), m_ops.m_chmod_ops.load(),
226  m_ops.m_open_ops.load(), m_ops.m_rename_ops.load(),
227  m_slow_ops.m_read_ops.load(), m_slow_ops.m_write_ops.load(), m_slow_ops.m_stat_ops.load(),
228  m_slow_ops.m_pgread_ops.load(), m_slow_ops.m_pgwrite_ops.load(), m_slow_ops.m_readv_ops.load(),
229  m_slow_ops.m_readv_segs.load(), m_slow_ops.m_dirlist_ops.load(), m_slow_ops.m_dirlist_entries.load(),
230  m_slow_ops.m_truncate_ops.load(), m_slow_ops.m_unlink_ops.load(), m_slow_ops.m_chmod_ops.load(),
231  m_slow_ops.m_open_ops.load(), m_slow_ops.m_rename_ops.load(),
232  static_cast<float>(m_times.m_open.load())/1e9, static_cast<float>(m_times.m_read.load())/1e9, static_cast<float>(m_times.m_readv.load())/1e9,
233  static_cast<float>(m_times.m_pgread.load())/1e9, static_cast<float>(m_times.m_write.load())/1e9, static_cast<float>(m_times.m_pgwrite.load())/1e9,
234  static_cast<float>(m_times.m_dirlist.load())/1e9, static_cast<float>(m_times.m_stat.load())/1e9, static_cast<float>(m_times.m_truncate.load())/1e9,
235  static_cast<float>(m_times.m_unlink.load())/1e9, static_cast<float>(m_times.m_rename.load())/1e9, static_cast<float>(m_times.m_chmod.load())/1e9,
236  static_cast<float>(m_slow_times.m_open.load())/1e9, static_cast<float>(m_slow_times.m_read.load())/1e9, static_cast<float>(m_slow_times.m_readv.load())/1e9,
237  static_cast<float>(m_slow_times.m_pgread.load())/1e9, static_cast<float>(m_slow_times.m_write.load())/1e9, static_cast<float>(m_slow_times.m_pgwrite.load())/1e9,
238  static_cast<float>(m_slow_times.m_dirlist.load())/1e9, static_cast<float>(m_slow_times.m_stat.load())/1e9, static_cast<float>(m_slow_times.m_truncate.load())/1e9,
239  static_cast<float>(m_slow_times.m_unlink.load())/1e9, static_cast<float>(m_slow_times.m_rename.load())/1e9, static_cast<float>(m_slow_times.m_chmod.load())/1e9
240 
241  );
242  if (len >= 1500) {
243  m_log.Log(LogMask::Error, "Aggregate", "Failed to generate g-stream statistics packet");
244  return;
245  }
246  m_log.Log(LogMask::Debug, "Aggregate", buf);
247  if (m_gstream && !m_gstream->Insert(buf, len + 1)) {
248  m_log.Log(LogMask::Error, "Aggregate", "Failed to send g-stream statistics packet");
249  return;
250  }
251 }
252 
253 StatsFileSystem::OpTimer::OpTimer(std::atomic<uint64_t> &op_count, std::atomic<uint64_t> &slow_op_count, std::atomic<uint64_t> &timing, std::atomic<uint64_t> &slow_timing, std::chrono::steady_clock::duration duration)
254  : m_op_count(op_count),
255  m_slow_op_count(slow_op_count),
256  m_timing(timing),
257  m_slow_timing(slow_timing),
258  m_start(std::chrono::steady_clock::now()),
259  m_slow_duration(duration)
260 {}
261 
262 StatsFileSystem::OpTimer::~OpTimer()
263 {
264  auto dur = std::chrono::steady_clock::now() - m_start;
265  m_op_count++;
266  m_timing += std::chrono::nanoseconds(dur).count();
267  if (dur > m_slow_duration) {
268  m_slow_op_count++;
269  m_slow_timing += std::chrono::nanoseconds(dur).count();
270  }
271 }
bool ParseDuration(const std::string &duration, std::chrono::steady_clock::duration &result, std::string &errmsg)
std::string LogMaskToString(int mask)
@ Info
@ Warning
int stat(const char *path, struct stat *buf)
struct myOpts opts
XrdOssDF * newDir(const char *user=0) override
StatsFileSystem(XrdOss *oss, XrdSysLogger *log, const char *configName, XrdOucEnv *envP)
int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *env=0) override
int StatXP(const char *path, unsigned long long &attr, XrdOucEnv *env=0) override
int StatFS(const char *path, char *buff, int &blen, XrdOucEnv *env=0) override
XrdOssDF * newFile(const char *user=0) override
friend class StatsDirectory
bool Config(const char *configfn)
int StatPF(const char *path, struct stat *buff, int opts) override
int StatLS(XrdOucEnv &env, const char *path, char *buff, int &blen) override
int Unlink(const char *path, int Opts=0, XrdOucEnv *env=0) override
int Chmod(const char *path, mode_t mode, XrdOucEnv *env=0) override
int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0) override
int Truncate(const char *path, unsigned long long fsize, XrdOucEnv *env=0) override
int StatXA(const char *path, char *buff, int &blen, XrdOucEnv *env=0) override
int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0) override
virtual int StatLS(XrdOucEnv &env, const char *path, char *buff, int &blen)
Definition: XrdOss.cc:97
virtual int StatXA(const char *path, char *buff, int &blen, XrdOucEnv *envP=0)
Definition: XrdOss.cc:127
virtual int StatXP(const char *path, unsigned long long &attr, XrdOucEnv *envP=0)
Definition: XrdOss.cc:137
virtual XrdOssDF * newDir(const char *tident)=0
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int StatPF(const char *path, struct stat *buff, int opts)
Definition: XrdOss.cc:107
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
virtual int StatFS(const char *path, char *buff, int &blen, XrdOucEnv *envP=0)
Definition: XrdOss.cc:87
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Truncate(const char *path, unsigned long long fsize, XrdOucEnv *envP=0)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
char * GetToken(char **rest=0, int lowcase=0)
int Gather(const char *cfname, Level lvl, const char *parms=0)
@ trim_lines
Prefix trimmed lines.
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
void setMsgMask(int mask)
Definition: XrdSysError.hh:154
int getMsgMask()
Definition: XrdSysError.hh:156
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
Definition: XrdSysError.hh:133
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
bool Insert(const char *data, int dlen)
int Opts
Definition: XrdMpxStats.cc:58
XrdOucEnv * envP
Definition: XrdPss.cc:109