XRootD
XrdMpxStats.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d M p x S t a t s . c c */
4 /* */
5 /* (c) 2009 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <unistd.h>
33 #include <sys/types.h>
34 #include <sys/uio.h>
35 
36 #include "XrdApps/XrdMpxXml.hh"
37 #include "XrdNet/XrdNetAddr.hh"
38 #include "XrdNet/XrdNetOpts.hh"
39 #include "XrdNet/XrdNetSocket.hh"
40 #include "XrdSys/XrdSysError.hh"
41 #include "XrdSys/XrdSysLogger.hh"
42 #include "XrdSys/XrdSysHeaders.hh"
43 #include "XrdSys/XrdSysPlatform.hh"
44 #include "XrdSys/XrdSysPthread.hh"
45 
46 /******************************************************************************/
47 /* G l o b a l V a r i a b l e s */
48 /******************************************************************************/
49 
50 namespace XrdMpx
51 {
53 
54  XrdSysError Say(&Logger, "mpxstats");
55 
56 static const int addSender = 0x0001;
57 
58  int Opts;
59 };
60 
61 using namespace XrdMpx;
62 
63 /******************************************************************************/
64 /* X r d M p x O u t */
65 /******************************************************************************/
66 
67 class XrdMpxOut
68 {
69 public:
70 
71 struct statsBuff
74  int Dlen;
75  char Data[8190];
76  char Pad[2];
77  };
78 
79 void Add(statsBuff *sbP);
80 
81 statsBuff *getBuff();
82 
83 void *Run(XrdMpxXml *xP);
84 
85  XrdMpxOut() : Ready(0), inQ(0), Free(0) {}
87 
88 private:
89 
90 XrdSysMutex myMutex;
91 XrdSysSemaphore Ready;
92 
93 statsBuff *inQ;
94 statsBuff *Free;
95 };
96 
97 /******************************************************************************/
98 /* X r d M p x O u t : : A d d */
99 /******************************************************************************/
100 
102 {
103 
104 // Add this to the queue and signal the processing thread
105 //
106  myMutex.Lock();
107  sbP->Next = inQ;
108  inQ = sbP;
109  Ready.Post();
110  myMutex.UnLock();
111 }
112 
113 /******************************************************************************/
114 /* X r d M p x O u t : : g e t B u f f */
115 /******************************************************************************/
116 
118 {
119  statsBuff *sbP;
120 
121 // Use an available buffer or allocate one
122 //
123  myMutex.Lock();
124  if ((sbP = Free)) Free = sbP->Next;
125  else sbP = new statsBuff;
126  myMutex.UnLock();
127  return sbP;
128 }
129 
130 /******************************************************************************/
131 /* X r d M p x O u t : : R u n */
132 /******************************************************************************/
133 
135 {
136  XrdNetAddr theAddr;
137  const char *Host = 0;
138  char *bP, obuff[sizeof(statsBuff)*2];
139  statsBuff *sbP;
140  int wLen, rc;
141 
142 // Simply loop formating and outputing the buffers
143 //
144  while(1)
145  {Ready.Wait();
146  myMutex.Lock();
147  if ((sbP = inQ)) inQ = sbP->Next;
148  myMutex.UnLock();
149  if (!sbP) continue;
150  if (xP)
151  {if (!(Opts & addSender)) Host = 0;
152  else if (theAddr.Set(&(sbP->From.Addr))) Host = 0;
153  else Host = theAddr.Name();
154  wLen = xP->Format(Host, sbP->Data, obuff);
155  bP = obuff;
156  } else {
157  bP = sbP->Data;
158  *(bP + sbP->Dlen) = '\n';
159  wLen = sbP->Dlen+1;
160  }
161 
162  while(wLen > 0)
163  {do {rc = write(STDOUT_FILENO, bP, wLen);}
164  while(rc < 0 && errno == EINTR);
165  wLen -= rc; bP += rc;
166  }
167 
168  myMutex.Lock(); sbP->Next = Free; Free = sbP; myMutex.UnLock();
169  }
170 
171 // Should never get here
172 //
173  return (void *)0;
174 }
175 
176 /******************************************************************************/
177 /* G l o b a l O b j e c t s */
178 /******************************************************************************/
179 
180 namespace XrdMpx
181 {
183 };
184 
185 /******************************************************************************/
186 /* T h r e a d I n t e r f a c e s */
187 /******************************************************************************/
188 
189 void *mainOutput(void *parg)
190 {
191  XrdMpxXml *xP = static_cast<XrdMpxXml *>(parg);
192  return statsQ.Run(xP);
193 }
194 
195 /******************************************************************************/
196 /* U s a g e */
197 /******************************************************************************/
198 
199 void Usage(int rc)
200 {
201  std::cerr <<"\nUsage: mpxstats [-f {cgi|flat|xml}] -p <port> [-s]" <<std::endl;
202  exit(rc);
203 }
204 
205 /******************************************************************************/
206 /* m a i n */
207 /******************************************************************************/
208 
209 int main(int argc, char *argv[])
210 {
211  extern char *optarg;
212  extern int opterr, optopt;
213  sigset_t myset;
214  pthread_t tid;
216  XrdMpxOut::statsBuff *sbP = 0;
217  XrdNetSocket mySocket(&Say);
218  XrdMpxXml *xP = 0;
219  SOCKLEN_t fromLen;
220  int Port = 0, retc, udpFD;
221  char buff[64], c;
222  bool Debug;
223 
224 // Process the options
225 //
226  opterr = 0; Debug = false; Opts = 0;
227  if (argc > 1 && '-' == *argv[1])
228  while ((c = getopt(argc,argv,"df:p:s")) && ((unsigned char)c != 0xff))
229  { switch(c)
230  {
231  case 'd': Debug = true;
232  break;
233  case 'f': if (!strcmp(optarg, "cgi" )) fType = XrdMpxXml::fmtCGI;
234  else if (!strcmp(optarg, "flat")) fType = XrdMpxXml::fmtFlat;
235  else if (!strcmp(optarg, "xml" )) fType = XrdMpxXml::fmtXML;
236  else {Say.Emsg(":", "Invalid format - ", optarg); Usage(1);}
237  break;
238  case 'h': Usage(0);
239  break;
240  case 'p': if (!(Port = atoi(optarg)))
241  {Say.Emsg(":", "Invalid port number - ", optarg); Usage(1);}
242  break;
243  case 's': Opts |= addSender;
244  break;
245  default: sprintf(buff,"'%c'", optopt);
246  if (c == ':') Say.Emsg(":", buff, "value not specified.");
247  else Say.Emsg(0, buff, "option is invalid");
248  Usage(1);
249  break;
250  }
251  }
252 
253 // Make sure port has been specified
254 //
255  if (!Port) {Say.Emsg(":", "Port has not been specified."); Usage(1);}
256 
257 // Turn off sigpipe and host a variety of others before we start any threads
258 //
259  signal(SIGPIPE, SIG_IGN); // Solaris optimization
260  sigemptyset(&myset);
261  sigaddset(&myset, SIGPIPE);
262  sigaddset(&myset, SIGCHLD);
263  pthread_sigmask(SIG_BLOCK, &myset, NULL);
264 
265 // Set the default stack size here
266 //
267  if (sizeof(long) > 4) XrdSysThread::setStackSize((size_t)1048576);
268  else XrdSysThread::setStackSize((size_t)786432);
269 
270 // Create a UDP socket and bind it to a port
271 //
272  if (mySocket.Open(0, Port, XRDNET_SERVER|XRDNET_UDPSOCKET, 0) < 0)
273  {Say.Emsg(":", -mySocket.LastError(), "create udp socket"); exit(4);}
274  udpFD = mySocket.Detach();
275 
276 // Establish format
277 //
278  if (fType != XrdMpxXml::fmtXML) xP = new XrdMpxXml(fType, Debug);
279 
280 // Now run a thread to output whatever we get
281 //
282  if ((retc = XrdSysThread::Run(&tid, mainOutput, (void *)xP,
283  XRDSYSTHREAD_BIND, "Output")))
284  {Say.Emsg(":", retc, "create output thread"); exit(4);}
285 
286 // Now simply wait for the messages
287 //
288  fromLen = sizeof(sbP->From);
289  while(1)
290  {sbP = statsQ.getBuff();
291  retc = recvfrom(udpFD, sbP->Data, sizeof(sbP->Data), 0,
292  &sbP->From.Addr, &fromLen);
293  if (retc < 0) {Say.Emsg(":", retc, "recv udp message"); exit(8);}
294  sbP->Dlen = retc;
295  statsQ.Add(sbP);
296  }
297 
298 // Should never get here
299 //
300  return 0;
301 }
int optopt
int main(int argc, char *argv[])
Definition: XrdMpxStats.cc:209
void * mainOutput(void *parg)
Definition: XrdMpxStats.cc:189
void Usage(int rc)
Definition: XrdMpxStats.cc:199
#define XRDNET_SERVER
Definition: XrdNetOpts.hh:99
#define XRDNET_UDPSOCKET
Definition: XrdNetOpts.hh:79
struct sockaddr Addr
ssize_t write(int fildes, const void *buf, size_t nbyte)
#define SOCKLEN_t
#define XRDSYSTHREAD_BIND
XrdNetSockAddr From
Definition: XrdMpxStats.cc:73
void * Run(XrdMpxXml *xP)
Definition: XrdMpxStats.cc:134
void Add(statsBuff *sbP)
Definition: XrdMpxStats.cc:101
statsBuff * getBuff()
Definition: XrdMpxStats.cc:117
int Format(const char *Host, char *ibuff, char *obuff)
Definition: XrdMpxXml.cc:242
const char * Name(const char *eName=0, const char **eText=0)
const char * Set(const char *hSpec, int pNum=PortInSpec)
Definition: XrdNetAddr.cc:216
int Open(const char *path, int port=-1, int flags=0, int sockbuffsz=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void setStackSize(size_t stsz, bool force=false)
XrdSysError Say
XrdMpxOut statsQ
Definition: XrdMpxStats.cc:182
static const int addSender
Definition: XrdMpxStats.cc:56
XrdSysLogger Logger
Definition: XrdMpxStats.cc:52
int Opts
Definition: XrdMpxStats.cc:58