36 #include <unordered_map>
48 #include <condition_variable>
73 std::shared_ptr<std::vector<char>>
Allocate(
size_t length )
75 std::unique_lock<std::mutex> lck( mtx );
76 cv.wait( lck, [
this, length]{
return available >= length; } );
79 std::shared_ptr<std::vector<char>> buffer(
new std::vector<char>( length,
'A' ), del );
88 void Reclaim(
size_t length )
90 std::unique_lock<std::mutex> lck(mtx);
101 void operator()( std::vector<char> *buff )
108 static const size_t KB = 1024;
109 static const size_t MB = 1024 * KB;
110 static const size_t GB = 1024 * MB;
116 BufferPool() : mtx(), cv()
118 const char *maxsize = getenv(
"XRD_MAXBUFFERSIZE" );
121 size_t len = strlen( maxsize );
123 available = std::stoul( maxsize, &pos );
124 std::string sufix( len != pos ? maxsize + len - 2 :
"" );
125 std::transform( sufix.begin(), sufix.end(), sufix.begin(), ::toupper );
130 else if( sufix ==
"MB" )
132 else if( sufix ==
"GB" )
137 available = std::numeric_limits<size_t>::max();
140 BufferPool(
const BufferPool& ) =
delete;
141 BufferPool( BufferPool&& ) =
delete;
143 BufferPool& operator=(
const BufferPool& ) =
delete;
144 BufferPool& operator=( BufferPool& ) =
delete;
149 std::condition_variable cv;
162 : start(clock_t::now())
169 void reset() { start = clock_t::now(); }
177 * (std::chrono::duration_cast<std::chrono::nanoseconds>(clock_t::now() - start).count())
182 using clock_t = std::chrono::high_resolution_clock;
183 std::chrono::time_point<clock_t> start;
216 bool AssureFile(
const std::string& url, uint64_t size,
bool viatruncate,
bool verify)
220 uint16_t timeout = 60;
224 auto file = std::make_unique<XrdCl::File>(
false);
225 XRootDStatus status = file->Open(url, flags, mode, timeout);
230 status = file->Stat(
false, statinfo, timeout);
233 if (statinfo->
GetSize() < size)
236 <<
"Error: file size is not sufficient, but I won't touch the file - aborting ...";
241 std::cout <<
"# ---> info: file exists and has sufficient size" << std::endl;
250 std::cerr <<
"Verify: file is missing or inaccessible: " << url << std::endl;
258 auto file = std::make_unique<XrdCl::File>(
false);
259 XRootDStatus status = file->Open(url, wflags, wmode, timeout);
265 status = file->Truncate(size, timeout);
268 std::cerr <<
"Error: " << status.
ToString() <<
" - empty file might be left behind!"
277 using buffer_t = std::vector<uint64_t>;
281 while (nbytes < size)
283 size_t towrite = size - nbytes;
284 if (towrite > (buffer.size() *
sizeof(uint64_t)))
285 towrite = buffer.size() *
sizeof(uint64_t);
286 for (
size_t i = 0; i < buffer.size(); ++i)
289 buffer[i] = nbytes /
sizeof(uint64_t) + i;
291 status = file->Write(nbytes, towrite, buffer.data(), timeout);
294 std::cerr <<
"Error: " << status.
ToString() <<
" - failed to write file at offset "
295 << nbytes <<
" - incomplete file might be left behind!" << std::endl;
305 std::cerr <<
"Error: " << status.
ToString() <<
" - failed to create file!" << std::endl;
316 using buffer_t = std::shared_ptr<std::vector<char>>;
329 const std::string& action,
330 const std::string& args,
331 const std::string& orgststr,
332 const std::string& resp,
333 const double& duration)
338 , nominalduration(duration)
346 void Execute(std::shared_ptr<barrier_t>& ending,
347 std::shared_ptr<barrier_t>& closing,
351 if (action ==
"Open")
357 std::tie(url, flags, mode, timeout) = GetOpenArgs();
362 metric.
ios[
"OpenW::n"]++;
366 metric.
ios[
"OpenR::n"]++;
369 metric.
ios[
"Open::n"]++;
375 [
this, orgststr{ orgststr }, ending, closing, timer, &metric](
XRootDStatus& s)
mutable
377 metric.
addIos(
"Open",
"e", HandleStatus(s, orgststr,
"Open"));
388 else if (action ==
"Close")
390 uint16_t timeout = GetCloseArgs();
395 auto& sem = closing->get();
400 metric.
ios[
"Close::n"]++;
404 [
this, orgststr{ orgststr }, ending, timer, &metric](
XRootDStatus& s)
mutable
406 metric.
addIos(
"Close",
"e", HandleStatus(s, orgststr,
"Close"));
415 else if (action ==
"Stat")
419 std::tie(force, timeout) = GetStatArgs();
420 metric.
ios[
"Stat::n"]++;
425 [
this, orgststr{ orgststr }, ending, closing, timer, &metric](
XRootDStatus& s,
StatInfo& r)
mutable
427 metric.
addIos(
"Stat",
"e", HandleStatus(s, orgststr,
"Stat"));
438 else if (action ==
"Read")
443 std::tie(offset, buffer, timeout) = GetReadArgs();
444 metric.
ios[
"Read::n"]++;
445 metric.
ios[
"Read::b"] += buffer->size();
446 if ((offset + buffer->size()) > metric.
ios[
"Read::o"])
447 metric.
ios[
"Read::o"] = offset + buffer->size();
451 Async(
Read(file, offset, buffer->size(), buffer->data(), timeout) >>
452 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](
XRootDStatus& s,
455 metric.addIos(
"Read",
"e", HandleStatus(s, orgststr,
"Read"));
456 metric.addDelays(
"Read",
"tmeas", timer.elapsed());
468 else if (action ==
"PgRead")
473 std::tie(offset, buffer, timeout) = GetPgReadArgs();
474 metric.
ios[
"PgRead::n"]++;
475 metric.
ios[
"PgRead::b"] += buffer->size();
476 if ((offset + buffer->size()) > metric.
ios[
"Read::o"])
477 metric.
ios[
"Read::o"] = offset + buffer->size();
480 Async(
PgRead(file, offset, buffer->size(), buffer->data(), timeout) >>
481 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](
XRootDStatus& s,
484 metric.addIos(
"PgRead",
"e", HandleStatus(s, orgststr,
"PgRead"));
485 metric.addDelays(
"PgRead",
"tmeas", timer.elapsed());
497 else if (action ==
"Write")
502 std::tie(offset, buffer, timeout) = GetWriteArgs();
503 metric.
ios[
"Write::n"]++;
504 metric.
ios[
"Write::b"] += buffer->size();
505 if ((offset + buffer->size()) > metric.
ios[
"Write::o"])
506 metric.
ios[
"Write::o"] = offset + buffer->size();
511 Write(file, offset, buffer->size(), buffer->data(), timeout) >>
512 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](
XRootDStatus& s)
mutable
514 metric.addIos(
"Write",
"e", HandleStatus(s, orgststr,
"Write"));
515 metric.addDelays(
"Write",
"tmeas", timer.elapsed());
527 else if (action ==
"PgWrite")
532 std::tie(offset, buffer, timeout) = GetPgWriteArgs();
533 metric.
ios[
"PgWrite::n"]++;
534 metric.
ios[
"PgWrite::b"] += buffer->size();
535 if ((offset + buffer->size()) > metric.
ios[
"Write::o"])
536 metric.
ios[
"Write::o"] = offset + buffer->size();
540 PgWrite(file, offset, buffer->size(), buffer->data(), timeout) >>
541 [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](
XRootDStatus& s)
mutable
543 metric.addIos(
"PgWrite",
"e", HandleStatus(s, orgststr,
"PgWrite"));
544 metric.addDelays(
"PgWrite",
"tmeas", timer.elapsed());
556 else if (action ==
"Sync")
558 uint16_t timeout = GetSyncArgs();
559 metric.
ios[
"Sync::n"]++;
563 [
this, orgststr{ orgststr }, ending, closing, timer, &metric](
XRootDStatus& s)
mutable
565 metric.
addIos(
"Sync",
"e", HandleStatus(s, orgststr,
"Sync"));
576 else if (action ==
"Truncate")
580 std::tie(size, timeout) = GetTruncateArgs();
581 metric.
ios[
"Truncate::n"]++;
582 if (size > metric.
ios[
"Truncate::o"])
583 metric.
ios[
"Truncate::o"] = size;
588 [
this, orgststr{ orgststr }, ending, closing, timer, &metric](
XRootDStatus& s)
mutable
590 metric.
addIos(
"Truncate",
"e", HandleStatus(s, orgststr,
"Truncate"));
601 else if (action ==
"VectorRead")
605 std::vector<buffer_t> buffers;
606 std::tie(chunks, timeout, buffers) = GetVectorReadArgs();
607 metric.
ios[
"VectorRead::n"]++;
608 for (
auto& ch : chunks)
610 metric.
ios[
"VectorRead::b"] += ch.GetLength();
611 if ((ch.GetOffset() + ch.GetLength()) > metric.
ios[
"Read::o"])
612 metric.
ios[
"Read::o"] = ch.GetOffset() + ch.GetLength();
621 metric.
addIos(
"VectorRead",
"e", HandleStatus(s, orgststr,
"VectorRead"));
634 else if (action ==
"VectorWrite")
638 std::vector<buffer_t> buffers;
639 std::tie(chunks, timeout, buffers) = GetVectorWriteArgs();
640 metric.
ios[
"VectorWrite::n"]++;
641 for (
auto& ch : chunks)
643 metric.
ios[
"VectorWrite::b"] += ch.GetLength();
644 if ((ch.GetOffset() + ch.GetLength()) > metric.
ios[
"Write::o"])
645 metric.
ios[
"Write::o"] = ch.GetOffset() + ch.GetLength();
650 [
this, orgststr{ orgststr }, buffers, ending, closing, timer, &metric](
XRootDStatus& s)
mutable
652 metric.
addIos(
"VectorWrite",
"e", HandleStatus(s, orgststr,
"VectorWrite"));
679 std::string
Name()
const {
return action; }
685 static bool HandleStatus(
XRootDStatus& response,
const std::string& orgstr,
const std::string where=
"unknown")
687 std::string rspstr = response.
ToString();
688 rspstr.erase(remove(rspstr.begin(), rspstr.end(),
' '), rspstr.end());
690 if (rspstr != orgstr)
693 "We were expecting status: %s, but "
694 "received: %s from: %s",
709 std::tuple<std::string, OpenFlags::Flags, Access::Mode, uint16_t> GetOpenArgs()
711 std::vector<std::string> tokens;
713 if (tokens.size() != 4)
714 throw std::invalid_argument(
"Failed to parse open arguments.");
715 std::string url = tokens[0];
718 uint16_t timeout =
static_cast<uint16_t
>(std::stoul(tokens[3]));
719 return std::make_tuple(url, flags, mode, timeout);
725 uint16_t GetCloseArgs() {
return static_cast<uint16_t
>(std::stoul(args)); }
727 std::tuple<bool, uint16_t> GetStatArgs()
729 std::vector<std::string> tokens;
731 if (tokens.size() != 2)
732 throw std::invalid_argument(
"Failed to parse stat arguments.");
733 bool force = (tokens[0] ==
"true");
734 uint16_t timeout =
static_cast<uint16_t
>(std::stoul(tokens[1]));
735 return std::make_tuple(force, timeout);
741 std::tuple<uint64_t, buffer_t, uint16_t> GetReadArgs()
743 std::vector<std::string> tokens;
745 if (tokens.size() != 3)
746 throw std::invalid_argument(
"Failed to parse read arguments.");
748 uint32_t length = std::stoul(tokens[1]);
750 uint16_t timeout =
static_cast<uint16_t
>(std::stoul(tokens[2]));
751 return std::make_tuple(offset, buffer, timeout);
757 inline std::tuple<uint64_t, buffer_t, uint16_t> GetPgReadArgs() {
return GetReadArgs(); }
762 inline std::tuple<uint64_t, buffer_t, uint16_t> GetWriteArgs() {
return GetReadArgs(); }
767 inline std::tuple<uint64_t, buffer_t, uint16_t> GetPgWriteArgs() {
return GetReadArgs(); }
772 uint16_t GetSyncArgs() {
return static_cast<uint16_t
>(std::stoul(args)); }
777 std::tuple<uint64_t, uint16_t> GetTruncateArgs()
779 std::vector<std::string> tokens;
781 if (tokens.size() != 2)
782 throw std::invalid_argument(
"Failed to parse truncate arguments.");
784 uint16_t timeout =
static_cast<uint16_t
>(std::stoul(tokens[1]));
785 return std::make_tuple(size, timeout);
791 std::tuple<ChunkList, uint16_t, std::vector<buffer_t>> GetVectorReadArgs()
793 std::vector<std::string> tokens;
796 chunks.reserve( tokens.size() - 1 );
797 std::vector<buffer_t> buffers;
798 buffers.reserve( tokens.size() - 1 );
799 for (
size_t i = 0; i < tokens.size() - 1; i += 2)
802 uint32_t length = std::stoul(tokens[i + 1]);
804 chunks.emplace_back(offset, length, buffer->data());
805 buffers.emplace_back( std::move( buffer ) );
807 uint16_t timeout =
static_cast<uint16_t
>(std::stoul(tokens.back()));
808 return std::make_tuple(std::move(chunks), timeout, std::move(buffers));
814 inline std::tuple<ChunkList, uint16_t, std::vector<buffer_t>> GetVectorWriteArgs() {
return GetVectorReadArgs(); }
817 const std::string action;
818 const std::string args;
819 std::string orgststr;
820 double nominalduration;
826 std::vector<std::string>
ToColumns(
const std::string &row )
828 std::vector<std::string> columns;
834 while( pos != std::string::npos && pos < row.size() )
836 if( row[pos] ==
'"' )
840 if( pos + 1 < row.size() && row[pos + 1] !=
',' )
841 throw std::runtime_error(
"Parsing error: missing comma" );
849 auto b = std::next( row.begin(), pos + 1 );
850 size_t posend = row.find(
"\",", pos + 1 );
851 if( posend == std::string::npos && row[row.size() - 1] ==
'"' )
852 posend = row.size() - 1;
853 else if( posend == std::string::npos )
854 throw std::runtime_error(
"Parsing error: missing closing quote" );
855 auto e = std::next( row.begin(), posend );
856 columns.emplace_back( b, e );
861 else if( row[pos] ==
',' )
863 if( pos + 1 < row.size() && row[pos + 1] ==
'"' )
868 auto b = std::next( row.begin(), pos + 1 );
869 size_t posend = row.find(
',', pos + 1 );
870 if( posend == std::string::npos )
872 auto e = std::next( row.begin(), posend );
873 columns.emplace_back( b, e );
879 size_t posend = row.find(
',', pos + 1 );
880 if( posend == std::string::npos )
882 auto end = std::next( row.begin(), posend );
883 columns.emplace_back( row.begin(), end );
889 throw std::runtime_error(
"Parsing error: invalid input file." );
904 std::unordered_map<File*, action_list>
ParseInput(
const std::string& path,
907 std::unordered_map<File*, std::string>& filenames,
908 std::unordered_map<File*, double>& synchronicity,
909 std::unordered_map<File*, size_t>& responseerrors,
910 const std::vector<std::string>& option_regex)
912 std::unordered_map<File*, action_list> result;
913 std::unique_ptr<std::ifstream> fin( path.empty() ?
nullptr :
new std::ifstream( path, std::ifstream::in ) );
914 std::istream &input = path.empty() ? std::cin : *fin;
916 std::unordered_map<uint64_t, File*> files;
917 std::unordered_map<uint64_t, double> last_stop;
918 std::unordered_map<uint64_t, double> overlaps;
919 std::unordered_map<uint64_t, double> overlaps_cnt;
928 std::vector<std::string> tokens =
ToColumns( line );
929 if (tokens.size() == 6)
930 tokens.emplace_back();
931 if (tokens.size() != 7)
933 throw std::invalid_argument(
"Invalid input file format.");
937 std::string action = tokens[1];
938 double start = std::stod(tokens[2]);
939 std::string args = tokens[3];
940 double stop = std::stod(tokens[4]);
941 std::string status = tokens[5];
942 std::string resp = tokens[6];
944 if (option_regex.size())
946 for (
auto& v : option_regex)
948 std::vector<std::string> tokens;
950 std::regex src(tokens[0]);
951 if (tokens.size() != 2)
954 <<
"Error: invalid regex for argument replacement - must be format like <oldstring>:=<newstring>"
961 args = std::regex_replace(args, src, tokens[1]);
971 if (!files.count(
id))
973 files[id] =
new File(
false);
974 files[id]->SetProperty(
"BundledClose",
"true");
975 filenames[files[id]] = args;
976 filenames[files[id]].erase(args.find(
";"));
978 overlaps_cnt[id] = 0;
979 last_stop[id] = stop;
984 if (start > last_stop[
id])
988 last_stop[id] = stop;
991 last_stop[id] = stop;
992 double nominal_duration = stop - start;
994 if (status !=
"[SUCCESS]")
996 responseerrors[files[id]]++;
1000 result[files[id]].emplace(
1001 start,
ActionExecutor(*files[
id], action, args, status, resp, nominal_duration));
1005 for (
auto& it : overlaps)
1008 synchronicity[files[it.first]] = 100.0 * (it.second / overlaps_cnt[it.first]);
1028 [file{ std::move(file) },
1029 actions{ std::move(actions) },
1037 auto ending = std::make_shared<barrier_t>(endsem);
1038 auto closing = std::make_shared<barrier_t>(closesem);
1040 for (
auto& p : actions)
1042 auto& action = p.second;
1048 metric.
delays[action.Name() +
"::tloss"] += tdelay;
1049 std::this_thread::sleep_for(std::chrono::milliseconds((
int) (tdelay * 1000)));
1053 metric.
delays[action.Name() +
"::tgain"] += tdelay;
1057 action.Execute(ending, closing, metric, simulate);
1058 metric.
addDelays(action.Name(),
"tnomi", action.NominalDuration());
1064 file->GetProperty(
"LastURL", metric.
url);
1075 <<
"usage: xrdreplay [-p|--print] [-c|--create-data] [t|--truncate-data] [-l|--long] [-s|--summary] [-h|--help] [-r|--replace <arg>:=<newarg>] [-f|--suppress] <recordfilename>\n"
1077 std::cerr <<
" -h | --help : show this help" << std::endl;
1079 <<
" -f | --suppress : force to run all IO with all successful result status - suppress all others"
1082 <<
" - by default the player won't run with an unsuccessfully recorded IO"
1084 std::cerr << std::endl;
1086 <<
" -p | --print : print only mode - shows all the IO for the given replay file without actually running any IO"
1089 <<
" -s | --summary : print summary - shows all the aggregated IO counter summed for all files"
1092 <<
" -l | --long : print long - show all file IO counter for each individual file"
1095 <<
" -r | --replace <a>:=<b> : replace in the argument list the string <a> with <b> "
1098 <<
" - option is usable several times e.g. to change storage prefixes or filenames"
1100 std::cerr << std::endl;
1102 <<
"example: ... --replace file:://localhost:=root://xrootd.eu/ : redirect local file to remote"
1104 std::cerr << std::endl;
1117 std::unordered_map<XrdCl::File*, std::string> filenames;
1118 std::unordered_map<XrdCl::File*, double> synchronicity;
1119 std::unordered_map<XrdCl::File*, size_t> responseerrors;
1127 std::vector<std::thread> threads;
1128 std::unordered_map<XrdCl::File*, XrdCl::ActionMetrics> metrics;
1129 threads.reserve(actions.size());
1133 bool sampling_error =
false;
1135 for (
auto& action : actions)
1137 metrics[action.first].
fname = filenames[action.first];
1138 metrics[action.first].synchronicity = synchronicity[action.first];
1139 metrics[action.first].errors = responseerrors[action.first];
1140 if (metrics[action.first].errors)
1142 sampling_error =
true;
1148 std::cerr <<
"Warning: IO file contains unsuccessful samples!" << std::endl;
1151 std::cerr <<
"... run with [-f] or [--suppress] option to suppress unsuccessful IO events!"
1161 for (
auto& action : actions)
1164 threads.emplace_back(
ExecuteActions(std::unique_ptr<XrdCl::File>(action.first),
1165 std::move(action.second),
1168 metrics[action.first],
1172 for (
auto& t : threads)
1177 std::cout <<
"{" << std::endl;
1179 std::cout <<
" \"metrics\": [" << std::endl;
1182 for (
auto& metric : metrics)
1186 std::cout << metric.second.Dump(opt.
json());
1188 summetric.
add(metric.second);
1192 std::cout << summetric.
Dump(opt.
json());
1197 std::cout <<
" ]," << std::endl;
1200 double tbench = timer.
elapsed();
1205 std::cout <<
" \"iosummary\": { " << std::endl;
1208 std::cout <<
" \"player::runtime\": " << tbench <<
"," << std::endl;
1210 std::cout <<
" \"player::speed\": " << opt.
speed() <<
"," << std::endl;
1211 std::cout <<
" \"sampled::runtime\": " << t1 - t0 <<
"," << std::endl;
1212 std::cout <<
" \"volume::totalread\": " << summetric.
getBytesRead() <<
"," << std::endl;
1213 std::cout <<
" \"volume::totalwrite\": " << summetric.
getBytesWritten() <<
","
1215 std::cout <<
" \"volume::read\": " << summetric.
ios[
"Read::b"] <<
"," << std::endl;
1216 std::cout <<
" \"volume::write\": " << summetric.
ios[
"Write::b"] <<
"," << std::endl;
1217 std::cout <<
" \"volume::pgread\": " << summetric.
ios[
"PgRead::b"] <<
"," << std::endl;
1218 std::cout <<
" \"volume::pgwrite\": " << summetric.
ios[
"PgWrite::b"] <<
"," << std::endl;
1219 std::cout <<
" \"volume::vectorread\": " << summetric.
ios[
"VectorRead::b"] <<
","
1221 std::cout <<
" \"volume::vectorwrite\": " << summetric.
ios[
"VectorWrite::b"] <<
","
1223 std::cout <<
" \"iops::read\": " << summetric.
ios[
"Read::n"] <<
"," << std::endl;
1224 std::cout <<
" \"iops::write\": " << summetric.
ios[
"Write::n"] <<
"," << std::endl;
1225 std::cout <<
" \"iops::pgread\": " << summetric.
ios[
"PgRead::n"] <<
"," << std::endl;
1226 std::cout <<
" \"iops::pgwrite\": " << summetric.
ios[
"PgRead::n"] <<
"," << std::endl;
1227 std::cout <<
" \"iops::vectorread\": " << summetric.
ios[
"VectorRead::n"] <<
","
1229 std::cout <<
" \"iops::vectorwrite\": " << summetric.
ios[
"VectorRead::n"] <<
","
1231 std::cout <<
" \"files::read\": " << summetric.
ios[
"OpenR::n"] <<
"," << std::endl;
1232 std::cout <<
" \"files::write\": " << summetric.
ios[
"OpenW::n"] <<
"," << std::endl;
1233 std::cout <<
" \"datasetsize::read\": " << summetric.
ios[
"Read::o"] <<
"," << std::endl;
1234 std::cout <<
" \"datasetsize::write\": " << summetric.
ios[
"Write::o"] <<
"," << std::endl;
1237 std::cout <<
" \"bandwidth::mb::read\": "
1238 << summetric.
getBytesRead() / tbench / 1000000.0 <<
"," << std::endl;
1239 std::cout <<
" \"bandwdith::mb::write\": "
1240 << summetric.
getBytesWritten() / tbench / 1000000.0 <<
"," << std::endl;
1241 std::cout <<
" \"performancemark\": " << (100.0 * (t1 - t0) / tbench) <<
","
1243 std::cout <<
" \"gain::read\":"
1244 << (100.0 * summetric.
delays[
"Read::tnomi"] / summetric.
delays[
"Read::tmeas"])
1245 <<
"," << std::endl;
1246 std::cout <<
" \"gain::write\":"
1247 << (100.0 * summetric.
delays[
"Write::tnomi"] / summetric.
delays[
"Write::tmeas"])
1250 std::cout <<
" \"synchronicity::read\":"
1252 std::cout <<
" \"synchronicity::write\":"
1254 std::cout <<
" \"response::error:\":" << summetric.
ios[
"All::e"] << std::endl;
1255 std::cout <<
" }" << std::endl;
1256 std::cout <<
"}" << std::endl;
1261 std::cout <<
"# =============================================" << std::endl;
1263 std::cout <<
"# IO Summary" << std::endl;
1265 std::cout <<
"# IO Summary (print mode)" << std::endl;
1266 std::cout <<
"# =============================================" << std::endl;
1269 std::cout <<
"# Total Runtime : " << std::fixed << tbench <<
" s" << std::endl;
1271 std::cout <<
"# Sampled Runtime : " << std::fixed << t1 - t0 <<
" s" << std::endl;
1272 std::cout <<
"# Playback Speed : " << std::fixed << std::setprecision(2) << opt.
speed()
1274 std::cout <<
"# IO Volume (R) : " << std::fixed
1279 <<
" ] " << std::endl;
1280 std::cout <<
"# IO Volume (W) : " << std::fixed
1285 <<
" ] " << std::endl;
1286 std::cout <<
"# IOPS (R) : " << std::fixed << summetric.
getIopsRead()
1287 <<
" [ std:" << summetric.
ios[
"Read::n"]
1288 <<
" vec:" << summetric.
ios[
"VectorRead::n"]
1289 <<
" page:" << summetric.
ios[
"PgRead::n"] <<
" ] " << std::endl;
1290 std::cout <<
"# IOPS (W) : " << std::fixed << summetric.
getIopsWrite()
1291 <<
" [ std:" << summetric.
ios[
"Write::n"]
1292 <<
" vec:" << summetric.
ios[
"VectorWrite::n"]
1293 <<
" page:" << summetric.
ios[
"PgWrite::n"] <<
" ] " << std::endl;
1294 std::cout <<
"# Files (R) : " << std::fixed << summetric.
ios[
"OpenR::n"] << std::endl;
1295 std::cout <<
"# Files (W) : " << std::fixed << summetric.
ios[
"OpenW::n"] << std::endl;
1296 std::cout <<
"# Datasize (R) : " << std::fixed
1298 std::cout <<
"# Datasize (W) : " << std::fixed
1302 std::cout <<
"# IO BW (R) : " << std::fixed << std::setprecision(2)
1303 << summetric.
getBytesRead() / tbench / 1000000.0 <<
" MB/s" << std::endl;
1304 std::cout <<
"# IO BW (W) : " << std::fixed << std::setprecision(2)
1305 << summetric.
getBytesRead() / tbench / 1000000.0 <<
" MB/s" << std::endl;
1307 std::cout <<
"# ---------------------------------------------" << std::endl;
1308 std::cout <<
"# Quality Estimation" << std::endl;
1309 std::cout <<
"# ---------------------------------------------" << std::endl;
1312 std::cout <<
"# Performance Mark : " << std::fixed << std::setprecision(2)
1313 << (100.0 * (t1 - t0) / tbench) <<
"%" << std::endl;
1314 std::cout <<
"# Gain Mark(R) : " << std::fixed << std::setprecision(2)
1315 << (100.0 * summetric.
delays[
"Read::tnomi"] / summetric.
delays[
"Read::tmeas"])
1316 <<
"%" << std::endl;
1317 std::cout <<
"# Gain Mark(W) : " << std::fixed << std::setprecision(2)
1318 << (100.0 * summetric.
delays[
"Write::tnomi"] / summetric.
delays[
"Write::tmeas"])
1319 <<
"%" << std::endl;
1321 std::cout <<
"# Synchronicity(R) : " << std::fixed << std::setprecision(2)
1323 std::cout <<
"# Synchronicity(W) : " << std::fixed << std::setprecision(2)
1327 std::cout <<
"# ---------------------------------------------" << std::endl;
1328 std::cout <<
"# Response Errors : " << std::fixed << summetric.
ios[
"All::e"] << std::endl;
1329 std::cout <<
"# =============================================" << std::endl;
1330 if (summetric.
ios[
"All::e"])
1332 std::cerr <<
"Error: replay job failed with IO errors!" << std::endl;
1338 std::cout <<
"# ---------------------------------------------" << std::endl;
1341 std::cout <<
"# Creating Dataset ..." << std::endl;
1345 std::cout <<
"# Verifying Dataset ..." << std::endl;
1347 uint64_t created_sofar = 0;
1348 for (
auto& metric : metrics)
1350 if (metric.second.getBytesRead() && !metric.second.getBytesWritten())
1352 std::cout <<
"# ............................................." << std::endl;
1353 std::cout <<
"# file: " << metric.second.fname << std::endl;
1354 std::cout <<
"# size: "
1358 << std::setprecision(2) <<
" ( "
1359 << 100.0 * created_sofar / summetric.
ios[
"Read::o"] <<
"% )" << std::endl;
1361 metric.second.fname, metric.second.ios[
"Read::o"], opt.
truncate(), opt.
verify()))
1369 std::cerr <<
"Error: failed to assure that file " << metric.second.fname
1370 <<
" is stored with a size of "
1381 catch (
const std::invalid_argument& ex)
1383 std::cout << ex.what() << std::endl;
static unsigned long long int stoull(const std::string &s)
simple integer parsing, to be replaced by std::stoll when C++11 can be used
int main(int argc, char **argv)
void getline(uchar *buff, int blen)
Executes an action registered in the csv file.
double NominalDuration() const
Get nominal duration variable.
void Execute(std::shared_ptr< barrier_t > &ending, std::shared_ptr< barrier_t > &closing, ActionMetrics &metric, bool simulate)
std::string Name() const
Get aciton name.
ActionExecutor(File &file, const std::string &action, const std::string &args, const std::string &orgststr, const std::string &resp, const double &duration)
Buffer pool - to limit memory consumption.
std::shared_ptr< std::vector< char > > Allocate(size_t length)
static BufferPool & Instance()
Single instance access.
static Log * GetLog()
Get default log.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Args parse for XrdClReplay.
std::vector< std::string > & regex()
uint64_t GetSize() const
Get size (in bytes)
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
barrier_t(XrdSysSemaphore &sem)
mytimer_t()
Constructor (record start time)
void reset()
Reset the start time.
VectorWriteImpl< false > VectorWrite(Ctx< File > file, Arg< ChunkList > chunks, uint16_t timeout=0)
Factory for creating VectorWriteImpl objects.
std::vector< std::string > ToColumns(const std::string &row)
Split a row into columns.
StatImpl< false > Stat(Ctx< File > file, Arg< bool > force, uint16_t timeout=0)
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::multimap< double, ActionExecutor > action_list
List of actions: start time - action.
PgWriteImpl< false > PgWrite(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, Arg< std::vector< uint32_t >> cksums, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
std::unordered_map< File *, action_list > ParseInput(const std::string &path, double &t0, double &t1, std::unordered_map< File *, std::string > &filenames, std::unordered_map< File *, double > &synchronicity, std::unordered_map< File *, size_t > &responseerrors, const std::vector< std::string > &option_regex)
std::thread ExecuteActions(std::unique_ptr< File > file, action_list &&actions, double t0, double speed, ActionMetrics &metric, bool simulate)
TruncateImpl< false > Truncate(Ctx< File > file, Arg< uint64_t > size, uint16_t timeout)
PgReadImpl< false > PgRead(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
bool AssureFile(const std::string &url, uint64_t size, bool viatruncate, bool verify)
AssureFile creates input data files on the fly if required.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
std::vector< char > buffer_t
@ UX
owner executable/browsable
double ReadSynchronicity() const
double WriteSynchronicity() const
Metrics struct storing all timing and IO information of an action.
synchronicity_t aggregated_synchronicity
void add(const ActionMetrics &other)
std::map< std::string, uint64_t > ios
size_t getBytesRead() const
void addIos(const std::string &action, const std::string &field, double value)
std::map< std::string, double > delays
size_t getBytesWritten() const
void addDelays(const std::string &action, const std::string &field, double value)
std::string Dump(bool json) const
size_t getIopsWrite() const
size_t getIopsRead() const
static std::string humanreadable(uint64_t insize)
static double timeNow()
Get curretn unix time in ns precision as a double.
Describe a data chunk for vector read.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
@ Update
Open for reading and writing.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.