XRootD
XrdTpcStream.cc
Go to the documentation of this file.
1 
2 #include <sstream>
3 
4 #include "XrdTpcStream.hh"
5 
7 #include "XrdSys/XrdSysError.hh"
8 
9 using namespace TPC;
10 
12 {
13  for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
14  buffer_iter != m_buffers.end();
15  buffer_iter++) {
16  delete *buffer_iter;
17  *buffer_iter = NULL;
18  }
19  m_fh->close();
20 }
21 
22 
23 bool
25 {
26  // Do not close twice
27  if (!m_open_for_write) {
28  return false;
29  }
30  m_open_for_write = false;
31 
32  for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
33  buffer_iter != m_buffers.end();
34  buffer_iter++) {
35  delete *buffer_iter;
36  *buffer_iter = NULL;
37  }
38 
39  if (m_fh->close() == SFS_ERROR) {
40  std::stringstream ss;
41  const char *msg = m_fh->error.getErrText();
42  if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
43  ss << "Failure when closing file handle: " << msg << " (code=" << m_fh->error.getErrInfo() << ")";
44  m_error_buf = ss.str();
45  return false;
46  }
47 
48  // If there are outstanding buffers to reorder, finalization failed
49  return m_avail_count == m_buffers.size();
50 }
51 
52 
53 int
54 Stream::Stat(struct stat* buf)
55 {
56  return m_fh->stat(buf);
57 }
58 
59 ssize_t
60 Stream::Write(off_t offset, const char *buf, size_t size, bool force)
61 {
62 /*
63  * NOTE: these lines are useful for debuggin the state of the buffer
64  * management code; too expensive to compile in and have a runtime switch.
65  std::stringstream ss;
66  ss << "Offset=" << offset << ", Size=" << size << ", force=" << force;
67  m_log.Emsg("Stream::Write", ss.str().c_str());
68  DumpBuffers();
69 */
70  if (!m_open_for_write) {
71  if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a buffer not opened for write";}
72  return SFS_ERROR;
73  }
74  size_t bytes_accepted = 0;
75  int retval = size;
76  if (offset < m_offset) {
77  if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a prior offset";}
78  return SFS_ERROR;
79  }
80  // If this is write is appending to the stream and
81  // MB-aligned, then we write it to disk; otherwise, the
82  // data will be buffered.
83  if (offset == m_offset && (force || (size && !(size % (1024*1024))))) {
84  retval = WriteImpl(offset, buf, size);
85  bytes_accepted = retval;
86  // On failure, we don't care about flushing buffers from memory --
87  // the stream is now invalid.
88  if (retval < 0) {
89  return retval;
90  }
91  // If there are no in-use buffers, then we don't need to
92  // do any accounting.
93  if (m_avail_count == m_buffers.size()) {
94  return retval;
95  }
96  }
97  // Even if we already accepted the current data, always
98  // iterate through available buffers and try to write as
99  // much out to disk as possible.
100  Entry *avail_entry;
101  bool buffer_was_written;
102  size_t avail_count = 0;
103  do {
104  avail_count = 0;
105  avail_entry = NULL;
106  buffer_was_written = false;
107  for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
108  entry_iter != m_buffers.end();
109  entry_iter++) {
110  // Always try to dump from memory; when size == 0, then we are
111  // going to force a flush even if things are not MB-aligned.
112  int retval2 = (*entry_iter)->Write(*this, size == 0);
113  if (retval2 == SFS_ERROR) {
114  if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
115  return retval2;
116  }
117  buffer_was_written |= retval2 > 0;
118  if ((*entry_iter)->Available()) { // Empty buffer
119  if (!avail_entry) {avail_entry = *entry_iter;}
120  avail_count ++;
121  }
122  else if (bytes_accepted != size && size) {
123  size_t new_accept = (*entry_iter)->Accept(offset + bytes_accepted, buf + bytes_accepted, size - bytes_accepted);
124  // Partial accept; buffer should be writable which means we should free it up
125  // for next iteration
126  if (new_accept && new_accept != size - bytes_accepted) {
127  int retval3 = (*entry_iter)->Write(*this, false);
128  if (retval3 == SFS_ERROR) {
129  if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
130  return SFS_ERROR;
131  }
132  buffer_was_written = true;
133  }
134  bytes_accepted += new_accept;
135  }
136  }
137  } while ((avail_count != m_buffers.size()) && buffer_was_written);
138  m_avail_count = avail_count;
139 
140  if (bytes_accepted != size && size) { // No place for this data in allocated buffers
141  if (!avail_entry) { // No available buffers to allocate; logic error, should not happen.
142  DumpBuffers();
143  m_error_buf = "No empty buffers available to place unordered data.";
144  return SFS_ERROR;
145  }
146  if (avail_entry->Accept(offset + bytes_accepted, buf + bytes_accepted, size - bytes_accepted) != size - bytes_accepted) { // Empty buffer cannot accept?!?
147  m_error_buf = "Empty re-ordering buffer was unable to to accept data; internal logic error.";
148  return SFS_ERROR;
149  }
150  m_avail_count --;
151  }
152 
153  // If we have low buffer occupancy, then release memory.
154  if ((m_buffers.size() > 2) && (m_avail_count * 2 > m_buffers.size())) {
155  for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
156  entry_iter != m_buffers.end();
157  entry_iter++) {
158  (*entry_iter)->ShrinkIfUnused();
159  }
160  }
161 
162  return retval;
163 }
164 
165 
166 ssize_t Stream::WriteImpl(off_t offset, const char *buf, size_t size)
167 {
168  ssize_t retval;
169  if (size == 0) {return 0;}
170  retval = m_fh->write(offset, buf, size);
171  if (retval != SFS_ERROR) {
172  m_offset += retval;
173  } else {
174  std::stringstream ss;
175  const char *msg = m_fh->error.getErrText();
176  if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
177  ss << msg << " (code=" << m_fh->error.getErrInfo() << ")";
178  m_error_buf = ss.str();
179  }
180  return retval;
181 }
182 
183 
184 void
186 {
187  m_log.Emsg("Stream::DumpBuffers", "Beginning dump of stream buffers.");
188  {
189  std::stringstream ss;
190  ss << "Stream offset: " << m_offset;
191  m_log.Emsg("Stream::DumpBuffers", ss.str().c_str());
192  }
193  size_t idx = 0;
194  for (std::vector<Entry*>::const_iterator entry_iter = m_buffers.begin();
195  entry_iter!= m_buffers.end();
196  entry_iter++) {
197  std::stringstream ss;
198  ss << "Buffer " << idx << ": Offset=" << (*entry_iter)->GetOffset() << ", Size="
199  << (*entry_iter)->GetSize() << ", Capacity=" << (*entry_iter)->GetCapacity();
200  m_log.Emsg("Stream::DumpBuffers", ss.str().c_str());
201  idx ++;
202  }
203  m_log.Emsg("Stream::DumpBuffers", "Finish dump of stream buffers.");
204 }
205 
206 
207 int
208 Stream::Read(off_t offset, char *buf, size_t size)
209 {
210  return m_fh->read(offset, buf, size);
211 }
int stat(const char *path, struct stat *buf)
#define SFS_ERROR
int Read(off_t offset, char *buffer, size_t size)
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
Definition: XrdTpcStream.cc:60
bool Finalize()
Definition: XrdTpcStream.cc:24
void DumpBuffers() const
int Stat(struct stat *)
Definition: XrdTpcStream.cc:54
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95