XRootD
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

+ Inheritance diagram for XrdPfc::Cache:
+ Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor. More...
 
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue. More...
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *ioP, int opts=0)=0
 Obtain a new IO object that fronts existing XrdOucCacheIO. More...
 
void ClearPurgeProtectedSet ()
 
bool Config (const char *config_filename, const char *parameters)
 Parse configuration file. More...
 
virtual int ConsiderCached (const char *url)
 
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached. More...
 
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
 
void DeRegisterPrefetchFile (File *)
 
long long DetermineFullFileSize (const std::string &cinfo_fname)
 
void ExecuteCommandUrl (const std::string &command_url)
 
void FileSyncDone (File *, bool high_debug)
 
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
 
XrdXrootdGStreamGetGStream ()
 
XrdSysErrorGetLog ()
 
FileGetNextFileToPrefetch ()
 
XrdOssGetOss () const
 
PurgePinGetPurgePin () const
 
XrdSysTraceGetTrace ()
 
bool IsFileActiveOrPurgeProtected (const std::string &) const
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
void Prefetch ()
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk. More...
 
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration. More...
 
ResourceMonitorRefResMon ()
 
void RegisterPrefetchFile (File *)
 
void ReleaseFile (File *, IO *)
 
void ReleaseRAM (char *buf, long long size)
 
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction. More...
 
char * RequestRAM (long long size)
 
void ScheduleFileSync (File *f)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Unlink (const char *url)
 
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache. More...
 
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
 
long long WritesSinceLastCall ()
 
- Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
 
virtual ~XrdOucCache ()
 Destructor. More...
 
virtual int Rename (const char *oldp, const char *newp)
 
virtual int Rmdir (const char *dirp)
 
virtual int Truncate (const char *path, off_t size)
 
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)
 

Static Public Member Functions

static const ConfigurationConf ()
 
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation. More...
 
static CacheGetInstance ()
 Singleton access. More...
 
static ResourceMonitorResMon ()
 
static const CacheTheOne ()
 
static bool VCheck (XrdVersionInfo &urVersion)
 Version check. More...
 

Static Public Attributes

static XrdSchedulerschedP = nullptr
 
- Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file) More...
 
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write) More...
 
static const int optRW = 0x0004
 File is read/write (o/w read/only) More...
 
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache. More...
 

Additional Inherited Members

- Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
 
enum  XeqCmd { xeqNoop = 0 }
 
- Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc). More...
 
XrdOucCacheStats Statistics
 

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 150 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger logger,
XrdOucEnv env 
)

Constructor.

Definition at line 159 of file XrdPfc.cc.

159  :
160  XrdOucCache("pfc"),
161  m_env(env),
162  m_log(logger, "XrdPfc_"),
163  m_trace(new XrdSysTrace("XrdPfc", logger)),
164  m_traceID("Cache"),
165  m_oss(0),
166  m_gstream(0),
167  m_purge_pin(0),
168  m_prefetch_condVar(0),
169  m_prefetch_enabled(false),
170  m_RAM_used(0),
171  m_RAM_write_queue(0),
172  m_RAM_std_size(0),
173  m_isClient(false),
174  m_active_cond(0)
175 {
176  // Default log level is Warning.
177  m_trace->What = 2;
178 }
XrdOucCache(const char *ctype)
Definition: XrdOucCache.hh:700

References XrdSysTrace::What.

Referenced by CreateInstance().

+ Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block b,
bool  from_read 
)

Add downloaded block in write queue.

Definition at line 222 of file XrdPfc.cc.

223 {
224  TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
225 
226  {
227  XrdSysMutexHelper lock(&m_RAM_mutex);
228  m_RAM_write_queue += b->get_size();
229  }
230 
231  m_writeQ.condVar.Lock();
232  if (fromRead)
233  m_writeQ.queue.push_back(b);
234  else
235  m_writeQ.queue.push_front(b);
236  m_writeQ.size++;
237  m_writeQ.condVar.Signal();
238  m_writeQ.condVar.UnLock();
239 }
#define TRACE(act, x)
Definition: XrdTrace.hh:63
int get_size() const
Definition: XrdPfcFile.hh:142
File * get_file() const
Definition: XrdPfcFile.hh:146
long long m_offset
Definition: XrdPfcFile.hh:120
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:270

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

+ Here is the call graph for this function:

◆ Attach() [1/2]

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO io,
int  Options = 0 
)
virtual

Implements XrdOucCache.

Definition at line 180 of file XrdPfc.cc.

181 {
182  const char* tpfx = "Attach() ";
183 
184  if (Cache::GetInstance().Decide(io))
185  {
186  TRACE(Info, tpfx << obfuscateAuth(io->Path()));
187 
188  IO *cio;
189 
190  if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
191  {
192  cio = new IOFileBlock(io, *this);
193  }
194  else
195  {
196  IOFile *iof = new IOFile(io, *this);
197 
198  if ( ! iof->HasFile())
199  {
200  delete iof;
201  // TODO - redirect instead. But this is kind of an awkward place for it.
202  // errno is set during IOFile construction.
203  TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
204  return io;
205  }
206 
207  cio = iof;
208  }
209 
210  TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
211  ((loc && loc[0] != 0) ? loc : "<deferred open>"));
212 
213  return cio;
214  }
215  else
216  {
217  TRACE(Info, tpfx << "decision decline " << io->Path());
218  }
219  return io;
220 }
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
Definition: XrdPfcTrace.hh:59
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
Definition: XrdOucCache.hh:161
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:203
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:133
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:138
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:39
bool HasFile() const
Check if File was opened successfully.
Definition: XrdPfcIOFile.hh:48
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41

References Macaroons::Debug, Decide(), Macaroons::Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

+ Here is the call graph for this function:

◆ Attach() [2/2]

virtual XrdOucCacheIO* XrdOucCache::Attach

Obtain a new IO object that fronts existing XrdOucCacheIO.

◆ ClearPurgeProtectedSet()

void Cache::ClearPurgeProtectedSet ( )

Definition at line 670 of file XrdPfc.cc.

671 {
672  XrdSysCondVarHelper lock(&m_active_cond);
673  m_purge_delay_set.clear();
674 }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), and XrdPfc::ResourceMonitor::perform_purge_task_cleanup().

+ Here is the caller graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 135 of file XrdPfc.cc.

135 { return m_instance->RefConfiguration(); }

References RefConfiguration().

Referenced by XrdPfc::FPurgeState::CheckFile(), XrdPfc::ResourceMonitor::heart_beat(), XrdPfc::OldStylePurgeDriver(), XrdPfc::ResourceMonitor::perform_purge_check(), Proto_ResourceMonitorHeartBeat(), and XrdPfc::ResourceMonitor::update_vs_and_file_usage_info().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char *  config_filename,
const char *  parameters 
)

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
Returns
parse status

