XRootD
XrdPfcCommand.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include "XrdPfcInfo.hh"
20 #include "XrdPfc.hh"
21 #include "XrdPfcTrace.hh"
22 
23 #include "XrdOfs/XrdOfsConfigPI.hh"
24 #include "XrdOss/XrdOss.hh"
25 #include "XrdOuc/XrdOuca2x.hh"
26 #include "XrdOuc/XrdOucArgs.hh"
27 #include "XrdOuc/XrdOucEnv.hh"
28 #include "XrdOuc/XrdOucStream.hh"
29 #include "XrdSys/XrdSysLogger.hh"
31 
32 #include <algorithm>
33 #include <cstring>
34 #include <iostream>
35 #include <fcntl.h>
36 #include <vector>
37 #include <sys/time.h>
38 
39 using namespace XrdPfc;
40 
41 //______________________________________________________________________________
42 
43 const int MAX_ACCESSES = 20;
44 
45 const long long ONE_MB = 1024ll * 1024;
46 const long long ONE_GB = 1024ll * 1024 * 1024;
47 
48 void Cache::ExecuteCommandUrl(const std::string& command_url)
49 {
50  static const char *top_epfx = "ExecuteCommandUrl ";
51 
52  SplitParser cp(command_url, "/");
53 
54  std::string token = cp.get_token();
55 
56  if (token != "xrdpfc_command")
57  {
58  TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
59  return;
60  }
61 
62  // Get the command
63  token = cp.get_token();
64 
65 
66  //================================================================
67  // create_file
68  //================================================================
69 
70  if (token == "create_file")
71  {
72  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
73  static const char* usage =
74  "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
75  " Creates a cache file with given parameters. Data in file is random.\n"
76  " Useful for cache purge testing.\n"
77  "Notes:\n"
78  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
79  " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
80  " . -t and -d can be given multiple times to record several accesses.\n"
81  " . Negative arguments given to -t are interpreted as relative to now.\n";
82 
83  const Configuration &conf = m_configuration;
84 
85  token = cp.get_token();
86 
87  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
88 
89  std::vector<char*> argv;
90  SplitParser ap(token, " ");
91  int argc = ap.fill_argv(argv);
92 
93  long long file_size = ONE_GB;
94  long long block_size = conf.m_bufferSize;
95  int access_time [MAX_ACCESSES];
96  int access_duration[MAX_ACCESSES];
97  int at_count = 0, ad_count = 0;
98  XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
99  "help", 1, "h",
100  "verbose", 1, "v",
101  "size", 1, "s",
102  "blocksize", 1, "b",
103  "time", 1, "t",
104  "duration", 1, "d",
105  (const char *) 0);
106 
107  time_t time_now = time(0);
108 
109  Spec.Set(argc, &argv[0]);
110  char theOpt;
111 
112  while ((theOpt = Spec.getopt()) != (char) -1)
113  {
114  switch (theOpt)
115  {
116  case 'h': {
117  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
118  return;
119  }
120  case 's': {
121  if (XrdOuca2x::a2sz(m_log, "Error getting filesize", Spec.getarg(),
122  &file_size, 0ll, 32 * ONE_GB))
123  return;
124  break;
125  }
126  case 'b': {
127  if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", Spec.getarg(),
128  &block_size, 0ll, 64 * ONE_MB))
129  return;
130  break;
131  }
132  case 't': {
133  if (XrdOuca2x::a2i(m_log, "Error getting access time", Spec.getarg(),
134  &access_time[at_count++], INT_MIN, INT_MAX))
135  return;
136  break;
137  }
138  case 'd': {
139  if (XrdOuca2x::a2i(m_log, "Error getting access duration", Spec.getarg(),
140  &access_duration[ad_count++], 0, 24 * 3600))
141  return;
142  break;
143  }
144  default: {
145  TRACE(Error, err_prefix << "Unhandled command argument.");
146  return;
147  }
148  }
149  }
150  if (Spec.getarg())
151  {
152  TRACE(Error, err_prefix << "Options must take up all the arguments.");
153  return;
154  }
155 
156  if (at_count < 1) access_time [at_count++] = time_now - 10;
157  if (ad_count < 1) access_duration[ad_count++] = 10;
158 
159  if (at_count != ad_count)
160  {
161  TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
162  return;
163  }
164 
165  std::string file_path (cp.get_reminder_with_delim());
166  std::string cinfo_path(file_path + Info::s_infoExtension);
167 
168  TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
169 
170  // Check if cinfo exists ... bail out if it does.
171  {
172  struct stat infoStat;
173  if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
174  {
175  TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
176  return;
177  }
178  }
179 
180  TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
181 
182  {
183  const char *myUser = conf.m_username.c_str();
184  XrdOucEnv myEnv;
185 
186  // Create the data file.
187 
188  char size_str[32]; sprintf(size_str, "%lld", file_size);
189  myEnv.Put("oss.asize", size_str);
190  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
191  int cret;
192  if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
193  {
194  TRACE(Error, err_prefix << "Create failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
195  return;
196  }
197 
198  XrdOssDF *myFile = GetOss()->newFile(myUser);
199  if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
200  {
201  TRACE(Error, err_prefix << "Open failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
202  delete myFile;
203  return;
204  }
205 
206  // Create the info file.
207 
208  myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
209  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
210  if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
211  {
212  TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
213  myFile->Close(); delete myFile;
214  return;
215  }
216 
217  XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
218  if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
219  {
220  TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
221  delete myInfoFile;
222  myFile->Close(); delete myFile;
223  return;
224  }
225 
226  // Allocate space for the data file.
227 
228  if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
229  {
230  TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(cret));
231  }
232 
233  // Fill up cinfo.
234 
235  Info myInfo(m_trace, false);
236  myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
237  myInfo.SetAllBitsSynced();
238 
239  for (int i = 0; i < at_count; ++i)
240  {
241  time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
242 
243  myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
244  }
245 
246  myInfo.Write(myInfoFile, cinfo_path.c_str());
247 
248  myInfoFile->Close(); delete myInfoFile;
249  myFile->Close(); delete myFile;
250 
251  TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB.");
252 
253  {
254  XrdSysCondVarHelper lock(&m_writeQ.condVar);
255 
256  m_writeQ.writes_between_purges += file_size;
257  }
258  }
259  }
260 
261  //================================================================
262  // remove_file
263  //================================================================
264 
265  else if (token == "remove_file")
266  {
267  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
268  static const char* usage =
269  "Usage: remove_file/ [-h] /<path>\n"
270  " Removes given file from the cache unless it is currently open.\n"
271  " Useful for removal of stale files or duplicate files in a caching cluster.\n"
272  "Notes:\n"
273  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
274 
275  token = cp.get_token();
276 
277  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
278 
279  std::vector<char*> argv;
280  SplitParser ap(token, " ");
281  int argc = ap.fill_argv(argv);
282 
283  XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
284  "help", 1, "h",
285  (const char *) 0);
286 
287  Spec.Set(argc, &argv[0]);
288  char theOpt;
289 
290  while ((theOpt = Spec.getopt()) != (char) -1)
291  {
292  switch (theOpt)
293  {
294  case 'h': {
295  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
296  return;
297  }
298  default: {
299  TRACE(Error, err_prefix << "Unhandled command argument.");
300  return;
301  }
302  }
303  }
304  if (Spec.getarg())
305  {
306  TRACE(Error, err_prefix << "Options must take up all the arguments.");
307  return;
308  }
309 
310  std::string f_name(cp.get_reminder());
311 
312  TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
313 
314  int ret = UnlinkFile(f_name, true);
315 
316  TRACE(Info, err_prefix << "returned with status " << ret);
317  }
318 
319  //================================================================
320  // unknown command
321  //================================================================
322 
323  else
324  {
325  TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
326  }
327 }
328 
329 
330 //==============================================================================
331 // Example python script to use /xrdpfc_command/
332 //==============================================================================
333 /*
334 from XRootD import client
335 from XRootD.client.flags import OpenFlags
336 
337 import sys
338 import time
339 
340 #-------------------------------------------------------------------------------
341 
342 port = int( sys.argv[1] );
343 
344 g_srv = "root://localhost:%d/" % port
345 g_com = "/xrdpfc_command/create_file/"
346 g_dir = "/store/user/matevz/"
347 
348 #-------------------------------------------------------------------------------
349 
350 def xxsend(args, file) :
351 
352  url = g_srv + g_com + args + g_dir + file
353  print "Opening ", url
354 
355  with client.File() as f:
356  status, response = f.open(url, OpenFlags.READ)
357 
358  print '%r' % status
359  print '%r' % response
360 
361 #-------------------------------------------------------------------------------
362 
363 pfx1 = "AAAA"
364 pfx2 = "BBBB"
365 
366 for i in range(1, 1024 + 1):
367 
368  atime = -10000 + i
369 
370  xxsend("-s 4g -t %d -d 10" % atime,
371  "%s-%04d" % (pfx1, i))
372 
373  time.sleep(0.01)
374 
375 
376 for i in range(1, 512 + 1):
377 
378  atime = -5000 + i
379 
380  xxsend("-s 4g -t %d -d 10" % atime,
381  "%s-%04d" % (pfx2, i))
382 
383  time.sleep(0.01)
384  */
void usage()
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
const int MAX_ACCESSES
const long long ONE_GB
const long long ONE_MB
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
int stat(const char *path, struct stat *buf)
bool Create
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
char getopt()
Definition: XrdOucArgs.cc:151
char * getarg()
Definition: XrdOucArgs.cc:136
void Set(char *arglist)
Definition: XrdOucArgs.cc:236
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
XrdOss * GetOss() const
Definition: XrdPfc.hh:389
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1119
void ExecuteCommandUrl(const std::string &command_url)
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1191
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:313
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
Definition: XrdPfcInfo.cc:446
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:268
void SetAllBitsSynced()
Mark all blocks as synced to disk.
Definition: XrdPfcInfo.cc:146
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:163
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
Definition: XrdPfc.hh:41
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:56
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:82
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:83
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:81
char * get_reminder()
Definition: XrdPfc.hh:162
char * get_token()
Definition: XrdPfc.hh:150
int fill_argv(std::vector< char * > &argv)
Definition: XrdPfc.hh:167
char * get_reminder_with_delim()
Definition: XrdPfc.hh:156