45 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
51 const char *File::m_traceID =
"File";
55 File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
59 m_cfi(
Cache::GetInstance().GetTrace(),
Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
62 m_file_size(iFileSize),
63 m_current_io(m_io_set.end()),
67 m_detach_time_logged(false),
73 m_prefetch_state(kOff),
75 m_prefetch_read_cnt(0),
76 m_prefetch_hit_cnt(0),
87 m_info_file =
nullptr;
95 m_data_file =
nullptr;
98 if (m_resmon_token >= 0)
104 TRACEF(
Debug,
"~File() ended, prefetch score = " << m_prefetch_score);
111 File *file =
new File(path, offset, fileSize);
139 m_in_shutdown =
true;
141 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
143 m_prefetch_state = kStopped;
144 cache()->DeRegisterPrefetchFile(
this);
151 void File::check_delta_stats()
155 if (m_delta_stats.
BytesRead() >= m_resmon_report_threshold)
156 report_and_merge_delta_stats();
159 void File::report_and_merge_delta_stats()
163 m_data_file->
Fstat(&s);
165 m_st_blocks = s.st_blocks;
167 m_stats.
AddUp(m_delta_stats);
168 m_delta_stats.
Reset();
175 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
183 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
187 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
199 insert_remote_location(loc);
215 IoSet_i mi = m_io_set.find(io);
217 if (mi != m_io_set.end())
222 ", active_reads " << n_active_reads <<
223 ", active_prefetches " << io->m_active_prefetches <<
224 ", allow_prefetching " << io->m_allow_prefetching <<
225 ", ios_in_detach " << m_ios_in_detach);
227 "\tio_map.size() " << m_io_set.size() <<
228 ", block_map.size() " << m_block_map.size() <<
", file");
230 insert_remote_location(loc);
232 io->m_allow_prefetching =
false;
233 io->m_in_detach =
true;
236 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
238 if ( ! select_current_io_or_disable_prefetching(
false) )
240 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
247 bool io_active_result;
249 if (n_active_reads > 0)
251 io_active_result =
true;
253 else if (m_io_set.size() - m_ios_in_detach == 1)
255 io_active_result = ! m_block_map.empty();
259 io_active_result = io->m_active_prefetches > 0;
262 if ( ! io_active_result)
267 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
269 return io_active_result;
273 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
284 m_detach_time_logged =
false;
293 if ( ! m_in_shutdown)
295 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
297 report_and_merge_delta_stats();
299 m_detach_time_logged =
true;
301 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
305 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
317 time_t now = time(0);
322 IoSet_i mi = m_io_set.find(io);
324 if (mi == m_io_set.end())
327 io->m_attach_time = now;
330 insert_remote_location(loc);
332 if (m_prefetch_state == kStopped)
334 m_prefetch_state = kOn;
335 cache()->RegisterPrefetchFile(
this);
340 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
354 time_t now = time(0);
358 IoSet_i mi = m_io_set.find(io);
360 if (mi != m_io_set.end())
362 if (mi == m_current_io)
367 m_delta_stats.
IoDetach(now - io->m_attach_time);
371 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
373 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
374 m_prefetch_state = kStopped;
375 cache()->DeRegisterPrefetchFile(
this);
380 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
392 static const char *tpfx =
"Open() ";
394 TRACEF(Dump, tpfx <<
"entered");
405 struct stat data_stat, info_stat;
409 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
410 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
413 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
414 myEnv.
Put(
"oss.asize", size_str);
426 m_data_file = myOss.
newFile(myUser);
427 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
431 delete m_data_file; m_data_file = 0;
435 myEnv.
Put(
"oss.asize",
"64k");
441 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
445 m_info_file = myOss.
newFile(myUser);
446 if ((res = m_info_file->
Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
450 delete m_info_file; m_info_file = 0;
451 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
455 bool initialize_info_file =
true;
457 if (info_existed && m_cfi.
Read(m_info_file, ifn.c_str()))
459 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
460 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
466 initialize_info_file =
false;
468 TRACEF(
Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
480 TRACEF(
Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
481 initialize_info_file =
true;
491 if (initialize_info_file)
496 m_cfi.
Write(m_info_file, ifn.c_str());
497 m_info_file->
Fsync();
498 cache()->WriteFileSizeXAttr(m_info_file->
getFD(), m_file_size);
499 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.
GetNBlocks());
503 if (futimens(m_info_file->
getFD(), NULL)) {
512 m_prefetch_state = (m_cfi.
IsComplete()) ? kComplete : kStopped;
514 m_data_file->
Fstat(&data_stat);
515 m_st_blocks = data_stat.st_blocks;
518 m_resmon_report_threshold = std::min(std::max(200ll * 1024, m_file_size / 50), 500ll * 1024 * 1024);
538 if ((res = m_data_file->
Fstat(&sbuff)))
return res;
540 sbuff.st_size = m_file_size;
542 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
553 bool File::overlap(
int blk,
562 const long long beg = blk * blk_size;
563 const long long end = beg + blk_size;
564 const long long req_end = req_off + req_size;
566 if (req_off < end && req_end > beg)
568 const long long ovlp_beg = std::max(beg, req_off);
569 const long long ovlp_end = std::min(end, req_end);
571 off = ovlp_beg - req_off;
572 blk_off = ovlp_beg - beg;
573 size = (int) (ovlp_end - ovlp_beg);
575 assert(size <= blk_size);
586 Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
594 const long long off = i * m_block_size;
595 const int last_block = m_num_blocks - 1;
596 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
598 int blk_size, req_size;
599 if (i == last_block) {
600 blk_size = req_size = m_file_size - off;
601 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
603 blk_size = req_size = m_block_size;
607 char *buf = cache()->RequestRAM(req_size);
611 b =
new (std::nothrow)
Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
621 m_prefetch_state = kHold;
622 cache()->DeRegisterPrefetchFile(
this);
627 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
634 void File::ProcessBlockRequest(
Block *b)
642 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
644 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
660 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
662 ProcessBlockRequest(*bi);
668 void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
670 int n_chunks = ioVec.size();
673 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
674 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
684 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
689 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
691 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
693 long long rs = m_data_file->
ReadV(ioVec.data(), (
int) ioVec.size());
697 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
701 if (rs != expected_size)
703 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
722 if (m_in_shutdown || io->m_in_detach)
725 return m_in_shutdown ? -ENOENT : -EBADF;
733 int ret = m_data_file->
Read(iUserBuff, iUserOff, iUserSize);
742 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
744 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
751 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
755 if (m_in_shutdown || io->m_in_detach)
758 return m_in_shutdown ? -ENOENT : -EBADF;
775 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
780 int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
792 int prefetch_cnt = 0;
797 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
799 std::vector<XrdOucIOVec> iovec_disk;
800 std::vector<XrdOucIOVec> iovec_direct;
801 int iovec_disk_total = 0;
802 int iovec_direct_total = 0;
804 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
811 const int idx_first = iUserOff / m_block_size;
812 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
814 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
816 enum LastBlock_e { LB_other, LB_disk, LB_direct };
818 LastBlock_e lbe = LB_other;
820 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
823 BlockMap_i bi = m_block_map.find(block_idx);
830 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
833 if (bi != m_block_map.end())
835 inc_ref_count(bi->second);
836 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
838 if (bi->second->is_finished())
842 assert(bi->second->is_ok());
844 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
846 if (bi->second->m_prefetch)
857 bi->second->m_chunk_reqs.emplace_back(
ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
866 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
869 iovec_disk.back().size += size;
871 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
872 iovec_disk_total += size;
886 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
889 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
891 blks_to_request.push_back(b);
900 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
902 iovec_direct_total += size;
909 iovec_direct.back().size += size;
911 long long in_offset = block_idx * m_block_size + blk_off;
912 char *out_pos = iUserBuff + off;
919 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
928 inc_prefetch_hit_cnt(prefetch_cnt);
933 if ( ! blks_to_request.empty())
935 ProcessBlockRequests(blks_to_request);
936 blks_to_request.clear();
940 if ( ! iovec_direct.empty())
942 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
944 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
949 long long bytes_read = 0;
953 if ( ! blks_ready.empty())
955 for (
auto &bvi : blks_ready)
957 for (
auto &cr : bvi.second)
959 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
960 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
961 bytes_read += cr.m_size;
967 if ( ! iovec_disk.empty())
969 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
970 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
987 for (
auto &bvi : blks_ready)
988 dec_ref_count(bvi.first, (
int) bvi.second.size());
1002 check_delta_stats();
1012 return -EWOULDBLOCK;
1018 check_delta_stats();
1023 return error_cond ? error_cond : bytes_read;
1035 long long offset = b->
m_offset - m_offset;
1055 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1065 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1068 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1070 bool schedule_sync =
false;
1091 m_writes_during_sync.push_back(blk_idx);
1096 ++m_non_flushed_cnt;
1100 schedule_sync =
true;
1102 m_non_flushed_cnt = 0;
1109 cache()->ScheduleFileSync(
this);
1119 int ret = m_data_file->
Fsync();
1120 bool errorp =
false;
1126 report_and_merge_delta_stats();
1127 loc_stats = m_stats;
1130 m_cfi.
Write(m_info_file, m_filename.c_str());
1131 int cret = m_info_file->
Fsync();
1134 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1140 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1146 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1153 m_writes_during_sync.clear();
1159 int written_while_in_sync;
1160 bool resync =
false;
1163 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1167 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1168 m_writes_during_sync.clear();
1172 if (written_while_in_sync > 0 && m_cfi.
IsComplete() && ! m_in_shutdown)
1177 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1188 void File::free_block(
Block* b)
1191 int i = b->
m_offset / m_block_size;
1192 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1193 size_t ret = m_block_map.erase(i);
1197 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1207 m_prefetch_state = kOn;
1208 cache()->RegisterPrefetchFile(
this);
1214 bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1218 int io_size = (int) m_io_set.size();
1223 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1226 m_current_io = m_io_set.begin();
1229 else if (io_size > 1)
1231 IoSet_i mi = m_current_io;
1232 if (skip_current && mi != m_io_set.end()) ++mi;
1234 for (
int i = 0; i < io_size; ++i)
1236 if (mi == m_io_set.end()) mi = m_io_set.begin();
1238 if ((*mi)->m_allow_prefetching)
1250 m_current_io = m_io_set.end();
1251 m_prefetch_state = kStopped;
1252 cache()->DeRegisterPrefetchFile(
this);
1260 void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1266 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1268 m_state_cond.
Lock();
1284 FinalizeReadRequest(rreq);
1311 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1314 m_state_cond.
Lock();
1319 rreq->m_stats.m_BytesMissed += creq.
m_size;
1321 rreq->m_stats.m_BytesHit += creq.
m_size;
1323 --rreq->m_n_chunk_reqs;
1326 inc_prefetch_hit_cnt(1);
1330 bool rreq_complete = rreq->is_complete();
1335 FinalizeReadRequest(rreq);
1345 check_delta_stats();
1352 void File::ProcessBlockResponse(
Block *b,
int res)
1354 static const char* tpfx =
"ProcessBlockResponse ";
1356 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1358 if (res >= 0 && res != b->
get_size())
1362 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1366 m_state_cond.
Lock();
1372 IoSet_i mi = m_io_set.find(io);
1373 if (mi != m_io_set.end())
1375 --io->m_active_prefetches;
1378 if (res < 0 && io->m_allow_prefetching)
1380 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1381 io->m_allow_prefetching =
false;
1384 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1386 if ( ! select_current_io_or_disable_prefetching(
false) )
1388 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1394 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1411 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1412 if ( ! m_in_shutdown)
1418 cache()->AddWriteTask(b,
true);
1427 for (
auto &creq : creqs_to_notify)
1429 ProcessBlockSuccess(b, creq);
1438 <<
", io=" << b->
get_io() <<
", error=" << res);
1443 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1444 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1455 std::list<ReadRequest*> rreqs_to_complete;
1464 ProcessBlockError(b, rreq);
1467 rreqs_to_complete.push_back(rreq);
1472 creqs_to_keep.push_back(creq);
1476 bool reissue =
false;
1477 if ( ! creqs_to_keep.empty())
1479 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1481 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1482 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1491 for (
auto rreq : rreqs_to_complete)
1492 FinalizeReadRequest(rreq);
1495 ProcessBlockRequest(b);
1503 return m_filename.c_str();
1508 int File::offsetIdx(
int iIdx)
const
1510 return iIdx - m_offset/m_block_size;
1524 TRACEF(DumpXL,
"Prefetch() entering.");
1528 if (m_prefetch_state != kOn)
1533 if ( ! select_current_io_or_disable_prefetching(
true) )
1535 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1540 for (
int f = 0; f < m_num_blocks; ++f)
1544 int f_act = f + m_offset / m_block_size;
1546 BlockMap_i bi = m_block_map.find(f_act);
1547 if (bi == m_block_map.end())
1549 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1552 TRACEF(Dump,
"Prefetch take block " << f_act);
1556 inc_prefetch_read_cnt(1);
1561 TRACEF(
Warning,
"Prefetch allocation failed for block " << f_act);
1570 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1571 m_prefetch_state = kComplete;
1572 cache()->DeRegisterPrefetchFile(
this);
1576 (*m_current_io)->m_active_prefetches += (int) blks.size();
1580 if ( ! blks.empty())
1582 ProcessBlockRequests(blks);
1591 return m_prefetch_score;
1604 void File::insert_remote_location(
const std::string &loc)
1608 size_t p = loc.find_first_of(
'@');
1609 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1616 if ( ! m_remote_locations.empty())
1620 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1624 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1627 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1629 s +=
'"'; s += *i; s +=
'"';
1630 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
int stat(const char *path, struct stat *buf)
const char * XrdSysE2T(int errcode)
virtual int Ftruncate(unsigned long long flen)
virtual int Fstat(struct stat *buf)
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
void * get_req_id() const
long long get_offset() const
vChunkRequest_t m_chunk_reqs
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
static Cache & GetInstance()
Singleton access.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
const std::string & GetLocalPath() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
void AddWriteStats(long long bytes_written, int n_cks_errs)
void AddBytesHit(long long bh)
long long BytesRead() const
long long m_BytesHit
number of bytes served from disk
void IoDetach(int duration)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * > BlockList_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
prefetch buffer size, default 1MB
std::string m_meta_space
oss space for metadata files (cinfo)
std::string m_username
username passed to oss plugin
void update_error_cond(int ec)