Definition at line 407 of file XrdPfcConfiguration.cc.

408 {
409  // Indicate whether or not we are a client instance
410  const char *theINS = getenv("XRDINSTANCE");
411  m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
412 
413  // Tell everyone else we are a caching proxy
414  XrdOucEnv::Export("XRDPFC", 1);
415 
416  XrdOucEnv myEnv;
417  XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
418 
419  if (! config_filename || ! *config_filename)
420  {
421  TRACE(Error, "Config() configuration file not specified.");
422  return false;
423  }
424 
425  int fd;
426  if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
427  {
428  TRACE( Error, "Config() can't open configuration file " << config_filename);
429  return false;
430  }
431 
432  Config.Attach(fd);
433  static const char *cvec[] = { "*** pfc plugin config:", 0 };
434  Config.Capture(cvec);
435 
436  // Obtain OFS configurator for OSS plugin.
437  XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
438  &XrdVERSIONINFOVAR(XrdOucGetCache));
439  if (! ofsCfg) return false;
440 
441  TmpConfiguration tmpc;
442 
443  // Adjust default parameters for client/serverless caching
444  if (m_isClient)
445  {
446  m_configuration.m_bufferSize = 128 * 1024; // same as normal.
447  m_configuration.m_wqueue_blocks = 8;
448  m_configuration.m_wqueue_threads = 1;
449  }
450 
451  // If network checksum processing is the default, indicate so.
452  if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
453 
454  // Actual parsing of the config file.
455  bool retval = true, aOK = true;
456  char *var;
457  while ((var = Config.GetMyFirstWord()))
458  {
459  if (! strcmp(var,"pfc.osslib"))
460  {
461  retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
462  }
463  else if (! strcmp(var,"pfc.cschk"))
464  {
465  retval = xcschk(Config);
466  }
467  else if (! strcmp(var,"pfc.decisionlib"))
468  {
469  retval = xdlib(Config);
470  }
471  else if (! strcmp(var,"pfc.purgelib"))
472  {
473  retval = xplib(Config);
474  }
475  else if (! strcmp(var,"pfc.trace"))
476  {
477  retval = xtrace(Config);
478  }
479  else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
480  {
481  m_configuration.m_allow_xrdpfc_command = true;
482  }
483  else if (! strncmp(var,"pfc.", 4))
484  {
485  retval = ConfigParameters(std::string(var+4), Config, tmpc);
486  }
487 
488  if ( ! retval)
489  {
490  TRACE(Error, "Config() error in parsing");
491  aOK = false;
492  }
493  }
494 
495  Config.Close();
496 
497  // Load OSS plugin.
498  myEnv.Put("oss.runmode", "pfc");
499  if (m_configuration.is_cschk_cache())
500  {
501  char csi_conf[128];
502  if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
503  {
504  ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
505  } else {
506  TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
507  return false;
508  }
509  }
510  if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
511  {
512  ofsCfg->Plugin(m_oss);
513  }
514  else
515  {
516  TRACE(Error, "Config() Unable to create an OSS object");
517  return false;
518  }
519 
520  // Test if OSS is operational, determine optional features.
521  aOK &= test_oss_basics_and_features();
522 
523  // sets default value for disk usage
524  XrdOssVSInfo sP;
525  {
526  if (m_configuration.m_meta_space != m_configuration.m_data_space &&
527  m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
528  {
529  m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
530  return false;
531  }
532  if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
533  {
534  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
535  m_configuration.m_meta_space.c_str());
536  return false;
537  }
538  if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
539  {
540  m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
541  return false;
542  }
543  if (sP.Total < 10ll << 20)
544  {
545  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
546  m_configuration.m_data_space.c_str());
547  return false;
548  }
549 
550  m_configuration.m_diskTotalSpace = sP.Total;
551 
552  if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
553  cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
554  {
555  if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
556  m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
557  aOK = false;
558  }
559  }
560  else aOK = false;
561 
562  if ( ! tmpc.m_fileUsageMax.empty())
563  {
564  if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
565  cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
566  cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
567  {
568  if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
569  m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
570  m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
571  {
572  m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
573  aOK = false;
574  }
575 
576 
577  if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
578  {
579  m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
580  aOK = false;
581  }
582  }
583  else aOK = false;
584  }
585  }
586 
587  // sets flush frequency
588  if ( ! tmpc.m_flushRaw.empty())
589  {
590  if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
591  {
592  if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
593  &m_configuration.m_flushCnt,
594  100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
595  {
596  return false;
597  }
598  m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
599  }
600  else
601  {
602  if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
603  &m_configuration.m_flushCnt, 100, 100000))
604  {
605  return false;
606  }
607  }
608  }
609 
610  // get number of available RAM blocks after process configuration
611  if (m_configuration.m_RamAbsAvailable == 0)
612  {
613  m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
614  char buff[1024];
615  snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
616  m_log.Say("Config info: ", buff);
617  }
618  // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
619  m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
620 
621  // Set tracing to debug if this is set in environment
622  char* cenv = getenv("XRDDEBUG");
623  if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
624 
625  if (aOK)
626  {
627  int loff = 0;
628 // 000 001 010
629  const char *csc[] = {"off", "cache nonet", "nocache net notls",
630 // 011
631  "cache net notls",
632 // 100 101 110
633  "off", "cache nonet", "nocache net tls",
634 // 111
635  "cache net tls"};
636  char buff[8192], uvk[32];
637  if (m_configuration.m_cs_UVKeep < 0)
638  strcpy(uvk, "lru");
639  else
640  sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
641  float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
642  loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
643  " pfc.cschk %s uvkeep %s\n"
644  " pfc.blocksize %lld\n"
645  " pfc.prefetch %d\n"
646  " pfc.ram %.fg\n"
647  " pfc.writequeue %d %d\n"
648  " # Total available disk: %lld\n"
649  " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
650  " pfc.spaces %s %s\n"
651  " pfc.trace %d\n"
652  " pfc.flush %lld\n"
653  " pfc.acchistorysize %d\n"
654  " pfc.onlyIfCachedMinBytes %lld\n"
655  " pfc.onlyIfCachedMinFrac %.2f\n",
656  config_filename,
657  csc[int(m_configuration.m_cs_Chk)], uvk,
658  m_configuration.m_bufferSize,
659  m_configuration.m_prefetch_max_blocks,
660  rg,
661  m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
662  sP.Total,
663  m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
664  m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
665  m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
666  m_configuration.m_data_space.c_str(),
667  m_configuration.m_meta_space.c_str(),
668  m_trace->What,
669  m_configuration.m_flushCnt,
670  m_configuration.m_accHistorySize,
671  m_configuration.m_onlyIfCachedMinSize,
672  m_configuration.m_onlyIfCachedMinFrac);
673 
674  if (m_configuration.is_dir_stat_reporting_on())
675  {
676  loff += snprintf(buff + loff, sizeof(buff) - loff,
677  " pfc.dirstats maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
678  m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
679  (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
680  loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
681  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
682  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
683  loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
684  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
685  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
686  }
687 
688  if (m_configuration.m_hdfsmode)
689  {
690  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
691  }
692 
693  if (m_configuration.m_username.empty())
694  {
695  char unameBuff[256];
696  XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
697  m_configuration.m_username = unameBuff;
698  }
699  else
700  {
701  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
702  }
703 
704  m_log.Say(buff);
705 
706  m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
707  }
708 
709  // Derived settings
710  m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
711  Info::s_maxNumAccess = m_configuration.m_accHistorySize;
712 
713  m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
714 
715  m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
716 
717  // Create the ResourceMonitor and get it ready for starting the main thread function.
718  if (aOK)
719  {
720  m_res_mon = new ResourceMonitor(*m_oss);
721  m_res_mon->init_before_main();
722  }
723 
724  m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
725 
726  if (ofsCfg) delete ofsCfg;
727 
728  // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
729  // Building of xrdpfc_print fails when this is enabled.
730 #ifdef XRDPFC_CKSUM_TEST
731  {
732  int xxx = m_configuration.m_cs_Chk;
733 
734  for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
735  {
736  Info::TestCksumStuff();
737  }
738 
739  m_configuration.m_cs_Chk = xxx;
740  }
741 #endif
742 
743  return aOK;
744 }
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:77
int open(const char *path, int oflag,...)
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition: XrdOssVS.hh:90
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:188
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
static size_t s_maxNumAccess
Definition: XrdPfcInfo.hh:311
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
@ CSChk_Both
Definition: XrdPfcTypes.hh:27
@ CSChk_None
Definition: XrdPfcTypes.hh:27
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:114
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:108
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:115
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition: XrdPfc.hh:100
int m_wqueue_threads
number of threads writing blocks to disk
Definition: XrdPfc.hh:111
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:94
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition: XrdPfc.hh:105
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:85
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:93
bool is_cschk_cache() const
Definition: XrdPfc.hh:75
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition: XrdPfc.hh:103
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:112
bool m_cs_ChkTLS
Allow TLS.
Definition: XrdPfc.hh:119
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:95
int m_cs_Chk
Checksum check.
Definition: XrdPfc.hh:118
bool m_hdfsmode
flag for enabling block-level operation
Definition: XrdPfc.hh:84
int m_purgeColdFilesAge
purge files older than this age
Definition: XrdPfc.hh:98
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:88
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition: XrdPfc.hh:102
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:92
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:109
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:107
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:89
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:110
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
bool is_cschk_net() const
Definition: XrdPfc.hh:76
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:122
time_t m_cs_UVKeep
unverified checksum cache keep
Definition: XrdPfc.hh:117
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition: XrdPfc.hh:104
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:97
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:121
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:70
std::string m_diskUsageLWM
Definition: XrdPfc.hh:129
std::string m_diskUsageHWM
Definition: XrdPfc.hh:130
std::string m_fileUsageBaseline
Definition: XrdPfc.hh:131
std::string m_fileUsageNominal
Definition: XrdPfc.hh:132
std::string m_flushRaw
Definition: XrdPfc.hh:134
std::string m_fileUsageMax
Definition: XrdPfc.hh:133

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, XrdSysError::Emsg(), Macaroons::Error, XrdOucEnv::Export(), XrdOucEnv::GetPtr(), XrdPfc::ResourceMonitor::init_before_main(), XrdPfc::Configuration::is_cschk_cache(), XrdPfc::Configuration::is_cschk_net(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_cs_Chk, XrdPfc::Configuration::m_cs_ChkTLS, XrdPfc::Configuration::m_cs_UVKeep, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_dirStatsDirGlobs, XrdPfc::Configuration::m_dirStatsDirs, XrdPfc::Configuration::m_dirStatsMaxDepth, XrdPfc::Configuration::m_dirStatsStoreDepth, XrdPfc::Configuration::m_diskTotalSpace, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::Configuration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::Configuration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::Configuration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::Configuration::m_flushCnt, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_hdfsbsize, XrdPfc::Configuration::m_hdfsmode, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_onlyIfCachedMinFrac, XrdPfc::Configuration::m_onlyIfCachedMinSize, XrdPfc::Configuration::m_prefetch_max_blocks, XrdPfc::Configuration::m_purgeColdFilesAge, XrdPfc::Configuration::m_purgeInterval, XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::Configuration::m_RamKeepStdBlocks, XrdPfc::Configuration::m_username, XrdPfc::Configuration::m_wqueue_blocks, XrdPfc::Configuration::m_wqueue_threads, XrdOfsConfigPI::New(), open(), XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdSysError::Say(), XrdOss::StatVS(), XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), XrdSysTrace::What, and XrdOucGetCache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char *  curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 985 of file XrdPfc.cc.

