XRootD
XrdBwmLogger.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d B w m L o g g e r . c c */
4 /* */
5 /* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cctype>
32 #include <stddef.h>
33 #include <cstdlib>
34 #include <cstdio>
35 #include <cstring>
36 
37 #include "XrdBwm/XrdBwmLogger.hh"
38 #include "XrdSys/XrdSysError.hh"
39 #include "XrdOuc/XrdOucProg.hh"
40 #include "XrdOuc/XrdOucStream.hh"
41 #include "XrdNet/XrdNetOpts.hh"
42 #include "XrdNet/XrdNetSocket.hh"
43 #include "XrdSys/XrdSysPlatform.hh"
44 
45 /******************************************************************************/
46 /* L o c a l C l a s s e s */
47 /******************************************************************************/
48 
50 {
51 public:
52 
53 static const int msgSize = 2048;
54 
56 char Text[msgSize];
57 int Tlen;
58 
59  XrdBwmLoggerMsg() : next(0), Tlen(0) {}
60 
62 };
63 
64 /******************************************************************************/
65 /* E x t e r n a l L i n k a g e s */
66 /******************************************************************************/
67 
68 void *XrdBwmLoggerSend(void *pp)
69 {
70  XrdBwmLogger *lP = (XrdBwmLogger *)pp;
71  lP->sendEvents();
72  return (void *)0;
73 }
74 
75 /******************************************************************************/
76 /* C o n s t r u c t o r */
77 /******************************************************************************/
78 
79 XrdBwmLogger::XrdBwmLogger(const char *Target)
80 {
81 
82 // Set common variables
83 //
84  theTarget = strdup(Target);
85  eDest = 0;
86  theProg = 0;
87  msgFirst = msgLast = msgFree = 0;
88  tid = 0;
89  msgFD = 0;
90  endIT = 0;
91  theEOL= '\n';
92  msgsInQ = 0;
93 }
94 
95 /******************************************************************************/
96 /* D e s t r u c t o r */
97 /******************************************************************************/
98 
100 {
101  XrdBwmLoggerMsg *tp;
102 
103 // Kill the notification thread. This may cause a msg block to be orphaned
104 // but, in practice, this object does not really get deleted after being
105 // started. So, the problem is moot.
106 //
107  endIT = 1;
108  if (tid) XrdSysThread::Kill(tid);
109 
110 // Release all queued message bocks
111 //
112  qMut.Lock();
113  while ((tp = msgFirst)) {msgFirst = tp->next; delete tp;}
114  if (theTarget) free(theTarget);
115  if (msgFD >= 0) close(msgFD);
116  if (theProg) delete theProg;
117  qMut.UnLock();
118 
119 // Release all free message blocks
120 //
121  fMut.Lock();
122  while ((tp = msgFree)) {msgFree = tp->next; delete tp;}
123  fMut.UnLock();
124 }
125 
126 /******************************************************************************/
127 /* E v e n t */
128 /******************************************************************************/
129 
131 {
132  static int warnings = 0;
133  XrdBwmLoggerMsg *tp;
134 
135 // Get a message block
136 //
137  if (!(tp = getMsg()))
138  {if ((++warnings & 0xff) == 1)
139  eDest->Emsg("Notify", "Ran out of logger message objects;",
140  eInfo.Tident, "event not logged.");
141  return;
142  }
143 
144 // Format the message
145 //
146  tp->Tlen = snprintf(tp->Text, XrdBwmLoggerMsg::msgSize,
147  "<stats id=\"bwm\"><tid>%s</tid><lfn>%s</lfn>"
148  "<lcl>%s</lcl><rmt>%s</rmt><flow>%c</flow>"
149  "<at>%lld</at><bt>%lld</bt><ct>%lld</ct>"
150  "<iq>%d</iq><oq>%d</oq><xq>%d</xq>"
151  "<sz>%lld<sz><esec>%d</esec></stats>%c",
152  eInfo.Tident, eInfo.Lfn, eInfo.lclNode, eInfo.rmtNode,
153  eInfo.Flow, (long long) eInfo.ATime,
154  (long long) eInfo.BTime, (long long) eInfo.CTime,
155  eInfo.numqIn, eInfo.numqOut, eInfo.numqXeq, eInfo.Size,
156  eInfo.ESec, theEOL);
157 
158 // Either log this or put the message on the queue and return
159 //
160  tp->next = 0;
161  qMut.Lock();
162  if (msgLast) {msgLast->next = tp; msgLast = tp;}
163  else msgFirst = msgLast = tp;
164  qMut.UnLock();
165  qSem.Post();
166 }
167 
168 /******************************************************************************/
169 /* s e n d E v e n t s */
170 /******************************************************************************/
171 
173 {
174  XrdBwmLoggerMsg *tp;
175  const char *theData[2] = {0,0};
176  int theDlen[2] = {0,0};
177 
178 // This is an endless loop that just gets things off the event queue and
179 // send them out. This allows us to only hang a simgle thread should the
180 // receiver get blocked, instead of the whole process.
181 //
182  while(1)
183  {qSem.Wait();
184  qMut.Lock();
185  if (endIT) break;
186  if ((tp = msgFirst) && !(msgFirst = tp->next)) msgLast = 0;
187  qMut.UnLock();
188  if (tp)
189  {if (!theProg) Feed(tp->Text, tp->Tlen);
190  else {theData[0] = tp->Text; theDlen[0] = tp->Tlen;
191  theProg->Feed(theData, theDlen);
192 
193  }
194  retMsg(tp);
195  }
196  }
197  qMut.UnLock();
198 }
199 
200 /******************************************************************************/
201 /* S t a r t */
202 /******************************************************************************/
203 
205 {
206  int rc;
207 
208 // Set the error object pointer
209 //
210  eDest = eobj;
211 
212 // Check if we need to create a socket to a path
213 //
214  if (!strcmp("*", theTarget)) {msgFD = -1; theEOL = '\0';}
215  else if (*theTarget == '>')
216  {XrdNetSocket *msgSock;
217  if (!(msgSock = XrdNetSocket::Create(eobj, theTarget+1, 0, 0660,
218  XRDNET_FIFO))) return -1;
219  msgFD = msgSock->Detach();
220  delete msgSock;
221  }
222  else {// Allocate a new program object if we don't have one
223  //
224  if (theProg) return 0;
225  theProg = new XrdOucProg(eobj);
226 
227  // Setup the program
228  //
229  if (theProg->Setup(theTarget, eobj)) return -1;
230  if ((rc = theProg->Start()))
231  {eobj->Emsg("Logger", rc, "start event collector"); return -1;}
232  }
233 
234 // Now start a thread to get messages and send them to the collector
235 //
236  if ((rc = XrdSysThread::Run(&tid, XrdBwmLoggerSend, static_cast<void *>(this),
237  0, "Log message sender")))
238  {eobj->Emsg("Logger", rc, "create log message sender thread");
239  return -1;
240  }
241 
242 // All done
243 //
244  return 0;
245 }
246 
247 /******************************************************************************/
248 /* P r i v a t e M e t h o d s */
249 /******************************************************************************/
250 /******************************************************************************/
251 /* F e e d */
252 /******************************************************************************/
253 
254 int XrdBwmLogger::Feed(const char *data, int dlen)
255 {
256  int retc;
257 
258 // Send message to the log if need be
259 //
260  if (msgFD < 0) {eDest->Say("", data); return 0;}
261 
262 // Write the data. since this is a udp socket all the data goes or none does
263 //
264  do { retc = write(msgFD, (const void *)data, (size_t)dlen);}
265  while (retc < 0 && errno == EINTR);
266  if (retc < 0)
267  {eDest->Emsg("Feed", errno, "write to logger socket", theTarget);
268  return -1;
269  }
270 
271 // All done
272 //
273  return 0;
274 }
275 
276 /******************************************************************************/
277 /* g e t M s g */
278 /******************************************************************************/
279 
280 XrdBwmLoggerMsg *XrdBwmLogger::getMsg()
281 {
282  XrdBwmLoggerMsg *tp;
283 
284 // Lock the free queue
285 //
286  fMut.Lock();
287 
288 // Get message object but don't give out too many
289 //
290  if (msgsInQ >= maxmInQ) tp = 0;
291  else {if ((tp = msgFree)) msgFree = tp->next;
292  else tp = new XrdBwmLoggerMsg();
293  msgsInQ++;
294  }
295 
296 // Unlock and return result
297 //
298  fMut.UnLock();
299  return tp;
300 }
301 
302 /******************************************************************************/
303 /* r e t M s g */
304 /******************************************************************************/
305 
306 void XrdBwmLogger::retMsg(XrdBwmLoggerMsg *tp)
307 {
308 
309 // Lock the free queue, return message, unlock the queue
310 //
311  fMut.Lock();
312  tp->next = msgFree;
313  msgFree = tp;
314  msgsInQ--;
315  fMut.UnLock();
316 }
void * XrdBwmLoggerSend(void *pp)
Definition: XrdBwmLogger.cc:68
#define XRDNET_FIFO
Definition: XrdNetOpts.hh:83
ssize_t write(int fildes, const void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:43
char Text[msgSize]
Definition: XrdBwmLogger.cc:56
XrdBwmLoggerMsg * next
Definition: XrdBwmLogger.cc:55
static const int msgSize
Definition: XrdBwmLogger.cc:53
const char * rmtNode
Definition: XrdBwmLogger.hh:48
const char * lclNode
Definition: XrdBwmLogger.hh:47
XrdBwmLogger(const char *Target)
Definition: XrdBwmLogger.cc:79
int Start(XrdSysError *eobj)
const char * Tident
Definition: XrdBwmLogger.hh:45
void Event(Info &eInfo)
const char * Lfn
Definition: XrdBwmLogger.hh:46
void sendEvents(void)
static XrdNetSocket * Create(XrdSysError *Say, const char *path, const char *fn, mode_t mode, int isudp=0)
int Start(void)
Definition: XrdOucProg.cc:349
int Feed(const char *data[], const int dlen[])
Definition: XrdOucProg.cc:63
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
Definition: XrdOucProg.cc:296
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static int Kill(pthread_t tid)