XRootD
XrdClReplay.cc File Reference
#include "XrdCl/XrdClOperations.hh"
#include "XrdCl/XrdClUtils.hh"
#include "XrdCl/XrdClFileOperations.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdClAction.hh"
#include "XrdClActionMetrics.hh"
#include "XrdClReplayArgs.hh"
#include <fstream>
#include <vector>
#include <tuple>
#include <unordered_map>
#include <chrono>
#include <iostream>
#include <thread>
#include <iomanip>
#include <atomic>
#include <stdarg.h>
#include <getopt.h>
#include <map>
#include <numeric>
#include <mutex>
#include <condition_variable>
+ Include dependency graph for XrdClReplay.cc:

Go to the source code of this file.

Classes

class  XrdCl::ActionExecutor
 Executes an action registered in the csv file. More...
 
class  XrdCl::barrier_t
 
class  XrdCl::BufferPool
 Buffer pool - to limit memory consumption. More...
 
class  XrdCl::mytimer_t
 Timer helper class. More...
 

Namespaces

 XrdCl
 

Typedefs

using XrdCl::action_list = std::multimap< double, ActionExecutor >
 List of actions: start time - action. More...
 

Functions

bool XrdCl::AssureFile (const std::string &url, uint64_t size, bool viatruncate, bool verify)
 AssureFile creates input data files on the fly if required. More...
 
std::thread XrdCl::ExecuteActions (std::unique_ptr< File > file, action_list &&actions, double t0, double speed, ActionMetrics &metric, bool simulate)
 
int main (int argc, char **argv)
 
std::unordered_map< File *, action_list > XrdCl::ParseInput (const std::string &path, double &t0, double &t1, std::unordered_map< File *, std::string > &filenames, std::unordered_map< File *, double > &synchronicity, std::unordered_map< File *, size_t > &responseerrors, const std::vector< std::string > &option_regex)
 
std::vector< std::string > XrdCl::ToColumns (const std::string &row)
 Split a row into columns. More...
 
void usage ()
 

Function Documentation

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 1108 of file XrdClReplay.cc.

1109 {
1110  XrdCl::ReplayArgs opt(argc, argv);
1111  int rc = 0;
1112 
1113  try
1114  {
1115  double t0 = 0;
1116  double t1 = 0;
1117  std::unordered_map<XrdCl::File*, std::string> filenames;
1118  std::unordered_map<XrdCl::File*, double> synchronicity;
1119  std::unordered_map<XrdCl::File*, size_t> responseerrors;
1120  auto actions = XrdCl::ParseInput(opt.path(),
1121  t0,
1122  t1,
1123  filenames,
1124  synchronicity,
1125  responseerrors,
1126  opt.regex()); // parse the input file
1127  std::vector<std::thread> threads;
1128  std::unordered_map<XrdCl::File*, XrdCl::ActionMetrics> metrics;
1129  threads.reserve(actions.size());
1130  double toffset = XrdCl::Action::timeNow() - t0;
1131  XrdCl::mytimer_t timer;
1132  XrdCl::ActionMetrics summetric;
1133  bool sampling_error = false;
1134 
1135  for (auto& action : actions)
1136  {
1137  metrics[action.first].fname = filenames[action.first];
1138  metrics[action.first].synchronicity = synchronicity[action.first];
1139  metrics[action.first].errors = responseerrors[action.first];
1140  if (metrics[action.first].errors)
1141  {
1142  sampling_error = true;
1143  }
1144  }
1145 
1146  if (sampling_error)
1147  {
1148  std::cerr << "Warning: IO file contains unsuccessful samples!" << std::endl;
1149  if (!opt.suppress_error())
1150  {
1151  std::cerr << "... run with [-f] or [--suppress] option to suppress unsuccessful IO events!"
1152  << std::endl;
1153  exit(-1);
1154  }
1155  }
1156 
1157 
1158  if (opt.print())
1159  toffset = 0; // indicate not to follow timing
1160 
1161  for (auto& action : actions)
1162  {
1163  // execute list of actions against file object
1164  threads.emplace_back(ExecuteActions(std::unique_ptr<XrdCl::File>(action.first),
1165  std::move(action.second),
1166  toffset,
1167  opt.speed(),
1168  metrics[action.first],
1169  opt.print()));
1170  }
1171 
1172  for (auto& t : threads) // wait until we are done
1173  t.join();
1174 
1175  if (opt.json())
1176  {
1177  std::cout << "{" << std::endl;
1178  if (opt.longformat())
1179  std::cout << " \"metrics\": [" << std::endl;
1180  }
1181 
1182  for (auto& metric : metrics)
1183  {
1184  if (opt.longformat())
1185  {
1186  std::cout << metric.second.Dump(opt.json());
1187  }
1188  summetric.add(metric.second);
1189  }
1190 
1191  if (opt.summary())
1192  std::cout << summetric.Dump(opt.json());
1193 
1194  if (opt.json())
1195  {
1196  if (opt.longformat())
1197  std::cout << " ]," << std::endl;
1198  }
1199 
1200  double tbench = timer.elapsed();
1201 
1202  if (opt.json())
1203  {
1204  {
1205  std::cout << " \"iosummary\": { " << std::endl;
1206  if (!opt.print())
1207  {
1208  std::cout << " \"player::runtime\": " << tbench << "," << std::endl;
1209  }
1210  std::cout << " \"player::speed\": " << opt.speed() << "," << std::endl;
1211  std::cout << " \"sampled::runtime\": " << t1 - t0 << "," << std::endl;
1212  std::cout << " \"volume::totalread\": " << summetric.getBytesRead() << "," << std::endl;
1213  std::cout << " \"volume::totalwrite\": " << summetric.getBytesWritten() << ","
1214  << std::endl;
1215  std::cout << " \"volume::read\": " << summetric.ios["Read::b"] << "," << std::endl;
1216  std::cout << " \"volume::write\": " << summetric.ios["Write::b"] << "," << std::endl;
1217  std::cout << " \"volume::pgread\": " << summetric.ios["PgRead::b"] << "," << std::endl;
1218  std::cout << " \"volume::pgwrite\": " << summetric.ios["PgWrite::b"] << "," << std::endl;
1219  std::cout << " \"volume::vectorread\": " << summetric.ios["VectorRead::b"] << ","
1220  << std::endl;
1221  std::cout << " \"volume::vectorwrite\": " << summetric.ios["VectorWrite::b"] << ","
1222  << std::endl;
1223  std::cout << " \"iops::read\": " << summetric.ios["Read::n"] << "," << std::endl;
1224  std::cout << " \"iops::write\": " << summetric.ios["Write::n"] << "," << std::endl;
1225  std::cout << " \"iops::pgread\": " << summetric.ios["PgRead::n"] << "," << std::endl;
1226  std::cout << " \"iops::pgwrite\": " << summetric.ios["PgRead::n"] << "," << std::endl;
1227  std::cout << " \"iops::vectorread\": " << summetric.ios["VectorRead::n"] << ","
1228  << std::endl;
1229  std::cout << " \"iops::vectorwrite\": " << summetric.ios["VectorRead::n"] << ","
1230  << std::endl;
1231  std::cout << " \"files::read\": " << summetric.ios["OpenR::n"] << "," << std::endl;
1232  std::cout << " \"files::write\": " << summetric.ios["OpenW::n"] << "," << std::endl;
1233  std::cout << " \"datasetsize::read\": " << summetric.ios["Read::o"] << "," << std::endl;
1234  std::cout << " \"datasetsize::write\": " << summetric.ios["Write::o"] << "," << std::endl;
1235  if (!opt.print())
1236  {
1237  std::cout << " \"bandwidth::mb::read\": "
1238  << summetric.getBytesRead() / tbench / 1000000.0 << "," << std::endl;
1239  std::cout << " \"bandwdith::mb::write\": "
1240  << summetric.getBytesWritten() / tbench / 1000000.0 << "," << std::endl;
1241  std::cout << " \"performancemark\": " << (100.0 * (t1 - t0) / tbench) << ","
1242  << std::endl;
1243  std::cout << " \"gain::read\":"
1244  << (100.0 * summetric.delays["Read::tnomi"] / summetric.delays["Read::tmeas"])
1245  << "," << std::endl;
1246  std::cout << " \"gain::write\":"
1247  << (100.0 * summetric.delays["Write::tnomi"] / summetric.delays["Write::tmeas"])
1248  << std::endl;
1249  }
1250  std::cout << " \"synchronicity::read\":"
1251  << summetric.aggregated_synchronicity.ReadSynchronicity() << "," << std::endl;
1252  std::cout << " \"synchronicity::write\":"
1253  << summetric.aggregated_synchronicity.WriteSynchronicity() << "," << std::endl;
1254  std::cout << " \"response::error:\":" << summetric.ios["All::e"] << std::endl;
1255  std::cout << " }" << std::endl;
1256  std::cout << "}" << std::endl;
1257  }
1258  }
1259  else
1260  {
1261  std::cout << "# =============================================" << std::endl;
1262  if (!opt.print())
1263  std::cout << "# IO Summary" << std::endl;
1264  else
1265  std::cout << "# IO Summary (print mode)" << std::endl;
1266  std::cout << "# =============================================" << std::endl;
1267  if (!opt.print())
1268  {
1269  std::cout << "# Total Runtime : " << std::fixed << tbench << " s" << std::endl;
1270  }
1271  std::cout << "# Sampled Runtime : " << std::fixed << t1 - t0 << " s" << std::endl;
1272  std::cout << "# Playback Speed : " << std::fixed << std::setprecision(2) << opt.speed()
1273  << std::endl;
1274  std::cout << "# IO Volume (R) : " << std::fixed
1276  << " [ std:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::b"])
1277  << " vec:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["VectorRead::b"])
1278  << " page:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["PgRead::b"])
1279  << " ] " << std::endl;
1280  std::cout << "# IO Volume (W) : " << std::fixed
1282  << " [ std:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["Write::b"])
1283  << " vec:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["VectorWrite::b"])
1284  << " page:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["PgWrite::b"])
1285  << " ] " << std::endl;
1286  std::cout << "# IOPS (R) : " << std::fixed << summetric.getIopsRead()
1287  << " [ std:" << summetric.ios["Read::n"]
1288  << " vec:" << summetric.ios["VectorRead::n"]
1289  << " page:" << summetric.ios["PgRead::n"] << " ] " << std::endl;
1290  std::cout << "# IOPS (W) : " << std::fixed << summetric.getIopsWrite()
1291  << " [ std:" << summetric.ios["Write::n"]
1292  << " vec:" << summetric.ios["VectorWrite::n"]
1293  << " page:" << summetric.ios["PgWrite::n"] << " ] " << std::endl;
1294  std::cout << "# Files (R) : " << std::fixed << summetric.ios["OpenR::n"] << std::endl;
1295  std::cout << "# Files (W) : " << std::fixed << summetric.ios["OpenW::n"] << std::endl;
1296  std::cout << "# Datasize (R) : " << std::fixed
1297  << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::o"]) << std::endl;
1298  std::cout << "# Datasize (W) : " << std::fixed
1299  << XrdCl::ActionMetrics::humanreadable(summetric.ios["Write::o"]) << std::endl;
1300  if (!opt.print())
1301  {
1302  std::cout << "# IO BW (R) : " << std::fixed << std::setprecision(2)
1303  << summetric.getBytesRead() / tbench / 1000000.0 << " MB/s" << std::endl;
1304  std::cout << "# IO BW (W) : " << std::fixed << std::setprecision(2)
1305  << summetric.getBytesRead() / tbench / 1000000.0 << " MB/s" << std::endl;
1306  }
1307  std::cout << "# ---------------------------------------------" << std::endl;
1308  std::cout << "# Quality Estimation" << std::endl;
1309  std::cout << "# ---------------------------------------------" << std::endl;
1310  if (!opt.print())
1311  {
1312  std::cout << "# Performance Mark : " << std::fixed << std::setprecision(2)
1313  << (100.0 * (t1 - t0) / tbench) << "%" << std::endl;
1314  std::cout << "# Gain Mark(R) : " << std::fixed << std::setprecision(2)
1315  << (100.0 * summetric.delays["Read::tnomi"] / summetric.delays["Read::tmeas"])
1316  << "%" << std::endl;
1317  std::cout << "# Gain Mark(W) : " << std::fixed << std::setprecision(2)
1318  << (100.0 * summetric.delays["Write::tnomi"] / summetric.delays["Write::tmeas"])
1319  << "%" << std::endl;
1320  }
1321  std::cout << "# Synchronicity(R) : " << std::fixed << std::setprecision(2)
1322  << summetric.aggregated_synchronicity.ReadSynchronicity() << "%" << std::endl;
1323  std::cout << "# Synchronicity(W) : " << std::fixed << std::setprecision(2)
1324  << summetric.aggregated_synchronicity.WriteSynchronicity() << "%" << std::endl;
1325  if (!opt.print())
1326  {
1327  std::cout << "# ---------------------------------------------" << std::endl;
1328  std::cout << "# Response Errors : " << std::fixed << summetric.ios["All::e"] << std::endl;
1329  std::cout << "# =============================================" << std::endl;
1330  if (summetric.ios["All::e"])
1331  {
1332  std::cerr << "Error: replay job failed with IO errors!" << std::endl;
1333  rc = -5;
1334  }
1335  }
1336  if (opt.create() || opt.verify())
1337  {
1338  std::cout << "# ---------------------------------------------" << std::endl;
1339  if (opt.create())
1340  {
1341  std::cout << "# Creating Dataset ..." << std::endl;
1342  }
1343  else
1344  {
1345  std::cout << "# Verifying Dataset ..." << std::endl;
1346  }
1347  uint64_t created_sofar = 0;
1348  for (auto& metric : metrics)
1349  {
1350  if (metric.second.getBytesRead() && !metric.second.getBytesWritten())
1351  {
1352  std::cout << "# ............................................." << std::endl;
1353  std::cout << "# file: " << metric.second.fname << std::endl;
1354  std::cout << "# size: "
1355  << XrdCl::ActionMetrics::humanreadable(metric.second.ios["Read::o"]) << " [ "
1356  << XrdCl::ActionMetrics::humanreadable(created_sofar) << " out of "
1357  << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::o"]) << " ] "
1358  << std::setprecision(2) << " ( "
1359  << 100.0 * created_sofar / summetric.ios["Read::o"] << "% )" << std::endl;
1360  if (!XrdCl::AssureFile(
1361  metric.second.fname, metric.second.ios["Read::o"], opt.truncate(), opt.verify()))
1362  {
1363  if (opt.verify())
1364  {
1365  rc = -5;
1366  }
1367  else
1368  {
1369  std::cerr << "Error: failed to assure that file " << metric.second.fname
1370  << " is stored with a size of "
1371  << XrdCl::ActionMetrics::humanreadable(metric.second.ios["Read::o"])
1372  << " !!!";
1373  rc = -5;
1374  }
1375  }
1376  }
1377  }
1378  }
1379  }
1380  }
1381  catch (const std::invalid_argument& ex)
1382  {
1383  std::cout << ex.what() << std::endl; // print parsing errors
1384  return 1;
1385  }
1386 
1387  return rc;
1388 }
Args parse for XrdClReplay.
Timer helper class.
Definition: XrdClReplay.cc:156
double elapsed() const
Definition: XrdClReplay.cc:174
std::unordered_map< File *, action_list > ParseInput(const std::string &path, double &t0, double &t1, std::unordered_map< File *, std::string > &filenames, std::unordered_map< File *, double > &synchronicity, std::unordered_map< File *, size_t > &responseerrors, const std::vector< std::string > &option_regex)
Definition: XrdClReplay.cc:904
std::thread ExecuteActions(std::unique_ptr< File > file, action_list &&actions, double t0, double speed, ActionMetrics &metric, bool simulate)
bool AssureFile(const std::string &url, uint64_t size, bool viatruncate, bool verify)
AssureFile creates input data files on the fly if required.
Definition: XrdClReplay.cc:216
Metrics struct storing all timing and IO information of an action.
synchronicity_t aggregated_synchronicity
void add(const ActionMetrics &other)
std::map< std::string, uint64_t > ios
std::map< std::string, double > delays
size_t getBytesWritten() const
std::string Dump(bool json) const
static std::string humanreadable(uint64_t insize)
static double timeNow()
Get curretn unix time in ns precision as a double.
Definition: XrdClAction.hh:78

