XRootD
XrdTpcTPC.cc
Go to the documentation of this file.
2 #include "XrdNet/XrdNetAddr.hh"
3 #include "XrdNet/XrdNetUtils.hh"
4 #include "XrdOuc/XrdOucEnv.hh"
5 #include "XrdSec/XrdSecEntity.hh"
8 #include "XrdSys/XrdSysFD.hh"
9 #include "XrdVersion.hh"
10 
12 #include "XrdOuc/XrdOucTUtils.hh"
13 #include "XrdTpc/XrdTpcUtils.hh"
14 
15 #include <curl/curl.h>
16 
17 #include <dlfcn.h>
18 #include <fcntl.h>
19 
20 #include <algorithm>
21 #include <memory>
22 #include <sstream>
23 #include <stdexcept>
24 #include <thread>
25 #include <iostream> // Delete later!!!
26 
27 #include "XrdTpcState.hh"
28 #include "XrdTpcStream.hh"
29 #include "XrdTpcTPC.hh"
30 #include "XrdTpcCurlMulti.hh"
31 #include <fstream>
32 
33 using namespace TPC;
34 
35 XrdXrootdTpcMon* TPCHandler::TPCLogRecord::tpcMonitor = 0;
36 
37 uint64_t TPCHandler::m_monid{0};
38 int TPCHandler::m_marker_period = 5;
39 size_t TPCHandler::m_block_size = 16*1024*1024;
40 size_t TPCHandler::m_small_block_size = 1*1024*1024;
41 XrdSysMutex TPCHandler::m_monid_mutex;
42 
44 
45 /******************************************************************************/
46 /* T P C H a n d l e r : : T P C L o g R e c o r d D e s t r u c t o r */
47 /******************************************************************************/
48 
49 TPCHandler::TPCLogRecord::~TPCLogRecord()
50 {
51 // Record monitoring data is enabled
52 //
53  if (tpcMonitor)
54  {XrdXrootdTpcMon::TpcInfo monInfo;
55 
56  monInfo.clID = clID.c_str();
57  monInfo.begT = begT;
58  gettimeofday(&monInfo.endT, 0);
59 
60  if (log_prefix == "PullRequest")
61  {monInfo.dstURL = local.c_str();
62  monInfo.srcURL = remote.c_str();
63  } else {
64  monInfo.dstURL = remote.c_str();
65  monInfo.srcURL = local.c_str();
67  }
68 
69  if (!status) monInfo.endRC = 0;
70  else if (tpc_status > 0) monInfo.endRC = tpc_status;
71  else monInfo.endRC = 1;
72  monInfo.strm = static_cast<unsigned char>(streams);
73  monInfo.fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
74  if (!isIPv6) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
75 
76  tpcMonitor->Report(monInfo);
77  }
78 }
79 
80 /******************************************************************************/
81 /* C u r l D e l e t e r : : o p e r a t o r ( ) */
82 /******************************************************************************/
83 
85 {
86  if (curl) curl_easy_cleanup(curl);
87 }
88 
89 /******************************************************************************/
90 /* s o c k o p t _ s e t c l o e x e c _ c a l l b a c k */
91 /******************************************************************************/
92 
101 int TPCHandler::sockopt_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
102  TPCLogRecord * rec = (TPCLogRecord *)clientp;
103  if (purpose == CURLSOCKTYPE_IPCXN && rec && rec->pmarkManager.isEnabled()) {
104  // We will not reach this callback if the corresponding socket could not have been connected
105  // the socket is already connected only if the packet marking is enabled
106  return CURL_SOCKOPT_ALREADY_CONNECTED;
107  }
108  return CURL_SOCKOPT_OK;
109 }
110 
111 /******************************************************************************/
112 /* o p e n s o c k e t _ c a l l b a c k */
113 /******************************************************************************/
114 
115 
120 int TPCHandler::opensocket_callback(void *clientp,
121  curlsocktype purpose,
122  struct curl_sockaddr *aInfo)
123 {
124  //Return a socket file descriptor (note the clo_exec flag will be set).
125  int fd = XrdSysFD_Socket(aInfo->family, aInfo->socktype, aInfo->protocol);
126  // See what kind of address will be used to connect
127  //
128  if(fd < 0) {
129  return CURL_SOCKET_BAD;
130  }
131  TPCLogRecord * rec = (TPCLogRecord *)clientp;
132  if (purpose == CURLSOCKTYPE_IPCXN && clientp)
133  {XrdNetAddr thePeer(&(aInfo->addr));
134  rec->isIPv6 = (thePeer.isIPType(XrdNetAddrInfo::IPv6)
135  && !thePeer.isMapped());
136  std::stringstream connectErrMsg;
137 
138  if(!rec->pmarkManager.connect(fd, &(aInfo->addr), aInfo->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
139  rec->m_log->Emsg(rec->log_prefix.c_str(),"Unable to connect socket:", connectErrMsg.str().c_str());
140  return CURL_SOCKET_BAD;
141  }
142  }
143 
144  return fd;
145 }
146 
147 int TPCHandler::closesocket_callback(void *clientp, curl_socket_t fd) {
148  TPCLogRecord * rec = (TPCLogRecord *)clientp;
149 
150  // Destroy the PMark handle associated to the file descriptor before closing it.
151  // Otherwise, we would lose the socket usage information if the socket is closed before
152  // the PMark handle is closed.
153  rec->pmarkManager.endPmark(fd);
154 
155  return close(fd);
156 }
157 
158 /******************************************************************************/
159 /* p r e p a r e U R L */
160 /******************************************************************************/
161 
162 // We need to utilize the full URL (including the query string), not just the
163 // resource name. The query portion is hidden in the `xrd-http-query` header;
164 // we take this out and combine it with the resource name.
165 // We also append the value of the headers configured in tpc.header2cgi to the resource full URL
166 //
167 // One special key is `authz`; this is always stripped out and copied to the Authorization
168 // header (which will later be used for XrdSecEntity). The latter copy is only done if
169 // the Authorization header is not already present.
170 //
171 // hasSetOpaque will be set to true if at least one opaque data has been set in the URL that is returned,
172 // false otherwise
173 std::string TPCHandler::prepareURL(XrdHttpExtReq &req, bool & hasSetOpaque) {
174  return XrdTpcUtils::prepareOpenURL(req.resource, req.headers,hdr2cgimap,hasSetOpaque);
175 }
176 
177 std::string TPCHandler::prepareURL(XrdHttpExtReq &req) {
178  bool foundHeader;
179  return prepareURL(req,foundHeader);
180 }
181 
182 /******************************************************************************/
183 /* e n c o d e _ x r o o t d _ o p a q u e _ t o _ u r i */
184 /******************************************************************************/
185 
186 // When processing a redirection from the filesystem layer, it is permitted to return
187 // some xrootd opaque data. The quoting rules for xrootd opaque data are significantly
188 // more permissive than a URI (basically, only '&' and '=' are disallowed while some
189 // URI parsers may dislike characters like '"'). This function takes an opaque string
190 // (e.g., foo=1&bar=2&baz=") and makes it safe for all URI parsers.
191 std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
192 {
193  std::stringstream parser(opaque);
194  std::string sequence;
195  std::stringstream output;
196  bool first = true;
197  while (getline(parser, sequence, '&')) {
198  if (sequence.empty()) {continue;}
199  size_t equal_pos = sequence.find('=');
200  char *val = NULL;
201  if (equal_pos != std::string::npos)
202  val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
203  // Do not emit parameter if value exists and escaping failed.
204  if (!val && equal_pos != std::string::npos) {continue;}
205 
206  if (!first) output << "&";
207  first = false;
208  output << sequence.substr(0, equal_pos);
209  if (val) {
210  output << "=" << val;
211  curl_free(val);
212  }
213  }
214  return output.str();
215 }
216 
217 /******************************************************************************/
218 /* T P C H a n d l e r : : C o n f i g u r e C u r l C A */
219 /******************************************************************************/
220 
221 void
222 TPCHandler::ConfigureCurlCA(CURL *curl)
223 {
224  auto ca_filename = m_ca_file ? m_ca_file->CAFilename() : "";
225  auto crl_filename = m_ca_file ? m_ca_file->CRLFilename() : "";
226  if (!ca_filename.empty() && !crl_filename.empty()) {
227  curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
228  //Check that the CRL file contains at least one entry before setting this option to curl
229  //Indeed, an empty CRL file will make curl unhappy and therefore will fail
230  //all HTTP TPC transfers (https://github.com/xrootd/xrootd/issues/1543)
231  std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
232  if(in.tellg() > 0 && m_ca_file->atLeastOneValidCRLFound()){
233  curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
234  } else {
235  std::ostringstream oss;
236  oss << "No valid CRL file has been found in the file " << crl_filename << ". Disabling CRL checking.";
237  m_log.Log(Warning,"TpcHandler",oss.str().c_str());
238  }
239  }
240  else if (!m_cadir.empty()) {
241  curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
242  }
243  if (!m_cafile.empty()) {
244  curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
245  }
246 }
247 
248 
249 bool TPCHandler::MatchesPath(const char *verb, const char *path) {
250  return !strcmp(verb, "COPY") || !strcmp(verb, "OPTIONS");
251 }
252 
253 /******************************************************************************/
254 /* P r e p a r e U R L */
255 /******************************************************************************/
256 
257 static std::string PrepareURL(const std::string &input) {
258  if (!strncmp(input.c_str(), "davs://", 7)) {
259  return "https://" + input.substr(7);
260  }
261  return input;
262 }
263 
264 /******************************************************************************/
265 /* T P C H a n d l e r : : P r o c e s s R e q */
266 /******************************************************************************/
267 
269  if (req.verb == "OPTIONS") {
270  return ProcessOptionsReq(req);
271  }
272  auto header = XrdOucTUtils::caseInsensitiveFind(req.headers,"credential");
273  if (header != req.headers.end()) {
274  if (header->second != "none") {
275  m_log.Emsg("ProcessReq", "COPY requested an unsupported credential type: ", header->second.c_str());
276  return req.SendSimpleResp(400, NULL, NULL, "COPY requestd an unsupported Credential type", 0);
277  }
278  }
279  header = XrdOucTUtils::caseInsensitiveFind(req.headers,"source");
280  if (header != req.headers.end()) {
281  std::string src = PrepareURL(header->second);
282  return ProcessPullReq(src, req);
283  }
284  header = XrdOucTUtils::caseInsensitiveFind(req.headers,"destination");
285  if (header != req.headers.end()) {
286  return ProcessPushReq(header->second, req);
287  }
288  m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified.");
289  return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0);
290 }
291 
292 /******************************************************************************/
293 /* T P C H a n d l e r D e s t r u c t o r */
294 /******************************************************************************/
295 
297  m_sfs = NULL;
298 }
299 
300 /******************************************************************************/
301 /* T P C H a n d l e r C o n s t r u c t o r */
302 /******************************************************************************/
303 
304 TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) :
305  m_desthttps(false),
306  m_fixed_route(false),
307  m_timeout(60),
308  m_first_timeout(120),
309  m_log(log->logger(), "TPC_"),
310  m_sfs(NULL)
311 {
312  if (!Configure(config, myEnv)) {
313  throw std::runtime_error("Failed to configure the HTTP third-party-copy handler.");
314  }
315 
316 // Extract out the TPC monitoring object (we share it with xrootd).
317 //
318  XrdXrootdGStream *gs = (XrdXrootdGStream*)myEnv->GetPtr("Tpc.gStream*");
319  if (gs)
320  TPCLogRecord::tpcMonitor = new XrdXrootdTpcMon("http",log->logger(),*gs);
321 }
322 
323 /******************************************************************************/
324 /* T P C H a n d l e r : : P r o c e s s O p t i o n s R e q */
325 /******************************************************************************/
326 
330 int TPCHandler::ProcessOptionsReq(XrdHttpExtReq &req) {
331  return req.SendSimpleResp(200, NULL, (char *) "DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
332 }
333 
334 /******************************************************************************/
335 /* T P C H a n d l e r : : G e t A u t h z */
336 /******************************************************************************/
337 
338 std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) {
339  std::string authz;
340  auto authz_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"authorization");
341  if (authz_header != req.headers.end()) {
342  char * quoted_url = quote(authz_header->second.c_str());
343  std::stringstream ss;
344  ss << "authz=" << quoted_url;
345  free(quoted_url);
346  authz = ss.str();
347  }
348  return authz;
349 }
350 
351 /******************************************************************************/
352 /* T P C H a n d l e r : : R e d i r e c t T r a n s f e r */
353 /******************************************************************************/
354 
355 int TPCHandler::RedirectTransfer(CURL *curl, const std::string &redirect_resource,
356  XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec)
357 {
358  int port;
359  const char *ptr = error.getErrText(port);
360  if ((ptr == NULL) || (*ptr == '\0') || (port == 0)) {
361  rec.status = 500;
362  std::stringstream ss;
363  ss << "Internal error: redirect without hostname";
364  logTransferEvent(LogMask::Error, rec, "REDIRECT_INTERNAL_ERROR", ss.str());
365  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
366  }
367 
368  // Construct redirection URL taking into consideration any opaque info
369  std::string rdr_info = ptr;
370  std::string host, opaque;
371  size_t pos = rdr_info.find('?');
372  host = rdr_info.substr(0, pos);
373 
374  if (pos != std::string::npos) {
375  opaque = rdr_info.substr(pos + 1);
376  }
377 
378  std::stringstream ss;
379  ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << redirect_resource;
380 
381  if (!opaque.empty()) {
382  ss << "?" << encode_xrootd_opaque_to_uri(curl, opaque);
383  }
384 
385  rec.status = 307;
386  logTransferEvent(LogMask::Info, rec, "REDIRECT", ss.str());
387  return req.SendSimpleResp(rec.status, NULL, const_cast<char *>(ss.str().c_str()),
388  NULL, 0);
389 }
390 
391 /******************************************************************************/
392 /* T P C H a n d l e r : : O p e n W a i t S t a l l */
393 /******************************************************************************/
394 
395 int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource,
396  int mode, int openMode, const XrdSecEntity &sec,
397  const std::string &authz)
398 {
399  int open_result;
400  while (1) {
401  int orig_ucap = fh.error.getUCap();
402  fh.error.setUCap(orig_ucap | XrdOucEI::uIPv64);
403  std::string opaque;
404  size_t pos = resource.find('?');
405  // Extract the path and opaque info from the resource
406  std::string path = resource.substr(0, pos);
407 
408  if (pos != std::string::npos) {
409  opaque = resource.substr(pos + 1);
410  }
411 
412  // Append the authz information if there are some
413  if(!authz.empty()) {
414  opaque += (opaque.empty() ? "" : "&");
415  opaque += authz;
416  }
417  open_result = fh.open(path.c_str(), mode, openMode, &sec, opaque.c_str());
418 
419  if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) {
420  int secs_to_stall = fh.error.getErrInfo();
421  if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
422  std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
423  }
424  break;
425  }
426  return open_result;
427 }
428 
429 /******************************************************************************/
430 /* XRD_CHUNK_RESP: */
431 /* T P C H a n d l e r : : D e t e r m i n e X f e r S i z e */
432 /******************************************************************************/
433 
434 #ifdef XRD_CHUNK_RESP
435 
436 
437 
441 int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
442  bool &success, TPCLogRecord &rec, bool shouldReturnErrorToClient) {
443  success = false;
444  curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
445  CURLcode res;
446  res = curl_easy_perform(curl);
447  //Immediately set the CURLOPT_NOBODY flag to 0 as we anyway
448  //don't want the next curl call to do be a HEAD request
449  curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
450  if (res == CURLE_HTTP_RETURNED_ERROR) {
451  std::stringstream ss;
452  ss << "Remote server failed request";
453  std::stringstream ss2;
454  ss2 << ss.str() << ": " << curl_easy_strerror(res);
455  rec.status = 500;
456  logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss2.str());
457  return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
458  } else if (state.GetStatusCode() >= 400) {
459  std::stringstream ss;
460  ss << "Remote side " << req.clienthost << " failed with status code " << state.GetStatusCode();
461  rec.status = 500;
462  logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
463  return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0) : -1;
464  } else if (res) {
465  std::stringstream ss;
466  ss << "Internal transfer failure";
467  std::stringstream ss2;
468  ss2 << ss.str() << " - HTTP library failed: " << curl_easy_strerror(res);
469  rec.status = 500;
470  logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss2.str());
471  return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
472  }
473  std::stringstream ss;
474  ss << "Successfully determined remote size for pull request: "
475  << state.GetContentLength();
476  logTransferEvent(LogMask::Debug, rec, "SIZE_SUCCESS", ss.str());
477  success = true;
478  return 0;
479 }
480 
481 int TPCHandler::GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, bool & success, TPCLogRecord &rec) {
482  State state(curl,req.tpcForwardCreds);
483  //Don't forget to copy the headers of the client's request before doing the HEAD call. Otherwise, if there is a need for authentication,
484  //it will fail
485  state.CopyHeaders(req);
486  int result;
487  //In case we cannot get the content length, we don't return anything to the client
488  if ((result = DetermineXferSize(curl, req, state, success, rec, false)) || !success) {
489  return result;
490  }
491  contentLength = state.GetContentLength();
492  return result;
493 }
494 
495 /******************************************************************************/
496 /* XRD_CHUNK_RESP: */
497 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
498 /******************************************************************************/
499 
500 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) {
501  std::stringstream ss;
502  const std::string crlf = "\n";
503  ss << "Perf Marker" << crlf;
504  ss << "Timestamp: " << time(NULL) << crlf;
505  ss << "Stripe Index: 0" << crlf;
506  ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
507  ss << "Total Stripe Count: 1" << crlf;
508  // Include the TCP connection associated with this transfer; used by
509  // the TPC client for monitoring purposes.
510  std::string desc = state.GetConnectionDescription();
511  if (!desc.empty())
512  ss << "RemoteConnections: " << desc << crlf;
513  ss << "End" << crlf;
514  rec.bytes_transferred = state.BytesTransferred();
515  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
516 
517  return req.ChunkResp(ss.str().c_str(), 0);
518 }
519 
520 /******************************************************************************/
521 /* XRD_CHUNK_RESP: */
522 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
523 /******************************************************************************/
524 
525 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
526  off_t bytes_transferred)
527 {
528  // The 'performance marker' format is largely derived from how GridFTP works
529  // (e.g., the concept of `Stripe` is not quite so relevant here). See:
530  // https://twiki.cern.ch/twiki/bin/view/LCG/HttpTpcTechnical
531  // Example marker:
532  // Perf Marker\n
533  // Timestamp: 1537788010\n
534  // Stripe Index: 0\n
535  // Stripe Bytes Transferred: 238745\n
536  // Total Stripe Count: 1\n
537  // RemoteConnections: tcp:129.93.3.4:1234,tcp:[2600:900:6:1301:268a:7ff:fef6:a590]:2345\n
538  // End\n
539  //
540  std::stringstream ss;
541  const std::string crlf = "\n";
542  ss << "Perf Marker" << crlf;
543  ss << "Timestamp: " << time(NULL) << crlf;
544  ss << "Stripe Index: 0" << crlf;
545  ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf;
546  ss << "Total Stripe Count: 1" << crlf;
547  // Build a list of TCP connections associated with this transfer; used by
548  // the TPC client for monitoring purposes.
549  bool first = true;
550  std::stringstream ss2;
551  for (std::vector<State*>::const_iterator iter = state.begin();
552  iter != state.end(); iter++)
553  {
554  std::string desc = (*iter)->GetConnectionDescription();
555  if (!desc.empty()) {
556  ss2 << (first ? "" : ",") << desc;
557  first = false;
558  }
559  }
560  if (!first)
561  ss << "RemoteConnections: " << ss2.str() << crlf;
562  ss << "End" << crlf;
563  rec.bytes_transferred = bytes_transferred;
564  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
565 
566  return req.ChunkResp(ss.str().c_str(), 0);
567 }
568 
569 /******************************************************************************/
570 /* XRD_CHUNK_RESP: */
571 /* T P C H a n d l e r : : R u n C u r l W i t h U p d a t e s */
572 /******************************************************************************/
573 
574 int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
575  TPCLogRecord &rec)
576 {
577  // Create the multi-handle and add in the current transfer to it.
578  CURLM *multi_handle = curl_multi_init();
579  if (!multi_handle) {
580  rec.status = 500;
581  logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL",
582  "Failed to initialize a libcurl multi-handle");
583  std::stringstream ss;
584  ss << "Failed to initialize internal server memory";
585  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
586  }
587 
588  //curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024);
589 
590  CURLMcode mres;
591  mres = curl_multi_add_handle(multi_handle, curl);
592  if (mres) {
593  rec.status = 500;
594  std::stringstream ss;
595  ss << "Failed to add transfer to libcurl multi-handle: HTTP library failure=" << curl_multi_strerror(mres);
596  logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", ss.str());
597  curl_multi_cleanup(multi_handle);
598  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
599  }
600 
601  // Start response to client prior to the first call to curl_multi_perform
602  int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
603  if (retval) {
604  curl_multi_cleanup(multi_handle);
605  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
606  "Failed to send the initial response to the TPC client");
607  return retval;
608  } else {
609  logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
610  "Initial transfer response sent to the TPC client");
611  }
612 
613  // Transfer loop: use curl to actually run the transfer, but periodically
614  // interrupt things to send back performance updates to the client.
615  int running_handles = 1;
616  time_t last_marker = 0;
617  // Track how long it's been since the last time we recorded more bytes being transferred.
618  off_t last_advance_bytes = 0;
619  time_t last_advance_time = time(NULL);
620  time_t transfer_start = last_advance_time;
621  CURLcode res = static_cast<CURLcode>(-1);
622  do {
623  time_t now = time(NULL);
624  time_t next_marker = last_marker + m_marker_period;
625  if (now >= next_marker) {
626  off_t bytes_xfer = state.BytesTransferred();
627  if (bytes_xfer > last_advance_bytes) {
628  last_advance_bytes = bytes_xfer;
629  last_advance_time = now;
630  }
631  if (SendPerfMarker(req, rec, state)) {
632  curl_multi_remove_handle(multi_handle, curl);
633  curl_multi_cleanup(multi_handle);
634  logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
635  "Failed to send a perf marker to the TPC client");
636  return -1;
637  }
638  int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
639  if (now > last_advance_time + timeout) {
640  const char *log_prefix = rec.log_prefix.c_str();
641  bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
642 
643  state.SetErrorCode(10);
644  std::stringstream ss;
645  ss << "Transfer failed because no bytes have been "
646  << (tpc_pull ? "received from the source (pull mode) in "
647  : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
648  state.SetErrorMessage(ss.str());
649  curl_multi_remove_handle(multi_handle, curl);
650  curl_multi_cleanup(multi_handle);
651  break;
652  }
653  last_marker = now;
654  }
655  // The transfer will start after this point, notify the packet marking manager
656  rec.pmarkManager.startTransfer();
657  mres = curl_multi_perform(multi_handle, &running_handles);
658  if (mres == CURLM_CALL_MULTI_PERFORM) {
659  // curl_multi_perform should be called again immediately. On newer
660  // versions of curl, this is no longer used.
661  continue;
662  } else if (mres != CURLM_OK) {
663  break;
664  } else if (running_handles == 0) {
665  break;
666  }
667 
668  rec.pmarkManager.beginPMarks();
669  //printf("There are %d running handles\n", running_handles);
670 
671  // Harvest any messages, looking for CURLMSG_DONE.
672  CURLMsg *msg;
673  do {
674  int msgq = 0;
675  msg = curl_multi_info_read(multi_handle, &msgq);
676  if (msg && (msg->msg == CURLMSG_DONE)) {
677  CURL *easy_handle = msg->easy_handle;
678  res = msg->data.result;
679  curl_multi_remove_handle(multi_handle, easy_handle);
680  }
681  } while (msg);
682 
683  int64_t max_sleep_time = next_marker - time(NULL);
684  if (max_sleep_time <= 0) {
685  continue;
686  }
687  int fd_count;
688 #ifdef HAVE_CURL_MULTI_WAIT
689  mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count);
690 #else
691  mres = curl_multi_wait_impl(multi_handle, max_sleep_time*1000, &fd_count);
692 #endif
693  if (mres != CURLM_OK) {
694  break;
695  }
696  } while (running_handles);
697 
698  if (mres != CURLM_OK) {
699  std::stringstream ss;
700  ss << "Internal libcurl multi-handle error: HTTP library failure=" << curl_multi_strerror(mres);
701  logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
702 
703  curl_multi_remove_handle(multi_handle, curl);
704  curl_multi_cleanup(multi_handle);
705 
706  if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
707  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
708  "Failed to send error message to the TPC client");
709  return retval;
710  }
711  return req.ChunkResp(NULL, 0);
712  }
713 
714  // Harvest any messages, looking for CURLMSG_DONE.
715  CURLMsg *msg;
716  do {
717  int msgq = 0;
718  msg = curl_multi_info_read(multi_handle, &msgq);
719  if (msg && (msg->msg == CURLMSG_DONE)) {
720  CURL *easy_handle = msg->easy_handle;
721  res = msg->data.result;
722  curl_multi_remove_handle(multi_handle, easy_handle);
723  }
724  } while (msg);
725 
726  if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
727  curl_multi_remove_handle(multi_handle, curl);
728  curl_multi_cleanup(multi_handle);
729  std::stringstream ss;
730  ss << "Internal state error in libcurl";
731  logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
732 
733  if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
734  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
735  "Failed to send error message to the TPC client");
736  return retval;
737  }
738  return req.ChunkResp(NULL, 0);
739  }
740  curl_multi_cleanup(multi_handle);
741 
742  state.Flush();
743 
744  rec.bytes_transferred = state.BytesTransferred();
745  rec.tpc_status = state.GetStatusCode();
746 
747  // Explicitly finalize the stream (which will close the underlying file
748  // handle) before the response is sent. In some cases, subsequent HTTP
749  // requests can occur before the filesystem is done closing the handle -
750  // and those requests may occur against partial data.
751  state.Finalize();
752 
753  // Generate the final response back to the client.
754  std::stringstream ss;
755  bool success = false;
756  if (state.GetStatusCode() >= 400) {
757  std::string err = state.GetErrorMessage();
758  std::stringstream ss2;
759  ss2 << "Remote side failed with status code " << state.GetStatusCode();
760  if (!err.empty()) {
761  std::replace(err.begin(), err.end(), '\n', ' ');
762  ss2 << "; error message: \"" << err << "\"";
763  }
764  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
765  ss << generateClientErr(ss2, rec);
766  } else if (state.GetErrorCode()) {
767  std::string err = state.GetErrorMessage();
768  if (err.empty()) {err = "(no error message provided)";}
769  else {std::replace(err.begin(), err.end(), '\n', ' ');}
770  std::stringstream ss2;
771  ss2 << "Error when interacting with local filesystem: " << err;
772  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
773  ss << generateClientErr(ss2, rec);
774  } else if (res != CURLE_OK) {
775  std::stringstream ss2;
776  ss2 << "Internal transfer failure";
777  std::stringstream ss3;
778  ss3 << ss2.str() << ": " << curl_easy_strerror(res);
779  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss3.str());
780  ss << generateClientErr(ss2, rec, res);
781  } else {
782  ss << "success: Created";
783  success = true;
784  }
785 
786  if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
787  logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
788  "Failed to send last update to remote client");
789  return retval;
790  } else if (success) {
791  logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
792  rec.status = 0;
793  }
794  return req.ChunkResp(NULL, 0);
795 }
796 
797 /******************************************************************************/
798 /* !XRD_CHUNK_RESP: */
799 /* T P C H a n d l e r : : R u n C u r l B a s i c */
800 /******************************************************************************/
801 
802 #else
803 int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state,
804  TPCLogRecord &rec) {
805  const char *log_prefix = rec.log_prefix.c_str();
806  CURLcode res;
807  res = curl_easy_perform(curl);
808  state.Flush();
809  state.Finalize();
810  if (state.GetErrorCode()) {
811  std::string err = state.GetErrorMessage();
812  if (err.empty()) {err = "(no error message provided)";}
813  else {std::replace(err.begin(), err.end(), '\n', ' ');}
814  std::stringstream ss2;
815  ss2 << "Error when interacting with local filesystem: " << err;
816  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
817  ss << "failure: " << ss2.str();
818  } else if (res == CURLE_HTTP_RETURNED_ERROR) {
819  m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res));
820  return req.SendSimpleResp(500, NULL, NULL,
821  const_cast<char *>(curl_easy_strerror(res)), 0);
822  } else if (state.GetStatusCode() >= 400) {
823  std::stringstream ss;
824  ss << "Remote side failed with status code " << state.GetStatusCode();
825  m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str());
826  return req.SendSimpleResp(500, NULL, NULL,
827  const_cast<char *>(ss.str().c_str()), 0);
828  } else if (res) {
829  m_log.Emsg(log_prefix, "Curl failed", curl_easy_strerror(res));
830  char msg[] = "Unknown internal transfer failure";
831  return req.SendSimpleResp(500, NULL, NULL, msg, 0);
832  } else {
833  char msg[] = "Created";
834  rec.status = 0;
835  return req.SendSimpleResp(201, NULL, NULL, msg, 0);
836  }
837 }
838 #endif
839 
840 /******************************************************************************/
841 /* T P C H a n d l e r : : P r o c e s s P u s h R e q */
842 /******************************************************************************/
843 
844 int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
845  TPCLogRecord rec(req);
846  rec.log_prefix = "PushRequest";
847  rec.local = req.resource;
848  rec.remote = resource;
849  rec.m_log = &m_log;
850  char *name = req.GetSecEntity().name;
851  req.GetClientID(rec.clID);
852  if (name) rec.name = name;
853  logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");
854 
855  ManagedCurlHandle curlPtr(curl_easy_init());
856  auto curl = curlPtr.get();
857  if (!curl) {
858  std::stringstream ss;
859  ss << "Failed to initialize internal transfer resources";
860  rec.status = 500;
861  logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", ss.str());
862  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
863  }
864  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
865  curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
866 // curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_setcloexec_callback);
867 
868  curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
869  curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
870  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
871  curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
872  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
873  curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
874  auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
875  std::string redirect_resource = req.resource;
876  if (query_header != req.headers.end()) {
877  redirect_resource = query_header->second;
878  }
879 
880  AtomicBeg(m_monid_mutex);
881  uint64_t file_monid = AtomicInc(m_monid);
882  AtomicEnd(m_monid_mutex);
883  std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
884  if (!fh.get()) {
885  rec.status = 500;
886  std::stringstream ss;
887  ss << "Failed to initialize internal transfer file handle";
888  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL",
889  ss.str());
890  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
891  }
892  std::string full_url = prepareURL(req);
893 
894  std::string authz = GetAuthz(req);
895 
896  int open_results = OpenWaitStall(*fh, full_url, SFS_O_RDONLY, 0644,
897  req.GetSecEntity(), authz);
898  if (SFS_REDIRECT == open_results) {
899  int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
900  return result;
901  } else if (SFS_OK != open_results) {
902  int code;
903  std::stringstream ss;
904  const char *msg = fh->error.getErrText(code);
905  if (msg == NULL) ss << "Failed to open local resource";
906  else ss << msg;
907  rec.status = 400;
908  if (code == EACCES) rec.status = 401;
909  else if (code == EEXIST) rec.status = 412;
910  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", msg);
911  int resp_result = req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
912  fh->close();
913  return resp_result;
914  }
915  ConfigureCurlCA(curl);
916  curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
917 
918  Stream stream(std::move(fh), 0, 0, m_log);
919  State state(0, stream, curl, true, req.tpcForwardCreds);
920  state.CopyHeaders(req);
921 
922 #ifdef XRD_CHUNK_RESP
923  return RunCurlWithUpdates(curl, req, state, rec);
924 #else
925  return RunCurlBasic(curl, req, state, rec);
926 #endif
927 }
928 
929 /******************************************************************************/
930 /* T P C H a n d l e r : : P r o c e s s P u l l R e q */
931 /******************************************************************************/
932 
933 int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) {
934  TPCLogRecord rec(req);
935  rec.log_prefix = "PullRequest";
936  rec.local = req.resource;
937  rec.remote = resource;
938  rec.m_log = &m_log;
939  char *name = req.GetSecEntity().name;
940  req.GetClientID(rec.clID);
941  if (name) rec.name = name;
942  logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a pull request");
943 
944  ManagedCurlHandle curlPtr(curl_easy_init());
945  auto curl = curlPtr.get();
946  if (!curl) {
947  std::stringstream ss;
948  ss << "Failed to initialize internal transfer resources";
949  rec.status = 500;
950  logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
951  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
952  }
953  // ddavila 2023-01-05:
954  // The following change was required by the Rucio/SENSE project where
955  // multiple IP addresses, each from a different subnet, are assigned to a
956  // single server and routed differently by SENSE.
957  // The above requires the server to utilize the same IP, that was used to
958  // start the TPC, for the resolution of the given TPC instead of
959  // using any of the IPs available.
960  if (m_fixed_route){
961  XrdNetAddr *nP;
962  int numIP = 0;
963  char buff[1024];
964  char * ip;
965 
966  // Get the hostname used to contact the server from the http header
967  auto host_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"host");
968  std::string host_used;
969  if (host_header != req.headers.end()) {
970  host_used = host_header->second;
971  }
972 
973  // Get the IP addresses associated with the above hostname
974  XrdNetUtils::GetAddrs(host_used.c_str(), &nP, numIP, XrdNetUtils::prefAuto, 0);
975  int ip_size = nP[0].Format(buff, 1024, XrdNetAddrInfo::fmtAddr,XrdNetAddrInfo::noPort);
976  ip = (char *)malloc(ip_size-1);
977 
978  // Substring to get only the address, remove brackets and garbage
979  memcpy(ip, buff+1, ip_size-2);
980  ip[ip_size-2]='\0';
981  logTransferEvent(LogMask::Info, rec, "LOCAL IP", ip);
982 
983  curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
984  }
985  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
986  curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
987 // curl_easy_setopt(curl,CURLOPT_SOCKOPTFUNCTION,sockopt_setcloexec_callback);
988  curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
989  curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
990  curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
991  curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
992  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
993  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
994  curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
995  std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
996  if (!fh.get()) {
997  std::stringstream ss;
998  ss << "Failed to initialize internal transfer file handle";
999  rec.status = 500;
1000  logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
1001  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1002  }
1003  auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
1004  std::string redirect_resource = req.resource;
1005  if (query_header != req.headers.end()) {
1006  redirect_resource = query_header->second;
1007  }
1009  auto overwrite_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"overwrite");
1010  if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) {
1011  if (! usingEC) mode = SFS_O_TRUNC;
1012  }
1013  int streams = 1;
1014  {
1015  auto streams_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"x-number-of-streams");
1016  if (streams_header != req.headers.end()) {
1017  int stream_req = -1;
1018  try {
1019  stream_req = std::stol(streams_header->second);
1020  } catch (...) { // Handled below
1021  }
1022  if (stream_req < 0 || stream_req > 100) {
1023  std::stringstream ss;
1024  ss << "Invalid request for number of streams";
1025  rec.status = 400;
1026  logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", ss.str());
1027  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1028  }
1029  streams = stream_req == 0 ? 1 : stream_req;
1030  }
1031  }
1032  rec.streams = streams;
1033  bool hasSetOpaque = false;
1034  std::string full_url = prepareURL(req, hasSetOpaque);
1035  std::string authz = GetAuthz(req);
1036  curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
1037  ConfigureCurlCA(curl);
1038 #ifdef XRD_CHUNK_RESP
1039  {
1040  //Get the content-length of the source file and pass it to the OSS layer
1041  //during the open
1042  uint64_t sourceFileContentLength = 0;
1043  bool success;
1044  GetContentLengthTPCPull(curl, req, sourceFileContentLength, success, rec);
1045  if(success) {
1046  //In the case we cannot get the information from the source server (offline or other error)
1047  //we just don't add the size information to the opaque of the local file to open
1048  full_url += hasSetOpaque ? "&" : "?";
1049  full_url += "oss.asize=" + std::to_string(sourceFileContentLength);
1050  }
1051  }
1052 #endif
1053  int open_result = OpenWaitStall(*fh, full_url, mode|SFS_O_WRONLY,
1054  0644 | SFS_O_MKPTH,
1055  req.GetSecEntity(), authz);
1056  if (SFS_REDIRECT == open_result) {
1057  int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
1058  return result;
1059  } else if (SFS_OK != open_result) {
1060  int code;
1061  std::stringstream ss;
1062  const char *msg = fh->error.getErrText(code);
1063  if ((msg == NULL) || (*msg == '\0')) ss << "Failed to open local resource";
1064  else ss << msg;
1065  rec.status = 400;
1066  if (code == EACCES) rec.status = 401;
1067  else if (code == EEXIST) rec.status = 412;
1068  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", ss.str());
1069  int resp_result = req.SendSimpleResp(rec.status, NULL, NULL,
1070  generateClientErr(ss, rec).c_str(), 0);
1071  fh->close();
1072  return resp_result;
1073  }
1074  Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
1075  State state(0, stream, curl, false, req.tpcForwardCreds);
1076  state.CopyHeaders(req);
1077 
1078 #ifdef XRD_CHUNK_RESP
1079  if (streams > 1) {
1080  return RunCurlWithStreams(req, state, streams, rec);
1081  } else {
1082  return RunCurlWithUpdates(curl, req, state, rec);
1083  }
1084 #else
1085  return RunCurlBasic(curl, req, state, rec);
1086 #endif
1087 }
1088 
1089 /******************************************************************************/
1090 /* T P C H a n d l e r : : l o g T r a n s f e r E v e n t */
1091 /******************************************************************************/
1092 
1093 void TPCHandler::logTransferEvent(LogMask mask, const TPCLogRecord &rec,
1094  const std::string &event, const std::string &message)
1095 {
1096  if (!(m_log.getMsgMask() & mask)) {return;}
1097 
1098  std::stringstream ss;
1099  ss << "event=" << event << ", local=" << rec.local << ", remote=" << rec.remote;
1100  if (rec.name.empty())
1101  ss << ", user=(anonymous)";
1102  else
1103  ss << ", user=" << rec.name;
1104  if (rec.streams != 1)
1105  ss << ", streams=" << rec.streams;
1106  if (rec.bytes_transferred >= 0)
1107  ss << ", bytes_transferred=" << rec.bytes_transferred;
1108  if (rec.status >= 0)
1109  ss << ", status=" << rec.status;
1110  if (rec.tpc_status >= 0)
1111  ss << ", tpc_status=" << rec.tpc_status;
1112  if (!message.empty())
1113  ss << "; " << message;
1114  m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
1115 }
1116 
1117 std::string TPCHandler::generateClientErr(std::stringstream &err_ss, const TPCLogRecord &rec, CURLcode cCode) {
1118  std::stringstream ssret;
1119  ssret << "failure: " << err_ss.str() << ", local=" << rec.local <<", remote=" << rec.remote;
1120  if(cCode != CURLcode::CURLE_OK) {
1121  ssret << ", HTTP library failure=" << curl_easy_strerror(cCode);
1122  }
1123  return ssret.str();
1124 }
1125 /******************************************************************************/
1126 /* X r d H t t p G e t E x t H a n d l e r */
1127 /******************************************************************************/
1128 
1129 extern "C" {
1130 
1131 XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) {
1132  if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
1133  log->Emsg("TPCInitialize", "libcurl failed to initialize");
1134  return NULL;
1135  }
1136 
1137  TPCHandler *retval{NULL};
1138  if (!config) {
1139  log->Emsg("TPCInitialize", "TPC handler requires a config filename in order to load");
1140  return NULL;
1141  }
1142  try {
1143  log->Emsg("TPCInitialize", "Will load configuration for the TPC handler from", config);
1144  retval = new TPCHandler(log, config, myEnv);
1145  } catch (std::runtime_error &re) {
1146  log->Emsg("TPCInitialize", "Encountered a runtime failure when loading ", re.what());
1147  //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*"));
1148  }
1149  return retval;
1150 }
1151 
1152 }
char * quote(const char *str)
#define close(a)
Definition: XrdPosix.hh:43
bool Debug
void getline(uchar *buff, int blen)
#define SFS_REDIRECT
#define SFS_O_MKPTH
#define SFS_STALL
#define SFS_O_RDONLY
#define SFS_STARTED
#define SFS_O_WRONLY
#define SFS_O_CREAT
int XrdSfsFileOpenMode
#define SFS_OK
#define SFS_O_TRUNC
#define AtomicInc(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
@ Error
CURLMcode curl_multi_wait_impl(CURLM *multi_handle, int timeout_ms, int *numfds)
void CURL
Definition: XrdTpcState.hh:13
static std::string PrepareURL(const std::string &input)
Definition: XrdTpcTPC.cc:257
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdSysError *log, const char *config, const char *, XrdOucEnv *myEnv)
Definition: XrdTpcTPC.cc:1131
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
Definition: XrdTpcTPC.cc:191
int GetStatusCode() const
Definition: XrdTpcState.hh:95
off_t BytesTransferred() const
Definition: XrdTpcState.hh:87
void CopyHeaders(XrdHttpExtReq &req)
Definition: XrdTpcState.cc:95
void SetErrorMessage(const std::string &error_msg)
Definition: XrdTpcState.hh:99
int GetErrorCode() const
Definition: XrdTpcState.hh:91
std::string GetErrorMessage() const
Definition: XrdTpcState.hh:97
std::string GetConnectionDescription()
Definition: XrdTpcState.cc:294
off_t GetContentLength() const
Definition: XrdTpcState.hh:89
void SetErrorCode(int error_code)
Definition: XrdTpcState.hh:93
bool Finalize()
Definition: XrdTpcState.cc:284
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
Definition: XrdTpcTPC.cc:304
virtual int ProcessReq(XrdHttpExtReq &req)
Definition: XrdTpcTPC.cc:268
virtual ~TPCHandler()
Definition: XrdTpcTPC.cc:296
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
Definition: XrdTpcTPC.cc:249
std::string clienthost
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
std::string resource
std::string verb
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static const int noPort
Do not add port number.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
static const char * GetAddrs(const char *hSpec, XrdNetAddr *aListP[], int &aListN, AddrOpts opts=allIPMap, int pNum=PortInSpec)
Definition: XrdNetUtils.cc:238
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:263
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
Definition: XrdOucTUtils.hh:79
char * name
Entity's name.
Definition: XrdSecEntity.hh:69
virtual XrdSfsFile * newFile(char *user=0, int MonID=0)=0
XrdOucErrInfo & error
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
virtual int close()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:141
int getMsgMask()
Definition: XrdSysError.hh:156
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
Definition: XrdSysError.hh:133
static std::string prepareOpenURL(const std::string &reqResource, std::map< std::string, std::string > &reqHeaders, const std::map< std::string, std::string > &hdr2cgimap, bool &hasSetOpaque)
Definition: XrdTpcUtils.cc:26
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
Definition: XrdTpcTPC.hh:39
LogMask
Definition: XrdTpcTPC.hh:27
@ Warning
Definition: XrdTpcTPC.hh:30
void operator()(CURL *curl)
Definition: XrdTpcTPC.cc:84
static const int uIPv64
ucap: Supports only IPv4 info
static const int isaPush