13 for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
14 buffer_iter != m_buffers.end();
27 if (!m_open_for_write) {
30 m_open_for_write =
false;
32 for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
33 buffer_iter != m_buffers.end();
41 const char *msg = m_fh->error.getErrText();
42 if (!msg || (*msg ==
'\0')) {msg =
"(no error message provided)";}
43 ss <<
"Failure when closing file handle: " << msg <<
" (code=" << m_fh->error.getErrInfo() <<
")";
44 m_error_buf = ss.str();
49 return m_avail_count == m_buffers.size();
56 return m_fh->stat(buf);
70 if (!m_open_for_write) {
71 if (!m_error_buf.size()) {m_error_buf =
"Logic error: writing to a buffer not opened for write";}
74 size_t bytes_accepted = 0;
76 if (offset < m_offset) {
77 if (!m_error_buf.size()) {m_error_buf =
"Logic error: writing to a prior offset";}
83 if (offset == m_offset && (force || (size && !(size % (1024*1024))))) {
84 retval = WriteImpl(offset, buf, size);
85 bytes_accepted = retval;
93 if (m_avail_count == m_buffers.size()) {
101 bool buffer_was_written;
102 size_t avail_count = 0;
106 buffer_was_written =
false;
107 for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
108 entry_iter != m_buffers.end();
112 int retval2 = (*entry_iter)->Write(*
this, size == 0);
114 if (!m_error_buf.size()) {m_error_buf =
"Unknown filesystem write failure.";}
117 buffer_was_written |= retval2 > 0;
118 if ((*entry_iter)->Available()) {
119 if (!avail_entry) {avail_entry = *entry_iter;}
122 else if (bytes_accepted != size && size) {
123 size_t new_accept = (*entry_iter)->Accept(offset + bytes_accepted, buf + bytes_accepted, size - bytes_accepted);
126 if (new_accept && new_accept != size - bytes_accepted) {
127 int retval3 = (*entry_iter)->Write(*
this,
false);
129 if (!m_error_buf.size()) {m_error_buf =
"Unknown filesystem write failure.";}
132 buffer_was_written =
true;
134 bytes_accepted += new_accept;
137 }
while ((avail_count != m_buffers.size()) && buffer_was_written);
138 m_avail_count = avail_count;
140 if (bytes_accepted != size && size) {
143 m_error_buf =
"No empty buffers available to place unordered data.";
146 if (avail_entry->Accept(offset + bytes_accepted, buf + bytes_accepted, size - bytes_accepted) != size - bytes_accepted) {
147 m_error_buf =
"Empty re-ordering buffer was unable to to accept data; internal logic error.";
154 if ((m_buffers.size() > 2) && (m_avail_count * 2 > m_buffers.size())) {
155 for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
156 entry_iter != m_buffers.end();
158 (*entry_iter)->ShrinkIfUnused();
166 ssize_t Stream::WriteImpl(off_t offset,
const char *buf,
size_t size)
169 if (size == 0) {
return 0;}
170 retval = m_fh->write(offset, buf, size);
174 std::stringstream ss;
175 const char *msg = m_fh->error.getErrText();
176 if (!msg || (*msg ==
'\0')) {msg =
"(no error message provided)";}
177 ss << msg <<
" (code=" << m_fh->error.getErrInfo() <<
")";
178 m_error_buf = ss.str();
187 m_log.
Emsg(
"Stream::DumpBuffers",
"Beginning dump of stream buffers.");
189 std::stringstream ss;
190 ss <<
"Stream offset: " << m_offset;
191 m_log.
Emsg(
"Stream::DumpBuffers", ss.str().c_str());
194 for (std::vector<Entry*>::const_iterator entry_iter = m_buffers.begin();
195 entry_iter!= m_buffers.end();
197 std::stringstream ss;
198 ss <<
"Buffer " << idx <<
": Offset=" << (*entry_iter)->GetOffset() <<
", Size="
199 << (*entry_iter)->GetSize() <<
", Capacity=" << (*entry_iter)->GetCapacity();
200 m_log.
Emsg(
"Stream::DumpBuffers", ss.str().c_str());
203 m_log.
Emsg(
"Stream::DumpBuffers",
"Finish dump of stream buffers.");
210 return m_fh->read(offset, buf, size);
int stat(const char *path, struct stat *buf)
int Read(off_t offset, char *buffer, size_t size)
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)