43 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
49 const char *File::m_traceID =
"File";
53 File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
57 m_cfi(
Cache::GetInstance().GetTrace(),
Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
65 m_detach_time_logged(false),
71 m_prefetch_state(kOff),
73 m_prefetch_read_cnt(0),
74 m_prefetch_hit_cnt(0),
101 m_info_file->
Close();
103 m_info_file =
nullptr;
109 m_data_file->
Close();
111 m_data_file =
nullptr;
114 if (m_resmon_token >= 0)
122 if (sr == 0 && s.st_blocks != m_st_blocks) {
125 m_st_blocks = s.st_blocks;
133 TRACEF(
Debug,
"Close() finished, prefetch score = " << m_prefetch_score);
140 File *file =
new File(path, offset, fileSize);
166 m_in_shutdown =
true;
168 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
170 m_prefetch_state = kStopped;
171 cache()->DeRegisterPrefetchFile(
this);
174 report_and_merge_delta_stats();
181 void File::check_delta_stats()
186 report_and_merge_delta_stats();
189 void File::report_and_merge_delta_stats()
193 m_data_file->
Fstat(&s);
196 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
198 long long st_blocks_to_report = std::min((
long long) s.st_blocks, max_st_blocks_to_report);
200 m_st_blocks = st_blocks_to_report;
202 m_stats.
AddUp(m_delta_stats);
203 m_delta_stats.
Reset();
210 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
218 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
222 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
234 insert_remote_location(loc);
250 IoSet_i mi = m_io_set.find(io);
252 if (mi != m_io_set.end())
257 ", active_reads " << n_active_reads <<
258 ", active_prefetches " << io->m_active_prefetches <<
259 ", allow_prefetching " << io->m_allow_prefetching <<
260 ", ios_in_detach " << m_ios_in_detach);
262 "\tio_map.size() " << m_io_set.size() <<
263 ", block_map.size() " << m_block_map.size() <<
", file");
265 insert_remote_location(loc);
267 io->m_allow_prefetching =
false;
268 io->m_in_detach =
true;
271 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
273 if ( ! select_current_io_or_disable_prefetching(
false) )
275 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
282 bool io_active_result;
284 if (n_active_reads > 0)
286 io_active_result =
true;
288 else if (m_io_set.size() - m_ios_in_detach == 1)
290 io_active_result = ! m_block_map.empty();
294 io_active_result = io->m_active_prefetches > 0;
297 if ( ! io_active_result)
302 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
304 return io_active_result;
308 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
319 m_detach_time_logged =
false;
328 if ( ! m_in_shutdown)
330 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
332 report_and_merge_delta_stats();
334 m_detach_time_logged =
true;
336 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
340 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
352 time_t now = time(0);
357 IoSet_i mi = m_io_set.find(io);
359 if (mi == m_io_set.end())
362 io->m_attach_time = now;
365 insert_remote_location(loc);
367 if (m_prefetch_state == kStopped)
369 m_prefetch_state = kOn;
370 cache()->RegisterPrefetchFile(
this);
375 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
389 time_t now = time(0);
393 IoSet_i mi = m_io_set.find(io);
395 if (mi != m_io_set.end())
397 if (mi == m_current_io)
402 m_delta_stats.
IoDetach(now - io->m_attach_time);
406 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
408 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
409 m_prefetch_state = kStopped;
410 cache()->DeRegisterPrefetchFile(
this);
415 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
427 static const char *tpfx =
"Open() ";
429 TRACEF(Dump, tpfx <<
"entered");
440 struct stat data_stat, info_stat;
444 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
445 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
448 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
449 myEnv.
Put(
"oss.asize", size_str);
461 m_data_file = myOss.
newFile(myUser);
462 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
466 delete m_data_file; m_data_file = 0;
470 myEnv.
Put(
"oss.asize",
"64k");
476 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
480 m_info_file = myOss.
newFile(myUser);
481 if ((res = m_info_file->
Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
485 delete m_info_file; m_info_file = 0;
486 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
490 bool initialize_info_file =
true;
492 if (info_existed && m_cfi.
Read(m_info_file, ifn.c_str()))
494 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
495 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
501 initialize_info_file =
false;
503 TRACEF(
Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
515 TRACEF(
Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
516 initialize_info_file =
true;
526 if (initialize_info_file)
531 m_cfi.
Write(m_info_file, ifn.c_str());
532 m_info_file->
Fsync();
533 cache()->WriteFileSizeXAttr(m_info_file->
getFD(), m_file_size);
534 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.
GetNBlocks());
538 if (futimens(m_info_file->
getFD(), NULL)) {
547 m_prefetch_state = (m_cfi.
IsComplete()) ? kComplete : kStopped;
549 m_data_file->
Fstat(&data_stat);
550 m_st_blocks = data_stat.st_blocks;
553 constexpr
long long MB = 1024 * 1024;
554 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
574 if ((res = m_data_file->
Fstat(&sbuff)))
return res;
576 sbuff.st_size = m_file_size;
578 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
589 bool File::overlap(
int blk,
598 const long long beg = blk * blk_size;
599 const long long end = beg + blk_size;
600 const long long req_end = req_off + req_size;
602 if (req_off < end && req_end > beg)
604 const long long ovlp_beg = std::max(beg, req_off);
605 const long long ovlp_end = std::min(end, req_end);
607 off = ovlp_beg - req_off;
608 blk_off = ovlp_beg - beg;
609 size = (int) (ovlp_end - ovlp_beg);
611 assert(size <= blk_size);
622 Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
630 const long long off = i * m_block_size;
631 const int last_block = m_num_blocks - 1;
632 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
634 int blk_size, req_size;
635 if (i == last_block) {
636 blk_size = req_size = m_file_size - off;
637 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
639 blk_size = req_size = m_block_size;
643 char *buf = cache()->RequestRAM(req_size);
647 b =
new (std::nothrow)
Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
657 m_prefetch_state = kHold;
658 cache()->DeRegisterPrefetchFile(
this);
663 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
670 void File::ProcessBlockRequest(
Block *b)
678 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
680 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
696 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
698 ProcessBlockRequest(*bi);
704 void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
706 int n_chunks = ioVec.size();
709 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
710 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
720 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
725 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
727 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
729 long long rs = m_data_file->
ReadV(ioVec.data(), (
int) ioVec.size());
733 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
737 if (rs != expected_size)
739 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
758 if (m_in_shutdown || io->m_in_detach)
761 return m_in_shutdown ? -ENOENT : -EBADF;
769 int ret = m_data_file->
Read(iUserBuff, iUserOff, iUserSize);
778 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
780 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
787 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
791 if (m_in_shutdown || io->m_in_detach)
794 return m_in_shutdown ? -ENOENT : -EBADF;
811 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
816 int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
828 int prefetch_cnt = 0;
833 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
835 std::vector<XrdOucIOVec> iovec_disk;
836 std::vector<XrdOucIOVec> iovec_direct;
837 int iovec_disk_total = 0;
838 int iovec_direct_total = 0;
840 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
847 const int idx_first = iUserOff / m_block_size;
848 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
850 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
852 enum LastBlock_e { LB_other, LB_disk, LB_direct };
854 LastBlock_e lbe = LB_other;
856 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
859 BlockMap_i bi = m_block_map.find(block_idx);
866 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
869 if (bi != m_block_map.end())
871 inc_ref_count(bi->second);
872 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
874 if (bi->second->is_finished())
878 assert(bi->second->is_ok());
880 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
882 if (bi->second->m_prefetch)
893 bi->second->m_chunk_reqs.emplace_back(
ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
902 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
905 iovec_disk.back().size += size;
907 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
908 iovec_disk_total += size;
922 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
925 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
927 blks_to_request.push_back(b);
936 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
938 iovec_direct_total += size;
945 iovec_direct.back().size += size;
947 long long in_offset = block_idx * m_block_size + blk_off;
948 char *out_pos = iUserBuff + off;
955 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
964 inc_prefetch_hit_cnt(prefetch_cnt);
969 if ( ! blks_to_request.empty())
971 ProcessBlockRequests(blks_to_request);
972 blks_to_request.clear();
976 if ( ! iovec_direct.empty())
978 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
980 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
985 long long bytes_read = 0;
989 if ( ! blks_ready.empty())
991 for (
auto &bvi : blks_ready)
993 for (
auto &cr : bvi.second)
995 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
996 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
997 bytes_read += cr.m_size;
1003 if ( ! iovec_disk.empty())
1005 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1006 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
1021 m_state_cond.
Lock();
1023 for (
auto &bvi : blks_ready)
1024 dec_ref_count(bvi.first, (
int) bvi.second.size());
1038 check_delta_stats();
1048 return -EWOULDBLOCK;
1054 check_delta_stats();
1059 return error_cond ? error_cond : bytes_read;
1071 long long offset = b->
m_offset - m_offset;
1086 TRACEF(
Error,
"WriteToDisk() write error " << retval);
1088 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1098 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1101 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1103 bool schedule_sync =
false;
1124 m_writes_during_sync.push_back(blk_idx);
1129 ++m_non_flushed_cnt;
1133 schedule_sync =
true;
1135 m_non_flushed_cnt = 0;
1142 cache()->ScheduleFileSync(
this);
1152 int ret = m_data_file->
Fsync();
1153 bool errorp =
false;
1159 report_and_merge_delta_stats();
1160 loc_stats = m_stats;
1163 m_cfi.
Write(m_info_file, m_filename.c_str());
1164 int cret = m_info_file->
Fsync();
1167 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1173 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1179 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1186 m_writes_during_sync.clear();
1192 int written_while_in_sync;
1193 bool resync =
false;
1196 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1200 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1201 m_writes_during_sync.clear();
1205 if (written_while_in_sync > 0 && m_cfi.
IsComplete() && ! m_in_shutdown)
1210 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1221 void File::free_block(
Block* b)
1224 int i = b->
m_offset / m_block_size;
1225 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1226 size_t ret = m_block_map.erase(i);
1230 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1240 m_prefetch_state = kOn;
1241 cache()->RegisterPrefetchFile(
this);
1247 bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1251 int io_size = (int) m_io_set.size();
1256 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1259 m_current_io = m_io_set.begin();
1262 else if (io_size > 1)
1264 IoSet_i mi = m_current_io;
1265 if (skip_current && mi != m_io_set.end()) ++mi;
1267 for (
int i = 0; i < io_size; ++i)
1269 if (mi == m_io_set.end()) mi = m_io_set.begin();
1271 if ((*mi)->m_allow_prefetching)
1283 m_current_io = m_io_set.end();
1284 m_prefetch_state = kStopped;
1285 cache()->DeRegisterPrefetchFile(
this);
1293 void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1299 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1301 m_state_cond.
Lock();
1317 FinalizeReadRequest(rreq);
1344 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1347 m_state_cond.
Lock();
1352 rreq->m_stats.m_BytesMissed += creq.
m_size;
1354 rreq->m_stats.m_BytesHit += creq.
m_size;
1356 --rreq->m_n_chunk_reqs;
1359 inc_prefetch_hit_cnt(1);
1363 bool rreq_complete = rreq->is_complete();
1368 FinalizeReadRequest(rreq);
1378 check_delta_stats();
1385 void File::ProcessBlockResponse(
Block *b,
int res)
1387 static const char* tpfx =
"ProcessBlockResponse ";
1389 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1391 if (res >= 0 && res != b->
get_size())
1395 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1399 m_state_cond.
Lock();
1405 IoSet_i mi = m_io_set.find(io);
1406 if (mi != m_io_set.end())
1408 --io->m_active_prefetches;
1411 if (res < 0 && io->m_allow_prefetching)
1413 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1414 io->m_allow_prefetching =
false;
1417 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1419 if ( ! select_current_io_or_disable_prefetching(
false) )
1421 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1427 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1444 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1445 if ( ! m_in_shutdown)
1451 cache()->AddWriteTask(b,
true);
1460 for (
auto &creq : creqs_to_notify)
1462 ProcessBlockSuccess(b, creq);
1471 <<
", io=" << b->
get_io() <<
", error=" << res);
1476 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1477 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1488 std::list<ReadRequest*> rreqs_to_complete;
1497 ProcessBlockError(b, rreq);
1500 rreqs_to_complete.push_back(rreq);
1505 creqs_to_keep.push_back(creq);
1509 bool reissue =
false;
1510 if ( ! creqs_to_keep.empty())
1512 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1514 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1515 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1524 for (
auto rreq : rreqs_to_complete)
1525 FinalizeReadRequest(rreq);
1528 ProcessBlockRequest(b);
1536 return m_filename.c_str();
1541 int File::offsetIdx(
int iIdx)
const
1543 return iIdx - m_offset/m_block_size;
1557 TRACEF(DumpXL,
"Prefetch() entering.");
1561 if (m_prefetch_state != kOn)
1566 if ( ! select_current_io_or_disable_prefetching(
true) )
1568 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1573 for (
int f = 0; f < m_num_blocks; ++f)
1577 int f_act = f + m_offset / m_block_size;
1579 BlockMap_i bi = m_block_map.find(f_act);
1580 if (bi == m_block_map.end())
1582 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1585 TRACEF(Dump,
"Prefetch take block " << f_act);
1589 inc_prefetch_read_cnt(1);
1594 TRACEF(
Warning,
"Prefetch allocation failed for block " << f_act);
1603 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1604 m_prefetch_state = kComplete;
1605 cache()->DeRegisterPrefetchFile(
this);
1609 (*m_current_io)->m_active_prefetches += (int) blks.size();
1613 if ( ! blks.empty())
1615 ProcessBlockRequests(blks);
1624 return m_prefetch_score;
1637 void File::insert_remote_location(
const std::string &loc)
1641 size_t p = loc.find_first_of(
'@');
1642 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1649 if ( ! m_remote_locations.empty())
1653 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1657 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1660 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1662 s +=
'"'; s += *i; s +=
'"';
1663 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
int stat(const char *path, struct stat *buf)
const char * XrdSysE2T(int errcode)
virtual int Ftruncate(unsigned long long flen)
virtual int Fstat(struct stat *buf)
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
void * get_req_id() const
long long get_offset() const
vChunkRequest_t m_chunk_reqs
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
static Cache & GetInstance()
Singleton access.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
void AddWriteStats(long long bytes_written, int n_cks_errs)
long long BytesReadAndWritten() const
void AddBytesHit(long long bh)
long long m_BytesHit
number of bytes served from disk
long long m_BytesWritten
number of bytes written to disk
void IoDetach(int duration)
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * > BlockList_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
prefetch buffer size, default 1MB
std::string m_meta_space
oss space for metadata files (cinfo)
std::string m_username
username passed to oss plugin
void update_error_cond(int ec)