986 {
987  static const char* tpfx = "ConsiderCached ";
988 
989  TRACE(Debug, tpfx << curl);
990 
991  XrdCl::URL url(curl);
992  std::string f_name = url.GetPath();
993 
994  File *file = nullptr;
995  {
996  XrdSysCondVarHelper lock(&m_active_cond);
997  auto it = m_active.find(f_name);
998  if (it != m_active.end()) {
999  file = it->second;
1000  inc_ref_cnt(file, false, false);
1001  }
1002  }
1003  if (file) {
1004  struct stat sbuff;
1005  int res = file->Fstat(sbuff);
1006  dec_ref_cnt(file, false);
1007  if (res)
1008  return res;
1009  // DecideIfConsideredCached() already called in File::Fstat().
1010  return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1011  }
1012 
1013  struct stat sbuff;
1014  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1015  if (res != XrdOssOK) {
1016  TRACE(Debug, tpfx << curl << " -> " << res);
1017  return res;
1018  }
1019  if (S_ISDIR(sbuff.st_mode))
1020  {
1021  TRACE(Debug, tpfx << curl << " -> EISDIR");
1022  return -EISDIR;
1023  }
1024 
1025  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1026  if (file_size < 0) {
1027  TRACE(Debug, tpfx << curl << " -> " << file_size);
1028  return (int) file_size;
1029  }
1030  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1031 
1032  return is_cached ? 0 : -EREMOTE;
1033 }
#define XrdOssOK
Definition: XrdOss.hh:50
int stat(const char *path, struct stat *buf)
URL representation.
Definition: XrdClURL.hh:31
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition: XrdPfc.cc:911
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition: XrdPfc.cc:952
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:527
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309

