XRootD
XrdThrottleFile.cc
Go to the documentation of this file.
1 
2 #include "XrdSfs/XrdSfsAio.hh"
3 #include "XrdSec/XrdSecEntity.hh"
5 
6 #include "XrdThrottle.hh"
7 
8 using namespace XrdThrottle;
9 
10 #define DO_LOADSHED if (m_throttle.CheckLoadShed(m_loadshed)) \
11 { \
12  unsigned port; \
13  std::string host; \
14  m_throttle.PerformLoadShed(m_loadshed, host, port); \
15  m_eroute.Emsg("File", "Performing load-shed for client", m_connection_id.c_str()); \
16  error.setErrInfo(port, host.c_str()); \
17  return SFS_REDIRECT; \
18 }
19 
20 #define DO_THROTTLE(amount) \
21 DO_LOADSHED \
22 m_throttle.Apply(amount, 1, m_uid); \
23 XrdThrottleTimer xtimer = m_throttle.StartIOTimer();
24 
25 File::File(const char *user,
26  unique_sfs_ptr sfs,
27  XrdThrottleManager &throttle,
28  XrdSysError &eroute)
29  : XrdSfsFile(sfs->error), // Use underlying error object as ours
30 #if __cplusplus >= 201103L
31  m_sfs(std::move(sfs)), // Guaranteed to be non-null by FileSystem::newFile
32 #else
33  m_sfs(sfs),
34 #endif
35  m_uid(0),
36  m_connection_id(user ? user : ""),
37  m_throttle(throttle),
38  m_eroute(eroute)
39 {}
40 
41 File::~File()
42 {
43  if (m_is_open) {
44  m_throttle.CloseFile(m_user);
45  }
46 }
47 
48 int
49 File::open(const char *fileName,
50  XrdSfsFileOpenMode openMode,
51  mode_t createMode,
52  const XrdSecEntity *client,
53  const char *opaque)
54 {
55  // Try various potential "names" associated with the request, from the most
56  // specific to most generic.
57  if (client->eaAPI && client->eaAPI->Get("token.subject", m_user)) {
58  if (client->vorg) m_user = std::string(client->vorg) + ":" + m_user;
59  } else if (client->eaAPI) {
60  std::string user;
61  if (client->eaAPI->Get("request.name", user) && !user.empty()) m_user = user;
62  }
63  if (m_user.empty()) {m_user = client->name ? client->name : "nobody";}
64  m_uid = XrdThrottleManager::GetUid(m_user.c_str());
65  m_throttle.PrepLoadShed(opaque, m_loadshed);
66  std::string open_error_message;
67  if (!m_throttle.OpenFile(m_user, open_error_message)) {
68  error.setErrInfo(EMFILE, open_error_message.c_str());
69  return SFS_ERROR;
70  }
71  auto retval = m_sfs->open(fileName, openMode, createMode, client, opaque);
72  if (retval != SFS_ERROR) {
73  m_is_open = true;
74  } else {
75  m_throttle.CloseFile(m_user);
76  }
77  return retval;
78 }
79 
80 int
82 {
83  m_is_open = false;
84  m_throttle.CloseFile(m_user);
85  return m_sfs->close();
86 }
87 
88 int File::checkpoint(cpAct act, struct iov *range, int n)
89 { return m_sfs->checkpoint(act, range, n);}
90 
91 int
92 File::fctl(const int cmd,
93  const char *args,
94  XrdOucErrInfo &out_error)
95 {
96  // Disable sendfile
97  if (cmd == SFS_FCTL_GETFD)
98  {
99  error.setErrInfo(ENOTSUP, "Sendfile not supported by throttle plugin.");
100  return SFS_ERROR;
101  }
102  else return m_sfs->fctl(cmd, args, out_error);
103 }
104 
105 const char *
107 {
108  return m_sfs->FName();
109 }
110 
111 int
112 File::getMmap(void **Addr, off_t &Size)
113 { // We cannot monitor mmap-based reads, so we disable them.
114  error.setErrInfo(ENOTSUP, "Mmap not supported by throttle plugin.");
115  return SFS_ERROR;
116 }
117 
120  char *buffer,
121  XrdSfsXferSize rdlen,
122  uint32_t *csvec,
123  uint64_t opts)
124 {
125  DO_THROTTLE(rdlen)
126  return m_sfs->pgRead(offset, buffer, rdlen, csvec, opts);
127 }
128 
130 File::pgRead(XrdSfsAio *aioparm, uint64_t opts)
131 { // We disable all AIO-based reads.
132  aioparm->Result = this->pgRead((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
133  (char *)aioparm->sfsAio.aio_buf,
134  (XrdSfsXferSize)aioparm->sfsAio.aio_nbytes,
135  aioparm->cksVec, opts);
136  aioparm->doneRead();
137  return SFS_OK;
138 }
139 
142  char *buffer,
143  XrdSfsXferSize rdlen,
144  uint32_t *csvec,
145  uint64_t opts)
146 {
147  DO_THROTTLE(rdlen)
148  return m_sfs->pgWrite(offset, buffer, rdlen, csvec, opts);
149 }
150 
152 File::pgWrite(XrdSfsAio *aioparm, uint64_t opts)
153 { // We disable all AIO-based writes.
154  aioparm->Result = this->pgWrite((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
155  (char *)aioparm->sfsAio.aio_buf,
156  (XrdSfsXferSize)aioparm->sfsAio.aio_nbytes,
157  aioparm->cksVec, opts);
158  aioparm->doneWrite();
159  return SFS_OK;
160 }
161 
162 int
164  XrdSfsXferSize amount)
165 {
166  DO_THROTTLE(amount)
167  return m_sfs->read(fileOffset, amount);
168 }
169 
172  char *buffer,
173  XrdSfsXferSize buffer_size)
174 {
175  DO_THROTTLE(buffer_size);
176  return m_sfs->read(fileOffset, buffer, buffer_size);
177 }
178 
179 int
181 { // We disable all AIO-based reads.
182  aioparm->Result = this->read((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
183  (char *)aioparm->sfsAio.aio_buf,
184  (XrdSfsXferSize)aioparm->sfsAio.aio_nbytes);
185  aioparm->doneRead();
186  return SFS_OK;
187 }
188 
191  const char *buffer,
192  XrdSfsXferSize buffer_size)
193 {
194  DO_THROTTLE(buffer_size);
195  return m_sfs->write(fileOffset, buffer, buffer_size);
196 }
197 
198 int
200 {
201  aioparm->Result = this->write((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
202  (char *)aioparm->sfsAio.aio_buf,
203  (XrdSfsXferSize)aioparm->sfsAio.aio_nbytes);
204  aioparm->doneWrite();
205  return SFS_OK;
206 }
207 
208 int
210 {
211  return m_sfs->sync();
212 }
213 
214 int
216 {
217  return m_sfs->sync(aiop);
218 }
219 
220 int
221 File::stat(struct stat *buf)
222 {
223  return m_sfs->stat(buf);
224 }
225 
226 int
228 {
229  return m_sfs->truncate(fileOffset);
230 }
231 
232 int
233 File::getCXinfo(char cxtype[4], int &cxrsz)
234 {
235  return m_sfs->getCXinfo(cxtype, cxrsz);
236 }
237 
238 int
240  XrdSfsFileOffset offset,
241  XrdSfsXferSize size)
242 {
243  DO_THROTTLE(size);
244  return m_sfs->SendData(sfDio, offset, size);
245 }
246 
int stat(const char *path, struct stat *buf)
struct myOpts opts
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
#define SFS_ERROR
#define SFS_FCTL_GETFD
int XrdSfsFileOpenMode
#define SFS_OK
long long XrdSfsFileOffset
int XrdSfsXferSize
if(Avsz)
#define DO_THROTTLE(amount)
int setErrInfo(int code, const char *emsg)
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
Definition: XrdSecEntity.hh:71
XrdSecEntityAttr * eaAPI
non-const API to attributes
Definition: XrdSecEntity.hh:92
char * name
Entity's name.
Definition: XrdSecEntity.hh:69
uint32_t * cksVec
Definition: XrdSfsAio.hh:63
ssize_t Result
Definition: XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
virtual void doneWrite()=0
XrdOucErrInfo & error
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
static int GetUid(const char *username)
virtual int truncate(XrdSfsFileOffset fileOffset)
virtual int sync()
virtual XrdSfsXferSize write(XrdSfsFileOffset fileOffset, const char *buffer, XrdSfsXferSize buffer_size)
virtual XrdSfsXferSize pgRead(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0)
virtual const char * FName()
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client, const char *opaque=0)
virtual int stat(struct stat *buf)
virtual int close()
virtual int SendData(XrdSfsDio *sfDio, XrdSfsFileOffset offset, XrdSfsXferSize size)
virtual int getCXinfo(char cxtype[4], int &cxrsz)
virtual int checkpoint(cpAct act, struct iov *range=0, int n=0)
virtual int getMmap(void **Addr, off_t &Size)
virtual int read(XrdSfsFileOffset fileOffset, XrdSfsXferSize amount)
virtual XrdSfsXferSize pgWrite(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0)
virtual int fctl(const int cmd, const char *args, XrdOucErrInfo &eInfo)=0
std::auto_ptr< XrdSfsFile > unique_sfs_ptr
Definition: XrdThrottle.hh:23