XRootD
XrdFrcProxy.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d F r c P r o x y . c c */
4 /* */
5 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include "errno.h"
32 #include <fcntl.h>
33 #include "stdio.h"
34 #include "unistd.h"
35 #include <sys/stat.h>
36 #include <sys/types.h>
37 
38 #include "XrdFrc/XrdFrcReqAgent.hh"
39 #include "XrdFrc/XrdFrcProxy.hh"
40 #include "XrdFrc/XrdFrcTrace.hh"
41 #include "XrdFrc/XrdFrcUtils.hh"
42 #include "XrdOuc/XrdOucEnv.hh"
43 #include "XrdOuc/XrdOucStream.hh"
44 #include "XrdOuc/XrdOucUtils.hh"
45 #include "XrdSys/XrdSysError.hh"
46 #include "XrdSys/XrdSysLogger.hh"
47 #include "XrdSys/XrdSysPlatform.hh"
48 
49 using namespace XrdFrc;
50 
51 /******************************************************************************/
52 /* S t a t i c V a r i a b l e s */
53 /******************************************************************************/
54 
55 XrdFrcProxy::o2qMap XrdFrcProxy::oqMap[] =
56  {{"getf", XrdFrcRequest::getQ, opGet},
57  {"migr", XrdFrcRequest::migQ, opMig},
58  {"pstg", XrdFrcRequest::stgQ, opStg},
59  {"putf", XrdFrcRequest::putQ, opPut}};
60 
61 int XrdFrcProxy::oqNum = sizeof(oqMap)/sizeof(oqMap[0]);
62 
63 /******************************************************************************/
64 /* C o n s t r u c t o r */
65 /******************************************************************************/
66 
67 XrdFrcProxy::XrdFrcProxy(XrdSysLogger *lP, const char *iName, int Debug)
68 {
69  char buff[256];
70 
71 // Clear agent vector
72 //
73  memset(Agent, 0, sizeof(Agent));
74 
75 // Link the logger to our message facility
76 //
77  Say.logger(lP);
78 
79 // Set the debug flag
80 //
81  if (Debug) Trace.What |= TRACE_ALL;
82 
83 // Develop our internal name
84 //
85  QPath = 0;
86  insName = XrdOucUtils::InstName(iName,0);
87  sprintf(buff,"%s.%d",XrdOucUtils::InstName(iName),static_cast<int>(getpid()));
88  intName = strdup(buff);
89 }
90 
91 /******************************************************************************/
92 /* A d d */
93 /******************************************************************************/
94 
95 int XrdFrcProxy::Add(char Opc, const char *Lfn, const char *Opq,
96  const char *Usr, const char *Rid,
97  const char *Nop, const char *Pop, int Prty)
98 {
99  XrdFrcRequest myReq;
100  int n, Options = 0;
101  int qType = XrdFrcUtils::MapR2Q(Opc, &Options);
102 
103 // Verify that we can support this operation
104 //
105  if (!Agent[qType]) return -ENOTSUP;
106 
107 // Initialize the request element
108 //
109  memset(&myReq, 0, sizeof(myReq));
110  myReq.OPc = Opc;
111 
112 // Insert the Lfn and Opaque information
113 //
114  n = strlen(Lfn);
115  if (Opq && *Opq)
116  {if (n + strlen(Opq) + 2 > sizeof(myReq.LFN)) return -ENAMETOOLONG;
117  strcpy(myReq.LFN, Lfn); strcpy(myReq.LFN+n+1, Opq), myReq.Opaque = n+1;
118  } else if (n < int(sizeof(myReq.LFN))) strcpy(myReq.LFN, Lfn);
119  else return -ENAMETOOLONG;
120 
121 // Get the LFN offset in case this is a url
122 //
123  if (myReq.LFN[0] != '/' && !(myReq.LFO = XrdFrcUtils::chkURL(myReq.LFN)))
124  return -EILSEQ;
125 
126 // Set the user, request id, notification path, and priority
127 //
128  if (Usr && *Usr) strlcpy(myReq.User, Usr, sizeof(myReq.User));
129  else strcpy(myReq.User, intName);
130  if (Rid) strlcpy(myReq.ID, Rid, sizeof(myReq.ID));
131  else *(myReq.ID) = '?';
132  if (Nop && *Nop) strlcpy(myReq.Notify, Nop, sizeof(myReq.Notify));
133  else *(myReq.Notify) = '-';
134  myReq.Prty = Prty;
135 
136 // Establish processing options
137 //
138  myReq.Options = Options | XrdFrcUtils::MapM2O(myReq.Notify, Pop);
139 
140 // Add this request to the queue of requests via the agent
141 //
142  Agent[qType]->Add(myReq);
143  return 0;
144 }
145 
146 /******************************************************************************/
147 /* D e l */
148 /******************************************************************************/
149 
150 int XrdFrcProxy::Del(char Opc, const char *Rid)
151 {
152  XrdFrcRequest myReq;
153  int qType = XrdFrcUtils::MapR2Q(Opc);
154 
155 // Verify that we can support this operation
156 //
157  if (!Agent[qType]) return -ENOTSUP;
158 
159 // Initialize the request element
160 //
161  memset(&myReq, 0, sizeof(myReq));
162  strlcpy(myReq.ID, Rid, sizeof(myReq.ID));
163 
164 // Delete the request from the queue
165 //
166  Agent[qType]->Del(myReq);
167  return 0;
168 }
169 
170 /******************************************************************************/
171 /* L i s t */
172 /******************************************************************************/
173 
174 int XrdFrcProxy::List(XrdFrcProxy::Queues &State, char *Buff, int Bsz)
175 {
176  int i;
177 
178 // Get a queue type
179 //
180 do{if (!State.Active)
181  while(State.QList & opAll)
182  {for (i = 0; i < oqNum; i++) if (oqMap[i].oType & State.QList) break;
183  if (i >= oqNum) return 0;
184  State.QNow = oqMap[i].qType;
185  State.QList &= ~oqMap[i].oType;
186  if (!Agent[int(State.QNow)]) continue;
187  State.Active = 1;
188  break;
189  }
190 
191  for (i = State.Prty; i <= XrdFrcRequest::maxPrty; i++)
192  if (Agent[int(State.QNow)]->NextLFN(Buff,Bsz,i,State.Offset)) return 1;
193  else State.Prty = i+1;
194 
195  State.Active = 0; State.Offset = 0; State.Prty = 0;
196  } while(State.QList & opAll);
197 
198 // We've completed returning all info
199 //
200  return 0;
201 }
202 
203 /******************************************************************************/
204 
205 int XrdFrcProxy::List(int qType, int qPrty, XrdFrcRequest::Item *Items, int Num)
206 {
207  int i, n, Cnt = 0;
208 
209 // List each queue
210 //
211  while(qType & opAll)
212  {for (i = 0; i < oqNum; i++) if (oqMap[i].oType & qType) break;
213  if (i >= oqNum) return Cnt;
214  qType &= ~oqMap[i].oType; n = oqMap[i].qType;
215  if (!Agent[n]) continue;
216  if (qPrty < 0) Cnt += Agent[n]->List(Items, Num);
217  else Cnt += Agent[n]->List(Items, Num, qPrty);
218  }
219 
220 // All done
221 //
222  return Cnt;
223 }
224 
225 /******************************************************************************/
226 /* I n i t */
227 /******************************************************************************/
228 
229 int XrdFrcProxy::Init(int opX, const char *aPath, int aMode, const char *qPath)
230 {
231  const char *configFN = getenv("XRDCONFIGFN"), *iName = 0;
232  int i;
233 
234 // If a qPath was specified, and the "Queues" component will be added later.
235 // Otherwise, we check the config file to see if there is a qpath there.
236 // If not we use the aPath which must be unqualified with a component name
237 // which we will add here). All paths must have the instance name if so needed.
238 //
239  if (qPath) QPath = strdup(qPath);
240  else if (!configFN) iName = insName;
241  else if (Init2(configFN)) return 0;
242 
243 // Create the queue path directory if it does not exists
244 //
245  if (!QPath && !(QPath = XrdFrcUtils::makePath(iName, aPath, aMode)))
246  return 0;
247 
248 // Now create and start an agent for each wanted service
249 //
250  for (i = 0; i < oqNum; i++)
251  if (opX & oqMap[i].oType)
252  {Agent[oqMap[i].qType]
253  = new XrdFrcReqAgent(oqMap[i].qName, oqMap[i].qType);
254  if (!Agent[oqMap[i].qType]->Start(QPath, aMode)) return 0;
255  }
256 
257 // All done
258 //
259  return 1;
260 }
261 
262 /******************************************************************************/
263 /* Private: I n i t 2 */
264 /******************************************************************************/
265 
266 int XrdFrcProxy::Init2(const char *ConfigFN)
267 {
268  char *var;
269  int cfgFD, retc, NoGo = 0;
270  XrdOucEnv myEnv;
271  XrdOucStream cfgFile(&Say, getenv("XRDINSTANCE"), &myEnv, "=====> ");
272 
273 // Try to open the configuration file.
274 //
275  if ( (cfgFD = open(ConfigFN, O_RDONLY, 0)) < 0)
276  {Say.Emsg("Config", errno, "open config file", ConfigFN);
277  return 1;
278  }
279  cfgFile.Attach(cfgFD);
280  static const char *cvec[] = { "*** frm client plugin config:", 0 };
281  cfgFile.Capture(cvec);
282 
283 // Now start reading records until eof looking for our directive
284 //
285  while((var = cfgFile.GetMyFirstWord()))
286  {if (!strcmp(var, "frm.xfr.qcheck") && qChk(cfgFile))
287  {cfgFile.Echo(); NoGo = 1;}
288  }
289 
290 // Now check if any errors occurred during file i/o
291 //
292  if ((retc = cfgFile.LastError()))
293  NoGo = Say.Emsg("Config", retc, "read config file", ConfigFN);
294  cfgFile.Close();
295 
296 // All done
297 //
298  return NoGo;
299 }
300 
301 /******************************************************************************/
302 /* Private: q C h k */
303 /******************************************************************************/
304 
305 int XrdFrcProxy::qChk(XrdOucStream &cfgFile)
306 {
307  char *val;
308 
309 // Get the next token, we must have one here
310 //
311  if (!(val = cfgFile.GetWord()))
312  {Say.Emsg("Config", "qcheck time not specified"); return 1;}
313 
314 // If not a path, then it must be a time
315 //
316  if (*val != '/' && !(val = cfgFile.GetWord())) return 0;
317 
318 // The next token has to be an absolute path if it is present at all
319 //
320  if (*val != '/')
321  {Say.Emsg("Config", "qcheck path not absolute"); return 1;}
322  if (QPath) free(QPath);
323  QPath = strdup(val);
324  return 0;
325 }
int open(const char *path, int oflag,...)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACE_ALL
Definition: XrdTrace.hh:35
int Add(char Opc, const char *Lfn, const char *Opq, const char *Usr, const char *Rid, const char *Nop, const char *Pop, int Prty=1)
Definition: XrdFrcProxy.cc:95
int Init(int opX, const char *aPath, int aMode, const char *qPath=0)
Definition: XrdFrcProxy.cc:229
int List(Queues &State, char *Buff, int Bsz)
Definition: XrdFrcProxy.cc:174
XrdFrcProxy(XrdSysLogger *lP, const char *iName, int Debug=0)
Definition: XrdFrcProxy.cc:67
int Del(char Opc, const char *Rid)
Definition: XrdFrcProxy.cc:150
static const int stgQ
static const int getQ
char LFN[3072]
static const int migQ
static const int putQ
char User[256]
signed char Prty
char Notify[512]
static const int maxPrty
static int chkURL(const char *Url)
Definition: XrdFrcUtils.cc:81
static int MapR2Q(char Opc, int *Flags=0)
Definition: XrdFrcUtils.cc:187
static char * makePath(const char *iName, const char *Path, int Mode)
Definition: XrdFrcUtils.cc:102
static int MapM2O(const char *Nop, const char *Pop)
Definition: XrdFrcUtils.cc:164
char * GetWord(int lowcase=0)
static const char * InstName(int TranOpt=0)
Definition: XrdOucUtils.cc:732
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:141
XrdOucTrace Trace
XrdSysError Say