XRootD
XrdFrcReqAgent.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d F r c R e q A g e n t . c c */
4 /* */
5 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstdlib>
33 #include <cstring>
34 #include <strings.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/types.h>
38 #include <sys/stat.h>
39 
40 #include "XrdFrc/XrdFrcReqAgent.hh"
41 #include "XrdFrc/XrdFrcTrace.hh"
42 #include "XrdFrc/XrdFrcUtils.hh"
43 #include "XrdNet/XrdNetMsg.hh"
44 #include "XrdOuc/XrdOucUtils.hh"
45 #include "XrdSys/XrdSysHeaders.hh"
46 #include "XrdSys/XrdSysPlatform.hh"
47 
48 using namespace XrdFrc;
49 
50 /******************************************************************************/
51 /* S t a t i c V a r i a b l e s */
52 /******************************************************************************/
53 
54 char *XrdFrcReqAgent::c2sFN = 0;
55 
56 /******************************************************************************/
57 /* C o n s t r u c t o r */
58 /******************************************************************************/
59 
60 XrdFrcReqAgent::XrdFrcReqAgent(const char *Me, int qVal)
61  : Persona(Me),myName(""),theQ(qVal)
62 {
63 // Set default ping message
64 //
65  switch(qVal)
66  {case XrdFrcRequest::getQ: pingMsg = "!<\n"; break;
67  case XrdFrcRequest::migQ: pingMsg = "!&\n"; break;
68  case XrdFrcRequest::stgQ: pingMsg = "!+\n"; break;
69  case XrdFrcRequest::putQ: pingMsg = "!>\n"; break;
70  default: pingMsg = "!\n" ; break;
71  }
72 }
73 
74 /******************************************************************************/
75 /* Public: A d d */
76 /******************************************************************************/
77 
79 {
80 
81 // Complete the request including verifying the priority
82 //
83  if (Request.Prty > XrdFrcRequest::maxPrty)
84  Request.Prty = XrdFrcRequest::maxPrty;
85  else if (Request.Prty < 0)Request.Prty = 0;
86 
87 // Add time and instance name
88 //
89  Request.addTOD = time(0);
90  if (myName) strlcpy(Request.iName, myName, sizeof(Request.iName));
91 
92 // Now add it to the queue
93 //
94  rQueue[static_cast<int>(Request.Prty)]->Add(&Request);
95 
96 // Now wake the boss
97 //
98  Ping();
99 }
100 
101 /******************************************************************************/
102 /* Public: D e l */
103 /******************************************************************************/
104 
106 {
107  int i;
108 
109 // Remove all pending requests for this id
110 //
111  for (i = 0; i <= XrdFrcRequest::maxPrty; i++) rQueue[i]->Can(&Request);
112 }
113 
114 /******************************************************************************/
115 /* Public: L i s t */
116 /******************************************************************************/
117 
119 {
120  char myLfn[8192];
121  int i, Offs, n = 0;
122 
123 // List entries in each priority queue
124 //
125  for (i = 0; i <= XrdFrcRequest::maxPrty; i++)
126  {Offs = 0;
127  while(rQueue[i]->List(myLfn, sizeof(myLfn), Offs, Items, Num))
128  {std::cout <<myLfn <<std::endl; n++;}
129  }
130 // All done
131 //
132  return n;
133 }
134 
135 /******************************************************************************/
136 
137 int XrdFrcReqAgent::List(XrdFrcRequest::Item *Items, int Num, int Prty)
138 {
139  char myLfn[8192];
140  int Offs, n = 0;
141 
142 // List entries in each priority queue
143 //
144  if (Prty <= XrdFrcRequest::maxPrty)
145  {Offs = 0;
146  while(rQueue[Prty]->List(myLfn, sizeof(myLfn), Offs, Items, Num))
147  {std::cout <<myLfn <<std::endl; n++;}
148  }
149 
150 // All done
151 //
152  return n;
153 }
154 
155 /******************************************************************************/
156 /* Public: N e x t L F N */
157 /******************************************************************************/
158 
159 int XrdFrcReqAgent::NextLFN(char *Buff, int Bsz, int Prty, int &Offs)
160 {
161  static XrdFrcRequest::Item Items[1] = {XrdFrcRequest::getLFN};
162 
163 // Return entry, if it exists
164 //
165  return rQueue[Prty]->List(Buff, Bsz, Offs, Items, 1) != 0;
166 }
167 
168 /******************************************************************************/
169 /* P i n g */
170 /******************************************************************************/
171 
172 void XrdFrcReqAgent::Ping(const char *Msg)
173 {
174  static XrdNetMsg udpMsg(&Say, c2sFN);
175  static int udpOK = 0;
176  struct stat buf;
177 
178 // Send given message or default message based on our persona
179 //
180  if (udpOK || !stat(c2sFN, &buf))
181  {udpMsg.Send(Msg ? Msg : pingMsg); udpOK = 1;}
182 }
183 
184 /******************************************************************************/
185 /* S t a r t */
186 /******************************************************************************/
187 
188 int XrdFrcReqAgent::Start(char *aPath, int aMode)
189 {
190  XrdFrcRequest Request;
191  const char *myClid;
192  char buff[2048], *qPath;
193  int i;
194 
195 // Initialize the udp path for pings, if we have not done so
196 //
197  if (!c2sFN)
198  {sprintf(buff, "%sxfrd.udp", aPath);
199  c2sFN = strdup(buff);
200  }
201 
202 // Get the instance name
203 //
204  myName = XrdOucUtils::InstName(1);
205 
206 // Generate the queue directory path
207 //
208  if (!(qPath = XrdFrcUtils::makeQDir(aPath, aMode))) return 0;
209 
210 // Initialize the registration entry and register ourselves
211 //
212  if ((myClid = getenv("XRDCMSCLUSTERID")))
213  {int Uid = static_cast<int>(geteuid());
214  int Gid = static_cast<int>(getegid());
215  memset(&Request, 0, sizeof(Request));
216  strlcpy(Request.LFN, myClid, sizeof(Request.LFN));
217  sprintf(Request.User,"%d %d", Uid, Gid);
218  sprintf(Request.ID, "%d", static_cast<int>(getpid()));
219  strlcpy(Request.iName, myName, sizeof(Request.iName));
220  Request.addTOD = time(0);
222  Request.OPc = '@';
223  }
224 
225 // Initialize the request queues if all went well
226 //
227  for (i = 0; i <= XrdFrcRequest::maxPrty; i++)
228  {sprintf(buff, "%s%sQ.%d", qPath, Persona, i);
229  rQueue[i] = new XrdFrcReqFile(buff, 1);
230  if (!rQueue[i]->Init()) return 0;
231  if (myClid) rQueue[i]->Add(&Request);
232  }
233 
234 // All done
235 //
236  if (myClid) Ping();
237  free(qPath);
238  return 1;
239 }
int stat(const char *path, struct stat *buf)
size_t strlcpy(char *dst, const char *src, size_t sz)
void Ping(const char *Msg=0)
void Add(XrdFrcRequest &Request)
XrdFrcReqAgent(const char *Me, int qVal)
int Start(char *aPath, int aMode)
void Del(XrdFrcRequest &Request)
int List(XrdFrcRequest::Item *Items, int Num)
int NextLFN(char *Buff, int Bsz, int Prty, int &Offs)
char * List(char *Buff, int bsz, int &Offs, XrdFrcRequest::Item *ITList=0, int ITNum=0)
void Add(XrdFrcRequest *rP)
static const int stgQ
static const int getQ
char LFN[3072]
static const int migQ
char iName[32]
static const int putQ
char User[256]
static const int Register
signed char Prty
static const int maxPrty
long long addTOD
static char * makeQDir(const char *Path, int Mode)
Definition: XrdFrcUtils.cc:127
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
Definition: XrdNetMsg.cc:70
static const char * InstName(int TranOpt=0)
Definition: XrdOucUtils.cc:732
XrdSysError Say