References XrdCl::ActionMetrics::add(), XrdCl::ActionMetrics::aggregated_synchronicity, XrdCl::AssureFile(), XrdCl::ReplayArgs::create(), XrdCl::ActionMetrics::delays, XrdCl::ActionMetrics::Dump(), XrdCl::mytimer_t::elapsed(), XrdCl::ExecuteActions(), XrdCl::ActionMetrics::fname, XrdCl::ActionMetrics::getBytesRead(), XrdCl::ActionMetrics::getBytesWritten(), XrdCl::ActionMetrics::getIopsRead(), XrdCl::ActionMetrics::getIopsWrite(), XrdCl::ActionMetrics::humanreadable(), XrdCl::ActionMetrics::ios, XrdCl::ReplayArgs::json(), XrdCl::ReplayArgs::longformat(), XrdCl::ParseInput(), XrdCl::ReplayArgs::path(), XrdCl::ReplayArgs::print(), XrdCl::ActionMetrics::synchronicity_t::ReadSynchronicity(), XrdCl::ReplayArgs::regex(), XrdCl::ReplayArgs::speed(), XrdCl::ReplayArgs::summary(), XrdCl::ReplayArgs::suppress_error(), XrdCl::Action::timeNow(), XrdCl::ReplayArgs::truncate(), XrdCl::ReplayArgs::verify(), and XrdCl::ActionMetrics::synchronicity_t::WriteSynchronicity().