References Macaroons::Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat(), XrdOss::Stat(), TRACE, and XrdOssOK.

Referenced by XrdPfcFSctl::FSctl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger logger,
XrdOucEnv env 
)
static

Singleton creation.

Definition at line 126 of file XrdPfc.cc.

127 {
128  assert (m_instance == 0);
129  m_instance = new Cache(logger, env);
130  return *m_instance;
131 }
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:159

References Cache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 138 of file XrdPfc.cc.

139 {
140  if (! m_decisionpoints.empty())
141  {
142  XrdCl::URL url(io->Path());
143  std::string filename = url.GetPath();
144  std::vector<Decision*>::const_iterator it;
145  for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
146  {
147  XrdPfc::Decision *d = *it;
148  if (! d) continue;
149  if (! d->Decide(filename, *m_oss))
150  {
151  return false;
152  }
153  }
154  }
155 
156  return true;
157 }
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long  file_size,
long long  bytes_on_disk 
)

Definition at line 952 of file XrdPfc.cc.

953 {
954  if (file_size == 0 || bytes_on_disk >= file_size)
955  return true;
956 
957  double frac_on_disk = (double) bytes_on_disk / file_size;
958 
959  if (file_size <= m_configuration.m_onlyIfCachedMinSize)
960  {
961  if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
962  return true;
963  }
964  else
965  {
966  if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
967  frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
968  return true;
969  }
970  return false;
971 }

References XrdPfc::Configuration::m_onlyIfCachedMinFrac, and XrdPfc::Configuration::m_onlyIfCachedMinSize.

Referenced by ConsiderCached(), and Stat().

+ Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File file)

Definition at line 696 of file XrdPfc.cc.

697 {
698  // Can be called with other locks held.
699 
700  if ( ! m_prefetch_enabled)
701  {
702  return;
703  }
704 
705  m_prefetch_condVar.Lock();
706  for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
707  {
708  if (*it == file)
709  {
710  m_prefetchList.erase(it);
711  break;
712  }
713  }
714  m_prefetch_condVar.UnLock();
715 }

References XrdSysCondVar::Lock(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string &  cinfo_fname)

Definition at line 911 of file XrdPfc.cc.

912 {
913  if (m_metaXattr) {
914  char pfn[4096];
915  m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
916  long long fsize = -1ll;
917  int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
918  if (res == sizeof(long long))
919  {
920  return fsize;
921  }
922  else
923  {
924  TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
925  }
926  }
927 
928  XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
929  XrdOucEnv env;
930  long long ret;
931  int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
932  if (res < 0) {
933  ret = res;
934  } else {
935  Info info(m_trace, 0);
936  if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
937  ret = -EBADF;
938  } else {
939  ret = info.GetFileSize();
940  }
941  infoFile->Close();
942  }
943  delete infoFile;
944  return ret;
945 }
XrdSysXAttr * XrdSysXAttrActive
Definition: XrdSysFAttr.cc:61
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References XrdOssDF::Close(), Macaroons::Debug, XrdSysXAttr::Get(), XrdPfc::Info::GetFileSize(), XrdOss::Lfn2Pfn(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string &  command_url)

Definition at line 50 of file XrdPfcCommand.cc.

