XRootD
XrdClHttpFilePlugIn.cc
Go to the documentation of this file.
1 
6 
7 #include <unistd.h>
8 
9 #include <cassert>
10 
13 #include "XrdCl/XrdClDefaultEnv.hh"
14 #include "XrdCl/XrdClLog.hh"
15 #include "XrdCl/XrdClStatus.hh"
16 
17 #include "XrdOuc/XrdOucCRC.hh"
18 
19 namespace {
20 
21 int MakePosixOpenFlags(XrdCl::OpenFlags::Flags flags) {
22  int posix_flags = 0;
23  if (flags & XrdCl::OpenFlags::New) {
24  posix_flags |= O_CREAT | O_EXCL;
25  }
26  if (flags & XrdCl::OpenFlags::Delete) {
27  posix_flags |= O_CREAT | O_TRUNC;
28  }
29  if (flags & XrdCl::OpenFlags::Read) {
30  posix_flags |= O_RDONLY;
31  }
32  if (flags & XrdCl::OpenFlags::Write) {
33  posix_flags |= O_WRONLY;
34  }
35  if (flags & XrdCl::OpenFlags::Update) {
36  posix_flags |= O_RDWR;
37  }
38  return posix_flags;
39 }
40 
41 } // namespace
42 
43 namespace XrdCl {
44 
45 Davix::Context *root_davix_context_ = NULL;
46 Davix::DavPosix *root_davix_client_file_ = NULL;
47 
49  : davix_fd_(nullptr),
50  curr_offset(0),
51  is_open_(false),
52  filesize(0),
53  url_(),
54  properties_(),
55  logger_(DefaultEnv::GetLog()) {
56  SetUpLogging(logger_);
57  logger_->Debug(kLogXrdClHttp, "HttpFilePlugin constructed.");
58 
59  std::string origin = getenv("XRDXROOTD_PROXY")? getenv("XRDXROOTD_PROXY") : "";
60  if ( origin.empty() || origin.find("=") == 0) {
61  davix_context_ = new Davix::Context();
62  davix_client_ = new Davix::DavPosix(davix_context_);
63  }
64  else {
65  if (root_davix_context_ == NULL) {
66  root_davix_context_ = new Davix::Context();
67  if (getenv("DAVIX_LOAD_GRID_MODULE_IN_XRD"))
68  root_davix_context_->loadModule("grid");
69  root_davix_client_file_ = new Davix::DavPosix(root_davix_context_);
70  }
71  davix_context_ = root_davix_context_;
72  davix_client_ = root_davix_client_file_;
73  }
74 
75 }
76 
78  if (root_davix_context_ == NULL) {
79  delete davix_client_;
80  delete davix_context_;
81  }
82 }
83 
84 XRootDStatus HttpFilePlugIn::Open(const std::string &url,
85  OpenFlags::Flags flags, Access::Mode /*mode*/,
86  ResponseHandler *handler, uint16_t timeout) {
87  if (is_open_) {
88  logger_->Error(kLogXrdClHttp, "URL %s already open", url.c_str());
90  }
91 
92  if (XrdCl::URL(url).GetProtocol().find("https") == 0)
93  isChannelEncrypted = true;
94  else
95  isChannelEncrypted = false;
96 
97  avoid_pread_ = false;
98  if (getenv(HTTP_FILE_PLUG_IN_AVOIDRANGE_ENV) != NULL)
99  avoid_pread_ = true;
100  else {
102  auto search = CGIs.find(HTTP_FILE_PLUG_IN_AVOIDRANGE_CGI);
103  if (search != CGIs.end())
104  avoid_pread_ = true;
105  }
106 
107  Davix::RequestParams params;
108  if (timeout != 0) {
109  struct timespec ts = {timeout, 0};
110  params.setOperationTimeout(&ts);
111  }
112 
114  auto full_path = XrdCl::URL(url).GetLocation();
115  auto pos = full_path.find_last_of('/');
116  auto base_dir =
117  pos != std::string::npos ? full_path.substr(0, pos) : full_path;
118  auto mkdir_status =
119  Posix::MkDir(*davix_client_, base_dir, XrdCl::MkDirFlags::MakePath,
120  XrdCl::Access::None, timeout);
121  if (mkdir_status.IsError()) {
122  logger_->Error(kLogXrdClHttp,
123  "Could not create parent directories when opening: %s",
124  url.c_str());
125  return mkdir_status;
126  }
127  }
128 
129  if (((flags & OpenFlags::Write) || (flags & OpenFlags::Update)) &&
130  (flags & OpenFlags::Delete)) {
131  auto stat_info = new StatInfo();
132  auto status = Posix::Stat(*davix_client_, url, timeout, stat_info);
133  if (status.IsOK()) {
134  auto unlink_status = Posix::Unlink(*davix_client_, url, timeout);
135  if (unlink_status.IsError()) {
136  logger_->Error(
138  "Could not delete existing destination file: %s. Error: %s",
139  url.c_str(), unlink_status.GetErrorMessage().c_str());
140  return unlink_status;
141  }
142  }
143  delete stat_info;
144  }
145  else if (flags & OpenFlags::Read) {
146  auto stat_info = new StatInfo();
147  auto status = Posix::Stat(*davix_client_, url, timeout, stat_info);
148  if (status.IsOK()) {
149  filesize = stat_info->GetSize();
150  }
151  delete stat_info;
152  }
153 
154  auto posix_open_flags = MakePosixOpenFlags(flags);
155 
156  logger_->Debug(kLogXrdClHttp,
157  "Open: URL: %s, XRootD flags: %d, POSIX flags: %d",
158  url.c_str(), flags, posix_open_flags);
159 
160  // res == std::pair<fd, XRootDStatus>
161  auto res = Posix::Open(*davix_client_, url, posix_open_flags, timeout);
162  if (!res.first) {
163  logger_->Error(kLogXrdClHttp, "Could not open: %s, error: %s", url.c_str(),
164  res.second.ToStr().c_str());
165  return res.second;
166  }
167 
168  davix_fd_ = res.first;
169 
170  logger_->Debug(kLogXrdClHttp, "Opened: %s", url.c_str());
171 
172  is_open_ = true;
173  url_ = url;
174 
175  auto status = new XRootDStatus();
176  handler->HandleResponse(status, nullptr);
177 
178  return XRootDStatus();
179 }
180 
182  uint16_t /*timeout*/) {
183  if (!is_open_) {
184  logger_->Error(kLogXrdClHttp,
185  "Cannot close. URL hasn't been previously opened");
187  }
188 
189  logger_->Debug(kLogXrdClHttp, "Closing davix fd: %p", davix_fd_);
190 
191  auto status = Posix::Close(*davix_client_, davix_fd_);
192  if (status.IsError()) {
193  logger_->Error(kLogXrdClHttp, "Could not close davix fd: %p, error: %s",
194  davix_fd_, status.ToStr().c_str());
195  return status;
196  }
197 
198  is_open_ = false;
199  url_.clear();
200 
201  handler->HandleResponse(new XRootDStatus(), nullptr);
202 
203  return XRootDStatus();
204 }
205 
207  uint16_t timeout) {
208  if (!is_open_) {
209  logger_->Error(kLogXrdClHttp,
210  "Cannot stat. URL hasn't been previously opened");
212  }
213 
214  auto stat_info = new StatInfo();
215  auto status = Posix::Stat(*davix_client_, url_, timeout, stat_info);
216  // A file that is_open_ = true should not retune 400/3011. the only time this
217  // happen is a newly created file. Davix doesn't issue a http PUT so this file
218  // won't show up for Stat(). Here we fake a response.
219  if (status.IsError() && status.code == 400 && status.errNo == 3011) {
220  std::ostringstream data;
221  data << 140737018595560 << " " << filesize << " " << 33261 << " " << time(NULL);
222  stat_info->ParseServerResponse(data.str().c_str());
223  }
224  else if (status.IsError()) {
225  logger_->Error(kLogXrdClHttp, "Stat failed: %s", status.ToStr().c_str());
226  return status;
227  }
228 
229  logger_->Debug(kLogXrdClHttp, "Stat-ed URL: %s", url_.c_str());
230 
231  auto obj = new AnyObject();
232  obj->Set(stat_info);
233 
234  handler->HandleResponse(new XRootDStatus(), obj);
235 
236  return XRootDStatus();
237 }
238 
239 XRootDStatus HttpFilePlugIn::Read(uint64_t offset, uint32_t size, void *buffer,
240  ResponseHandler *handler,
241  uint16_t /*timeout*/) {
242  if (!is_open_) {
243  logger_->Error(kLogXrdClHttp,
244  "Cannot read. URL hasn't previously been opened");
246  }
247 
248  // DavPosix::pread will return -1 if the pread goes beyond the file size
249  size = (offset + size > filesize)? filesize - offset : size;
250  std::pair<int, XRootDStatus> res;
251  if (! avoid_pread_) {
252  res = Posix::PRead(*davix_client_, davix_fd_, buffer, size, offset);
253  }
254  else {
255  offset_locker.lock();
256  if (offset == curr_offset) {
257  res = Posix::Read(*davix_client_, davix_fd_, buffer, size);
258  }
259  else {
260  res = Posix::PRead(*davix_client_, davix_fd_, buffer, size, offset);
261  }
262  }
263 
264  if (res.second.IsError()) {
265  logger_->Error(kLogXrdClHttp, "Could not read URL: %s, error: %s",
266  url_.c_str(), res.second.ToStr().c_str());
267  if (avoid_pread_) offset_locker.unlock();
268  return res.second;
269  }
270 
271  int num_bytes_read = res.first;
272  curr_offset = offset + num_bytes_read;
273  if (avoid_pread_) offset_locker.unlock();
274 
275  logger_->Debug(kLogXrdClHttp, "Read %d bytes, at offset %llu, from URL: %s",
276  num_bytes_read, (unsigned long long) offset, url_.c_str());
277 
278  auto status = new XRootDStatus();
279  auto chunk_info = new ChunkInfo(offset, num_bytes_read, buffer);
280  auto obj = new AnyObject();
281  obj->Set(chunk_info);
282  handler->HandleResponse(status, obj);
283 
284  return XRootDStatus();
285 }
286 
288  private:
289  XrdCl::ResponseHandler *realHandler;
290  bool isChannelEncrypted;
291  public:
292  // constructor
294  bool isHttps) : realHandler(a), isChannelEncrypted(isHttps) {}
295 
296  // Response Handler
298  XrdCl::AnyObject *rdresp) {
299 
300  if( !status->IsOK() )
301  {
302  realHandler->HandleResponse( status, rdresp );
303  delete this;
304  return;
305  }
306 
307  //using namespace XrdCl;
308 
309  ChunkInfo *chunk = 0;
310  rdresp->Get(chunk);
311 
312  std::vector<uint32_t> cksums;
313  if( isChannelEncrypted )
314  {
315  size_t nbpages = chunk->length / XrdSys::PageSize;
316  if( chunk->length % XrdSys::PageSize )
317  ++nbpages;
318  cksums.reserve( nbpages );
319 
320  size_t size = chunk->length;
321  char *buffer = reinterpret_cast<char*>( chunk->buffer );
322 
323  for( size_t pg = 0; pg < nbpages; ++pg )
324  {
325  size_t pgsize = XrdSys::PageSize;
326  if( pgsize > size ) pgsize = size;
327  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
328  cksums.push_back( crcval );
329  buffer += pgsize;
330  size -= pgsize;
331  }
332  }
333 
334  PageInfo *pages = new PageInfo(chunk->offset, chunk->length, chunk->buffer, std::move(cksums));
335  delete rdresp;
336  AnyObject *response = new AnyObject();
337  response->Set( pages );
338  realHandler->HandleResponse( status, response );
339 
340  delete this;
341  }
342 };
343 
344 XRootDStatus HttpFilePlugIn::PgRead(uint64_t offset, uint32_t size, void *buffer,
345  ResponseHandler *handler,
346  uint16_t timeout) {
347  ResponseHandler *substitHandler = new PgReadSubstitutionHandler( handler, isChannelEncrypted );
348  XRootDStatus st = Read(offset, size, buffer, substitHandler, timeout);
349  return st;
350 }
351 
352 XRootDStatus HttpFilePlugIn::Write(uint64_t offset, uint32_t size,
353  const void *buffer, ResponseHandler *handler,
354  uint16_t timeout) {
355  if (!is_open_) {
356  logger_->Error(kLogXrdClHttp,
357  "Cannot write. URL hasn't previously been opened");
359  }
360 
361  // res == std::pair<int, XRootDStatus>
362  auto res =
363  Posix::PWrite(*davix_client_, davix_fd_, offset, size, buffer, timeout);
364  if (res.second.IsError()) {
365  logger_->Error(kLogXrdClHttp, "Could not write URL: %s, error: %s",
366  url_.c_str(), res.second.ToStr().c_str());
367  return res.second;
368  }
369  else
370  filesize += res.first;
371 
372  logger_->Debug(kLogXrdClHttp, "Wrote %d bytes, at offset %llu, to URL: %s",
373  res.first, (unsigned long long) offset, url_.c_str());
374 
375  handler->HandleResponse(new XRootDStatus(), nullptr);
376 
377  return XRootDStatus();
378 }
379 
380 //------------------------------------------------------------------------
382 //------------------------------------------------------------------------
384  uint32_t size,
385  const void *buffer,
386  std::vector<uint32_t> &cksums,
387  ResponseHandler *handler,
388  uint16_t timeout )
389 { (void)cksums;
390  return Write(offset, size, buffer, handler, timeout);
391 }
392 
393 XRootDStatus HttpFilePlugIn::Sync(ResponseHandler *handler, uint16_t timeout) {
394  (void)handler;
395  (void)timeout;
396 
397  logger_->Debug(kLogXrdClHttp, "Sync is a no-op for HTTP.");
398 
399  return XRootDStatus();
400 }
401 
402 
404  ResponseHandler *handler,
405  uint16_t /*timeout*/) {
406  if (!is_open_) {
407  logger_->Error(kLogXrdClHttp,
408  "Cannot read. URL hasn't previously been opened");
410  }
411 
412  const auto num_chunks = chunks.size();
413  std::vector<Davix::DavIOVecInput> input_vector(num_chunks);
414  std::vector<Davix::DavIOVecOuput> output_vector(num_chunks);
415 
416  for (size_t i = 0; i < num_chunks; ++i) {
417  input_vector[i].diov_offset = chunks[i].offset;
418  input_vector[i].diov_size = chunks[i].length;
419  input_vector[i].diov_buffer = chunks[i].buffer;
420  }
421 
422  // res == std::pair<int, XRootDStatus>
423  auto res = Posix::PReadVec(*davix_client_, davix_fd_, chunks, buffer);
424  if (res.second.IsError()) {
425  logger_->Error(kLogXrdClHttp, "Could not vectorRead URL: %s, error: %s",
426  url_.c_str(), res.second.ToStr().c_str());
427  return res.second;
428  }
429 
430  int num_bytes_read = res.first;
431 
432  logger_->Debug(kLogXrdClHttp, "VecRead %d bytes, from URL: %s",
433  num_bytes_read, url_.c_str());
434 
435  char *output = static_cast<char *>(buffer);
436  for (size_t i = 0; i < num_chunks; ++i) {
437  std::memcpy(output + input_vector[i].diov_offset,
438  output_vector[i].diov_buffer, output_vector[i].diov_size);
439  }
440 
441  auto status = new XRootDStatus();
442  auto read_info = new VectorReadInfo();
443  read_info->SetSize(num_bytes_read);
444  read_info->GetChunks() = chunks;
445  auto obj = new AnyObject();
446  obj->Set(read_info);
447  handler->HandleResponse(status, obj);
448 
449  return XRootDStatus();
450 }
451 
452 bool HttpFilePlugIn::IsOpen() const { return is_open_; }
453 
454 bool HttpFilePlugIn::SetProperty(const std::string &name,
455  const std::string &value) {
456  properties_[name] = value;
457  return true;
458 }
459 
460 bool HttpFilePlugIn::GetProperty(const std::string &name,
461  std::string &value) const {
462  const auto p = properties_.find(name);
463  if (p == std::end(properties_)) {
464  return false;
465  }
466 
467  value = p->second;
468  return true;
469 }
470 
471 } // namespace XrdCl
#define HTTP_FILE_PLUG_IN_AVOIDRANGE_ENV
#define HTTP_FILE_PLUG_IN_AVOIDRANGE_CGI
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
virtual XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus PgWrite(uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus VectorRead(const ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout) override
virtual ~HttpFilePlugIn() noexcept
virtual XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout) override
virtual bool GetProperty(const std::string &name, std::string &value) const override
virtual XRootDStatus Close(ResponseHandler *handler, uint16_t timeout) override
virtual bool SetProperty(const std::string &name, const std::string &value) override
virtual bool IsOpen() const override
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *rdresp)
PgReadSubstitutionHandler(XrdCl::ResponseHandler *a, bool isHttps)
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Object stat info.
URL representation.
Definition: XrdClURL.hh:31
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClURL.cc:337
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
std::pair< int, XrdCl::XRootDStatus > PReadVec(Davix::DavPosix &davix_client, DAVIX_FD *fd, const XrdCl::ChunkList &chunks, void *buffer)
std::pair< int, XRootDStatus > Read(Davix::DavPosix &davix_client, DAVIX_FD *fd, void *buffer, uint32_t size)
std::pair< int, XrdCl::XRootDStatus > PWrite(Davix::DavPosix &davix_client, DAVIX_FD *fd, uint64_t offset, uint32_t size, const void *buffer, uint16_t timeout)
std::pair< DAVIX_FD *, XRootDStatus > Open(Davix::DavPosix &davix_client, const std::string &url, int flags, uint16_t timeout)
std::pair< int, XRootDStatus > PRead(Davix::DavPosix &davix_client, DAVIX_FD *fd, void *buffer, uint32_t size, uint64_t offset)
XRootDStatus Close(Davix::DavPosix &davix_client, DAVIX_FD *fd)
XRootDStatus Unlink(Davix::DavPosix &davix_client, const std::string &url, uint16_t timeout)
XRootDStatus MkDir(Davix::DavPosix &davix_client, const std::string &path, XrdCl::MkDirFlags::Flags flags, XrdCl::Access::Mode, uint16_t timeout)
XRootDStatus Stat(Davix::DavPosix &davix_client, const std::string &url, uint16_t timeout, StatInfo *stat_info)
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
Davix::Context * root_davix_context_
std::vector< ChunkInfo > ChunkList
List of chunks.
Davix::DavPosix * root_davix_client_file_
void SetUpLogging(Log *logger)
static const uint64_t kLogXrdClHttp
static const int PageSize
Mode
Access mode.
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
@ MakePath
create the entire directory tree if it doesn't exist
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
@ Update
Open for reading and writing.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124