XRootD
XrdFrmXfrAgent.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d F r m X f r A g e n t . c c */
4 /* */
5 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdlib>
32 #include <cstring>
33 #include <strings.h>
34 #include <unistd.h>
35 #include <fcntl.h>
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 
39 #include "XrdFrc/XrdFrcRequest.hh"
40 #include "XrdFrc/XrdFrcTrace.hh"
41 #include "XrdFrc/XrdFrcUtils.hh"
42 #include "XrdFrm/XrdFrmConfig.hh"
43 #include "XrdFrm/XrdFrmXfrAgent.hh"
44 #include "XrdOuc/XrdOucStream.hh"
45 #include "XrdSys/XrdSysPlatform.hh"
46 
47 using namespace XrdFrc;
48 using namespace XrdFrm;
49 
50 /******************************************************************************/
51 /* S t a t i c V a r i a b l e s */
52 /******************************************************************************/
53 
54 XrdFrcReqAgent XrdFrmXfrAgent::GetAgent("getf", XrdFrcRequest::getQ);
55 
56 XrdFrcReqAgent XrdFrmXfrAgent::MigAgent("migr", XrdFrcRequest::migQ);
57 
58 XrdFrcReqAgent XrdFrmXfrAgent::StgAgent("pstg", XrdFrcRequest::stgQ);
59 
60 XrdFrcReqAgent XrdFrmXfrAgent::PutAgent("putf", XrdFrcRequest::putQ);
61 
62 /******************************************************************************/
63 /* Private: A d d */
64 /******************************************************************************/
65 
66 void XrdFrmXfrAgent::Add(XrdOucStream &Request, char *Tok,
67  XrdFrcReqAgent &Server)
68 {
69  XrdFrcRequest myReq;
70  const char *Miss = 0;
71  char *tp, *op;
72 
73 // Handle: op[<traceid>] <requestid> <npath> <prty> <mode> <path> [. . .]
74 //
75 // op: + | & | ^ | < | = | >
76 //
77  memset(&myReq, 0, sizeof(myReq));
78  myReq.OPc = *Tok;
79  if (*Tok == '=' || *Tok == '^') myReq.Options |= XrdFrcRequest::Purge;
80  Tok++;
81 
82  if (*Tok) strlcpy(myReq.User, Tok, sizeof(myReq.User));
83  else strlcpy(myReq.User, Config.myProg, sizeof(myReq.User));
84 
85  if (!(tp = Request.GetToken())) Miss = "request id";
86  else strlcpy(myReq.ID, tp, sizeof(myReq.ID));
87 
88  if (!Miss)
89  {if (!(tp = Request.GetToken())) Miss = "notify path";
90  else strlcpy(myReq.Notify, tp, sizeof(myReq.Notify));
91  }
92 
93  if (!Miss)
94  {if (!(tp = Request.GetToken())) Miss = "priority";
95  else {myReq.Prty = atoi(tp);
96  if (myReq.Prty < 0) myReq.Prty = 0;
97  else if (myReq.Prty > XrdFrcRequest::maxPrty)
99  }
100  }
101 
102  if (!Miss)
103  {if (!(tp = Request.GetToken())) Miss = "mode";
104  else myReq.Options |= XrdFrcUtils::MapM2O(myReq.Notify, tp);
105  }
106 
107  if (!Miss && !(tp = Request.GetToken())) Miss = "path";
108 
109 // Check for any errors
110 //
111  if (Miss) {Say.Emsg("Agent_Add", Miss, "missing in '+' request.");
112  return;
113  }
114 
115 // Add all paths in the request
116 //
117  do {strlcpy(myReq.LFN, tp, sizeof(myReq.LFN));
118  if ((op = index(tp, '?'))) {myReq.Opaque = op-tp+1; *op = '\0';}
119  else myReq.Opaque = 0;
120  myReq.LFO = 0;
121  if (myReq.LFN[0] != '/' && !(myReq.LFO = XrdFrcUtils::chkURL(myReq.LFN)))
122  Say.Emsg("Agent_Add", "Invalid url -", myReq.LFN);
123  else Server.Add(myReq);
124  if ((tp = Request.GetToken())) memset(myReq.LFN, 0, sizeof(myReq.LFN));
125  } while(tp);
126 }
127 
128 /******************************************************************************/
129 /* Private: A g e n t */
130 /******************************************************************************/
131 
132 XrdFrcReqAgent *XrdFrmXfrAgent::Agent(char bType)
133 {
134 
135 // Return the agent corresponding to the type
136 //
137  switch(bType)
138  {case 0 : return &StgAgent;
139  case '+': return &StgAgent;
140  case '^':
141  case '&': return &MigAgent;
142  case '<': return &GetAgent;
143  case '=':
144  case '>': return &PutAgent;
145  default: break;
146  }
147  return 0;
148 }
149 
150 /******************************************************************************/
151 /* Private: D e l */
152 /******************************************************************************/
153 
154 void XrdFrmXfrAgent::Del(XrdOucStream &Request, char *Tok,
155  XrdFrcReqAgent &Server)
156 {
157  XrdFrcRequest myReq;
158 
159 // If the requestid is adjacent to the operation, use it o/w get it
160 //
161  if (!(*Tok) && (!(Tok = Request.GetToken()) || !(*Tok)))
162  {Say.Emsg("Del", "request id missing in cancel request.");
163  return;
164  }
165 
166 // Copy the request ID into the request and remove it from peer server
167 //
168  memset(&myReq, 0, sizeof(myReq));
169  strlcpy(myReq.ID, Tok, sizeof(myReq.ID));
170  Server.Del(myReq);
171 }
172 
173 /******************************************************************************/
174 /* Private: L i s t */
175 /******************************************************************************/
176 
177 void XrdFrmXfrAgent::List(XrdOucStream &Request, char *Tok)
178 {
180  XrdFrcReqAgent *agentP;
181  int n = 0;
182  char *tp;
183 
184  while((tp = Request.GetToken()) && n < XrdFrcRequest::getLast)
185  {if (XrdFrcUtils::MapV2I(tp, Items[n])) n++;}
186 
187 // List entries queued for specific servers
188 //
189  if (!(*Tok)) {StgAgent.List(Items, n); GetAgent.List(Items, n);}
190  else do {if ((agentP = Agent(*Tok))) agentP->List(Items, n);
191  } while(*(++Tok));
192  std::cout <<std::endl;
193 }
194 
195 /******************************************************************************/
196 /* Public: P r o c e s s */
197 /******************************************************************************/
198 
200 {
201  char *tp;
202 
203 // Each frm request comes in as:
204 //
205 // Copy in: <[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
206 // Copy purge: =[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
207 // Copy out: >[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
208 // Migrate: &[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
209 // Migr+Purge: ^[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
210 // Stage: +[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
211 // Cancel in: - <requestid>
212 // Cancel out: ~ <requestid>
213 // List: ?[<][+][&][>]
214 // Wakeup: ![<][+][&][>]
215 //
216  if ((tp = Request.GetToken()))
217  switch(*tp)
218  {case '+': Add(Request, tp, StgAgent); break;
219  case '<': Add(Request, tp, GetAgent); break;
220  case '=':
221  case '>': Add(Request, tp, PutAgent); break;
222  case '&':
223  case '^': Add(Request, tp, MigAgent); break;
224  case '-': Del(Request, tp+1, StgAgent);
225  Del(Request, tp+1, GetAgent);
226  break;
227  case '~': Del(Request, tp+1, MigAgent);
228  Del(Request, tp+1, PutAgent);
229  break;
230  case '?': List(Request, tp+1); break;
231  case '!': GetAgent.Ping(tp); break;
232  default: Say.Emsg("Agent", "Invalid request, '", tp, "'.");
233  }
234 }
235 
236 /******************************************************************************/
237 /* Public: S t a r t */
238 /******************************************************************************/
239 
241 {
242  EPNAME("Agent");
243  XrdOucStream Request;
244  char *tp;
245 
246 // Prepare our agents
247 //
248  if (!StgAgent.Start(Config.QPath, Config.AdminMode)
249  || !MigAgent.Start(Config.QPath, Config.AdminMode)
250  || !GetAgent.Start(Config.QPath, Config.AdminMode)
251  || !PutAgent.Start(Config.QPath, Config.AdminMode)) return 2;
252 
253 // Attach stdin to the Request stream
254 //
255  Request.Attach(STDIN_FILENO, 8*1024);
256 
257 // Process all input
258 //
259  while((tp = Request.GetLine()))
260  {DEBUG ("Request: '" <<tp <<"'");
261  Process(Request);
262  }
263 
264 // If we exit then we lost the connection
265 //
266  Say.Emsg("Agent", "Exiting; lost request connection!");
267  return 8;
268 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
size_t strlcpy(char *dst, const char *src, size_t sz)
const char * myProg
void Add(XrdFrcRequest &Request)
void Del(XrdFrcRequest &Request)
int List(XrdFrcRequest::Item *Items, int Num)
static const int stgQ
static const int getQ
char LFN[3072]
static const int migQ
static const int putQ
static const int Purge
char User[256]
signed char Prty
char Notify[512]
static const int maxPrty
static int chkURL(const char *Url)
Definition: XrdFrcUtils.cc:81
static int MapV2I(const char *Opc, XrdFrcRequest::Item &ICode)
Definition: XrdFrcUtils.cc:211
static int MapM2O(const char *Nop, const char *Pop)
Definition: XrdFrcUtils.cc:164
static int Start()
static void Process(XrdOucStream &Request)
char * GetLine()
int Attach(int FileDescriptor, int bsz=2047)
char * GetToken(int lowcase=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
ZipListImpl< false > List(Ctx< ZipArchive > zip)
Factory for creating ZipStatImpl objects.
XrdCmsConfig Config
XrdSysError Say