XRootD
XrdOssThrottleFile.cc
Go to the documentation of this file.
1 /***************************************************************
2  *
3  * Copyright (C) 2025, Pelican Project, Morgridge Institute for Research
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License"); you
6  * may not use this file except in compliance with the License. You may
7  * obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  ***************************************************************/
18 
19 #include "XrdOuc/XrdOucEnv.hh"
21 #include "XrdOss/XrdOss.hh"
22 #include "XrdOss/XrdOssWrapper.hh"
23 #include "XrdSfs/XrdSfsAio.hh"
24 #include "XrdSys/XrdSysLogger.hh"
28 #include "XrdVersion.hh"
29 
30 #include <functional>
31 
32 namespace {
33 
34 class File final : public XrdOssWrapDF {
35 public:
36  File(std::unique_ptr<XrdOssDF> wrapDF, XrdThrottleManager &throttle, XrdSysError *lP, XrdOucTrace *tP)
37  : XrdOssWrapDF(*wrapDF), m_log(lP), m_throttle(throttle), m_trace(tP), m_wrapped(std::move(wrapDF)) {}
38 
39 virtual ~File() {}
40 
41 virtual int Open(const char *path, int Oflag, mode_t Mode,
42  XrdOucEnv &env) override {
43 
44  std::tie(m_user, m_uid) = m_throttle.GetUserInfo(env.secEnv());
45 
46  std::string open_error_message;
47  if (!m_throttle.OpenFile(m_user, open_error_message)) {
48  TRACE(DEBUG, open_error_message);
49  return -EMFILE;
50  }
51 
52  auto rval = wrapDF.Open(path, Oflag, Mode, env);
53 
54  if (rval < 0) {
55  m_throttle.CloseFile(m_user);
56  }
57 
58  return rval;
59 }
60 
61 virtual int Close(long long *retsz) override {
62  m_throttle.CloseFile(m_user);
63  return wrapDF.Close(retsz);
64 }
65 
66 virtual int getFD() override {return -1;}
67 
68 virtual off_t getMmap(void **addr) override {*addr = 0; return 0;}
69 
70 virtual ssize_t pgRead (void* buffer, off_t offset, size_t rdlen,
71  uint32_t* csvec, uint64_t opts) override {
72 
73  return DoThrottle(rdlen, 1,
74  static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t, uint32_t*, uint64_t)>(&XrdOssDF::pgRead),
75  buffer, offset, rdlen, csvec, opts);
76 }
77 
78 virtual int pgRead(XrdSfsAio *aioparm, uint64_t opts) override
79 { // We disable all AIO-based reads.
80  aioparm->Result = pgRead((char *)aioparm->sfsAio.aio_buf,
81  aioparm->sfsAio.aio_offset,
82  aioparm->sfsAio.aio_nbytes,
83  aioparm->cksVec, opts);
84  aioparm->doneRead();
85  return 0;
86 }
87 
88 virtual ssize_t pgWrite(void* buffer, off_t offset, size_t wrlen,
89  uint32_t* csvec, uint64_t opts) override {
90 
91  return DoThrottle(wrlen, 1,
92  static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t, uint32_t*, uint64_t)>(&XrdOssDF::pgWrite),
93  buffer, offset, wrlen, csvec, opts);
94 }
95 
96 virtual int pgWrite(XrdSfsAio *aioparm, uint64_t opts) override
97 { // We disable all AIO-based writes.
98  aioparm->Result = this->pgWrite((char *)aioparm->sfsAio.aio_buf,
99  aioparm->sfsAio.aio_offset,
100  aioparm->sfsAio.aio_nbytes,
101  aioparm->cksVec, opts);
102  aioparm->doneWrite();
103  return 0;
104 }
105 
106 virtual ssize_t Read(off_t offset, size_t size) override {
107  return DoThrottle(size, 1,
108  static_cast<ssize_t (XrdOssDF::*)(off_t, size_t)>(&XrdOssDF::Read),
109  offset, size);
110 }
111 virtual ssize_t Read(void* buffer, off_t offset, size_t size) override {
112  return DoThrottle(size, 1,
113  static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t)>(&XrdOssDF::Read),
114  buffer, offset, size);
115 }
116 
117 virtual int Read(XrdSfsAio *aiop) override {
118  aiop->Result = this->Read((char *)aiop->sfsAio.aio_buf,
119  aiop->sfsAio.aio_offset,
120  aiop->sfsAio.aio_nbytes);
121  aiop->doneRead();
122  return 0;
123 }
124 
125 virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt) override {
126  off_t sum = 0;
127  for (int i = 0; i < rdvcnt; ++i) {
128  sum += readV[i].size;
129  }
130  return DoThrottle(sum, rdvcnt, &XrdOssDF::ReadV, readV, rdvcnt);
131 }
132 
133 
134 virtual ssize_t Write(const void* buffer, off_t offset, size_t size) override {
135  return DoThrottle(size, 1,
136  static_cast<ssize_t (XrdOssDF::*)(const void*, off_t, size_t)>(&XrdOssDF::Write),
137  buffer, offset, size);
138 }
139 
140 virtual int Write(XrdSfsAio *aiop) override {
141  aiop->Result = this->Write((char *)aiop->sfsAio.aio_buf,
142  aiop->sfsAio.aio_offset,
143  aiop->sfsAio.aio_nbytes);
144  aiop->doneWrite();
145  return 0;
146 }
147 
148 private:
149 
150  template <class Fn, class... Args>
151  int DoThrottle(size_t rdlen, size_t ops, Fn &&fn, Args &&... args) {
152  m_throttle.Apply(rdlen, ops, m_uid);
153  bool ok = true;
154  XrdThrottleTimer timer = m_throttle.StartIOTimer(m_uid, ok);
155  if (!ok) {
156  TRACE(DEBUG, "Throttling in progress");
157  return -EMFILE;
158  }
159  return std::invoke(fn, wrapDF, std::forward<Args>(args)...);
160  }
161 
162  XrdSysError *m_log{nullptr};
163  XrdThrottleManager &m_throttle;
164  XrdOucTrace *m_trace{nullptr};
165  std::unique_ptr<XrdOssDF> m_wrapped;
166  std::string m_user;
167  uint16_t m_uid;
168 
169  static constexpr char TraceID[] = "XrdThrottleFile";
170 };
171 
172 class FileSystem final : public XrdOssWrapper {
173 public:
174  FileSystem(XrdOss *oss, XrdSysLogger *log, XrdOucEnv *envP)
175  : XrdOssWrapper(*oss),
176  m_env(envP),
177  m_oss(oss),
178  m_log(new XrdSysError(log)),
179  m_trace(new XrdOucTrace(m_log.get())),
180  m_throttle(m_log.get(), m_trace.get())
181  {
182 
183  m_throttle.Init();
184  if (envP)
185  {
186  auto gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("Throttle.gStream*"));
187  m_log->Say("Config", "Throttle g-stream has", gstream ? "" : " NOT", " been configured via xrootd.mongstream directive");
188  m_throttle.SetMonitor(gstream);
189  }
190  }
191 
192  int Configure(const std::string &config_filename) {
193  XrdThrottle::Configuration config(*m_log, m_env);
194  if (config.Configure(config_filename)) {
195  m_log->Emsg("Config", "Unable to load configuration file", config_filename.c_str());
196  return 1;
197  }
198  m_throttle.FromConfig(config);
199  return 0;
200  }
201 
202  virtual ~FileSystem() {}
203 
204  virtual XrdOssDF *newFile(const char *user = 0) override {
205  std::unique_ptr<XrdOssDF> wrapped(wrapPI.newFile(user));
206  return new File(std::move(wrapped), m_throttle, m_log.get(), m_trace.get());
207  }
208 
209 private:
210  XrdOucEnv *m_env{nullptr};
211  std::unique_ptr<XrdOss> m_oss;
212  std::unique_ptr<XrdSysError> m_log{nullptr};
213  std::unique_ptr<XrdOucTrace> m_trace{nullptr};
214  XrdThrottleManager m_throttle;
215 };
216 
217 } // namespace
218 
219 extern "C" {
220 
222  const char *config_fn, const char *parms,
223  XrdOucEnv *envP) {
224  std::unique_ptr<FileSystem> fs(new FileSystem(curr_oss, logger, envP));
225  if (fs->Configure(config_fn)) {
226  XrdSysError(logger, "XrdThrottle").Say("Config", "Unable to load configuration file", config_fn);
227  return nullptr;
228  }
229  // Note the throttle is set up as an OSS.
230  // This will prevent the throttle from being layered on top of the OFS; to keep backward
231  // compatibility with old configurations, we do not cause the server to fail.
232  //
233  // Originally, XrdThrottle was used as an OFS because the loadshed code required the ability
234  // to redirect the client to a different server. This is rarely (never?) used in practice.
235  // By putting the throttle in the OSS, we benefit from the fact the OFS has first run the
236  // authorization code and has made a user name available for fairshare of the throttle.
237  envP->PutInt("XrdOssThrottle", 1);
238  return fs.release();
239 }
240 
242 
243 } // extern "C"
244 
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
XrdOss * XrdOssAddStorageSystem2(XrdOss *curr_oss, XrdSysLogger *logger, const char *config_fn, const char *parms, XrdOucEnv *envP)
XrdVERSIONINFO(XrdOssAddStorageSystem2, throttle)
int Mode
XrdOucString File
struct myOpts opts
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:198
virtual ssize_t pgRead(void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:160
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition: XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
virtual int getFD()
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual int Close(long long *retsz=0)
virtual ssize_t pgRead(void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual off_t getMmap(void **addr)
virtual int Init(XrdSysLogger *lp, const char *cfn)
virtual XrdOssDF * newFile(const char *tident)
void PutInt(const char *varname, long value)
Definition: XrdOucEnv.cc:268
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
const XrdSecEntity * secEnv() const
Definition: XrdOucEnv.hh:107
uint32_t * cksVec
Definition: XrdSfsAio.hh:63
ssize_t Result
Definition: XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
virtual void doneWrite()=0
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
XrdOucEnv * envP
Definition: XrdPss.cc:109