XRootD
XrdTpcMultistream.cc
Go to the documentation of this file.
1 
5 #include "XrdTpcTPC.hh"
6 #include "XrdTpcState.hh"
7 #include "XrdTpcCurlMulti.hh"
8 
9 #include "XrdSys/XrdSysError.hh"
10 
11 #include <curl/curl.h>
12 
13 #include <algorithm>
14 #include <sstream>
15 #include <stdexcept>
16 
17 
18 using namespace TPC;
19 
20 class CurlHandlerSetupError : public std::runtime_error {
21 public:
22  CurlHandlerSetupError(const std::string &msg) :
23  std::runtime_error(msg)
24  {}
25 
26  virtual ~CurlHandlerSetupError() throw () {}
27 };
28 
29 namespace {
30 class MultiCurlHandler {
31 public:
32  MultiCurlHandler(std::vector<State*> &states, XrdSysError &log) :
33  m_handle(curl_multi_init()),
34  m_states(states),
35  m_log(log),
36  m_bytes_transferred(0),
37  m_error_code(0),
38  m_status_code(0)
39  {
40  if (m_handle == NULL) {
41  throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle");
42  }
43  m_avail_handles.reserve(states.size());
44  m_active_handles.reserve(states.size());
45  for (std::vector<State*>::const_iterator state_iter = states.begin();
46  state_iter != states.end();
47  state_iter++) {
48  m_avail_handles.push_back((*state_iter)->GetHandle());
49  }
50  }
51 
52  ~MultiCurlHandler()
53  {
54  if (!m_handle) {return;}
55  for (std::vector<CURL *>::const_iterator it = m_active_handles.begin();
56  it != m_active_handles.end();
57  it++) {
58  curl_multi_remove_handle(m_handle, *it);
59  }
60  curl_multi_cleanup(m_handle);
61  }
62 
63  MultiCurlHandler(const MultiCurlHandler &) = delete;
64 
65  CURLM *Get() const {return m_handle;}
66 
67  void FinishCurlXfer(CURL *curl) {
68  CURLMcode mres = curl_multi_remove_handle(m_handle, curl);
69  if (mres) {
70  std::stringstream ss;
71  ss << "Failed to remove transfer from set: "
72  << curl_multi_strerror(mres);
73  throw std::runtime_error(ss.str());
74  }
75  for (std::vector<State*>::iterator state_iter = m_states.begin();
76  state_iter != m_states.end();
77  state_iter++) {
78  if (curl == (*state_iter)->GetHandle()) {
79  m_bytes_transferred += (*state_iter)->BytesTransferred();
80  int error_code = (*state_iter)->GetErrorCode();
81  if (error_code && !m_error_code) {
82  m_error_code = error_code;
83  m_error_message = (*state_iter)->GetErrorMessage();
84  }
85  int status_code = (*state_iter)->GetStatusCode();
86  if (status_code >= 400 && !m_status_code) {
87  m_status_code = status_code;
88  m_error_message = (*state_iter)->GetErrorMessage();
89  }
90  (*state_iter)->ResetAfterRequest();
91  break;
92  }
93  }
94  for (std::vector<CURL *>::iterator iter = m_active_handles.begin();
95  iter != m_active_handles.end();
96  ++iter)
97  {
98  if (*iter == curl) {
99  m_active_handles.erase(iter);
100  break;
101  }
102  }
103  m_avail_handles.push_back(curl);
104  }
105 
106  off_t StartTransfers(off_t current_offset, off_t content_length, size_t block_size,
107  int &running_handles) {
108  bool started_new_xfer = false;
109  do {
110  size_t xfer_size = std::min(content_length - current_offset, static_cast<off_t>(block_size));
111  if (xfer_size == 0) {return current_offset;}
112  if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) {
113  // In this case, we need to start new transfers but weren't able to.
114  if (running_handles == 0) {
115  if (!CanStartTransfer(true)) {
116  m_log.Emsg("StartTransfers", "Unable to start transfers.");
117  }
118  }
119  break;
120  } else {
121  running_handles += 1;
122  }
123  current_offset += xfer_size;
124  } while (true);
125  return current_offset;
126  }
127 
128  int Flush() {
129  int last_error = 0;
130  for (std::vector<State*>::iterator state_it = m_states.begin();
131  state_it != m_states.end();
132  state_it++)
133  {
134  int error = (*state_it)->Flush();
135  if (error) {last_error = error;}
136  }
137  return last_error;
138  }
139 
140  off_t BytesTransferred() const {
141  return m_bytes_transferred;
142  }
143 
144  int GetStatusCode() const {
145  return m_status_code;
146  }
147 
148  int GetErrorCode() const {
149  return m_error_code;
150  }
151 
152  void SetErrorCode(int error_code) {
153  m_error_code = error_code;
154  }
155 
156  std::string GetErrorMessage() const {
157  return m_error_message;
158  }
159 
160  void SetErrorMessage(const std::string &error_msg) {
161  m_error_message = error_msg;
162  }
163 
164 private:
165 
166  bool StartTransfer(off_t offset, size_t size) {
167  if (!CanStartTransfer(false)) {return false;}
168  for (std::vector<CURL*>::const_iterator handle_it = m_avail_handles.begin();
169  handle_it != m_avail_handles.end();
170  handle_it++) {
171  for (std::vector<State*>::iterator state_it = m_states.begin();
172  state_it != m_states.end();
173  state_it++) {
174  if ((*state_it)->GetHandle() == *handle_it) { // This state object represents an idle handle.
175  (*state_it)->SetTransferParameters(offset, size);
176  ActivateHandle(**state_it);
177  return true;
178  }
179  }
180  }
181  return false;
182  }
183 
184  void ActivateHandle(State &state) {
185  CURL *curl = state.GetHandle();
186  m_active_handles.push_back(curl);
187  CURLMcode mres;
188  mres = curl_multi_add_handle(m_handle, curl);
189  if (mres) {
190  std::stringstream ss;
191  ss << "Failed to add transfer to libcurl multi-handle"
192  << curl_multi_strerror(mres);
193  throw std::runtime_error(ss.str());
194  }
195  for (auto iter = m_avail_handles.begin();
196  iter != m_avail_handles.end();
197  ++iter)
198  {
199  if (*iter == curl) {
200  m_avail_handles.erase(iter);
201  break;
202  }
203  }
204  }
205 
206  bool CanStartTransfer(bool log_reason) const {
207  size_t idle_handles = m_avail_handles.size();
208  size_t transfer_in_progress = 0;
209  for (std::vector<State*>::const_iterator state_iter = m_states.begin();
210  state_iter != m_states.end();
211  state_iter++) {
212  for (std::vector<CURL*>::const_iterator handle_iter = m_active_handles.begin();
213  handle_iter != m_active_handles.end();
214  handle_iter++) {
215  if (*handle_iter == (*state_iter)->GetHandle()) {
216  transfer_in_progress += (*state_iter)->BodyTransferInProgress();
217  break;
218  }
219  }
220  }
221  if (!idle_handles) {
222  if (log_reason) {
223  m_log.Emsg("CanStartTransfer", "Unable to start transfers as no idle CURL handles are available.");
224  }
225  return false;
226  }
227  ssize_t available_buffers = m_states[0]->AvailableBuffers();
228  // To be conservative, set aside buffers for any transfers that have been activated
229  // but don't have their first responses back yet.
230  available_buffers -= (m_active_handles.size() - transfer_in_progress);
231  if (log_reason && (available_buffers == 0)) {
232  std::stringstream ss;
233  ss << "Unable to start transfers as no buffers are available. Available buffers: " <<
234  m_states[0]->AvailableBuffers() << ", Active curl handles: " << m_active_handles.size()
235  << ", Transfers in progress: " << transfer_in_progress;
236  m_log.Emsg("CanStartTransfer", ss.str().c_str());
237  if (m_states[0]->AvailableBuffers() == 0) {
238  m_states[0]->DumpBuffers();
239  }
240  }
241  return available_buffers > 0;
242  }
243 
244  CURLM *m_handle;
245  std::vector<CURL *> m_avail_handles;
246  std::vector<CURL *> m_active_handles;
247  std::vector<State*> &m_states;
248  XrdSysError &m_log;
249  off_t m_bytes_transferred;
250  int m_error_code;
251  int m_status_code;
252  std::string m_error_message;
253 };
254 }
255 
256 
257 int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
258  size_t streams, std::vector<State*> &handles,
259  std::vector<ManagedCurlHandle> &curl_handles, TPCLogRecord &rec)
260 {
261  bool success;
262  // The content-length was set thanks to the call to GetContentLengthTPCPull() before calling this function
263  off_t content_size = state.GetContentLength();
264  off_t current_offset = 0;
265 
266  size_t concurrency = streams * m_pipelining_multiplier;
267 
268  handles.reserve(concurrency);
269  handles.push_back(new State());
270  handles[0]->Move(state);
271  for (size_t idx = 1; idx < concurrency; idx++) {
272  handles.push_back(handles[0]->Duplicate());
273  curl_handles.emplace_back(handles.back()->GetHandle());
274  }
275 
276  // Notify the packet marking manager that the transfer will start after this point
277  rec.pmarkManager.startTransfer();
278 
279  // Create the multi-handle and add in the current transfer to it.
280  MultiCurlHandler mch(handles, m_log);
281  CURLM *multi_handle = mch.Get();
282 
283 #ifdef USE_PIPELINING
284  curl_multi_setopt(multi_handle, CURLMOPT_PIPELINING, 1);
285  curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, streams);
286 #endif
287 
288  // Start response to client prior to the first call to curl_multi_perform
289  int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
290  if (retval) {
291  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
292  "Failed to send the initial response to the TPC client");
293  return retval;
294  } else {
295  logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
296  "Initial transfer response sent to the TPC client");
297  }
298 
299  // Start assigning transfers
300  int running_handles = 0;
301  current_offset = mch.StartTransfers(current_offset, content_size, m_block_size, running_handles);
302 
303  // Transfer loop: use curl to actually run the transfer, but periodically
304  // interrupt things to send back performance updates to the client.
305  time_t last_marker = 0;
306  // Track the time since the transfer last made progress
307  off_t last_advance_bytes = 0;
308  time_t last_advance_time = time(NULL);
309  time_t transfer_start = last_advance_time;
310  CURLcode res = static_cast<CURLcode>(-1);
311  CURLMcode mres = CURLM_OK;
312  do {
313  time_t now = time(NULL);
314  time_t next_marker = last_marker + m_marker_period;
315  if (now >= next_marker) {
316  if (current_offset > last_advance_bytes) {
317  last_advance_bytes = current_offset;
318  last_advance_time = now;
319  }
320  if (SendPerfMarker(req, rec, handles, current_offset)) {
321  logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
322  "Failed to send a perf marker to the TPC client");
323  return -1;
324  }
325  int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
326  if (now > last_advance_time + timeout) {
327  const char *log_prefix = rec.log_prefix.c_str();
328  bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
329 
330  mch.SetErrorCode(10);
331  std::stringstream ss;
332  ss << "Transfer failed because no bytes have been "
333  << (tpc_pull ? "received from the source (pull mode) in "
334  : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
335  mch.SetErrorMessage(ss.str());
336  break;
337  }
338  last_marker = now;
339  }
340 
341  mres = curl_multi_perform(multi_handle, &running_handles);
342  if (mres == CURLM_CALL_MULTI_PERFORM) {
343  // curl_multi_perform should be called again immediately. On newer
344  // versions of curl, this is no longer used.
345  continue;
346  } else if (mres != CURLM_OK) {
347  break;
348  }
349 
350  rec.pmarkManager.beginPMarks();
351 
352 
353  // Harvest any messages, looking for CURLMSG_DONE.
354  CURLMsg *msg;
355  do {
356  int msgq = 0;
357  msg = curl_multi_info_read(multi_handle, &msgq);
358  if (msg && (msg->msg == CURLMSG_DONE)) {
359  CURL *easy_handle = msg->easy_handle;
360  res = msg->data.result;
361  mch.FinishCurlXfer(easy_handle);
362  // If any requests fail, cut off the entire transfer.
363  if (res != CURLE_OK) {
364  break;
365  }
366  }
367  } while (msg);
368  if (res != static_cast<CURLcode>(-1) && res != CURLE_OK) {
369  std::stringstream ss;
370  ss << "Breaking loop due to failed curl transfer: " << curl_easy_strerror(res);
371  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_CURL_FAILURE",
372  ss.str());
373  break;
374  }
375 
376  if (running_handles < static_cast<int>(concurrency)) {
377  // Issue new transfers if there is still pending work to do.
378  // Otherwise, continue running until there are no handles left.
379  if (current_offset != content_size) {
380  current_offset = mch.StartTransfers(current_offset, content_size,
381  m_block_size, running_handles);
382  if (!running_handles) {
383  std::stringstream ss;
384  ss << "No handles are able to run. Streams=" << streams << ", concurrency="
385  << concurrency;
386 
387  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE", ss.str());
388  }
389  } else if (running_handles == 0) {
390  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE",
391  "Unable to start new transfers; breaking loop.");
392  break;
393  }
394  }
395 
396  int64_t max_sleep_time = next_marker - time(NULL);
397  if (max_sleep_time <= 0) {
398  continue;
399  }
400  int fd_count;
401 #ifdef HAVE_CURL_MULTI_WAIT
402  mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000,
403  &fd_count);
404 #else
405  mres = curl_multi_wait_impl(multi_handle, max_sleep_time*1000,
406  &fd_count);
407 #endif
408  if (mres != CURLM_OK) {
409  break;
410  }
411  } while (running_handles);
412 
413  if (mres != CURLM_OK) {
414  std::stringstream ss;
415  ss << "Internal libcurl multi-handle error: "
416  << curl_multi_strerror(mres);
417  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", ss.str());
418  throw std::runtime_error(ss.str());
419  }
420 
421  // Harvest any messages, looking for CURLMSG_DONE.
422  CURLMsg *msg;
423  do {
424  int msgq = 0;
425  msg = curl_multi_info_read(multi_handle, &msgq);
426  if (msg && (msg->msg == CURLMSG_DONE)) {
427  CURL *easy_handle = msg->easy_handle;
428  mch.FinishCurlXfer(easy_handle);
429  if (res == CURLE_OK || res == static_cast<CURLcode>(-1))
430  res = msg->data.result; // Transfer result will be examined below.
431  }
432  } while (msg);
433 
434  if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
435  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
436  "Internal state error in libcurl");
437  throw std::runtime_error("Internal state error in libcurl");
438  }
439 
440  mch.Flush();
441 
442  rec.bytes_transferred = mch.BytesTransferred();
443  rec.tpc_status = mch.GetStatusCode();
444 
445  // Generate the final response back to the client.
446  std::stringstream ss;
447  success = false;
448  if (mch.GetStatusCode() >= 400) {
449  std::string err = mch.GetErrorMessage();
450  std::stringstream ss2;
451  ss2 << "Remote side failed with status code " << mch.GetStatusCode();
452  if (!err.empty()) {
453  std::replace(err.begin(), err.end(), '\n', ' ');
454  ss2 << "; error message: \"" << err << "\"";
455  }
456  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
457  ss << generateClientErr(ss2, rec);
458  } else if (mch.GetErrorCode()) {
459  std::string err = mch.GetErrorMessage();
460  if (err.empty()) {err = "(no error message provided)";}
461  else {std::replace(err.begin(), err.end(), '\n', ' ');}
462  std::stringstream ss2;
463  ss2 << "Error when interacting with local filesystem: " << err;
464  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
465  ss << generateClientErr(ss2, rec);
466  } else if (res != CURLE_OK) {
467  std::stringstream ss2;
468  ss2 << "Request failed when processing";
469  std::stringstream ss3;
470  ss3 << ss2.str() << ":" << curl_easy_strerror(res);
471  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss3.str());
472  ss << generateClientErr(ss2, rec, res);
473  } else if (current_offset != content_size) {
474  std::stringstream ss2;
475  ss2 << "Internal logic error led to early abort; current offset is " <<
476  current_offset << " while full size is " << content_size;
477  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
478  ss << generateClientErr(ss2, rec);
479  } else {
480  if (!handles[0]->Finalize()) {
481  std::stringstream ss2;
482  ss2 << "Failed to finalize and close file handle.";
483  ss << generateClientErr(ss2, rec);
484  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
485  ss2.str());
486  } else {
487  ss << "success: Created";
488  success = true;
489  }
490  }
491 
492  if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
493  logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
494  "Failed to send last update to remote client");
495  return retval;
496  } else if (success) {
497  logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
498  rec.status = 0;
499  }
500  return req.ChunkResp(NULL, 0);
501 }
502 
503 
504 int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
505  size_t streams, TPCLogRecord &rec)
506 {
507  std::vector<ManagedCurlHandle> curl_handles;
508  std::vector<State*> handles;
509  std::stringstream err_ss;
510  try {
511  int retval = RunCurlWithStreamsImpl(req, state, streams, handles, curl_handles, rec);
512  for (std::vector<State*>::iterator state_iter = handles.begin();
513  state_iter != handles.end();
514  state_iter++) {
515  delete *state_iter;
516  }
517  return retval;
518  } catch (CurlHandlerSetupError &e) {
519  for (std::vector<State*>::iterator state_iter = handles.begin();
520  state_iter != handles.end();
521  state_iter++) {
522  delete *state_iter;
523  }
524 
525  rec.status = 500;
526  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
527  std::stringstream ss;
528  ss << e.what();
529  err_ss << generateClientErr(ss, rec);
530  return req.SendSimpleResp(rec.status, NULL, NULL, e.what(), 0);
531  } catch (std::runtime_error &e) {
532  for (std::vector<State*>::iterator state_iter = handles.begin();
533  state_iter != handles.end();
534  state_iter++) {
535  delete *state_iter;
536  }
537 
538  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
539  std::stringstream ss;
540  ss << e.what();
541  err_ss << generateClientErr(ss, rec);
542  int retval;
543  if ((retval = req.ChunkResp(err_ss.str().c_str(), 0))) {
544  return retval;
545  }
546  return req.ChunkResp(NULL, 0);
547  }
548 }
#define Duplicate(x, y)
@ Info
CURLMcode curl_multi_wait_impl(CURLM *multi_handle, int timeout_ms, int *numfds)
void CURL
Definition: XrdTpcState.hh:14
CurlHandlerSetupError(const std::string &msg)
CURL * GetHandle() const
Definition: XrdTpcState.hh:106
int GetErrorCode() const
Definition: XrdTpcState.hh:94
off_t GetContentLength() const
Definition: XrdTpcState.hh:92
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.