51 {
52  static const char *top_epfx = "ExecuteCommandUrl ";
53 
54  SplitParser cp(command_url, "/");
55 
56  std::string token = cp.get_token();
57 
58  if (token != "xrdpfc_command")
59  {
60  TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
61  return;
62  }
63 
64  // Get the command
65  token = cp.get_token();
66 
67 
68  //================================================================
69  // create_file
70  //================================================================
71 
72  if (token == "create_file")
73  {
74  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
75  static const char* usage =
76  "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
77  " Creates a cache file with given parameters. Data in file is random.\n"
78  " Useful for cache purge testing.\n"
79  "Notes:\n"
80  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
81  " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
82  " . -t and -d can be given multiple times to record several accesses.\n"
83  " . Negative arguments given to -t are interpreted as relative to now.\n";
84 
85  const Configuration &conf = m_configuration;
86 
87  token = cp.get_token();
88 
89  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
90 
91  std::vector<char*> argv;
92  SplitParser ap(token, " ");
93  int argc = ap.fill_argv(argv);
94 
95  long long file_size = ONE_GB;
96  long long block_size = conf.m_bufferSize;
97  int access_time [MAX_ACCESSES];
98  int access_duration[MAX_ACCESSES];
99  int at_count = 0, ad_count = 0;
100  XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
101  "help", 1, "h",
102  "verbose", 1, "v",
103  "size", 1, "s",
104  "blocksize", 1, "b",
105  "time", 1, "t",
106  "duration", 1, "d",
107  (const char *) 0);
108 
109  time_t time_now = time(0);
110 
111  Spec.Set(argc, &argv[0]);
112  char theOpt;
113 
114  while ((theOpt = Spec.getopt()) != (char) -1)
115  {
116  switch (theOpt)
117  {
118  case 'h': {
119  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
120  return;
121  }
122  case 's': {
123  if (XrdOuca2x::a2sz(m_log, "Error getting filesize", Spec.getarg(),
124  &file_size, 0ll, 32 * ONE_GB))
125  return;
126  break;
127  }
128  case 'b': {
129  if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", Spec.getarg(),
130  &block_size, 0ll, 64 * ONE_MB))
131  return;
132  break;
133  }
134  case 't': {
135  if (XrdOuca2x::a2i(m_log, "Error getting access time", Spec.getarg(),
136  &access_time[at_count++], INT_MIN, INT_MAX))
137  return;
138  break;
139  }
140  case 'd': {
141  if (XrdOuca2x::a2i(m_log, "Error getting access duration", Spec.getarg(),
142  &access_duration[ad_count++], 0, 24 * 3600))
143  return;
144  break;
145  }
146  default: {
147  TRACE(Error, err_prefix << "Unhandled command argument.");
148  return;
149  }
150  }
151  }
152  if (Spec.getarg())
153  {
154  TRACE(Error, err_prefix << "Options must take up all the arguments.");
155  return;
156  }
157 
158  if (at_count < 1) access_time [at_count++] = time_now - 10;
159  if (ad_count < 1) access_duration[ad_count++] = 10;
160 
161  if (at_count != ad_count)
162  {
163  TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
164  return;
165  }
166 
167  std::string file_path (cp.get_reminder_with_delim());
168  std::string cinfo_path(file_path + Info::s_infoExtension);
169 
170  TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
171 
172  // Check if cinfo exists ... bail out if it does.
173  {
174  struct stat infoStat;
175  if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
176  {
177  TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
178  return;
179  }
180  }
181 
182  TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
183 
184  {
185  const char *myUser = conf.m_username.c_str();
186  XrdOucEnv myEnv;
187 
188  // Create the data file.
189 
190  char size_str[32]; sprintf(size_str, "%lld", file_size);
191  myEnv.Put("oss.asize", size_str);
192  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
193  int cret;
194  if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
195  {
196  TRACE(Error, err_prefix << "Create failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
197  return;
198  }
199 
200  XrdOssDF *myFile = GetOss()->newFile(myUser);
201  if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
202  {
203  TRACE(Error, err_prefix << "Open failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
204  delete myFile;
205  return;
206  }
207 
208  // Create the info file.
209 
210  myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
211  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
212  if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
213  {
214  TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
215  myFile->Close(); delete myFile;
216  return;
217  }
218 
219  XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
220  if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
221  {
222  TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
223  delete myInfoFile;
224  myFile->Close(); delete myFile;
225  return;
226  }
227 
228  // Allocate space for the data file.
229 
230  if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
231  {
232  TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(cret));
233  }
234 
235  // Fill up cinfo.
236 
237  Info myInfo(m_trace, false);
238  myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
239  myInfo.SetAllBitsSynced();
240 
241  for (int i = 0; i < at_count; ++i)
242  {
243  time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
244 
245  myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
246  }
247 
248  myInfo.Write(myInfoFile, cinfo_path.c_str());
249 
250  // Fake last modified time to the last access_time
251  {
252  time_t last_detach;
253  myInfo.GetLatestDetachTime(last_detach);
254  struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
255 
256  futimens(myInfoFile->getFD(), acc_mod_time);
257  }
258 
259  myInfoFile->Close(); delete myInfoFile;
260  myFile->Close(); delete myFile;
261 
262  TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB.");
263 
264  {
265  XrdSysCondVarHelper lock(&m_writeQ.condVar);
266 
267  m_writeQ.writes_between_purges += file_size;
268  }
269  {
270  int token = m_res_mon->register_file_open(file_path, time_now, false);
271  XrdPfc::Stats stats;
272  stats.m_BytesWritten = file_size;
273  stats.m_StBlocksAdded = (file_size & 0x1ff) ? (file_size >> 9) + 1 : file_size >> 9;
274  m_res_mon->register_file_update_stats(token, stats);
275  m_res_mon->register_file_close(token, time(0), stats);
276  }
277  }
278  }
279 
280  //================================================================
281  // remove_file
282  //================================================================
283 
284  else if (token == "remove_file")
285  {
286  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
287  static const char* usage =
288  "Usage: remove_file/ [-h] /<path>\n"
289  " Removes given file from the cache unless it is currently open.\n"
290  " Useful for removal of stale files or duplicate files in a caching cluster.\n"
291  "Notes:\n"
292  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
293 
294  token = cp.get_token();
295 
296  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
297 
298  std::vector<char*> argv;
299  SplitParser ap(token, " ");
300  int argc = ap.fill_argv(argv);
301 
302  XrdOucArgs Spec(&m_log, err_prefix, "h",
303  "help", 1, "h",
304  (const char *) 0);
305 
306  Spec.Set(argc, &argv[0]);
307  char theOpt;
308 
309  while ((theOpt = Spec.getopt()) != (char) -1)
310  {
311  switch (theOpt)
312  {
313  case 'h': {
314  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
315  return;
316  }
317  default: {
318  TRACE(Error, err_prefix << "Unhandled command argument.");
319  return;
320  }
321  }
322  }
323  if (Spec.getarg())
324  {
325  TRACE(Error, err_prefix << "Options must take up all the arguments.");
326  return;
327  }
328 
329  std::string f_name(cp.get_reminder_with_delim());
330 
331  TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
332 
333  int ret = UnlinkFile(f_name, true);
334 
335  TRACE(Info, err_prefix << "returned with status " << ret);
336  }
337 
338  //================================================================
339  // unknown command
340  //================================================================
341 
342  else
343  {
344  TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
345  }
346 }
void usage()
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
const int MAX_ACCESSES
const long long ONE_GB
const long long ONE_MB
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
bool Create
virtual int getFD()
Definition: XrdOss.hh:426
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
XrdOss * GetOss() const
Definition: XrdPfc.hh:267
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1096
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1163
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Macaroons::Debug, ERRNO_AND_ERRSTR, Macaroons::Error, XrdPfc::SplitParser::fill_argv(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdOucArgs::getarg(), XrdOssDF::getFD(), XrdPfc::Info::GetLatestDetachTime(), XrdOucArgs::getopt(), GetOss(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Stats::m_BytesWritten, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Stats::m_StBlocksAdded, XrdPfc::Configuration::m_username, MAX_ACCESSES, XrdOss::newFile(), ONE_GB, ONE_MB, XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::ResourceMonitor::register_file_close(), XrdPfc::ResourceMonitor::register_file_open(), XrdPfc::ResourceMonitor::register_file_update_stats(), XrdPfc::Info::s_infoExtension, XrdSysError::Say(), XrdOucArgs::Set(), XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), stat(), Stat(), TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

+ Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File f,
bool  high_debug 
)

Definition at line 543 of file XrdPfc.cc.

544 {
545  dec_ref_cnt(f, high_debug);
546 }

◆ GetFile()

File * Cache::GetFile ( const std::string &  path,
IO io,
long long  off = 0,
long long  filesize = 0 
)

Definition at line 390 of file XrdPfc.cc.

391 {
392  // Called from virtual IOFile constructor.
393 
394  TRACE(Debug, "GetFile " << path << ", io " << io);
395 
396  ActiveMap_i it;
397 
398  {
399  XrdSysCondVarHelper lock(&m_active_cond);
400 
401  while (true)
402  {
403  it = m_active.find(path);
404 
405  // File is not open or being opened. Mark it as being opened and
406  // proceed to opening it outside of while loop.
407  if (it == m_active.end())
408  {
409  it = m_active.insert(std::make_pair(path, (File*) 0)).first;
410  break;
411  }
412 
413  if (it->second != 0)
414  {
415  it->second->AddIO(io);
416  inc_ref_cnt(it->second, false, true);
417 
418  return it->second;
419  }
420  else
421  {
422  // Wait for some change in m_active, then recheck.
423  m_active_cond.Wait();
424  }
425  }
426  }
427 
428  // This is always true, now that IOFileBlock is unsupported.
429  if (filesize == 0)
430  {
431  struct stat st;
432  int res = io->Fstat(st);
433  if (res < 0) {
434  errno = res;
435  TRACE(Error, "GetFile, could not get valid stat");
436  } else if (res > 0) {
437  errno = ENOTSUP;
438  TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
439  } else {
440  filesize = st.st_size;
441  }
442  }
443 
444  File *file = 0;
445 
446  if (filesize >= 0)
447  {
448  file = File::FileOpen(path, off, filesize);
449  }
450 
451  {
452  XrdSysCondVarHelper lock(&m_active_cond);
453 
454  if (file)
455  {
456  inc_ref_cnt(file, false, true);
457  it->second = file;
458 
459  file->AddIO(io);
460  }
461  else
462  {
463  m_active.erase(it);
464  }
465 
466  m_active_cond.Broadcast();
467  }
468 
469  return file;
470 }
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:109
void AddIO(IO *io)
Definition: XrdPfcFile.cc:311