+ Here is the call graph for this function:

◆ usage()

void usage ( )

Definition at line 1072 of file XrdClReplay.cc.

1073 {
1074  std::cerr
1075  << "usage: xrdreplay [-p|--print] [-c|--create-data] [t|--truncate-data] [-l|--long] [-s|--summary] [-h|--help] [-r|--replace <arg>:=<newarg>] [-f|--suppress] <recordfilename>\n"
1076  << std::endl;
1077  std::cerr << " -h | --help : show this help" << std::endl;
1078  std::cerr
1079  << " -f | --suppress : force to run all IO with all successful result status - suppress all others"
1080  << std::endl;
1081  std::cerr
1082  << " - by default the player won't run with an unsuccessfully recorded IO"
1083  << std::endl;
1084  std::cerr << std::endl;
1085  std::cerr
1086  << " -p | --print : print only mode - shows all the IO for the given replay file without actually running any IO"
1087  << std::endl;
1088  std::cerr
1089  << " -s | --summary : print summary - shows all the aggregated IO counter summed for all files"
1090  << std::endl;
1091  std::cerr
1092  << " -l | --long : print long - show all file IO counter for each individual file"
1093  << std::endl;
1094  std::cerr
1095  << " -r | --replace <a>:=<b> : replace in the argument list the string <a> with <b> "
1096  << std::endl;
1097  std::cerr
1098  << " - option is usable several times e.g. to change storage prefixes or filenames"
1099  << std::endl;
1100  std::cerr << std::endl;
1101  std::cerr
1102  << "example: ... --replace file:://localhost:=root://xrootd.eu/ : redirect local file to remote"
1103  << std::endl;
1104  std::cerr << std::endl;
1105  exit(-1);
1106 }

Referenced by XrdPfc::Cache::ExecuteCommandUrl(), XrdSysXSLock::Lock(), main(), and XrdSysXSLock::UnLock().

+ Here is the caller graph for this function: