XRootD
XrdClReplay.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2021 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 // Co-Author: Andreas-Joachim Peters <andreas.joachim.peters@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #include "XrdCl/XrdClOperations.hh"
27 #include "XrdCl/XrdClUtils.hh"
29 #include "XrdSys/XrdSysPthread.hh"
30 #include "XrdClAction.hh"
31 #include "XrdClActionMetrics.hh"
32 #include "XrdClReplayArgs.hh"
33 #include <fstream>
34 #include <vector>
35 #include <tuple>
36 #include <unordered_map>
37 #include <chrono>
38 #include <iostream>
39 #include <thread>
40 #include <iomanip>
41 #include <atomic>
42 #include <stdarg.h>
43 #include <getopt.h>
44 #include <map>
45 #include <vector>
46 #include <numeric>
47 #include <mutex>
48 #include <condition_variable>
49 
50 namespace XrdCl
51 {
52 
53 //------------------------------------------------------------------------------
55 //------------------------------------------------------------------------------
57 {
58  public:
59 
60  //--------------------------------------------------------------------------
62  //--------------------------------------------------------------------------
63  static BufferPool& Instance()
64  {
65  static BufferPool instance;
66  return instance;
67  }
68 
69  //--------------------------------------------------------------------------
72  //--------------------------------------------------------------------------
73  std::shared_ptr<std::vector<char>> Allocate( size_t length )
74  {
75  std::unique_lock<std::mutex> lck( mtx );
76  cv.wait( lck, [this, length]{ return available >= length; } );
77  available -= length;
78  BufferDeleter del;
79  std::shared_ptr<std::vector<char>> buffer( new std::vector<char>( length, 'A' ), del );
80  return buffer;
81  }
82 
83  private:
84 
85  //--------------------------------------------------------------------------
87  //--------------------------------------------------------------------------
88  void Reclaim( size_t length )
89  {
90  std::unique_lock<std::mutex> lck(mtx);
91  available += length;
92  cv.notify_all();
93  }
94 
95  //--------------------------------------------------------------------------
98  //--------------------------------------------------------------------------
99  struct BufferDeleter
100  {
101  void operator()( std::vector<char> *buff )
102  {
103  BufferPool::Instance().Reclaim( buff->size() );
104  delete buff;
105  }
106  };
107 
108  static const size_t KB = 1024;
109  static const size_t MB = 1024 * KB;
110  static const size_t GB = 1024 * MB;
111 
112  //--------------------------------------------------------------------------
115  //--------------------------------------------------------------------------
116  BufferPool() : mtx(), cv()
117  {
118  const char *maxsize = getenv( "XRD_MAXBUFFERSIZE" );
119  if( maxsize )
120  {
121  size_t len = strlen( maxsize );
122  size_t pos;
123  available = std::stoul( maxsize, &pos );
124  std::string sufix( len != pos ? maxsize + len - 2 : "" );
125  std::transform( sufix.begin(), sufix.end(), sufix.begin(), ::toupper );
126  if( !sufix.empty() )
127  {
128  if( sufix == "KB" )
129  available *= KB;
130  else if( sufix == "MB" )
131  available *= MB;
132  else if( sufix == "GB" )
133  available *= GB;
134  }
135  return;
136  }
137  available = std::numeric_limits<size_t>::max();
138  }
139 
140  BufferPool( const BufferPool& ) = delete;
141  BufferPool( BufferPool&& ) = delete;
142 
143  BufferPool& operator=( const BufferPool& ) = delete;
144  BufferPool& operator=( BufferPool& ) = delete;
145 
146 
147  size_t available;
148  std::mutex mtx;
149  std::condition_variable cv;
150 };
151 
152 //------------------------------------------------------------------------------
154 //------------------------------------------------------------------------------
156 {
157  public:
158  //--------------------------------------------------------------------------
160  //--------------------------------------------------------------------------
162  : start(clock_t::now())
163  {
164  }
165 
166  //--------------------------------------------------------------------------
168  //--------------------------------------------------------------------------
169  void reset() { start = clock_t::now(); }
170 
171  //--------------------------------------------------------------------------
173  //--------------------------------------------------------------------------
174  double elapsed() const
175  {
176  return (1.0
177  * (std::chrono::duration_cast<std::chrono::nanoseconds>(clock_t::now() - start).count())
178  / 1000000000.0);
179  }
180 
181  private:
182  using clock_t = std::chrono::high_resolution_clock;
183  std::chrono::time_point<clock_t> start; //< registered start time
184 };
185 
186 //------------------------------------------------------------------------------
189 //------------------------------------------------------------------------------
191 {
192  public:
193  //------------------------------------------------------------------------
196  //------------------------------------------------------------------------
198  : sem(sem)
199  {
200  }
201 
202  //------------------------------------------------------------------------
204  //------------------------------------------------------------------------
205  ~barrier_t() { sem.Post(); }
206 
207  inline XrdSysSemaphore& get() { return sem; }
208 
209  private:
210  XrdSysSemaphore& sem; //< the semaphore to be posted
211 };
212 
213 //------------------------------------------------------------------------------
215 //------------------------------------------------------------------------------
216 bool AssureFile(const std::string& url, uint64_t size, bool viatruncate, bool verify)
217 {
219  Access::Mode mode = Access::None;
220  uint16_t timeout = 60;
221 
222  {
223  // deal with existing files
224  auto file = std::make_unique<XrdCl::File>(false);
225  XRootDStatus status = file->Open(url, flags, mode, timeout);
226  if (status.IsOK())
227  {
228  StatInfo* statinfo;
229  // file exists already, verify the size
230  status = file->Stat(false, statinfo, timeout);
231  if (status.IsOK())
232  {
233  if (statinfo->GetSize() < size)
234  {
235  std::cerr
236  << "Error: file size is not sufficient, but I won't touch the file - aborting ...";
237  return false;
238  }
239  else
240  {
241  std::cout << "# ---> info: file exists and has sufficient size" << std::endl;
242  return true;
243  }
244  }
245  }
246  }
247 
248  if (verify)
249  {
250  std::cerr << "Verify: file is missing or inaccessible: " << url << std::endl;
251  return false;
252  }
253 
254  {
255  // deal with non-existing file
258  auto file = std::make_unique<XrdCl::File>(false);
259  XRootDStatus status = file->Open(url, wflags, wmode, timeout);
260  if (status.IsOK())
261  {
262  if (viatruncate)
263  {
264  // create a file via truncation
265  status = file->Truncate(size, timeout);
266  if (!status.IsOK())
267  {
268  std::cerr << "Error: " << status.ToString() << " - empty file might be left behind!"
269  << std::endl;
270  return false;
271  }
272  return true;
273  }
274  else
275  {
276  // create a file via writes
277  using buffer_t = std::vector<uint64_t>; //< data buffer
278  buffer_t buffer(32768);
279  size_t nbytes = 0;
280 
281  while (nbytes < size)
282  {
283  size_t towrite = size - nbytes;
284  if (towrite > (buffer.size() * sizeof(uint64_t)))
285  towrite = buffer.size() * sizeof(uint64_t);
286  for (size_t i = 0; i < buffer.size(); ++i)
287  {
288  // we write the offset in this buffer
289  buffer[i] = nbytes / sizeof(uint64_t) + i;
290  }
291  status = file->Write(nbytes, towrite, buffer.data(), timeout);
292  if (!status.IsOK())
293  {
294  std::cerr << "Error: " << status.ToString() << " - failed to write file at offset "
295  << nbytes << " - incomplete file might be left behind!" << std::endl;
296  return false;
297  }
298  nbytes += towrite;
299  }
300  }
301  return true;
302  }
303  else
304  {
305  std::cerr << "Error: " << status.ToString() << " - failed to create file!" << std::endl;
306  }
307  }
308  return false;
309 }
310 
311 //------------------------------------------------------------------------------
313 //------------------------------------------------------------------------------
315 {
316  using buffer_t = std::shared_ptr<std::vector<char>>; //< data buffer
317 
318  public:
319  //--------------------------------------------------------------------------
327  //--------------------------------------------------------------------------
329  const std::string& action,
330  const std::string& args,
331  const std::string& orgststr,
332  const std::string& resp,
333  const double& duration)
334  : file(file)
335  , action(action)
336  , args(args)
337  , orgststr(orgststr)
338  , nominalduration(duration)
339  {
340  }
341 
342  //--------------------------------------------------------------------------
345  //--------------------------------------------------------------------------
346  void Execute(std::shared_ptr<barrier_t>& ending,
347  std::shared_ptr<barrier_t>& closing,
348  ActionMetrics& metric,
349  bool simulate)
350  {
351  if (action == "Open") // open action
352  {
353  std::string url;
354  OpenFlags::Flags flags;
355  Access::Mode mode;
356  uint16_t timeout;
357  std::tie(url, flags, mode, timeout) = GetOpenArgs();
358 
359  std::string lmetric;
360  if ((flags & OpenFlags::Update) || (flags & OpenFlags::Write))
361  {
362  metric.ios["OpenW::n"]++;
363  }
364  else
365  {
366  metric.ios["OpenR::n"]++;
367  }
368 
369  metric.ios["Open::n"]++;
370 
371  mytimer_t timer;
372 
373  if (!simulate)
374  WaitFor(Open(file, url, flags, mode, timeout) >>
375  [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
376  {
377  metric.addIos("Open", "e", HandleStatus(s, orgststr, "Open"));
378  metric.addDelays("Open", "tmeas", timer.elapsed());
379  ending.reset();
380  closing.reset();
381  });
382  else
383  {
384  ending.reset();
385  closing.reset();
386  }
387  }
388  else if (action == "Close") // close action
389  {
390  uint16_t timeout = GetCloseArgs();
391  mytimer_t timer;
392 
393  if (closing)
394  {
395  auto& sem = closing->get();
396  closing.reset();
397  sem.Wait();
398  }
399 
400  metric.ios["Close::n"]++;
401 
402  if (!simulate)
403  Async(Close(file, timeout) >>
404  [this, orgststr{ orgststr }, ending, timer, &metric](XRootDStatus& s) mutable
405  {
406  metric.addIos("Close", "e", HandleStatus(s, orgststr, "Close"));
407  metric.addDelays("Close", "tmeas", timer.elapsed());
408  ending.reset();
409  });
410  else
411  {
412  ending.reset();
413  }
414  }
415  else if (action == "Stat") // stat action
416  {
417  bool force;
418  uint16_t timeout;
419  std::tie(force, timeout) = GetStatArgs();
420  metric.ios["Stat::n"]++;
421  mytimer_t timer;
422 
423  if (!simulate)
424  Async(Stat(file, force, timeout) >>
425  [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s, StatInfo& r) mutable
426  {
427  metric.addIos("Stat", "e", HandleStatus(s, orgststr, "Stat"));
428  metric.addDelays("Stat", "tmeas", timer.elapsed());
429  ending.reset();
430  closing.reset();
431  });
432  else
433  {
434  ending.reset();
435  closing.reset();
436  }
437  }
438  else if (action == "Read") // read action
439  {
440  uint64_t offset;
441  buffer_t buffer;
442  uint16_t timeout;
443  std::tie(offset, buffer, timeout) = GetReadArgs();
444  metric.ios["Read::n"]++;
445  metric.ios["Read::b"] += buffer->size();
446  if ((offset + buffer->size()) > metric.ios["Read::o"])
447  metric.ios["Read::o"] = offset + buffer->size();
448 
449  mytimer_t timer;
450  if (!simulate)
451  Async(Read(file, offset, buffer->size(), buffer->data(), timeout) >>
452  [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s,
453  ChunkInfo& r) mutable
454  {
455  metric.addIos("Read", "e", HandleStatus(s, orgststr, "Read"));
456  metric.addDelays("Read", "tmeas", timer.elapsed());
457  buffer.reset();
458  ending.reset();
459  closing.reset();
460  });
461  else
462  {
463  buffer.reset();
464  ending.reset();
465  closing.reset();
466  }
467  }
468  else if (action == "PgRead") // pgread action
469  {
470  uint64_t offset;
471  buffer_t buffer;
472  uint16_t timeout;
473  std::tie(offset, buffer, timeout) = GetPgReadArgs();
474  metric.ios["PgRead::n"]++;
475  metric.ios["PgRead::b"] += buffer->size();
476  if ((offset + buffer->size()) > metric.ios["Read::o"])
477  metric.ios["Read::o"] = offset + buffer->size();
478  mytimer_t timer;
479  if (!simulate)
480  Async(PgRead(file, offset, buffer->size(), buffer->data(), timeout) >>
481  [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s,
482  PageInfo& r) mutable
483  {
484  metric.addIos("PgRead", "e", HandleStatus(s, orgststr, "PgRead"));
485  metric.addDelays("PgRead", "tmeas", timer.elapsed());
486  buffer.reset();
487  ending.reset();
488  closing.reset();
489  });
490  else
491  {
492  buffer.reset();
493  ending.reset();
494  closing.reset();
495  }
496  }
497  else if (action == "Write") // write action
498  {
499  uint64_t offset;
500  buffer_t buffer;
501  uint16_t timeout;
502  std::tie(offset, buffer, timeout) = GetWriteArgs();
503  metric.ios["Write::n"]++;
504  metric.ios["Write::b"] += buffer->size();
505  if ((offset + buffer->size()) > metric.ios["Write::o"])
506  metric.ios["Write::o"] = offset + buffer->size();
507  mytimer_t timer;
508 
509  if (!simulate)
510  Async(
511  Write(file, offset, buffer->size(), buffer->data(), timeout) >>
512  [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
513  {
514  metric.addIos("Write", "e", HandleStatus(s, orgststr, "Write"));
515  metric.addDelays("Write", "tmeas", timer.elapsed());
516  buffer.reset();
517  ending.reset();
518  closing.reset();
519  });
520  else
521  {
522  buffer.reset();
523  ending.reset();
524  closing.reset();
525  }
526  }
527  else if (action == "PgWrite") // pgwrite action
528  {
529  uint64_t offset;
530  buffer_t buffer;
531  uint16_t timeout;
532  std::tie(offset, buffer, timeout) = GetPgWriteArgs();
533  metric.ios["PgWrite::n"]++;
534  metric.ios["PgWrite::b"] += buffer->size();
535  if ((offset + buffer->size()) > metric.ios["Write::o"])
536  metric.ios["Write::o"] = offset + buffer->size();
537  mytimer_t timer;
538  if (!simulate)
539  Async(
540  PgWrite(file, offset, buffer->size(), buffer->data(), timeout) >>
541  [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
542  {
543  metric.addIos("PgWrite", "e", HandleStatus(s, orgststr, "PgWrite"));
544  metric.addDelays("PgWrite", "tmeas", timer.elapsed());
545  buffer.reset();
546  ending.reset();
547  closing.reset();
548  });
549  else
550  {
551  buffer.reset();
552  ending.reset();
553  closing.reset();
554  }
555  }
556  else if (action == "Sync") // sync action
557  {
558  uint16_t timeout = GetSyncArgs();
559  metric.ios["Sync::n"]++;
560  mytimer_t timer;
561  if (!simulate)
562  Async(Sync(file, timeout) >>
563  [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
564  {
565  metric.addIos("Sync", "e", HandleStatus(s, orgststr, "Sync"));
566  metric.addDelays("Sync", "tmeas", timer.elapsed());
567  ending.reset();
568  closing.reset();
569  });
570  else
571  {
572  ending.reset();
573  closing.reset();
574  }
575  }
576  else if (action == "Truncate") // truncate action
577  {
578  uint64_t size;
579  uint16_t timeout;
580  std::tie(size, timeout) = GetTruncateArgs();
581  metric.ios["Truncate::n"]++;
582  if (size > metric.ios["Truncate::o"])
583  metric.ios["Truncate::o"] = size;
584 
585  mytimer_t timer;
586  if (!simulate)
587  Async(Truncate(file, size, timeout) >>
588  [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
589  {
590  metric.addIos("Truncate", "e", HandleStatus(s, orgststr, "Truncate"));
591  metric.addDelays("Truncate", "tmeas", timer.elapsed());
592  ending.reset();
593  closing.reset();
594  });
595  else
596  {
597  ending.reset();
598  closing.reset();
599  }
600  }
601  else if (action == "VectorRead") // vector read action
602  {
603  ChunkList chunks;
604  uint16_t timeout;
605  std::vector<buffer_t> buffers;
606  std::tie(chunks, timeout, buffers) = GetVectorReadArgs();
607  metric.ios["VectorRead::n"]++;
608  for (auto& ch : chunks)
609  {
610  metric.ios["VectorRead::b"] += ch.GetLength();
611  if ((ch.GetOffset() + ch.GetLength()) > metric.ios["Read::o"])
612  metric.ios["Read::o"] = ch.GetOffset() + ch.GetLength();
613  }
614 
615  mytimer_t timer;
616  if (!simulate)
617  Async(
618  VectorRead(file, chunks, timeout) >>
619  [this, orgststr{ orgststr }, buffers, ending, closing, timer, &metric](XRootDStatus& s, VectorReadInfo& r) mutable
620  {
621  metric.addIos("VectorRead", "e", HandleStatus(s, orgststr, "VectorRead"));
622  metric.addDelays("VectorRead", "tmeas", timer.elapsed());
623  buffers.clear();
624  ending.reset();
625  closing.reset();
626  });
627  else
628  {
629  buffers.clear();
630  ending.reset();
631  closing.reset();
632  }
633  }
634  else if (action == "VectorWrite") // vector write
635  {
636  ChunkList chunks;
637  uint16_t timeout;
638  std::vector<buffer_t> buffers;
639  std::tie(chunks, timeout, buffers) = GetVectorWriteArgs();
640  metric.ios["VectorWrite::n"]++;
641  for (auto& ch : chunks)
642  {
643  metric.ios["VectorWrite::b"] += ch.GetLength();
644  if ((ch.GetOffset() + ch.GetLength()) > metric.ios["Write::o"])
645  metric.ios["Write::o"] = ch.GetOffset() + ch.GetLength();
646  }
647  mytimer_t timer;
648  if (!simulate)
649  Async(VectorWrite(file, chunks, timeout) >>
650  [this, orgststr{ orgststr }, buffers, ending, closing, timer, &metric](XRootDStatus& s) mutable
651  {
652  metric.addIos("VectorWrite", "e", HandleStatus(s, orgststr, "VectorWrite"));
653  metric.addDelays("VectorWrite", "tmeas", timer.elapsed());
654  buffers.clear();
655  ending.reset();
656  closing.reset();
657  });
658  else
659  {
660  buffers.clear();
661  ending.reset();
662  closing.reset();
663  }
664  }
665  else
666  {
667  DefaultEnv::GetLog()->Warning(AppMsg, "Cannot replyt %s action.", action.c_str());
668  }
669  }
670 
671  //--------------------------------------------------------------------------
673  //--------------------------------------------------------------------------
674  double NominalDuration() const { return nominalduration; }
675 
676  //--------------------------------------------------------------------------
678  //--------------------------------------------------------------------------
679  std::string Name() const { return action; }
680 
681  private:
682  //--------------------------------------------------------------------------
684  //--------------------------------------------------------------------------
685  static bool HandleStatus(XRootDStatus& response, const std::string& orgstr, const std::string where="unknown")
686  {
687  std::string rspstr = response.ToString();
688  rspstr.erase(remove(rspstr.begin(), rspstr.end(), ' '), rspstr.end());
689 
690  if (rspstr != orgstr)
691  {
693  "We were expecting status: %s, but "
694  "received: %s from: %s",
695  orgstr.c_str(),
696  rspstr.c_str(),
697  where.c_str());
698  return true;
699  }
700  else
701  {
702  return false;
703  }
704  }
705 
706  //--------------------------------------------------------------------------
708  //--------------------------------------------------------------------------
709  std::tuple<std::string, OpenFlags::Flags, Access::Mode, uint16_t> GetOpenArgs()
710  {
711  std::vector<std::string> tokens;
712  Utils::splitString(tokens, args, ";");
713  if (tokens.size() != 4)
714  throw std::invalid_argument("Failed to parse open arguments.");
715  std::string url = tokens[0];
716  OpenFlags::Flags flags = static_cast<OpenFlags::Flags>(std::stoul(tokens[1]));
717  Access::Mode mode = static_cast<Access::Mode>(std::stoul(tokens[2]));
718  uint16_t timeout = static_cast<uint16_t>(std::stoul(tokens[3]));
719  return std::make_tuple(url, flags, mode, timeout);
720  }
721 
722  //--------------------------------------------------------------------------
724  //--------------------------------------------------------------------------
725  uint16_t GetCloseArgs() { return static_cast<uint16_t>(std::stoul(args)); }
726 
727  std::tuple<bool, uint16_t> GetStatArgs()
728  {
729  std::vector<std::string> tokens;
730  Utils::splitString(tokens, args, ";");
731  if (tokens.size() != 2)
732  throw std::invalid_argument("Failed to parse stat arguments.");
733  bool force = (tokens[0] == "true");
734  uint16_t timeout = static_cast<uint16_t>(std::stoul(tokens[1]));
735  return std::make_tuple(force, timeout);
736  }
737 
738  //--------------------------------------------------------------------------
740  //--------------------------------------------------------------------------
741  std::tuple<uint64_t, buffer_t, uint16_t> GetReadArgs()
742  {
743  std::vector<std::string> tokens;
744  Utils::splitString(tokens, args, ";");
745  if (tokens.size() != 3)
746  throw std::invalid_argument("Failed to parse read arguments.");
747  uint64_t offset = std::stoull(tokens[0]);
748  uint32_t length = std::stoul(tokens[1]);
749  auto buffer = BufferPool::Instance().Allocate( length );
750  uint16_t timeout = static_cast<uint16_t>(std::stoul(tokens[2]));
751  return std::make_tuple(offset, buffer, timeout);
752  }
753 
754  //--------------------------------------------------------------------------
756  //--------------------------------------------------------------------------
757  inline std::tuple<uint64_t, buffer_t, uint16_t> GetPgReadArgs() { return GetReadArgs(); }
758 
759  //--------------------------------------------------------------------------
761  //--------------------------------------------------------------------------
762  inline std::tuple<uint64_t, buffer_t, uint16_t> GetWriteArgs() { return GetReadArgs(); }
763 
764  //--------------------------------------------------------------------------
766  //--------------------------------------------------------------------------
767  inline std::tuple<uint64_t, buffer_t, uint16_t> GetPgWriteArgs() { return GetReadArgs(); }
768 
769  //--------------------------------------------------------------------------
771  //--------------------------------------------------------------------------
772  uint16_t GetSyncArgs() { return static_cast<uint16_t>(std::stoul(args)); }
773 
774  //--------------------------------------------------------------------------
776  //--------------------------------------------------------------------------
777  std::tuple<uint64_t, uint16_t> GetTruncateArgs()
778  {
779  std::vector<std::string> tokens;
780  Utils::splitString(tokens, args, ";");
781  if (tokens.size() != 2)
782  throw std::invalid_argument("Failed to parse truncate arguments.");
783  uint64_t size = std::stoull(tokens[0]);
784  uint16_t timeout = static_cast<uint16_t>(std::stoul(tokens[1]));
785  return std::make_tuple(size, timeout);
786  }
787 
788  //--------------------------------------------------------------------------
790  //--------------------------------------------------------------------------
791  std::tuple<ChunkList, uint16_t, std::vector<buffer_t>> GetVectorReadArgs()
792  {
793  std::vector<std::string> tokens;
794  Utils::splitString(tokens, args, ";");
795  ChunkList chunks;
796  chunks.reserve( tokens.size() - 1 );
797  std::vector<buffer_t> buffers;
798  buffers.reserve( tokens.size() - 1 );
799  for (size_t i = 0; i < tokens.size() - 1; i += 2)
800  {
801  uint64_t offset = std::stoull(tokens[i]);
802  uint32_t length = std::stoul(tokens[i + 1]);
803  auto buffer = BufferPool::Instance().Allocate( length );
804  chunks.emplace_back(offset, length, buffer->data());
805  buffers.emplace_back( std::move( buffer ) );
806  }
807  uint16_t timeout = static_cast<uint16_t>(std::stoul(tokens.back()));
808  return std::make_tuple(std::move(chunks), timeout, std::move(buffers));
809  }
810 
811  //--------------------------------------------------------------------------
813  //--------------------------------------------------------------------------
814  inline std::tuple<ChunkList, uint16_t, std::vector<buffer_t>> GetVectorWriteArgs() { return GetVectorReadArgs(); }
815 
816  File& file; //< the file object
817  const std::string action; //< the action to be executed
818  const std::string args; //< arguments for the operation
819  std::string orgststr; //< the original response status of the action
820  double nominalduration; //< the original duration of execution
821 };
822 
823 //------------------------------------------------------------------------------
825 //------------------------------------------------------------------------------
826 std::vector<std::string> ToColumns( const std::string &row )
827 {
828  std::vector<std::string> columns;
829  size_t quotecnt = 0;
830  size_t pos = 0;
831  //----------------------------------------------------------------------------
833  //----------------------------------------------------------------------------
834  while( pos != std::string::npos && pos < row.size() )
835  {
836  if( row[pos] == '"' ) // we are handling a quoted column
837  {
838  if( quotecnt > 0 ) // this is a closing quote
839  {
840  if( pos + 1 < row.size() && row[pos + 1] != ',' ) // if it is not the last character in the row it should be followed by a comma
841  throw std::runtime_error( "Parsing error: missing comma" );
842  --quotecnt; // strip the quote
843  ++pos; // move to the comma or end of row
844  continue;
845  }
846  else // this is a opening quote
847  {
848  ++quotecnt;
849  auto b = std::next( row.begin(), pos + 1 ); // iterator to the beginning of our column
850  size_t posend = row.find( "\",", pos + 1 ); // position of the cursor to the end of our column
851  if( posend == std::string::npos && row[row.size() - 1] == '"' )
852  posend = row.size() - 1;
853  else if( posend == std::string::npos )
854  throw std::runtime_error( "Parsing error: missing closing quote" );
855  auto e = std::next( row.begin(), posend ); // iterator to the end of our column
856  columns.emplace_back( b, e ); // add the column to the result
857  pos = posend; // move to the next column
858  continue;
859  }
860  }
861  else if( row[pos] == ',' ) // we are handling a column separator
862  {
863  if( pos + 1 < row.size() && row[pos + 1] == '"' ) // check if the column is quoted
864  {
865  ++pos; // if yes we will handle this with the logic reserved for quoted columns
866  continue;
867  }
868  auto b = std::next( row.begin(), pos + 1 ); // iterator to the beginning of our column
869  size_t posend = row.find( ',', pos + 1 ); // position of the cursor to the end of our column
870  if( posend == std::string::npos )
871  posend = row.size();
872  auto e = std::next( row.begin(), posend ); // iterator to the end of our column
873  columns.emplace_back( b, e ); // add the column to the result
874  pos = posend; // move to the next column
875  continue;
876  }
877  else if( pos == 0 ) // we are handling the 1st column if not quoted
878  {
879  size_t posend = row.find( ',', pos + 1 ); // position of the cursor to the end of our column
880  if( posend == std::string::npos )
881  posend = row.size();
882  auto end = std::next( row.begin(), posend ); // iterator to the end of our column
883  columns.emplace_back( row.begin(), end ); // add the column to the result
884  pos = posend; // move to the next column
885  continue;
886  }
887  else
888  {
889  throw std::runtime_error( "Parsing error: invalid input file." );
890  }
891  }
892  return columns;
893 }
894 
895 //------------------------------------------------------------------------------
897 //------------------------------------------------------------------------------
898 using action_list = std::multimap<double, ActionExecutor>;
899 
900 //------------------------------------------------------------------------------
903 //------------------------------------------------------------------------------
904 std::unordered_map<File*, action_list> ParseInput(const std::string& path,
905  double& t0,
906  double& t1,
907  std::unordered_map<File*, std::string>& filenames,
908  std::unordered_map<File*, double>& synchronicity,
909  std::unordered_map<File*, size_t>& responseerrors,
910  const std::vector<std::string>& option_regex)
911 {
912  std::unordered_map<File*, action_list> result;
913  std::unique_ptr<std::ifstream> fin( path.empty() ? nullptr : new std::ifstream( path, std::ifstream::in ) );
914  std::istream &input = path.empty() ? std::cin : *fin;
915  std::string line;
916  std::unordered_map<uint64_t, File*> files;
917  std::unordered_map<uint64_t, double> last_stop;
918  std::unordered_map<uint64_t, double> overlaps;
919  std::unordered_map<uint64_t, double> overlaps_cnt;
920 
921  t0 = 10e99;
922  t1 = 0;
923  while (input.good())
924  {
925  std::getline(input, line);
926  if (line.empty())
927  continue;
928  std::vector<std::string> tokens = ToColumns( line );
929  if (tokens.size() == 6)
930  tokens.emplace_back();
931  if (tokens.size() != 7)
932  {
933  throw std::invalid_argument("Invalid input file format.");
934  }
935 
936  uint64_t id = std::stoull(tokens[0]); // file object ID
937  std::string action = tokens[1]; // action name (e.g. Open)
938  double start = std::stod(tokens[2]); // start time
939  std::string args = tokens[3]; // operation arguments
940  double stop = std::stod(tokens[4]); // stop time
941  std::string status = tokens[5]; // operation status
942  std::string resp = tokens[6]; // server response
943 
944  if (option_regex.size())
945  {
946  for (auto& v : option_regex)
947  {
948  std::vector<std::string> tokens;
949  Utils::splitString(tokens, v, ":=");
950  std::regex src(tokens[0]);
951  if (tokens.size() != 2)
952  {
953  std::cerr
954  << "Error: invalid regex for argument replacement - must be format like <oldstring>:=<newstring>"
955  << std::endl;
956  exit(EINVAL);
957  }
958  else
959  {
960  // write the results to an output iterator
961  args = std::regex_replace(args, src, tokens[1]);
962  }
963  }
964  }
965 
966  if (start < t0)
967  t0 = start;
968  if (stop > t1)
969  t1 = stop;
970 
971  if (!files.count(id))
972  {
973  files[id] = new File(false);
974  files[id]->SetProperty("BundledClose", "true");
975  filenames[files[id]] = args;
976  filenames[files[id]].erase(args.find(";"));
977  overlaps[id] = 0;
978  overlaps_cnt[id] = 0;
979  last_stop[id] = stop;
980  }
981  else
982  {
983  overlaps_cnt[id]++;
984  if (start > last_stop[id])
985  {
986  overlaps[id]++;
987  }
988  last_stop[id] = stop;
989  }
990 
991  last_stop[id] = stop;
992  double nominal_duration = stop - start;
993 
994  if (status != "[SUCCESS]")
995  {
996  responseerrors[files[id]]++;
997  }
998  else
999  {
1000  result[files[id]].emplace(
1001  start, ActionExecutor(*files[id], action, args, status, resp, nominal_duration));
1002  }
1003  }
1004 
1005  for (auto& it : overlaps)
1006  {
1007  // compute the synchronicity of requests
1008  synchronicity[files[it.first]] = 100.0 * (it.second / overlaps_cnt[it.first]);
1009  }
1010  return result;
1011 }
1012 
1013 //------------------------------------------------------------------------------
1019 //------------------------------------------------------------------------------
1020 std::thread ExecuteActions(std::unique_ptr<File> file,
1021  action_list&& actions,
1022  double t0,
1023  double speed,
1024  ActionMetrics& metric,
1025  bool simulate)
1026 {
1027  std::thread t(
1028  [file{ std::move(file) },
1029  actions{ std::move(actions) },
1030  t0,
1031  &metric,
1032  simulate,
1033  &speed]() mutable
1034  {
1035  XrdSysSemaphore endsem(0);
1036  XrdSysSemaphore closesem(0);
1037  auto ending = std::make_shared<barrier_t>(endsem);
1038  auto closing = std::make_shared<barrier_t>(closesem);
1039 
1040  for (auto& p : actions)
1041  {
1042  auto& action = p.second;
1043 
1044  auto tdelay = t0 ? ((p.first + t0) - XrdCl::Action::timeNow()) : 0;
1045  if (tdelay > 0)
1046  {
1047  tdelay /= speed;
1048  metric.delays[action.Name() + "::tloss"] += tdelay;
1049  std::this_thread::sleep_for(std::chrono::milliseconds((int) (tdelay * 1000)));
1050  }
1051  else
1052  {
1053  metric.delays[action.Name() + "::tgain"] += tdelay;
1054  }
1055 
1056  mytimer_t timer;
1057  action.Execute(ending, closing, metric, simulate);
1058  metric.addDelays(action.Name(), "tnomi", action.NominalDuration());
1059  metric.addDelays(action.Name(), "texec", timer.elapsed());
1060  }
1061  ending.reset();
1062  closing.reset();
1063  endsem.Wait();
1064  file->GetProperty("LastURL", metric.url);
1065  file.reset();
1066  });
1067  return t;
1068 }
1069 
1070 }
1071 
1072 void usage()
1073 {
1074  std::cerr
1075  << "usage: xrdreplay [-p|--print] [-c|--create-data] [t|--truncate-data] [-l|--long] [-s|--summary] [-h|--help] [-r|--replace <arg>:=<newarg>] [-f|--suppress] <recordfilename>\n"
1076  << std::endl;
1077  std::cerr << " -h | --help : show this help" << std::endl;
1078  std::cerr
1079  << " -f | --suppress : force to run all IO with all successful result status - suppress all others"
1080  << std::endl;
1081  std::cerr
1082  << " - by default the player won't run with an unsuccessfully recorded IO"
1083  << std::endl;
1084  std::cerr << std::endl;
1085  std::cerr
1086  << " -p | --print : print only mode - shows all the IO for the given replay file without actually running any IO"
1087  << std::endl;
1088  std::cerr
1089  << " -s | --summary : print summary - shows all the aggregated IO counter summed for all files"
1090  << std::endl;
1091  std::cerr
1092  << " -l | --long : print long - show all file IO counter for each individual file"
1093  << std::endl;
1094  std::cerr
1095  << " -r | --replace <a>:=<b> : replace in the argument list the string <a> with <b> "
1096  << std::endl;
1097  std::cerr
1098  << " - option is usable several times e.g. to change storage prefixes or filenames"
1099  << std::endl;
1100  std::cerr << std::endl;
1101  std::cerr
1102  << "example: ... --replace file:://localhost:=root://xrootd.eu/ : redirect local file to remote"
1103  << std::endl;
1104  std::cerr << std::endl;
1105  exit(-1);
1106 }
1107 
1108 int main(int argc, char** argv)
1109 {
1110  XrdCl::ReplayArgs opt(argc, argv);
1111  int rc = 0;
1112 
1113  try
1114  {
1115  double t0 = 0;
1116  double t1 = 0;
1117  std::unordered_map<XrdCl::File*, std::string> filenames;
1118  std::unordered_map<XrdCl::File*, double> synchronicity;
1119  std::unordered_map<XrdCl::File*, size_t> responseerrors;
1120  auto actions = XrdCl::ParseInput(opt.path(),
1121  t0,
1122  t1,
1123  filenames,
1124  synchronicity,
1125  responseerrors,
1126  opt.regex()); // parse the input file
1127  std::vector<std::thread> threads;
1128  std::unordered_map<XrdCl::File*, XrdCl::ActionMetrics> metrics;
1129  threads.reserve(actions.size());
1130  double toffset = XrdCl::Action::timeNow() - t0;
1131  XrdCl::mytimer_t timer;
1132  XrdCl::ActionMetrics summetric;
1133  bool sampling_error = false;
1134 
1135  for (auto& action : actions)
1136  {
1137  metrics[action.first].fname = filenames[action.first];
1138  metrics[action.first].synchronicity = synchronicity[action.first];
1139  metrics[action.first].errors = responseerrors[action.first];
1140  if (metrics[action.first].errors)
1141  {
1142  sampling_error = true;
1143  }
1144  }
1145 
1146  if (sampling_error)
1147  {
1148  std::cerr << "Warning: IO file contains unsuccessful samples!" << std::endl;
1149  if (!opt.suppress_error())
1150  {
1151  std::cerr << "... run with [-f] or [--suppress] option to suppress unsuccessful IO events!"
1152  << std::endl;
1153  exit(-1);
1154  }
1155  }
1156 
1157 
1158  if (opt.print())
1159  toffset = 0; // indicate not to follow timing
1160 
1161  for (auto& action : actions)
1162  {
1163  // execute list of actions against file object
1164  threads.emplace_back(ExecuteActions(std::unique_ptr<XrdCl::File>(action.first),
1165  std::move(action.second),
1166  toffset,
1167  opt.speed(),
1168  metrics[action.first],
1169  opt.print()));
1170  }
1171 
1172  for (auto& t : threads) // wait until we are done
1173  t.join();
1174 
1175  if (opt.json())
1176  {
1177  std::cout << "{" << std::endl;
1178  if (opt.longformat())
1179  std::cout << " \"metrics\": [" << std::endl;
1180  }
1181 
1182  for (auto& metric : metrics)
1183  {
1184  if (opt.longformat())
1185  {
1186  std::cout << metric.second.Dump(opt.json());
1187  }
1188  summetric.add(metric.second);
1189  }
1190 
1191  if (opt.summary())
1192  std::cout << summetric.Dump(opt.json());
1193 
1194  if (opt.json())
1195  {
1196  if (opt.longformat())
1197  std::cout << " ]," << std::endl;
1198  }
1199 
1200  double tbench = timer.elapsed();
1201 
1202  if (opt.json())
1203  {
1204  {
1205  std::cout << " \"iosummary\": { " << std::endl;
1206  if (!opt.print())
1207  {
1208  std::cout << " \"player::runtime\": " << tbench << "," << std::endl;
1209  }
1210  std::cout << " \"player::speed\": " << opt.speed() << "," << std::endl;
1211  std::cout << " \"sampled::runtime\": " << t1 - t0 << "," << std::endl;
1212  std::cout << " \"volume::totalread\": " << summetric.getBytesRead() << "," << std::endl;
1213  std::cout << " \"volume::totalwrite\": " << summetric.getBytesWritten() << ","
1214  << std::endl;
1215  std::cout << " \"volume::read\": " << summetric.ios["Read::b"] << "," << std::endl;
1216  std::cout << " \"volume::write\": " << summetric.ios["Write::b"] << "," << std::endl;
1217  std::cout << " \"volume::pgread\": " << summetric.ios["PgRead::b"] << "," << std::endl;
1218  std::cout << " \"volume::pgwrite\": " << summetric.ios["PgWrite::b"] << "," << std::endl;
1219  std::cout << " \"volume::vectorread\": " << summetric.ios["VectorRead::b"] << ","
1220  << std::endl;
1221  std::cout << " \"volume::vectorwrite\": " << summetric.ios["VectorWrite::b"] << ","
1222  << std::endl;
1223  std::cout << " \"iops::read\": " << summetric.ios["Read::n"] << "," << std::endl;
1224  std::cout << " \"iops::write\": " << summetric.ios["Write::n"] << "," << std::endl;
1225  std::cout << " \"iops::pgread\": " << summetric.ios["PgRead::n"] << "," << std::endl;
1226  std::cout << " \"iops::pgwrite\": " << summetric.ios["PgRead::n"] << "," << std::endl;
1227  std::cout << " \"iops::vectorread\": " << summetric.ios["VectorRead::n"] << ","
1228  << std::endl;
1229  std::cout << " \"iops::vectorwrite\": " << summetric.ios["VectorRead::n"] << ","
1230  << std::endl;
1231  std::cout << " \"files::read\": " << summetric.ios["OpenR::n"] << "," << std::endl;
1232  std::cout << " \"files::write\": " << summetric.ios["OpenW::n"] << "," << std::endl;
1233  std::cout << " \"datasetsize::read\": " << summetric.ios["Read::o"] << "," << std::endl;
1234  std::cout << " \"datasetsize::write\": " << summetric.ios["Write::o"] << "," << std::endl;
1235  if (!opt.print())
1236  {
1237  std::cout << " \"bandwidth::mb::read\": "
1238  << summetric.getBytesRead() / tbench / 1000000.0 << "," << std::endl;
1239  std::cout << " \"bandwdith::mb::write\": "
1240  << summetric.getBytesWritten() / tbench / 1000000.0 << "," << std::endl;
1241  std::cout << " \"performancemark\": " << (100.0 * (t1 - t0) / tbench) << ","
1242  << std::endl;
1243  std::cout << " \"gain::read\":"
1244  << (100.0 * summetric.delays["Read::tnomi"] / summetric.delays["Read::tmeas"])
1245  << "," << std::endl;
1246  std::cout << " \"gain::write\":"
1247  << (100.0 * summetric.delays["Write::tnomi"] / summetric.delays["Write::tmeas"])
1248  << std::endl;
1249  }
1250  std::cout << " \"synchronicity::read\":"
1251  << summetric.aggregated_synchronicity.ReadSynchronicity() << "," << std::endl;
1252  std::cout << " \"synchronicity::write\":"
1253  << summetric.aggregated_synchronicity.WriteSynchronicity() << "," << std::endl;
1254  std::cout << " \"response::error:\":" << summetric.ios["All::e"] << std::endl;
1255  std::cout << " }" << std::endl;
1256  std::cout << "}" << std::endl;
1257  }
1258  }
1259  else
1260  {
1261  std::cout << "# =============================================" << std::endl;
1262  if (!opt.print())
1263  std::cout << "# IO Summary" << std::endl;
1264  else
1265  std::cout << "# IO Summary (print mode)" << std::endl;
1266  std::cout << "# =============================================" << std::endl;
1267  if (!opt.print())
1268  {
1269  std::cout << "# Total Runtime : " << std::fixed << tbench << " s" << std::endl;
1270  }
1271  std::cout << "# Sampled Runtime : " << std::fixed << t1 - t0 << " s" << std::endl;
1272  std::cout << "# Playback Speed : " << std::fixed << std::setprecision(2) << opt.speed()
1273  << std::endl;
1274  std::cout << "# IO Volume (R) : " << std::fixed
1276  << " [ std:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::b"])
1277  << " vec:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["VectorRead::b"])
1278  << " page:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["PgRead::b"])
1279  << " ] " << std::endl;
1280  std::cout << "# IO Volume (W) : " << std::fixed
1282  << " [ std:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["Write::b"])
1283  << " vec:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["VectorWrite::b"])
1284  << " page:" << XrdCl::ActionMetrics::humanreadable(summetric.ios["PgWrite::b"])
1285  << " ] " << std::endl;
1286  std::cout << "# IOPS (R) : " << std::fixed << summetric.getIopsRead()
1287  << " [ std:" << summetric.ios["Read::n"]
1288  << " vec:" << summetric.ios["VectorRead::n"]
1289  << " page:" << summetric.ios["PgRead::n"] << " ] " << std::endl;
1290  std::cout << "# IOPS (W) : " << std::fixed << summetric.getIopsWrite()
1291  << " [ std:" << summetric.ios["Write::n"]
1292  << " vec:" << summetric.ios["VectorWrite::n"]
1293  << " page:" << summetric.ios["PgWrite::n"] << " ] " << std::endl;
1294  std::cout << "# Files (R) : " << std::fixed << summetric.ios["OpenR::n"] << std::endl;
1295  std::cout << "# Files (W) : " << std::fixed << summetric.ios["OpenW::n"] << std::endl;
1296  std::cout << "# Datasize (R) : " << std::fixed
1297  << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::o"]) << std::endl;
1298  std::cout << "# Datasize (W) : " << std::fixed
1299  << XrdCl::ActionMetrics::humanreadable(summetric.ios["Write::o"]) << std::endl;
1300  if (!opt.print())
1301  {
1302  std::cout << "# IO BW (R) : " << std::fixed << std::setprecision(2)
1303  << summetric.getBytesRead() / tbench / 1000000.0 << " MB/s" << std::endl;
1304  std::cout << "# IO BW (W) : " << std::fixed << std::setprecision(2)
1305  << summetric.getBytesRead() / tbench / 1000000.0 << " MB/s" << std::endl;
1306  }
1307  std::cout << "# ---------------------------------------------" << std::endl;
1308  std::cout << "# Quality Estimation" << std::endl;
1309  std::cout << "# ---------------------------------------------" << std::endl;
1310  if (!opt.print())
1311  {
1312  std::cout << "# Performance Mark : " << std::fixed << std::setprecision(2)
1313  << (100.0 * (t1 - t0) / tbench) << "%" << std::endl;
1314  std::cout << "# Gain Mark(R) : " << std::fixed << std::setprecision(2)
1315  << (100.0 * summetric.delays["Read::tnomi"] / summetric.delays["Read::tmeas"])
1316  << "%" << std::endl;
1317  std::cout << "# Gain Mark(W) : " << std::fixed << std::setprecision(2)
1318  << (100.0 * summetric.delays["Write::tnomi"] / summetric.delays["Write::tmeas"])
1319  << "%" << std::endl;
1320  }
1321  std::cout << "# Synchronicity(R) : " << std::fixed << std::setprecision(2)
1322  << summetric.aggregated_synchronicity.ReadSynchronicity() << "%" << std::endl;
1323  std::cout << "# Synchronicity(W) : " << std::fixed << std::setprecision(2)
1324  << summetric.aggregated_synchronicity.WriteSynchronicity() << "%" << std::endl;
1325  if (!opt.print())
1326  {
1327  std::cout << "# ---------------------------------------------" << std::endl;
1328  std::cout << "# Response Errors : " << std::fixed << summetric.ios["All::e"] << std::endl;
1329  std::cout << "# =============================================" << std::endl;
1330  if (summetric.ios["All::e"])
1331  {
1332  std::cerr << "Error: replay job failed with IO errors!" << std::endl;
1333  rc = -5;
1334  }
1335  }
1336  if (opt.create() || opt.verify())
1337  {
1338  std::cout << "# ---------------------------------------------" << std::endl;
1339  if (opt.create())
1340  {
1341  std::cout << "# Creating Dataset ..." << std::endl;
1342  }
1343  else
1344  {
1345  std::cout << "# Verifying Dataset ..." << std::endl;
1346  }
1347  uint64_t created_sofar = 0;
1348  for (auto& metric : metrics)
1349  {
1350  if (metric.second.getBytesRead() && !metric.second.getBytesWritten())
1351  {
1352  std::cout << "# ............................................." << std::endl;
1353  std::cout << "# file: " << metric.second.fname << std::endl;
1354  std::cout << "# size: "
1355  << XrdCl::ActionMetrics::humanreadable(metric.second.ios["Read::o"]) << " [ "
1356  << XrdCl::ActionMetrics::humanreadable(created_sofar) << " out of "
1357  << XrdCl::ActionMetrics::humanreadable(summetric.ios["Read::o"]) << " ] "
1358  << std::setprecision(2) << " ( "
1359  << 100.0 * created_sofar / summetric.ios["Read::o"] << "% )" << std::endl;
1360  if (!XrdCl::AssureFile(
1361  metric.second.fname, metric.second.ios["Read::o"], opt.truncate(), opt.verify()))
1362  {
1363  if (opt.verify())
1364  {
1365  rc = -5;
1366  }
1367  else
1368  {
1369  std::cerr << "Error: failed to assure that file " << metric.second.fname
1370  << " is stored with a size of "
1371  << XrdCl::ActionMetrics::humanreadable(metric.second.ios["Read::o"])
1372  << " !!!";
1373  rc = -5;
1374  }
1375  }
1376  }
1377  }
1378  }
1379  }
1380  }
1381  catch (const std::invalid_argument& ex)
1382  {
1383  std::cout << ex.what() << std::endl; // print parsing errors
1384  return 1;
1385  }
1386 
1387  return rc;
1388 }
static unsigned long long int stoull(const std::string &s)
simple integer parsing, to be replaced by std::stoll when C++11 can be used
void usage()
int main(int argc, char **argv)
XrdOucString File
void getline(uchar *buff, int blen)
Executes an action registered in the csv file.
Definition: XrdClReplay.cc:315
double NominalDuration() const
Get nominal duration variable.
Definition: XrdClReplay.cc:674
void Execute(std::shared_ptr< barrier_t > &ending, std::shared_ptr< barrier_t > &closing, ActionMetrics &metric, bool simulate)
Definition: XrdClReplay.cc:346
std::string Name() const
Get aciton name.
Definition: XrdClReplay.cc:679
ActionExecutor(File &file, const std::string &action, const std::string &args, const std::string &orgststr, const std::string &resp, const double &duration)
Definition: XrdClReplay.cc:328
Buffer pool - to limit memory consumption.
Definition: XrdClReplay.cc:57
std::shared_ptr< std::vector< char > > Allocate(size_t length)
Definition: XrdClReplay.cc:73
static BufferPool & Instance()
Single instance access.
Definition: XrdClReplay.cc:63
static Log * GetLog()
Get default log.
A file.
Definition: XrdClFile.hh:46
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
Args parse for XrdClReplay.
std::vector< std::string > & regex()
std::string & path()
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
~barrier_t()
Destructor.
Definition: XrdClReplay.cc:205
XrdSysSemaphore & get()
Definition: XrdClReplay.cc:207
barrier_t(XrdSysSemaphore &sem)
Definition: XrdClReplay.cc:197
Timer helper class.
Definition: XrdClReplay.cc:156
double elapsed() const
Definition: XrdClReplay.cc:174
mytimer_t()
Constructor (record start time)
Definition: XrdClReplay.cc:161
void reset()
Reset the start time.
Definition: XrdClReplay.cc:169
VectorWriteImpl< false > VectorWrite(Ctx< File > file, Arg< ChunkList > chunks, uint16_t timeout=0)
Factory for creating VectorWriteImpl objects.
std::vector< std::string > ToColumns(const std::string &row)
Split a row into columns.
Definition: XrdClReplay.cc:826
StatImpl< false > Stat(Ctx< File > file, Arg< bool > force, uint16_t timeout=0)
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
const uint64_t AppMsg
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::multimap< double, ActionExecutor > action_list
List of actions: start time - action.
Definition: XrdClReplay.cc:898
PgWriteImpl< false > PgWrite(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, Arg< std::vector< uint32_t >> cksums, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
std::unordered_map< File *, action_list > ParseInput(const std::string &path, double &t0, double &t1, std::unordered_map< File *, std::string > &filenames, std::unordered_map< File *, double > &synchronicity, std::unordered_map< File *, size_t > &responseerrors, const std::vector< std::string > &option_regex)
Definition: XrdClReplay.cc:904
std::thread ExecuteActions(std::unique_ptr< File > file, action_list &&actions, double t0, double speed, ActionMetrics &metric, bool simulate)
TruncateImpl< false > Truncate(Ctx< File > file, Arg< uint64_t > size, uint16_t timeout)
PgReadImpl< false > PgRead(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
bool AssureFile(const std::string &url, uint64_t size, bool viatruncate, bool verify)
AssureFile creates input data files on the fly if required.
Definition: XrdClReplay.cc:216
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
std::vector< char > buffer_t
Definition: XrdZipUtils.hh:56
Mode
Access mode.
@ UR
owner readable
@ UW
owner writable
@ UX
owner executable/browsable
Metrics struct storing all timing and IO information of an action.
synchronicity_t aggregated_synchronicity
void add(const ActionMetrics &other)
std::map< std::string, uint64_t > ios
void addIos(const std::string &action, const std::string &field, double value)
std::map< std::string, double > delays
size_t getBytesWritten() const
void addDelays(const std::string &action, const std::string &field, double value)
std::string Dump(bool json) const
static std::string humanreadable(uint64_t insize)
static double timeNow()
Get curretn unix time in ns precision as a double.
Definition: XrdClAction.hh:78
Describe a data chunk for vector read.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
@ Update
Open for reading and writing.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97