43 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
49 const char *File::m_traceID =
"File";
53 File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
57 m_cfi(
Cache::GetInstance().GetTrace(),
Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
65 m_detach_time_logged(false),
71 m_prefetch_state(kOff),
73 m_prefetch_read_cnt(0),
74 m_prefetch_hit_cnt(0),
101 m_info_file->
Close();
103 m_info_file =
nullptr;
109 m_data_file->
Close();
111 m_data_file =
nullptr;
114 if (m_resmon_token >= 0)
122 if (sr == 0 && s.st_blocks != m_st_blocks) {
125 m_st_blocks = s.st_blocks;
133 TRACEF(
Debug,
"Close() finished, prefetch score = " << m_prefetch_score);
140 File *file =
new File(path, offset, fileSize);
166 m_in_shutdown =
true;
168 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
170 m_prefetch_state = kStopped;
171 cache()->DeRegisterPrefetchFile(
this);
174 report_and_merge_delta_stats();
181 void File::check_delta_stats()
186 report_and_merge_delta_stats();
189 void File::report_and_merge_delta_stats()
193 m_data_file->
Fstat(&s);
196 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
198 long long st_blocks_to_report = std::min((
long long) s.st_blocks, max_st_blocks_to_report);
200 m_st_blocks = st_blocks_to_report;
202 m_stats.
AddUp(m_delta_stats);
203 m_delta_stats.
Reset();
210 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
218 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
222 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
234 insert_remote_location(loc);
250 IoSet_i mi = m_io_set.find(io);
252 if (mi != m_io_set.end())
257 ", active_reads " << n_active_reads <<
258 ", active_prefetches " << io->m_active_prefetches <<
259 ", allow_prefetching " << io->m_allow_prefetching <<
260 ", ios_in_detach " << m_ios_in_detach);
262 "\tio_map.size() " << m_io_set.size() <<
263 ", block_map.size() " << m_block_map.size() <<
", file");
265 insert_remote_location(loc);
267 io->m_allow_prefetching =
false;
268 io->m_in_detach =
true;
271 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
273 if ( ! select_current_io_or_disable_prefetching(
false) )
275 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
282 bool io_active_result;
284 if (n_active_reads > 0)
286 io_active_result =
true;
288 else if (m_io_set.size() - m_ios_in_detach == 1)
290 io_active_result = ! m_block_map.empty();
294 io_active_result = io->m_active_prefetches > 0;
297 if ( ! io_active_result)
302 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
304 return io_active_result;
308 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
319 m_detach_time_logged =
false;
328 if ( ! m_in_shutdown)
330 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
332 report_and_merge_delta_stats();
334 m_detach_time_logged =
true;
336 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
340 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
352 time_t now = time(0);
357 IoSet_i mi = m_io_set.find(io);
359 if (mi == m_io_set.end())
362 io->m_attach_time = now;
365 insert_remote_location(loc);
367 if (m_prefetch_state == kStopped)
369 m_prefetch_state = kOn;
370 cache()->RegisterPrefetchFile(
this);
375 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
389 time_t now = time(0);
393 IoSet_i mi = m_io_set.find(io);
395 if (mi != m_io_set.end())
397 if (mi == m_current_io)
402 m_delta_stats.
IoDetach(now - io->m_attach_time);
406 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
408 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
409 m_prefetch_state = kStopped;
410 cache()->DeRegisterPrefetchFile(
this);
415 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
427 static const char *tpfx =
"Open() ";
429 TRACEF(Dump, tpfx <<
"entered");
440 struct stat data_stat, info_stat;
444 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
445 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
448 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
449 myEnv.
Put(
"oss.asize", size_str);
461 m_data_file = myOss.
newFile(myUser);
462 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
466 delete m_data_file; m_data_file = 0;
470 myEnv.
Put(
"oss.asize",
"64k");
476 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
480 m_info_file = myOss.
newFile(myUser);
481 if ((res = m_info_file->
Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
485 delete m_info_file; m_info_file = 0;
486 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
490 bool initialize_info_file =
true;
492 if (info_existed && m_cfi.
Read(m_info_file, ifn.c_str()))
494 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
495 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
501 initialize_info_file =
false;
503 TRACEF(
Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
515 TRACEF(
Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
516 initialize_info_file =
true;
526 if (initialize_info_file)
531 m_cfi.
Write(m_info_file, ifn.c_str());
532 m_info_file->
Fsync();
533 cache()->WriteFileSizeXAttr(m_info_file->
getFD(), m_file_size);
534 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.
GetNBlocks());
538 if (futimens(m_info_file->
getFD(), NULL)) {
547 m_prefetch_state = (m_cfi.
IsComplete()) ? kComplete : kStopped;
549 m_data_file->
Fstat(&data_stat);
550 m_st_blocks = data_stat.st_blocks;
553 constexpr
long long MB = 1024 * 1024;
554 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
574 if ((res = m_data_file->
Fstat(&sbuff)))
return res;
576 sbuff.st_size = m_file_size;
578 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
589 bool File::overlap(
int blk,
598 const long long beg = blk * blk_size;
599 const long long end = beg + blk_size;
600 const long long req_end = req_off + req_size;
602 if (req_off < end && req_end > beg)
604 const long long ovlp_beg = std::max(beg, req_off);
605 const long long ovlp_end = std::min(end, req_end);
607 off = ovlp_beg - req_off;
608 blk_off = ovlp_beg - beg;
609 size = (int) (ovlp_end - ovlp_beg);
611 assert(size <= blk_size);
622 Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
630 const long long off = i * m_block_size;
631 const int last_block = m_num_blocks - 1;
632 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
634 int blk_size, req_size;
635 if (i == last_block) {
636 blk_size = req_size = m_file_size - off;
637 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
639 blk_size = req_size = m_block_size;
643 char *buf = cache()->RequestRAM(req_size);
647 b =
new (std::nothrow)
Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
657 m_prefetch_state = kHold;
658 cache()->DeRegisterPrefetchFile(
this);
663 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
670 void File::ProcessBlockRequest(
Block *b)
678 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
680 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
696 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
698 ProcessBlockRequest(*bi);
704 void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
706 int n_chunks = ioVec.size();
709 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
710 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
720 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
725 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
727 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
729 long long rs = m_data_file->
ReadV(ioVec.data(), (
int) ioVec.size());
733 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
737 if (rs != expected_size)
739 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
758 if (m_in_shutdown || io->m_in_detach)
761 return m_in_shutdown ? -ENOENT : -EBADF;
769 int ret = m_data_file->
Read(iUserBuff, iUserOff, iUserSize);
778 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
780 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
787 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
791 if (m_in_shutdown || io->m_in_detach)
794 return m_in_shutdown ? -ENOENT : -EBADF;
811 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
816 int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
828 int prefetch_cnt = 0;
833 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
835 std::vector<XrdOucIOVec> iovec_disk;
836 std::vector<XrdOucIOVec> iovec_direct;
837 int iovec_disk_total = 0;
838 int iovec_direct_total = 0;
840 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
847 const int idx_first = iUserOff / m_block_size;
848 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
850 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
852 enum LastBlock_e { LB_other, LB_disk, LB_direct };
854 LastBlock_e lbe = LB_other;
856 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
859 BlockMap_i bi = m_block_map.find(block_idx);
866 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
869 if (bi != m_block_map.end())
871 inc_ref_count(bi->second);
872 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
874 if (bi->second->is_finished())
878 assert(bi->second->is_ok());
880 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
882 if (bi->second->m_prefetch)
893 bi->second->m_chunk_reqs.emplace_back(
ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
902 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
905 iovec_disk.back().size += size;
907 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
908 iovec_disk_total += size;
922 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
925 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
927 blks_to_request.push_back(b);
936 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
938 iovec_direct_total += size;
945 iovec_direct.back().size += size;
947 long long in_offset = block_idx * m_block_size + blk_off;
948 char *out_pos = iUserBuff + off;
955 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
964 inc_prefetch_hit_cnt(prefetch_cnt);
969 if ( ! blks_to_request.empty())
971 ProcessBlockRequests(blks_to_request);
972 blks_to_request.clear();
976 if ( ! iovec_direct.empty())
978 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
980 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
985 long long bytes_read = 0;
989 if ( ! blks_ready.empty())
991 for (
auto &bvi : blks_ready)
993 for (
auto &cr : bvi.second)
995 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
996 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
997 bytes_read += cr.m_size;
1003 if ( ! iovec_disk.empty())
1005 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1006 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
1021 m_state_cond.
Lock();
1023 for (
auto &bvi : blks_ready)
1024 dec_ref_count(bvi.first, (
int) bvi.second.size());
1038 check_delta_stats();
1048 return -EWOULDBLOCK;
1054 check_delta_stats();
1059 return error_cond ? error_cond : bytes_read;
1071 long long offset = b->
m_offset - m_offset;
1086 TRACEF(
Error,
"WriteToDisk() write error " << retval);
1088 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1098 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1101 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1103 bool schedule_sync =
false;
1122 m_writes_during_sync.push_back(blk_idx);
1127 ++m_non_flushed_cnt;
1131 schedule_sync =
true;
1133 m_non_flushed_cnt = 0;
1139 if (!schedule_sync) {
1146 cache()->ScheduleFileSync(
this);
1158 int ret = m_data_file->
Fsync();
1159 bool errorp =
false;
1165 report_and_merge_delta_stats();
1166 loc_stats = m_stats;
1169 m_cfi.
Write(m_info_file, m_filename.c_str());
1170 int cret = m_info_file->
Fsync();
1173 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1179 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1185 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1192 m_writes_during_sync.clear();
1198 int written_while_in_sync;
1199 bool resync =
false;
1202 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1206 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1207 m_writes_during_sync.clear();
1211 if (written_while_in_sync > 0 && m_cfi.
IsComplete() && ! m_in_shutdown)
1216 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1227 void File::free_block(
Block* b)
1230 int i = b->
m_offset / m_block_size;
1231 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1232 size_t ret = m_block_map.erase(i);
1236 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1246 m_prefetch_state = kOn;
1247 cache()->RegisterPrefetchFile(
this);
1253 bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1257 int io_size = (int) m_io_set.size();
1262 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1265 m_current_io = m_io_set.begin();
1268 else if (io_size > 1)
1270 IoSet_i mi = m_current_io;
1271 if (skip_current && mi != m_io_set.end()) ++mi;
1273 for (
int i = 0; i < io_size; ++i)
1275 if (mi == m_io_set.end()) mi = m_io_set.begin();
1277 if ((*mi)->m_allow_prefetching)
1289 m_current_io = m_io_set.end();
1290 m_prefetch_state = kStopped;
1291 cache()->DeRegisterPrefetchFile(
this);
1299 void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1305 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1307 m_state_cond.
Lock();
1323 FinalizeReadRequest(rreq);
1350 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1353 m_state_cond.
Lock();
1358 rreq->m_stats.m_BytesMissed += creq.
m_size;
1360 rreq->m_stats.m_BytesHit += creq.
m_size;
1362 --rreq->m_n_chunk_reqs;
1365 inc_prefetch_hit_cnt(1);
1369 bool rreq_complete = rreq->is_complete();
1374 FinalizeReadRequest(rreq);
1384 check_delta_stats();
1391 void File::ProcessBlockResponse(
Block *b,
int res)
1393 static const char* tpfx =
"ProcessBlockResponse ";
1395 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1397 if (res >= 0 && res != b->
get_size())
1401 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1405 m_state_cond.
Lock();
1411 IoSet_i mi = m_io_set.find(io);
1412 if (mi != m_io_set.end())
1414 --io->m_active_prefetches;
1417 if (res < 0 && io->m_allow_prefetching)
1419 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1420 io->m_allow_prefetching =
false;
1423 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1425 if ( ! select_current_io_or_disable_prefetching(
false) )
1427 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1433 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1450 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1451 if ( ! m_in_shutdown)
1457 cache()->AddWriteTask(b,
true);
1466 for (
auto &creq : creqs_to_notify)
1468 ProcessBlockSuccess(b, creq);
1477 <<
", io=" << b->
get_io() <<
", error=" << res);
1482 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1483 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1494 std::list<ReadRequest*> rreqs_to_complete;
1503 ProcessBlockError(b, rreq);
1506 rreqs_to_complete.push_back(rreq);
1511 creqs_to_keep.push_back(creq);
1515 bool reissue =
false;
1516 if ( ! creqs_to_keep.empty())
1518 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1520 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1521 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1530 for (
auto rreq : rreqs_to_complete)
1531 FinalizeReadRequest(rreq);
1534 ProcessBlockRequest(b);
1542 return m_filename.c_str();
1547 int File::offsetIdx(
int iIdx)
const
1549 return iIdx - m_offset/m_block_size;
1563 TRACEF(DumpXL,
"Prefetch() entering.");
1567 if (m_prefetch_state != kOn)
1572 if ( ! select_current_io_or_disable_prefetching(
true) )
1574 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1579 for (
int f = 0; f < m_num_blocks; ++f)
1583 int f_act = f + m_offset / m_block_size;
1585 BlockMap_i bi = m_block_map.find(f_act);
1586 if (bi == m_block_map.end())
1588 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1591 TRACEF(Dump,
"Prefetch take block " << f_act);
1595 inc_prefetch_read_cnt(1);
1600 TRACEF(
Warning,
"Prefetch allocation failed for block " << f_act);
1609 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1610 m_prefetch_state = kComplete;
1611 cache()->DeRegisterPrefetchFile(
this);
1615 (*m_current_io)->m_active_prefetches += (int) blks.size();
1619 if ( ! blks.empty())
1621 ProcessBlockRequests(blks);
1630 return m_prefetch_score;
1643 void File::insert_remote_location(
const std::string &loc)
1647 size_t p = loc.find_first_of(
'@');
1648 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1655 if ( ! m_remote_locations.empty())
1659 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1663 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1666 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1668 s +=
'"'; s += *i; s +=
'"';
1669 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
int stat(const char *path, struct stat *buf)
const char * XrdSysE2T(int errcode)
virtual int Ftruncate(unsigned long long flen)
virtual int Fstat(struct stat *buf)
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
void * get_req_id() const
long long get_offset() const
vChunkRequest_t m_chunk_reqs
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
static Cache & GetInstance()
Singleton access.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
void AddWriteStats(long long bytes_written, int n_cks_errs)
long long BytesReadAndWritten() const
void AddBytesHit(long long bh)
long long m_BytesHit
number of bytes served from disk
long long m_BytesWritten
number of bytes written to disk
void IoDetach(int duration)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * > BlockList_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
prefetch buffer size, default 1MB
std::string m_meta_space
oss space for metadata files (cinfo)
std::string m_username
username passed to oss plugin
void update_error_cond(int ec)