XRootD
XrdCl::ActionExecutor Class Reference

Executes an action registered in the csv file. More...

+ Collaboration diagram for XrdCl::ActionExecutor:

Public Member Functions

 ActionExecutor (File &file, const std::string &action, const std::string &args, const std::string &orgststr, const std::string &resp, const double &duration)
 
void Execute (std::shared_ptr< barrier_t > &ending, std::shared_ptr< barrier_t > &closing, ActionMetrics &metric, bool simulate)
 
std::string Name () const
 Get aciton name. More...
 
double NominalDuration () const
 Get nominal duration variable. More...
 

Detailed Description

Executes an action registered in the csv file.

Definition at line 314 of file XrdClReplay.cc.

Constructor & Destructor Documentation

◆ ActionExecutor()

XrdCl::ActionExecutor::ActionExecutor ( File file,
const std::string &  action,
const std::string &  args,
const std::string &  orgststr,
const std::string &  resp,
const double &  duration 
)
inline

Constructor

Parameters
file: the file that should be the context of the action
action: the action to be executed
args: arguments for the action
orgststr: original status
resp: original response
duration: nominal duration of this action

Definition at line 328 of file XrdClReplay.cc.

334  : file(file)
335  , action(action)
336  , args(args)
337  , orgststr(orgststr)
338  , nominalduration(duration)
339  {
340  }

Member Function Documentation

◆ Execute()

void XrdCl::ActionExecutor::Execute ( std::shared_ptr< barrier_t > &  ending,
std::shared_ptr< barrier_t > &  closing,
ActionMetrics metric,
bool  simulate 
)
inline

Execute the action

Parameters
ending: synchronization object for ending the execution

Definition at line 346 of file XrdClReplay.cc.

350  {
351  if (action == "Open") // open action
352  {
353  std::string url;
354  OpenFlags::Flags flags;
355  Access::Mode mode;
356  uint16_t timeout;
357  std::tie(url, flags, mode, timeout) = GetOpenArgs();
358 
359  std::string lmetric;
360  if ((flags & OpenFlags::Update) || (flags & OpenFlags::Write))
361  {
362  metric.ios["OpenW::n"]++;
363  }
364  else
365  {
366  metric.ios["OpenR::n"]++;
367  }
368 
369  metric.ios["Open::n"]++;
370 
371  mytimer_t timer;
372 
373  if (!simulate)
374  WaitFor(Open(file, url, flags, mode, timeout) >>
375  [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
376  {
377  metric.addIos("Open", "e", HandleStatus(s, orgststr, "Open"));
378  metric.addDelays("Open", "tmeas", timer.elapsed());
379  ending.reset();
380  closing.reset();
381  });
382  else
383  {
384  ending.reset();
385  closing.reset();
386  }
387  }
388  else if (action == "Close") // close action
389  {
390  uint16_t timeout = GetCloseArgs();
391  mytimer_t timer;
392 
393  if (closing)
394  {
395  auto& sem = closing->get();
396  closing.reset();
397  sem.Wait();
398  }
399 
400  metric.ios["Close::n"]++;
401 
402  if (!simulate)
403  Async(Close(file, timeout) >>
404  [this, orgststr{ orgststr }, ending, timer, &metric](XRootDStatus& s) mutable
405  {
406  metric.addIos("Close", "e", HandleStatus(s, orgststr, "Close"));
407  metric.addDelays("Close", "tmeas", timer.elapsed());
408  ending.reset();
409  });
410  else
411  {
412  ending.reset();
413  }
414  }
415  else if (action == "Stat") // stat action
416  {
417  bool force;
418  uint16_t timeout;
419  std::tie(force, timeout) = GetStatArgs();
420  metric.ios["Stat::n"]++;
421  mytimer_t timer;
422 
423  if (!simulate)
424  Async(Stat(file, force, timeout) >>
425  [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s, StatInfo& r) mutable
426  {
427  metric.addIos("Stat", "e", HandleStatus(s, orgststr, "Stat"));
428  metric.addDelays("Stat", "tmeas", timer.elapsed());
429  ending.reset();
430  closing.reset();
431  });
432  else
433  {
434  ending.reset();
435  closing.reset();
436  }
437  }
438  else if (action == "Read") // read action
439  {
440  uint64_t offset;
441  buffer_t buffer;
442  uint16_t timeout;
443  std::tie(offset, buffer, timeout) = GetReadArgs();
444  metric.ios["Read::n"]++;
445  metric.ios["Read::b"] += buffer->size();
446  if ((offset + buffer->size()) > metric.ios["Read::o"])
447  metric.ios["Read::o"] = offset + buffer->size();
448 
449  mytimer_t timer;
450  if (!simulate)
451  Async(Read(file, offset, buffer->size(), buffer->data(), timeout) >>
452  [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s,
453  ChunkInfo& r) mutable
454  {
455  metric.addIos("Read", "e", HandleStatus(s, orgststr, "Read"));
456  metric.addDelays("Read", "tmeas", timer.elapsed());
457  buffer.reset();
458  ending.reset();
459  closing.reset();
460  });
461  else
462  {
463  buffer.reset();
464  ending.reset();
465  closing.reset();
466  }
467  }
468  else if (action == "PgRead") // pgread action
469  {
470  uint64_t offset;
471  buffer_t buffer;
472  uint16_t timeout;
473  std::tie(offset, buffer, timeout) = GetPgReadArgs();
474  metric.ios["PgRead::n"]++;
475  metric.ios["PgRead::b"] += buffer->size();
476  if ((offset + buffer->size()) > metric.ios["Read::o"])
477  metric.ios["Read::o"] = offset + buffer->size();
478  mytimer_t timer;
479  if (!simulate)
480  Async(PgRead(file, offset, buffer->size(), buffer->data(), timeout) >>
481  [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s,
482  PageInfo& r) mutable
483  {
484  metric.addIos("PgRead", "e", HandleStatus(s, orgststr, "PgRead"));
485  metric.addDelays("PgRead", "tmeas", timer.elapsed());
486  buffer.reset();
487  ending.reset();
488  closing.reset();
489  });
490  else
491  {
492  buffer.reset();
493  ending.reset();
494  closing.reset();
495  }
496  }
497  else if (action == "Write") // write action
498  {
499  uint64_t offset;
500  buffer_t buffer;
501  uint16_t timeout;
502  std::tie(offset, buffer, timeout) = GetWriteArgs();
503  metric.ios["Write::n"]++;
504  metric.ios["Write::b"] += buffer->size();
505  if ((offset + buffer->size()) > metric.ios["Write::o"])
506  metric.ios["Write::o"] = offset + buffer->size();
507  mytimer_t timer;
508 
509  if (!simulate)
510  Async(
511  Write(file, offset, buffer->size(), buffer->data(), timeout) >>
512  [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
513  {
514  metric.addIos("Write", "e", HandleStatus(s, orgststr, "Write"));
515  metric.addDelays("Write", "tmeas", timer.elapsed());
516  buffer.reset();
517  ending.reset();
518  closing.reset();
519  });
520  else
521  {
522  buffer.reset();
523  ending.reset();
524  closing.reset();
525  }
526  }
527  else if (action == "PgWrite") // pgwrite action
528  {
529  uint64_t offset;
530  buffer_t buffer;
531  uint16_t timeout;
532  std::tie(offset, buffer, timeout) = GetPgWriteArgs();
533  metric.ios["PgWrite::n"]++;
534  metric.ios["PgWrite::b"] += buffer->size();
535  if ((offset + buffer->size()) > metric.ios["Write::o"])
536  metric.ios["Write::o"] = offset + buffer->size();
537  mytimer_t timer;
538  if (!simulate)
539  Async(
540  PgWrite(file, offset, buffer->size(), buffer->data(), timeout) >>
541  [buffer, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
542  {
543  metric.addIos("PgWrite", "e", HandleStatus(s, orgststr, "PgWrite"));
544  metric.addDelays("PgWrite", "tmeas", timer.elapsed());
545  buffer.reset();
546  ending.reset();
547  closing.reset();
548  });
549  else
550  {
551  buffer.reset();
552  ending.reset();
553  closing.reset();
554  }
555  }
556  else if (action == "Sync") // sync action
557  {
558  uint16_t timeout = GetSyncArgs();
559  metric.ios["Sync::n"]++;
560  mytimer_t timer;
561  if (!simulate)
562  Async(Sync(file, timeout) >>
563  [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
564  {
565  metric.addIos("Sync", "e", HandleStatus(s, orgststr, "Sync"));
566  metric.addDelays("Sync", "tmeas", timer.elapsed());
567  ending.reset();
568  closing.reset();
569  });
570  else
571  {
572  ending.reset();
573  closing.reset();
574  }
575  }
576  else if (action == "Truncate") // truncate action
577  {
578  uint64_t size;
579  uint16_t timeout;
580  std::tie(size, timeout) = GetTruncateArgs();
581  metric.ios["Truncate::n"]++;
582  if (size > metric.ios["Truncate::o"])
583  metric.ios["Truncate::o"] = size;
584 
585  mytimer_t timer;
586  if (!simulate)
587  Async(Truncate(file, size, timeout) >>
588  [this, orgststr{ orgststr }, ending, closing, timer, &metric](XRootDStatus& s) mutable
589  {
590  metric.addIos("Truncate", "e", HandleStatus(s, orgststr, "Truncate"));
591  metric.addDelays("Truncate", "tmeas", timer.elapsed());
592  ending.reset();
593  closing.reset();
594  });
595  else
596  {
597  ending.reset();
598  closing.reset();
599  }
600  }
601  else if (action == "VectorRead") // vector read action
602  {
603  ChunkList chunks;
604  uint16_t timeout;
605  std::vector<buffer_t> buffers;
606  std::tie(chunks, timeout, buffers) = GetVectorReadArgs();
607  metric.ios["VectorRead::n"]++;
608  for (auto& ch : chunks)
609  {
610  metric.ios["VectorRead::b"] += ch.GetLength();
611  if ((ch.GetOffset() + ch.GetLength()) > metric.ios["Read::o"])
612  metric.ios["Read::o"] = ch.GetOffset() + ch.GetLength();
613  }
614 
615  mytimer_t timer;
616  if (!simulate)
617  Async(
618  VectorRead(file, chunks, timeout) >>
619  [this, orgststr{ orgststr }, buffers, ending, closing, timer, &metric](XRootDStatus& s, VectorReadInfo& r) mutable
620  {
621  metric.addIos("VectorRead", "e", HandleStatus(s, orgststr, "VectorRead"));
622  metric.addDelays("VectorRead", "tmeas", timer.elapsed());
623  buffers.clear();
624  ending.reset();
625  closing.reset();
626  });
627  else
628  {
629  buffers.clear();
630  ending.reset();
631  closing.reset();
632  }
633  }
634  else if (action == "VectorWrite") // vector write
635  {
636  ChunkList chunks;
637  uint16_t timeout;
638  std::vector<buffer_t> buffers;
639  std::tie(chunks, timeout, buffers) = GetVectorWriteArgs();
640  metric.ios["VectorWrite::n"]++;
641  for (auto& ch : chunks)
642  {
643  metric.ios["VectorWrite::b"] += ch.GetLength();
644  if ((ch.GetOffset() + ch.GetLength()) > metric.ios["Write::o"])
645  metric.ios["Write::o"] = ch.GetOffset() + ch.GetLength();
646  }
647  mytimer_t timer;
648  if (!simulate)
649  Async(VectorWrite(file, chunks, timeout) >>
650  [this, orgststr{ orgststr }, buffers, ending, closing, timer, &metric](XRootDStatus& s) mutable
651  {
652  metric.addIos("VectorWrite", "e", HandleStatus(s, orgststr, "VectorWrite"));
653  metric.addDelays("VectorWrite", "tmeas", timer.elapsed());
654  buffers.clear();
655  ending.reset();
656  closing.reset();
657  });
658  else
659  {
660  buffers.clear();
661  ending.reset();
662  closing.reset();
663  }
664  }
665  else
666  {
667  DefaultEnv::GetLog()->Warning(AppMsg, "Cannot replyt %s action.", action.c_str());
668  }
669  }
static Log * GetLog()
Get default log.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
VectorWriteImpl< false > VectorWrite(Ctx< File > file, Arg< ChunkList > chunks, uint16_t timeout=0)
Factory for creating VectorWriteImpl objects.
StatImpl< false > Stat(Ctx< File > file, Arg< bool > force, uint16_t timeout=0)
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
const uint64_t AppMsg
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
PgWriteImpl< false > PgWrite(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, Arg< std::vector< uint32_t >> cksums, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.
std::vector< ChunkInfo > ChunkList
List of chunks.
TruncateImpl< false > Truncate(Ctx< File > file, Arg< uint64_t > size, uint16_t timeout)
PgReadImpl< false > PgRead(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating PgReadImpl objects.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
Mode
Access mode.
Flags
Open flags, may be or'd when appropriate.
@ Write
Open only for writing.
@ Update
Open for reading and writing.

References XrdCl::ActionMetrics::addDelays(), XrdCl::ActionMetrics::addIos(), XrdCl::AppMsg, XrdCl::Async(), XrdCl::Close(), XrdCl::mytimer_t::elapsed(), XrdCl::DefaultEnv::GetLog(), XrdCl::ActionMetrics::ios, XrdCl::Open(), XrdCl::PgRead(), XrdCl::PgWrite(), XrdCl::Read(), XrdCl::mytimer_t::reset(), XrdCl::Stat(), XrdCl::Sync(), XrdCl::Truncate(), XrdCl::OpenFlags::Update, XrdCl::VectorRead(), XrdCl::VectorWrite(), XrdCl::WaitFor(), XrdCl::Log::Warning(), XrdCl::Write(), and XrdCl::OpenFlags::Write.

+ Here is the call graph for this function:

◆ Name()

std::string XrdCl::ActionExecutor::Name ( ) const
inline

Get aciton name.

Definition at line 679 of file XrdClReplay.cc.

679 { return action; }

◆ NominalDuration()

double XrdCl::ActionExecutor::NominalDuration ( ) const
inline

Get nominal duration variable.

Definition at line 674 of file XrdClReplay.cc.

674 { return nominalduration; }

The documentation for this class was generated from the following file: