25 #define TRACE_PURGE(x)
38 long long m_usage_extra;
39 long long m_usage_purged;
50 typedef std::map<std::string, DirState> DsMap_t;
51 typedef DsMap_t::iterator DsMap_i;
62 DirState* create_child(
const std::string &dir)
64 std::pair<DsMap_i, bool> ir = m_subdirs.insert(std::make_pair(dir,
DirState(
this)));
65 return & ir.first->second;
72 DsMap_i i = m_subdirs.find(pt.
m_dirs[pos]);
76 if (i != m_subdirs.end())
80 if (create_subdirs && m_depth < m_max_depth)
82 ds = create_child(pt.
m_dirs[pos]);
84 if (ds)
return ds->find_path_tok(pt, pos + 1, create_subdirs);
91 DirState(
int max_depth) : m_parent(0), m_depth(0), m_max_depth(max_depth)
103 void set_usage(
long long u) { m_usage = u; m_usage_extra = 0; }
107 DirState*
find_path(
const std::string &path,
int max_depth,
bool parse_as_lfn,
bool create_subdirs)
111 return find_path_tok(pt, 0, create_subdirs);
116 DsMap_i i = m_subdirs.find(dir);
118 if (i != m_subdirs.end())
return & i->second;
120 if (create_subdirs && m_depth < m_max_depth)
return create_child(dir);
129 for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
131 i->second.reset_stats();
137 for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
139 i->second.upward_propagate_stats();
141 m_stats.
AddUp(i->second.m_stats);
149 for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
151 m_usage_purged += i->second.upward_propagate_usage_purged();
153 m_usage -= m_usage_purged;
155 long long ret = m_usage_purged;
162 printf(
"%*d %s usage=%lld usage_extra=%lld usage_total=%lld num_ios=%d duration=%d b_hit=%lld b_miss=%lld b_byps=%lld b_wrtn=%lld\n",
163 2 + 2*m_depth, m_depth, name, m_usage, m_usage_extra, m_usage + m_usage_extra,
166 for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
168 i->second.dump_recursively(i->first.c_str());
186 m_max_depth (
Cache::Conf().m_dirStatsStoreDepth ),
187 m_root ( m_max_depth ),
188 m_prev_time ( time(0) )
197 return m_root.
find_path(lfn, m_max_depth,
true,
true);
206 time_t now = time(0);
208 printf(
"DataFsState::dump_recursively epoch = %lld delta_t = %lld max_depth = %d\n",
209 (
long long) now, (
long long) (now - m_prev_time), m_max_depth);
232 FS(
const std::string &dname,
const char *fname,
long long n, time_t t,
DirState *ds) :
237 typedef std::multimap<time_t, FS>
map_t;
360 m_fmap.insert(std::make_pair(i->time, *i));
393 static const char *trc_pfx =
"FPurgeState::CheckFile ";
400 TRACE(
Debug, trc_pfx <<
"could not get access time for " <<
m_current_path << fname <<
", using mtime from stat instead.");
401 atime =
fstat.st_mtime;
449 static const char *trc_pfx =
"FPurgeState::TraverseNamespace ";
461 int rc = iOssDF->
Readdir(fname, 256);
464 TRACE_PURGE(
" Skipping ENOENT dir entry [" << fname <<
"].");
478 if (fname[0] ==
'.' && (fname[1] == 0 || (fname[1] ==
'.' && fname[2] == 0))) {
479 TRACE_PURGE(
" Skipping here or parent dir [" << fname <<
"]. Continue loop.");
483 size_t fname_len = strlen(fname);
486 if (S_ISDIR(
fstat.st_mode))
517 TRACE_PURGE(
" Ignoring [" << fname <<
"], not a dir or cinfo.");
551 class ScanAndPurgeJob :
public XrdJob
554 ScanAndPurgeJob(
const char *desc =
"") :
XrdJob(desc) {}
565 void Cache::copy_out_active_stats_and_update_data_fs_state()
567 static const char *trc_pfx =
"copy_out_active_stats_and_update_data_fs_state() ";
574 updates.swap( m_closed_files_stats );
576 for (ActiveMap_i i = m_active.begin(); i != m_active.end(); ++i)
580 updates.insert(std::make_pair(i->first, i->second->DeltaStatsFromLastCall()));
587 for (StatsMMap_i i = updates.begin(); i != updates.end(); ++i)
593 TRACE(
Error, trc_pfx <<
"Failed finding DirState for file '" << i->first <<
"'.");
642 time_t heartbeat_start = time(0);
683 int heartbeat_duration = time(0) - heartbeat_start;
688 int sleep_time = 60 - heartbeat_duration;
700 static const char *trc_pfx =
"Purge() ";
703 long long disk_usage;
716 int age_based_purge_countdown = 0;
717 bool is_first =
true;
721 time_t purge_start = time(0);
732 long long bytesToRemove_d = 0, bytesToRemove_f = 0;
744 TRACE(
Debug, trc_pfx <<
"used disk space " << disk_usage <<
" bytes.");
755 long long estimated_writes_since_last_purge;
759 estimated_writes_since_last_purge = m_writeQ.writes_between_purges;
760 m_writeQ.writes_between_purges = 0;
762 estimated_file_usage += estimated_writes_since_last_purge;
764 TRACE(
Debug, trc_pfx <<
"estimated usage by files " << estimated_file_usage <<
" bytes.");
766 bytesToRemove_f = std::max(estimated_file_usage - m_configuration.
m_fileUsageNominal, 0ll);
769 double frac_du = 0, frac_fu = 0;
772 if (frac_fu > 1.0 - frac_du)
774 bytesToRemove_f = std::max(bytesToRemove_f, disk_usage - m_configuration.
m_diskUsageLWM);
778 long long bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
780 bool enforce_age_based_purge =
false;
785 if (--age_based_purge_countdown <= 0)
787 enforce_age_based_purge =
true;
792 bool enforce_traversal_for_usage_collection = is_first;
795 copy_out_active_stats_and_update_data_fs_state();
798 TRACE(
Debug,
"\tbytes_to_remove_disk = " << bytesToRemove_d <<
" B");
799 TRACE(
Debug,
"\tbytes_to remove_files = " << bytesToRemove_f <<
" B (" << (is_first ?
"max possible for initial run" :
"estimated") <<
")");
800 TRACE(
Debug,
"\tbytes_to_remove = " << bytesToRemove <<
" B");
801 TRACE(
Debug,
"\tenforce_age_based_purge = " << enforce_age_based_purge);
804 long long bytesToRemove_at_start = 0;
805 int deleted_file_count = 0;
807 bool purge_required = (bytesToRemove > 0 || enforce_age_based_purge);
813 if (purge_required || enforce_traversal_for_usage_collection)
841 TRACE(
Debug, trc_pfx <<
"actual usage by files " << estimated_file_usage <<
" bytes.");
847 bytesToRemove_f = std::max(estimated_file_usage - m_configuration.
m_fileUsageNominal, 0ll);
849 double frac_du = 0, frac_fu = 0;
852 if (frac_fu > 1.0 - frac_du)
854 bytesToRemove = std::max(bytesToRemove_f, disk_usage - m_configuration.
m_diskUsageLWM);
855 bytesToRemove = std::min(bytesToRemove, estimated_file_usage - m_configuration.
m_fileUsageBaseline);
859 bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
864 bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
866 bytesToRemove_at_start = bytesToRemove;
869 TRACE(
Debug,
"\tbytes_to_remove_disk = " << bytesToRemove_d <<
" B");
870 TRACE(
Debug,
"\tbytes_to remove_files = " << bytesToRemove_f <<
" B (measured)");
871 TRACE(
Debug,
"\tbytes_to_remove = " << bytesToRemove <<
" B");
872 TRACE(
Debug,
"\tenforce_age_based_purge = " << enforce_age_based_purge);
875 if (enforce_age_based_purge)
893 int protected_cnt = 0;
894 long long protected_sum = 0;
899 if (bytesToRemove <= 0 && ! (enforce_age_based_purge && it->first == 0))
904 std::string &infoPath = it->second.path;
905 std::string dataPath = infoPath.substr(0, infoPath.size() - info_ext_len);
910 protected_sum += it->second.nBytes;
911 TRACE(
Debug, trc_pfx <<
"File is active or purge-protected: " << dataPath <<
" size: " << it->second.nBytes);
924 m_oss->
Unlink(infoPath.c_str());
925 TRACE(Dump, trc_pfx <<
"Removed file: '" << infoPath <<
"' size: " <<
fstat.st_size);
931 bytesToRemove -= it->second.nBytes;
932 estimated_file_usage -= it->second.nBytes;
933 ++deleted_file_count;
935 m_oss->
Unlink(dataPath.c_str());
936 TRACE(Dump, trc_pfx <<
"Removed file: '" << dataPath <<
"' size: " << it->second.nBytes <<
", time: " << it->first);
938 if (it->second.dirState != 0)
939 it->second.dirState->add_usage_purged(it->second.nBytes);
941 TRACE(
Error, trc_pfx <<
"DirState not set for file '" << dataPath <<
"'.");
944 if (protected_cnt > 0)
946 TRACE(
Info, trc_pfx <<
"Encountered " << protected_cnt <<
" protected files, sum of their size: " << protected_sum);
955 m_purge_delay_set.clear();
959 int purge_duration = time(0) - purge_start;
961 TRACE(
Info, trc_pfx <<
"Finished, removed " << deleted_file_count <<
" data files, total size " <<
962 bytesToRemove_at_start - bytesToRemove <<
", bytes to remove at end " << bytesToRemove <<
", purge duration " << purge_duration);
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
int stat(const char *path, struct stat *buf)
int fstat(int fildes, struct stat *buf)
const char * XrdSysE2T(int errcode)
int OpenRO(XrdOssDF &atDir, const char *path, XrdOucEnv &env, XrdOssDF *&ossDF)
int Opendir(XrdOssDF &atDir, const char *path, XrdOucEnv &env, XrdOssDF *&ossDF)
int Unlink(XrdOssDF &atDir, const char *path)
virtual int StatRet(struct stat *buff)
virtual int Opendir(const char *path, XrdOucEnv &env)
virtual int Readdir(char *buff, int blen)
virtual int Close(long long *retsz=0)=0
virtual XrdOssDF * newDir(const char *tident)=0
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
struct XrdOucCacheStats::CacheStats X
XrdOucCacheStats Statistics
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
static const Configuration & Conf()
void Purge()
Thread function invoked to scan and purge files from disk when needed.
static Cache & GetInstance()
Singleton access.
void ResourceMonitorHeartBeat()
Thread function checking resource usage periodically.
bool IsFileActiveOrPurgeProtected(const std::string &)
int get_max_depth() const
void upward_propagate_usage_purged()
void upward_propagate_stats()
DirState * find_dirstate_for_lfn(const std::string &lfn)
void add_usage_purged(long long up)
void dump_recursively(const char *name)
long long upward_propagate_usage_purged()
DirState * find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs)
DirState * find_dir(const std::string &dir, bool create_subdirs)
void upward_propagate_stats()
DirState(DirState *parent)
void set_usage(long long u)
void add_up_stats(const Stats &stats)
time_t getMinTime() const
const int m_max_dir_level_for_stat_collection
void MoveListEntriesToMap()
std::vector< long long > m_dir_usage_stack
std::multimap< time_t, FS > map_t
time_t tMinUVKeepTimeStamp
void CheckFile(const char *fname, Info &info, struct stat &fstat)
std::vector< std::string > m_dir_names_stack
void setUVKeepMinTime(time_t min_time)
const size_t m_info_ext_len
static const char * m_traceID
void cd_down(const std::string &dir_name)
void TraverseNamespace(XrdOssDF *iOssDF)
void setMinTime(time_t min_time)
FPurgeState(long long iNBytesReq, XrdOss &oss)
long long getNBytesTotal() const
void begin_traversal(DirState *root, const char *root_path="/")
std::string m_current_path
Status of cached file. Can be read from and written into a binary file.
static const char * s_infoExtension
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
bool GetLatestDetachTime(time_t &t) const
Get latest detach time.
long long GetNDownloadedBytes() const
Get number of downloaded bytes.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Statistics of cache utilisation by a File object.
long long m_BytesMissed
number of bytes served from remote and cached
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
int m_Duration
total duration of all IOs attached
int m_NumIos
number of IO objects attached during this access
long long m_BytesHit
number of bytes served from disk
long long m_BytesWritten
number of bytes written to disk
long long m_RamAbsAvailable
available from configuration
long long m_diskTotalSpace
total disk space on configured partition or oss space
long long m_fileUsageBaseline
cache purge - files usage baseline
long long m_diskUsageHWM
cache purge - disk usage high water mark
bool is_uvkeep_purge_in_effect() const
bool are_file_usage_limits_set() const
long long m_fileUsageNominal
cache purge - files usage nominal
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
int m_purgeColdFilesAge
purge files older than this age
std::string m_data_space
oss space for data files
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
long long m_diskUsageLWM
cache purge - disk usage low water mark
bool is_age_based_purge_in_effect() const
std::string m_username
username passed to oss plugin
time_t m_cs_UVKeep
unverified checksum cache keep
int m_purgeInterval
sleep interval between cache purges
bool is_dir_stat_reporting_on() const
FS(const std::string &dname, const char *fname, long long n, time_t t, DirState *ds)
std::vector< const char * > m_dirs