References XrdPfc::File::AddIO(), XrdSysCondVar::Broadcast(), Macaroons::Debug, Macaroons::Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), stat(), TRACE, and XrdSysCondVar::Wait().

Referenced by XrdPfc::IOFile::IOFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream* XrdPfc::Cache::GetGStream ( )
inline

Definition at line 285 of file XrdPfc.hh.

285 { return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 133 of file XrdPfc.cc.

133 { return *m_instance; }

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::ResourceMonitor::perform_purge_check(), XrdPfc::ResourceMonitor::perform_purge_task_cleanup(), PrefetchThread(), ProcessWriteTaskThread(), Proto_ResourceMonitorHeartBeat(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError* XrdPfc::Cache::GetLog ( )
inline

Definition at line 281 of file XrdPfc.hh.

281 { return &m_log; }

Referenced by XrdPfc::File::GetLog().

+ Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 718 of file XrdPfc.cc.

719 {
720  m_prefetch_condVar.Lock();
721  while (m_prefetchList.empty())
722  {
723  m_prefetch_condVar.Wait();
724  }
725 
726  // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
727 
728  size_t l = m_prefetchList.size();
729  int idx = rand() % l;
730  File* f = m_prefetchList[idx];
731 
732  m_prefetch_condVar.UnLock();
733  return f;
734 }

References XrdSysCondVar::Lock(), XrdSysCondVar::UnLock(), and XrdSysCondVar::Wait().

Referenced by Prefetch().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetOss()

XrdOss* XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 267 of file XrdPfc.hh.

267 { return m_oss; }

Referenced by ExecuteCommandUrl().

+ Here is the caller graph for this function:

◆ GetPurgePin()

PurgePin* XrdPfc::Cache::GetPurgePin ( ) const
inline

Definition at line 271 of file XrdPfc.hh.

271 { return m_purge_pin; }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace* XrdPfc::Cache::GetTrace ( )
inline

Definition at line 282 of file XrdPfc.hh.

282 { return m_trace; }

Referenced by XrdPfc::File::GetTrace(), and XrdPfc::IO::GetTrace().

+ Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string &  path) const

Definition at line 662 of file XrdPfc.cc.

663 {
664  XrdSysCondVarHelper lock(&m_active_cond);
665 
666  return m_active.find(path) != m_active.end() ||
667  m_purge_delay_set.find(path) != m_purge_delay_set.end();
668 }

◆ LocalFilePath()

int Cache::LocalFilePath ( const char *  curl,
char *  buff = 0,
int  blen = 0,
LFP_Reason  why = ForAccess,
bool  forall = false 
)
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 779 of file XrdPfc.cc.

781 {
782  static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
783  static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
784  static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
785 
786  TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
787 
788  if (buff && blen > 0) buff[0] = 0;
789 
790  XrdCl::URL url(curl);
791  std::string f_name = url.GetPath();
792  std::string i_name = f_name + Info::s_infoExtension;
793 
794  if (why == ForPath)
795  {
796  int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
797  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
798  return ret;
799  }
800 
801  {
802  XrdSysCondVarHelper lock(&m_active_cond);
803  m_purge_delay_set.insert(f_name);
804  }
805 
806  struct stat sbuff, sbuff2;
807  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
808  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
809  {
810  if (S_ISDIR(sbuff.st_mode))
811  {
812  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
813  return -EISDIR;
814  }
815  else
816  {
817  bool read_ok = false;
818  bool is_complete = false;
819 
820  // Lock and check if the file is active. If NOT, keep the lock
821  // and add dummy access after successful reading of info file.
822  // If it IS active, just release the lock, this ongoing access will
823  // assure the file continues to exist.
824 
825  // XXXX How can I just loop over the cinfo file when active?
826  // Can I not get is_complete from the existing file?
827  // Do I still want to inject access record?
828  // Oh, it writes only if not active .... still let's try to use existing File.
829 
830  m_active_cond.Lock();
831 
832  bool is_active = m_active.find(f_name) != m_active.end();
833 
834  if (is_active) m_active_cond.UnLock();
835 
836  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
837  XrdOucEnv myEnv;
838  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
839  if (res >= 0)
840  {
841  Info info(m_trace, 0);
842  if (info.Read(infoFile, i_name.c_str()))
843  {
844  read_ok = true;
845 
846  is_complete = info.IsComplete();
847 
848  // Add full-size access if reason is for access.
849  if ( ! is_active && is_complete && why == ForAccess)
850  {
851  info.WriteIOStatSingle(info.GetFileSize());
852  info.Write(infoFile, i_name.c_str());
853  }
854  }
855  infoFile->Close();
856  }
857  delete infoFile;
858 
859  if ( ! is_active) m_active_cond.UnLock();
860 
861  if (read_ok)
862  {
863  if ((is_complete || why == ForInfo) && buff != 0)
864  {
865  int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
866  if (res2 < 0)
867  return res2;
868 
869  // Normally, files are owned by us but when direct cache access
870  // is wanted and possible, make sure the file is world readable.
871  if (why == ForAccess)
872  {mode_t mode = (forall ? worldReadable : groupReadable);
873  if (((sbuff.st_mode & worldReadable) != mode)
874  && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
875  {is_complete = false;
876  *buff = 0;
877  }
878  }
879  }
880 
881  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
882  (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
883 
884  return is_complete ? 0 : -EREMOTE;
885  }
886  }
887  }
888 
889  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
890  return -ENOENT;
891 }
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0

References XrdOss::Chmod(), XrdOssDF::Close(), Macaroons::Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOss::Lfn2Pfn(), XrdSysCondVar::Lock(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat(), XrdOss::Stat(), TRACE, XrdSysCondVar::UnLock(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

+ Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 737 of file XrdPfc.cc.

738 {
739  const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
740 
741  while (true)
742  {
743  m_RAM_mutex.Lock();
744  bool doPrefetch = (m_RAM_used < limit_RAM);
745  m_RAM_mutex.UnLock();
746 
747  if (doPrefetch)
748  {
750  f->Prefetch();
751  }
752  else
753  {
755  }
756  }
757 }
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:718
void Prefetch()
Definition: XrdPfcFile.cc:1516
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227

References GetNextFileToPrefetch(), XrdSysMutex::Lock(), XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::File::Prefetch(), XrdSysMutex::UnLock(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Prepare()

int Cache::Prepare ( const char *  curl,
int  oflags,
mode_t  mode 
)
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1045 of file XrdPfc.cc.

1046 {
1047  XrdCl::URL url(curl);
1048  std::string f_name = url.GetPath();
1049  std::string i_name = f_name + Info::s_infoExtension;
1050 
1051  // Do not allow write access.
1052  if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1053  {
1054  TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1055  return -EROFS;
1056  }
1057 
1058  // Intercept xrdpfc_command requests.
1059  if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1060  {
1061  // Schedule a job to process command request.
1062  {
1063  CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1064 
1065  schedP->Schedule(ce);
1066  }
1067 
1068  return -EAGAIN;
1069  }
1070 
1071  {
1072  XrdSysCondVarHelper lock(&m_active_cond);
1073  m_purge_delay_set.insert(f_name);
1074  }
1075 
1076  struct stat sbuff;
1077  if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1078  {
1079  TRACE(Dump, "Prepare defer open " << f_name);
1080  return 1;
1081  }
1082  else
1083  {
1084  return 0;
1085  }
1086 }
@ Warning
static XrdScheduler * schedP
Definition: XrdPfc.hh:289
void Schedule(XrdJob *jp)

References XrdCl::URL::GetPath(), XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Info::s_infoExtension, schedP, XrdScheduler::Schedule(), stat(), XrdOss::Stat(), TRACE, Warning, and XrdOssOK.

+ Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 274 of file XrdPfc.cc.

275 {
276  std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
277 
278  while (true)
279  {
280  m_writeQ.condVar.Lock();
281  while (m_writeQ.size == 0)
282  {
283  m_writeQ.condVar.Wait();
284  }
285 
286  // MT -- optimize to pop several blocks if they are available (or swap the list).
287  // This makes sense especially for smallish block sizes.
288 
289  int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
290  long long sum_size = 0;
291 
292  for (int bi = 0; bi < n_pushed; ++bi)
293  {
294  Block* block = m_writeQ.queue.front();
295  m_writeQ.queue.pop_front();
296  m_writeQ.writes_between_purges += block->get_size();
297  sum_size += block->get_size();
298 
299  blks_to_write[bi] = block;
300 
301  TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
302  }
303  m_writeQ.size -= n_pushed;
304 
305  m_writeQ.condVar.UnLock();
306 
307  {
308  XrdSysMutexHelper lock(&m_RAM_mutex);
309  m_RAM_write_queue -= sum_size;
310  }
311 
312  for (int bi = 0; bi < n_pushed; ++bi)
313  {
314  Block* block = blks_to_write[bi];
315 
316  block->m_file->WriteBlockToDisk(block);
317  }
318  }
319 }
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1501
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1032

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, XrdPfc::Configuration::m_wqueue_blocks, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration& XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 203 of file XrdPfc.hh.

203 { return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Conf(), XrdPfc::File::WriteBlockToDisk(), and XrdOucGetCache().

+ Here is the caller graph for this function:

◆ RefResMon()

ResourceMonitor& XrdPfc::Cache::RefResMon ( )
inline

Definition at line 284 of file XrdPfc.hh.

284 { return *m_res_mon; }

Referenced by ResMon().

+ Here is the caller graph for this function:

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File file)

Definition at line 680 of file XrdPfc.cc.

681 {
682  // Can be called with other locks held.
683 
684  if ( ! m_prefetch_enabled)
685  {
686  return;
687  }
688 
689  m_prefetch_condVar.Lock();
690  m_prefetchList.push_back(file);
691  m_prefetch_condVar.Signal();
692  m_prefetch_condVar.UnLock();
693 }

References XrdSysCondVar::Lock(), XrdSysCondVar::Signal(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ ReleaseFile()

void Cache::ReleaseFile ( File f,
IO io 
)

Definition at line 472 of file XrdPfc.cc.

473 {
474  // Called from virtual IO::DetachFinalize.
475 
476  TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
477 
478  {
479  XrdSysCondVarHelper lock(&m_active_cond);
480 
481  f->RemoveIO(io);
482  }
483  dec_ref_cnt(f, true);
484 }
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:348

References Macaroons::Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize(), and XrdPfc::IOFileBlock::DetachFinalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char *  buf,
long long  size 
)

Definition at line 372 of file XrdPfc.cc.

373 {
374  bool std_size = (size == m_configuration.m_bufferSize);
375  {
376  XrdSysMutexHelper lock(&m_RAM_mutex);
377 
378  m_RAM_used -= size;
379 
380  if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
381  {
382  m_RAM_std_blocks.push_back(buf);
383  ++m_RAM_std_size;
384  return;
385  }
386  }
387  free(buf);
388 }

References XrdPfc::Configuration::m_bufferSize, and XrdPfc::Configuration::m_RamKeepStdBlocks.

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 241 of file XrdPfc.cc.

242 {
243  std::list<Block*> removed_blocks;
244  long long sum_size = 0;
245 
246  m_writeQ.condVar.Lock();
247  std::list<Block*>::iterator i = m_writeQ.queue.begin();
248  while (i != m_writeQ.queue.end())
249  {
250  if ((*i)->m_file == file)
251  {
252  TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
253  std::list<Block*>::iterator j = i++;
254  removed_blocks.push_back(*j);
255  sum_size += (*j)->get_size();
256  m_writeQ.queue.erase(j);
257  --m_writeQ.size;
258  }
259  else
260  {
261  ++i;
262  }
263  }
264  m_writeQ.condVar.UnLock();
265 
266  {
267  XrdSysMutexHelper lock(&m_RAM_mutex);
268  m_RAM_write_queue -= sum_size;
269  }
270 
271  file->BlocksRemovedFromWriteQ(removed_blocks);
272 }

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long  size)

Definition at line 332 of file XrdPfc.cc.

333 {
334  static const size_t s_block_align = sysconf(_SC_PAGESIZE);
335 
336  bool std_size = (size == m_configuration.m_bufferSize);
337 
338  m_RAM_mutex.Lock();
339 
340  long long total = m_RAM_used + size;
341 
342  if (total <= m_configuration.m_RamAbsAvailable)
343  {
344  m_RAM_used = total;
345  if (std_size && m_RAM_std_size > 0)
346  {
347  char *buf = m_RAM_std_blocks.back();
348  m_RAM_std_blocks.pop_back();
349  --m_RAM_std_size;
350 
351  m_RAM_mutex.UnLock();
352 
353  return buf;
354  }
355  else
356  {
357  m_RAM_mutex.UnLock();
358  char *buf;
359  if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
360  {
361  // Report out of mem? Probably should report it at least the first time,
362  // then periodically.
363  return 0;
364  }
365  return buf;
366  }
367  }
368  m_RAM_mutex.UnLock();
369  return 0;
370 }

References XrdSysMutex::Lock(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_RamAbsAvailable, and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

◆ ResMon()

ResourceMonitor & Cache::ResMon ( )
static

Definition at line 136 of file XrdPfc.cc.

136 { return m_instance->RefResMon(); }
ResourceMonitor & RefResMon()
Definition: XrdPfc.hh:284

References RefResMon().

Referenced by XrdPfc::File::~File(), XrdPfc::ResourceMonitor::perform_purge_check(), ResourceMonitorThread(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File f)
inline

Definition at line 277 of file XrdPfc.hh.

277 { schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char *  curl,
struct stat sbuff 
)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1096 of file XrdPfc.cc.

1097 {
1098  const char *tpfx = "Stat ";
1099 
1100  XrdCl::URL url(curl);
1101  std::string f_name = url.GetPath();
1102 
1103  File *file = nullptr;
1104  {
1105  XrdSysCondVarHelper lock(&m_active_cond);
1106  auto it = m_active.find(f_name);
1107  if (it != m_active.end()) {
1108  file = it->second;
1109  inc_ref_cnt(file, false, false);
1110  }
1111  }
1112  if (file) {
1113  int res = file->Fstat(sbuff);
1114  dec_ref_cnt(file, false);
1115  TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1116  return res;
1117  }
1118 
1119  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1120  if (res != XrdOssOK) {
1121  TRACE(Debug, tpfx << curl << " -> " << res);
1122  return 1; // res; -- for only-if-cached
1123  }
1124  if (S_ISDIR(sbuff.st_mode))
1125  {
1126  TRACE(Debug, tpfx << curl << " -> EISDIR");
1127  return -EISDIR;
1128  }
1129 
1130  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1131  if (file_size < 0) {
1132  TRACE(Debug, tpfx << curl << " -> " << file_size);
1133  return 1; // (int) file_size; -- for only-if-cached
1134  }
1135  sbuff.st_size = file_size;
1136  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1137  if ( ! is_cached) {
1138  sbuff.st_ctime = sbuff.st_mtime = sbuff.st_atime = 0;
1139  }
1140 
1141  TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1142 
1143  return 0;
1144 }

References Macaroons::Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), TRACE, and XrdOssOK.

Referenced by ExecuteCommandUrl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 134 of file XrdPfc.cc.

134 { return *m_instance; }

Referenced by XrdPfc::OldStylePurgeDriver(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the caller graph for this function:

◆ Unlink()

int Cache::Unlink ( const char *  curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1153 of file XrdPfc.cc.

1154 {
1155  XrdCl::URL url(curl);
1156  std::string f_name = url.GetPath();
1157 
1158  // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1159 
1160  return UnlinkFile(f_name, false);
1161 }

References XrdCl::URL::GetPath(), and UnlinkFile().

+ Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string &  f_name,
bool  fail_if_open 
)

Remove cinfo and data files from cache.

Definition at line 1163 of file XrdPfc.cc.

1164 {
1165  static const char* trc_pfx = "UnlinkFile ";
1166  ActiveMap_i it;
1167  File *file = 0;
1168  {
1169  XrdSysCondVarHelper lock(&m_active_cond);
1170 
1171  it = m_active.find(f_name);
1172 
1173  if (it != m_active.end())
1174  {
1175  if (fail_if_open)
1176  {
1177  TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1178  return -EBUSY;
1179  }
1180 
1181  // Null File* in m_active map means an operation is ongoing, probably
1182  // Attach() with possible File::Open(). Ask for retry.
1183  if (it->second == 0)
1184  {
1185  TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1186  return -EAGAIN;
1187  }
1188 
1189  file = it->second;
1191  it->second = 0;
1192  }
1193  else
1194  {
1195  it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1196  }
1197  }
1198 
1199  if (file)
1200  {
1201  RemoveWriteQEntriesFor(file);
1202  }
1203 
1204  std::string i_name = f_name + Info::s_infoExtension;
1205 
1206  // Unlink file & cinfo
1207  struct stat f_stat;
1208  bool stat_ok = (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK);
1209  int f_ret = m_oss->Unlink(f_name.c_str());
1210  int i_ret = m_oss->Unlink(i_name.c_str());
1211 
1212  if (stat_ok)
1213  m_res_mon->register_file_purge(f_name, f_stat.st_blocks);
1214 
1215  TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1216 
1217  {
1218  XrdSysCondVarHelper lock(&m_active_cond);
1219 
1220  m_active.erase(it);
1221  }
1222 
1223  return std::min(f_ret, i_ret);
1224 }
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:241
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:122
void register_file_purge(DirState *target, long long size_in_st_blocks)

References Macaroons::Debug, XrdPfc::File::initiate_emergency_shutdown(), XrdPfc::ResourceMonitor::register_file_purge(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, stat(), XrdOss::Stat(), TRACE, XrdOss::Unlink(), and XrdOssOK.

Referenced by ExecuteCommandUrl(), XrdPfcFSctl::FSctl(), XrdPfc::File::Sync(), and Unlink().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VCheck()

static bool XrdPfc::Cache::VCheck ( XrdVersionInfo &  urVersion)
inlinestatic

Version check.

Definition at line 232 of file XrdPfc.hh.

232 { return true; }

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int  cinfo_fd,
long long  file_size 
)

Definition at line 896 of file XrdPfc.cc.

897 {
898  if (m_metaXattr) {
899  int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
900  if (res != 0) {
901  TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
902  }
903  }
904 }
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Macaroons::Debug, XrdSysXAttr::Set(), TRACE, and XrdSysXAttrActive.

+ Here is the call graph for this function:

◆ WritesSinceLastCall()

long long Cache::WritesSinceLastCall ( )

Definition at line 321 of file XrdPfc.cc.

322 {
323  // Called from ResourceMonitor for an alternative estimation of disk writes.
324  XrdSysCondVarHelper lock(&m_writeQ.condVar);
325  long long ret = m_writeQ.writes_between_purges;
326  m_writeQ.writes_between_purges = 0;
327  return ret;
328 }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = nullptr
static

The documentation for this class was generated from the following files: