XRootD
XrdTpcState.cc
Go to the documentation of this file.
1 
2 #include <algorithm>
3 #include <sstream>
4 #include <stdexcept>
5 
6 #include "XrdVersion.hh"
9 
10 #include <curl/curl.h>
11 
12 #include "XrdTpcState.hh"
13 #include "XrdTpcStream.hh"
14 
15 using namespace TPC;
16 
17 
19  if (m_headers) {
20  curl_slist_free_all(m_headers);
21  m_headers = NULL;
22  if (m_curl) {curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);}
23  }
24 }
25 
26 
27 void State::Move(State &other)
28 {
29  m_push = other.m_push;
30  m_recv_status_line = other.m_recv_status_line;
31  m_recv_all_headers = other.m_recv_all_headers;
32  m_offset = other.m_offset;
33  m_start_offset = other.m_start_offset;
34  m_status_code = other.m_status_code;
35  m_content_length = other.m_content_length;
36  m_stream = other.m_stream;
37  m_curl = other.m_curl;
38  m_headers = other.m_headers;
39  m_headers_copy = other.m_headers_copy;
40  m_resp_protocol = other.m_resp_protocol;
41  m_is_transfer_state = other.m_is_transfer_state;
42  curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this);
43  if (m_is_transfer_state) {
44  if (m_push) {
45  curl_easy_setopt(m_curl, CURLOPT_READDATA, this);
46  } else {
47  curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this);
48  }
49  }
50  tpcForwardCreds = other.tpcForwardCreds;
51  other.m_headers_copy.clear();
52  other.m_curl = NULL;
53  other.m_headers = NULL;
54  other.m_stream = NULL;
55 }
56 
57 
58 bool State::InstallHandlers(CURL *curl) {
59  curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XrdVERSION);
60  curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB);
61  curl_easy_setopt(curl, CURLOPT_HEADERDATA, this);
62  if(m_is_transfer_state) {
63  if (m_push) {
64  curl_easy_setopt(curl, CURLOPT_UPLOAD, 1);
65  curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB);
66  curl_easy_setopt(curl, CURLOPT_READDATA, this);
67  struct stat buf;
68  if (SFS_OK == m_stream->Stat(&buf)) {
69  curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size);
70  }
71  } else {
72  curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB);
73  curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
74  }
75  }
76  curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
77  if(tpcForwardCreds) {
78  curl_easy_setopt(curl,CURLOPT_UNRESTRICTED_AUTH,1L);
79  }
80 
81  // Only use low-speed limits with libcurl v7.38 or later.
82  // Older versions have poor transfer performance, corrected in curl commit cacdc27f.
83  curl_version_info_data *curl_ver = curl_version_info(CURLVERSION_NOW);
84  if (curl_ver->age > 0 && curl_ver->version_num >= 0x072600) {
85  // Require a minimum speed from the transfer: 2 minute average must at least 10KB/s
86  curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 2*60);
87  curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 10*1024);
88  }
89  return true;
90 }
91 
96  struct curl_slist *list = NULL;
97  for (std::map<std::string, std::string>::const_iterator hdr_iter = req.headers.begin();
98  hdr_iter != req.headers.end();
99  hdr_iter++) {
100  if (!strcasecmp(hdr_iter->first.c_str(),"copy-header")) {
101  list = curl_slist_append(list, hdr_iter->second.c_str());
102  m_headers_copy.emplace_back(hdr_iter->second);
103  }
104  // Note: len("TransferHeader") == 14
105  if (!strncasecmp(hdr_iter->first.c_str(),"transferheader",14)) {
106  std::stringstream ss;
107  ss << hdr_iter->first.substr(14) << ": " << hdr_iter->second;
108  list = curl_slist_append(list, ss.str().c_str());
109  m_headers_copy.emplace_back(ss.str());
110  }
111  }
112  if (list != NULL) {
113  curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list);
114  m_headers = list;
115  }
116 }
117 
119  m_offset = 0;
120  m_status_code = -1;
121  m_content_length = -1;
122  m_recv_all_headers = false;
123  m_recv_status_line = false;
124 }
125 
126 size_t State::HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata)
127 {
128  State *obj = static_cast<State*>(userdata);
129  std::string header(buffer, size*nitems);
130  return obj->Header(header);
131 }
132 
133 int State::Header(const std::string &header) {
134  //printf("Recieved remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str());
135  if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect?
136  m_recv_all_headers = false;
137  m_recv_status_line = false;
138  }
139  if (!m_recv_status_line) {
140  std::stringstream ss(header);
141  std::string item;
142  if (!std::getline(ss, item, ' ')) return 0;
143  m_resp_protocol = item;
144  //printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str());
145  if (!std::getline(ss, item, ' ')) return 0;
146  try {
147  m_status_code = std::stol(item);
148  } catch (...) {
149  return 0;
150  }
151  m_recv_status_line = true;
152  } else if (header.size() == 0 || header == "\n" || header == "\r\n") {
153  m_recv_all_headers = true;
154  }
155  else if (header != "\r\n") {
156  // Parse the header
157  std::size_t found = header.find(":");
158  if (found != std::string::npos) {
159  std::string header_name = header.substr(0, found);
160  std::transform(header_name.begin(), header_name.end(), header_name.begin(), ::tolower);
161  std::string header_value = header.substr(found+1);
162  if (header_name == "content-length")
163  {
164  try {
165  m_content_length = std::stoll(header_value);
166  } catch (...) {
167  // Header unparseable -- not a great sign, fail request.
168  //printf("Content-length header unparseable\n");
169  return 0;
170  }
171  }
172  } else {
173  // Non-empty header that isn't the status line, but no ':' present --
174  // malformed request?
175  //printf("Malformed header: %s\n", header.c_str());
176  return 0;
177  }
178  }
179  return header.size();
180 }
181 
182 size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) {
183  State *obj = static_cast<State*>(userdata);
184  if (obj->GetStatusCode() < 0) {
185  return 0;
186  } // malformed request - got body before headers.
187  if (obj->GetStatusCode() >= 400) {
188  obj->m_error_buf += std::string(static_cast<char*>(buffer),
189  std::min(static_cast<size_t>(1024), size*nitems));
190  // Record error messages until we hit a KB; at that point, fail out.
191  if (obj->m_error_buf.size() >= 1024)
192  return 0;
193  else
194  return size*nitems;
195  } // Status indicates failure.
196  return obj->Write(static_cast<char*>(buffer), size*nitems);
197 }
198 
199 ssize_t State::Write(char *buffer, size_t size) {
200  ssize_t retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
201  if (retval == SFS_ERROR) {
202  m_error_buf = m_stream->GetErrorMessage();
203  m_error_code = 1;
204  return -1;
205  }
206  m_offset += retval;
207  return retval;
208 }
209 
211  if (m_push) {
212  return 0;
213  }
214 
215  ssize_t retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
216  if (retval == SFS_ERROR) {
217  m_error_buf = m_stream->GetErrorMessage();
218  m_error_code = 2;
219  return -1;
220  }
221  m_offset += retval;
222  return retval;
223 }
224 
225 size_t State::ReadCB(void *buffer, size_t size, size_t nitems, void *userdata) {
226  State *obj = static_cast<State*>(userdata);
227  if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers.
228  if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure.
229  return obj->Read(static_cast<char*>(buffer), size*nitems);
230 }
231 
232 int State::Read(char *buffer, size_t size) {
233  int retval = m_stream->Read(m_start_offset + m_offset, buffer, size);
234  if (retval == SFS_ERROR) {
235  return -1;
236  }
237  m_offset += retval;
238  //printf("Read a total of %ld bytes.\n", m_offset);
239  return retval;
240 }
241 
243  CURL *curl = curl_easy_duphandle(m_curl);
244  if (!curl) {
245  throw std::runtime_error("Failed to duplicate existing curl handle.");
246  }
247 
248  State *state = new State(0, *m_stream, curl, m_push, tpcForwardCreds);
249 
250  if (m_headers) {
251  state->m_headers_copy.reserve(m_headers_copy.size());
252  for (std::vector<std::string>::const_iterator header_iter = m_headers_copy.begin();
253  header_iter != m_headers_copy.end();
254  header_iter++) {
255  state->m_headers = curl_slist_append(state->m_headers, header_iter->c_str());
256  state->m_headers_copy.push_back(*header_iter);
257  }
258  curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL);
259  curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state->m_headers);
260  }
261 
262  return state;
263 }
264 
265 void State::SetTransferParameters(off_t offset, size_t size) {
266  m_start_offset = offset;
267  m_offset = 0;
268  m_content_length = size;
269  std::stringstream ss;
270  ss << offset << "-" << (offset+size-1);
271  curl_easy_setopt(m_curl, CURLOPT_RANGE, ss.str().c_str());
272 }
273 
275 {
276  return m_stream->AvailableBuffers();
277 }
278 
279 void State::DumpBuffers() const
280 {
281  m_stream->DumpBuffers();
282 }
283 
285 {
286  if (!m_stream->Finalize()) {
287  m_error_buf = m_stream->GetErrorMessage();
288  m_error_code = 3;
289  return false;
290  }
291  return true;
292 }
293 
295 {
296  // CURLINFO_PRIMARY_PORT is only defined for 7.21.0 or later; on older
297  // library versions, simply omit this information.
298 #if LIBCURL_VERSION_NUM >= 0x071500
299  char *curl_ip = NULL;
300  CURLcode rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_IP, &curl_ip);
301  if ((rc != CURLE_OK) || !curl_ip) {
302  return "";
303  }
304  long curl_port = 0;
305  rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_PORT, &curl_port);
306  if ((rc != CURLE_OK) || !curl_port) {
307  return "";
308  }
309  std::stringstream ss;
310  // libcurl returns IPv6 addresses of the form:
311  // 2600:900:6:1301:5054:ff:fe0b:9cba:8000
312  // However the HTTP-TPC spec says to use the form
313  // [2600:900:6:1301:5054:ff:fe0b:9cba]:8000
314  // Hence, we add '[' and ']' whenever a ':' is seen.
315  if (NULL == strchr(curl_ip, ':'))
316  ss << "tcp:" << curl_ip << ":" << curl_port;
317  else
318  ss << "tcp:[" << curl_ip << "]:" << curl_port;
319  return ss.str();
320 #else
321  return "";
322 #endif
323 }
int stat(const char *path, struct stat *buf)
void getline(uchar *buff, int blen)
#define SFS_ERROR
#define SFS_OK
void CURL
Definition: XrdTpcState.hh:14
State * Duplicate()
Definition: XrdTpcState.cc:242
void Move(State &other)
Definition: XrdTpcState.cc:27
int GetStatusCode() const
Definition: XrdTpcState.hh:98
void DumpBuffers() const
Definition: XrdTpcState.cc:279
void CopyHeaders(XrdHttpExtReq &req)
Definition: XrdTpcState.cc:95
void ResetAfterRequest()
Definition: XrdTpcState.cc:118
void SetTransferParameters(off_t offset, size_t size)
Definition: XrdTpcState.cc:265
std::string GetConnectionDescription()
Definition: XrdTpcState.cc:294
bool Finalize()
Definition: XrdTpcState.cc:284
int AvailableBuffers() const
Definition: XrdTpcState.cc:274
int Read(off_t offset, char *buffer, size_t size)
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
Definition: XrdTpcStream.cc:60
bool Finalize()
Definition: XrdTpcStream.cc:24
void DumpBuffers() const
std::string GetErrorMessage() const
Definition: XrdTpcStream.hh:69
size_t AvailableBuffers() const
Definition: XrdTpcStream.hh:56
int Stat(struct stat *)
Definition: XrdTpcStream.cc:54
std::map< std::string, std::string > & headers