XRootD
TPC::Stream Class Reference

#include <XrdTpcStream.hh>

+ Collaboration diagram for TPC::Stream:

Public Member Functions

 Stream (std::unique_ptr< XrdSfsFile > fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
 
 ~Stream ()
 
size_t AvailableBuffers () const
 
void DumpBuffers () const
 
bool Finalize ()
 
std::string GetErrorMessage () const
 
int Read (off_t offset, char *buffer, size_t size)
 
int Stat (struct stat *)
 
ssize_t Write (off_t offset, const char *buffer, size_t size, bool force)
 

Detailed Description

Definition at line 22 of file XrdTpcStream.hh.

Constructor & Destructor Documentation

◆ Stream()

TPC::Stream::Stream ( std::unique_ptr< XrdSfsFile fh,
size_t  max_blocks,
size_t  buffer_size,
XrdSysError log 
)
inline

Definition at line 24 of file XrdTpcStream.hh.

25  : m_open_for_write(false),
26  m_avail_count(max_blocks),
27  m_fh(std::move(fh)),
28  m_offset(0),
29  m_log(log)
30  {
31  m_buffers.reserve(max_blocks);
32  for (size_t idx=0; idx < max_blocks; idx++) {
33  m_buffers.push_back(new Entry(buffer_size));
34  }
35  m_open_for_write = true;
36  }

◆ ~Stream()

Stream::~Stream ( )

Definition at line 11 of file XrdTpcStream.cc.

12 {
13  for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
14  buffer_iter != m_buffers.end();
15  buffer_iter++) {
16  delete *buffer_iter;
17  *buffer_iter = NULL;
18  }
19  m_fh->close();
20 }

Member Function Documentation

◆ AvailableBuffers()

size_t TPC::Stream::AvailableBuffers ( ) const
inline

Definition at line 56 of file XrdTpcStream.hh.

56 {return m_avail_count;}

Referenced by TPC::State::AvailableBuffers().

+ Here is the caller graph for this function:

◆ DumpBuffers()

void Stream::DumpBuffers ( ) const

Definition at line 185 of file XrdTpcStream.cc.

186 {
187  m_log.Emsg("Stream::DumpBuffers", "Beginning dump of stream buffers.");
188  {
189  std::stringstream ss;
190  ss << "Stream offset: " << m_offset;
191  m_log.Emsg("Stream::DumpBuffers", ss.str().c_str());
192  }
193  size_t idx = 0;
194  for (std::vector<Entry*>::const_iterator entry_iter = m_buffers.begin();
195  entry_iter!= m_buffers.end();
196  entry_iter++) {
197  std::stringstream ss;
198  ss << "Buffer " << idx << ": Offset=" << (*entry_iter)->GetOffset() << ", Size="
199  << (*entry_iter)->GetSize() << ", Capacity=" << (*entry_iter)->GetCapacity();
200  m_log.Emsg("Stream::DumpBuffers", ss.str().c_str());
201  idx ++;
202  }
203  m_log.Emsg("Stream::DumpBuffers", "Finish dump of stream buffers.");
204 }
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95

References XrdSysError::Emsg().

Referenced by TPC::State::DumpBuffers(), and Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Finalize()

bool Stream::Finalize ( )

Definition at line 24 of file XrdTpcStream.cc.

25 {
26  // Do not close twice
27  if (!m_open_for_write) {
28  return false;
29  }
30  m_open_for_write = false;
31 
32  for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
33  buffer_iter != m_buffers.end();
34  buffer_iter++) {
35  delete *buffer_iter;
36  *buffer_iter = NULL;
37  }
38 
39  if (m_fh->close() == SFS_ERROR) {
40  std::stringstream ss;
41  const char *msg = m_fh->error.getErrText();
42  if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
43  ss << "Failure when closing file handle: " << msg << " (code=" << m_fh->error.getErrInfo() << ")";
44  m_error_buf = ss.str();
45  return false;
46  }
47 
48  // If there are outstanding buffers to reorder, finalization failed
49  return m_avail_count == m_buffers.size();
50 }
#define SFS_ERROR

References SFS_ERROR.

Referenced by TPC::State::Finalize().

+ Here is the caller graph for this function:

◆ GetErrorMessage()

std::string TPC::Stream::GetErrorMessage ( ) const
inline

Definition at line 69 of file XrdTpcStream.hh.

69 {return m_error_buf;}

Referenced by TPC::State::Finalize(), and TPC::State::Flush().

+ Here is the caller graph for this function:

◆ Read()

int Stream::Read ( off_t  offset,
char *  buffer,
size_t  size 
)

Definition at line 208 of file XrdTpcStream.cc.

209 {
210  return m_fh->read(offset, buf, size);
211 }

◆ Stat()

int Stream::Stat ( struct stat buf)

Definition at line 54 of file XrdTpcStream.cc.

55 {
56  return m_fh->stat(buf);
57 }

◆ Write()

ssize_t Stream::Write ( off_t  offset,
const char *  buffer,
size_t  size,
bool  force 
)

Definition at line 60 of file XrdTpcStream.cc.

61 {
62 /*
63  * NOTE: these lines are useful for debuggin the state of the buffer
64  * management code; too expensive to compile in and have a runtime switch.
65  std::stringstream ss;
66  ss << "Offset=" << offset << ", Size=" << size << ", force=" << force;
67  m_log.Emsg("Stream::Write", ss.str().c_str());
68  DumpBuffers();
69 */
70  if (!m_open_for_write) {
71  if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a buffer not opened for write";}
72  return SFS_ERROR;
73  }
74  size_t bytes_accepted = 0;
75  int retval = size;
76  if (offset < m_offset) {
77  if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a prior offset";}
78  return SFS_ERROR;
79  }
80  // If this is write is appending to the stream and
81  // MB-aligned, then we write it to disk; otherwise, the
82  // data will be buffered.
83  if (offset == m_offset && (force || (size && !(size % (1024*1024))))) {
84  retval = WriteImpl(offset, buf, size);
85  bytes_accepted = retval;
86  // On failure, we don't care about flushing buffers from memory --
87  // the stream is now invalid.
88  if (retval < 0) {
89  return retval;
90  }
91  // If there are no in-use buffers, then we don't need to
92  // do any accounting.
93  if (m_avail_count == m_buffers.size()) {
94  return retval;
95  }
96  }
97  // Even if we already accepted the current data, always
98  // iterate through available buffers and try to write as
99  // much out to disk as possible.
100  Entry *avail_entry;
101  bool buffer_was_written;
102  size_t avail_count = 0;
103  do {
104  avail_count = 0;
105  avail_entry = NULL;
106  buffer_was_written = false;
107  for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
108  entry_iter != m_buffers.end();
109  entry_iter++) {
110  // Always try to dump from memory; when size == 0, then we are
111  // going to force a flush even if things are not MB-aligned.
112  int retval2 = (*entry_iter)->Write(*this, size == 0);
113  if (retval2 == SFS_ERROR) {
114  if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
115  return retval2;
116  }
117  buffer_was_written |= retval2 > 0;
118  if ((*entry_iter)->Available()) { // Empty buffer
119  if (!avail_entry) {avail_entry = *entry_iter;}
120  avail_count ++;
121  }
122  else if (bytes_accepted != size && size) {
123  size_t new_accept = (*entry_iter)->Accept(offset + bytes_accepted, buf + bytes_accepted, size - bytes_accepted);
124  // Partial accept; buffer should be writable which means we should free it up
125  // for next iteration
126  if (new_accept && new_accept != size - bytes_accepted) {
127  int retval3 = (*entry_iter)->Write(*this, false);
128  if (retval3 == SFS_ERROR) {
129  if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
130  return SFS_ERROR;
131  }
132  buffer_was_written = true;
133  }
134  bytes_accepted += new_accept;
135  }
136  }
137  } while ((avail_count != m_buffers.size()) && buffer_was_written);
138  m_avail_count = avail_count;
139 
140  if (bytes_accepted != size && size) { // No place for this data in allocated buffers
141  if (!avail_entry) { // No available buffers to allocate; logic error, should not happen.
142  DumpBuffers();
143  m_error_buf = "No empty buffers available to place unordered data.";
144  return SFS_ERROR;
145  }
146  if (avail_entry->Accept(offset + bytes_accepted, buf + bytes_accepted, size - bytes_accepted) != size - bytes_accepted) { // Empty buffer cannot accept?!?
147  m_error_buf = "Empty re-ordering buffer was unable to to accept data; internal logic error.";
148  return SFS_ERROR;
149  }
150  m_avail_count --;
151  }
152 
153  // If we have low buffer occupancy, then release memory.
154  if ((m_buffers.size() > 2) && (m_avail_count * 2 > m_buffers.size())) {
155  for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
156  entry_iter != m_buffers.end();
157  entry_iter++) {
158  (*entry_iter)->ShrinkIfUnused();
159  }
160  }
161 
162  return retval;
163 }
void DumpBuffers() const

References DumpBuffers(), and SFS_ERROR.

Referenced by TPC::State::Flush().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: