XRootD
XrdNetPMarkFF.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d N e t P M a r k C f g . h h */
4 /* */
5 /* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdint>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <time.h>
36 #include <unistd.h>
37 #include <sys/socket.h>
38 #include <sys/time.h>
39 #include <sys/types.h>
40 
41 #include "Xrd/XrdScheduler.hh"
42 #include "XrdNet/XrdNetAddrInfo.hh"
43 #include "XrdNet/XrdNetMsg.hh"
44 #include "XrdNet/XrdNetPMarkFF.hh"
45 #include "XrdNet/XrdNetUtils.hh"
46 #include "XrdSys/XrdSysError.hh"
47 #include "XrdSys/XrdSysTrace.hh"
48 
49 /******************************************************************************/
50 /* L o c a l M a c r o s */
51 /******************************************************************************/
52 
53 #define TRACE(txt) if (doTrace) SYSTRACE(Trace->, tident, epName, 0, txt)
54 
55 #define DEBUG(txt) if (doDebug) SYSTRACE(Trace->, tident, epName, 0, txt)
56 
57 #define EPName(ep) const char *epName = ep
58 
59 /******************************************************************************/
60 /* F i r e f l y P a c k e t T e m p l a t e */
61 /******************************************************************************/
62 
63 namespace
64 {
65 const char *ffFmt0 =
66 "<134>1 - %s xrootd - firefly-json - " //RFC5424 syslog header (abbreviated)
67 "{"
68  "\"version\":1,"
69  "\"flow-lifecycle\":{"
70  "\"state\":\"%%s\"," //-> start | ongoing | end
71  "\"current-time\":\"%%s\"," //-> yyyy-mm-ddThh:mm:ss.uuuuuu+00:00
72  "\"start-time\":\"%s\""
73  "%%s" //-> ,"end-time":"<date-time>"
74  "},"
75  "\"usage\":{\"received\":%%llu,\"sent\":%%llu},"
76  "\"netlink\":{\"rtt\":%%u.%%.03u},";
77 
78 const char *ffFmt1 =
79  "\"context\":{"
80  "\"experiment-id\":%d,"
81  "\"activity-id\":%d"
82  "%s" //-> ,application:<appname>
83  "},";
84 
85 const char *ffFmt2 =
86  "\"flow-id\":{"
87  "\"afi\":\"ipv%c\"," //-> ipv4 | ipv6
88  "\"src-ip\":\"%s\"," // source which is always server (us)
89  "\"dst-ip\":\"%s\"," // dest which is always client
90  "\"protocol\":\"tcp\","
91  "\"src-port\":%d,"
92  "\"dst-port\":%d"
93  "}"
94 "}";
95 
96 const char *ffApp = ",\"application\":\"%.*s\"";
97 
98 const char *ffEnd = ",\"end-time\":\"%s\"";
99 }
100 
101 /******************************************************************************/
102 /* s t a t i c O b j e c t s */
103 /******************************************************************************/
104 
105 namespace XrdNetPMarkConfig
106 {
107 
108 // Other configuration values
109 //
110 extern XrdSysError *eDest;
111 extern XrdNetMsg *netMsg;
112 extern XrdNetMsg *netOrg;
113 extern XrdScheduler *Sched;
114 extern XrdSysTrace *Trace;
115 
116 extern char *ffDest;
117 extern int ffPortO;
118 extern int ffEcho;
119 extern bool doDebug;
120 extern bool doTrace;
121 
122 extern const char *myHostName;
123 }
124 using namespace XrdNetPMarkConfig;
125 
126 /******************************************************************************/
127 /* T h r e a d I n t e r f a c e s */
128 /******************************************************************************/
129 /*
130 namespace
131 {
132 void *Refresh(void *carg)
133  {int intvl = *(int *)carg;
134  while(true) {XrdSysTimer::Snooze(intvl); XrdNetPMarkCfg::Ping();}
135  }
136 XrdSysMutex ffMutex;
137 }
138 */
139 
140 /******************************************************************************/
141 /* Private: E m i t */
142 /******************************************************************************/
143 
144 bool XrdNetPMarkFF::Emit(const char *state, const char *cT, const char *eT)
145 {
146  EPName("Emit");
147  struct sockStats ss;
148  char msgBuff[1024];
149 
150  SockStats(ss);
151 
152  int n = snprintf(msgBuff, sizeof(msgBuff), ffHdr, state, cT, eT,
153  ss.bRecv, ss.bSent, ss.msRTT, ss.usRTT);
154 
155  if (n + ffTailsz >= (int)sizeof(msgBuff))
156  {eDest->Emsg("PMarkFF", "invalid json; msgBuff truncated.");
157  fdOK = odOK = false;
158  return false;
159  }
160 
161  memcpy(msgBuff+n, ffTail, ffTailsz+1);
162 
163  if (fdOK)
164  {DEBUG("Sending pmark s-msg: " <<msgBuff);
165  if (netMsg->Send(msgBuff, n+ffTailsz) < 0)
166  {fdOK = false;
167  return false;
168  }
169  }
170 
171  if (odOK)
172  {DEBUG("Sending pmark o-msg: " <<(netMsg ? "=s-msg" : msgBuff));
173  if (netOrg->Send(oDest, *mySad, msgBuff, n+ffTailsz) < 0)
174  {odOK = false;
175  return false;
176  }
177  }
178 
179  return true;
180 }
181 
182 /******************************************************************************/
183 /* Private: g e t U T C */
184 /******************************************************************************/
185 
186 const char *XrdNetPMarkFF::getUTC(char *utcBuff, int utcBLen)
187 {
188  struct timeval tod;
189  struct tm utcDT;
190  char *bP;
191 
192 // Get the current time in UTC
193 //
194  gettimeofday(&tod, 0);
195  gmtime_r(&tod.tv_sec, &utcDT);
196 
197 // Format this ISO-style
198 //
199  size_t n = strftime(utcBuff, utcBLen, "%FT%T", &utcDT);
200  bP = utcBuff + n; utcBLen -= n;
201  snprintf(bP, utcBLen, ".%06u+00:00", static_cast<unsigned int>(tod.tv_usec));
202 
203 // Return result
204 //
205  return utcBuff;
206 }
207 
208 /******************************************************************************/
209 /* P i n g */
210 /******************************************************************************/
211 /*
212 void XrdNetPMarkCfg::Ping()
213 {
214 // Tell every registered task to send out a continuation
215 //
216  ffMutex.Lock();
217  for (std::set<XdNetPMarkFF*> it = ffTasks.begin(); it!= ffTasks.end(); it++)
218 ???
219  ffMutex.UnLock();
220 }
221 */
222 /******************************************************************************/
223 /* R e g i s t r y */
224 /******************************************************************************/
225 /*
226 XrdNetMsg *XrdNetPMarkCfg::netMsg = 0;
227 std::set<XrdNetPMarkFF*> XrdNetPMarkCfg::ffTasks;
228 
229 void XrdNetPMarkCfg::Registry(XrdNetPMarkFF *ffobj, bool doadd)
230 {
231 // Add or delete ityem from task list
232 //
233  ffMutex.Lock();
234  if (doadd) ffTasks.insert(ffObj);
235  else ffTasks.erase(ffObj);
236  ffMutex.UnLock();
237 }
238 
239 // This is firefly so we must get a netmsg object
240 //
241  bool aOK;
242  netMsg = new XrdNetMsg(eLog, ffDest, aOK);
243  if (!aOK)
244  {eLog->Emsg("Config", "Unable to create UDP tunnel to", ffDest);
245  return 0;
246  }
247 
248 // If there is an interval, start a thread to handle continuations
249 //
250  if (ffIntvl && XrdSysThread::Run(&tid,Refresh,(void *)&ffIntvl,0,"pmark")
251  {eDest->Emsg(epname, errno, "start pmark refresh timer");
252  return 0;
253  }
254 */
255 
256 /******************************************************************************/
257 /* D e s t r u c t o r */
258 /******************************************************************************/
259 
261 {
262 // If all is well, emit the closing message
263 //
264  if (fdOK || odOK)
265  {char utcBuff[40], endBuff[80];
266  snprintf(endBuff, sizeof(endBuff), ffEnd,
267  getUTC(utcBuff, sizeof(utcBuff)));
268  Emit("end", utcBuff, endBuff);
269  }
270 
271 // Cleanup
272 //
273  if (mySad) delete(mySad);
274  if (oDest) free(oDest);
275  if (ffHdr) free(ffHdr);
276  if (ffTail) free(ffTail);
277  if (xtraFH) delete xtraFH;
278 };
279 
280 /******************************************************************************/
281 /* S o c k S t a t s */
282 /******************************************************************************/
283 
284 #ifdef __linux__
285 #include <linux/tcp.h>
286 #endif
287 
288 void XrdNetPMarkFF::SockStats(struct sockStats &ss)
289 {
290 #ifndef __linux__
291  memset(&ss, 0, sizeof(struct sockStats));
292 #else
293  EPName("SockStats");
294  struct tcp_info tcpInfo;
295  socklen_t tiLen = sizeof(tcpInfo);
296 
297  if (getsockopt(sockFD, IPPROTO_TCP, TCP_INFO, (void *)&tcpInfo, &tiLen) == 0)
298  {ss.bRecv = static_cast<uint64_t>(tcpInfo.tcpi_bytes_received);
299  ss.bSent = static_cast<uint64_t>(tcpInfo.tcpi_bytes_acked);
300  ss.msRTT = static_cast<uint32_t>(tcpInfo.tcpi_rtt/1000);
301  ss.usRTT = static_cast<uint32_t>(tcpInfo.tcpi_rtt%1000);
302  } else {
303  memset(&ss, 0, sizeof(struct sockStats));
304  DEBUG("Unable to get TCP information errno=" << strerror(errno));
305  }
306 #endif
307 }
308 
309 /******************************************************************************/
310 /* S t a r t */
311 /******************************************************************************/
312 
314 {
315  char appInfo[128], clIP[INET6_ADDRSTRLEN+2], svIP[INET6_ADDRSTRLEN+2];
316  int clPort, svPort;
317  char clType, svType;
318  bool fdok = false, odok = false;
319 
320 // Preform app if we need to
321 //
322  if (!appName) *appInfo = 0;
323  else snprintf(appInfo,sizeof(appInfo),ffApp,sizeof(appInfo)-20,appName);
324 
325 // Get the file descriptor for the socket
326 //
327  sockFD = addr.SockFD();
328 
329 // Obtain connectivity information about the peer and ourselves. We really
330 // should obtain our external address and use that but the issue is that
331 // we may have multiple external addresses and the client determines which
332 // one actually gets used. So, it's complicated. A TODO.
333 //
334  clPort = XrdNetUtils::GetSokInfo( sockFD, clIP, sizeof(clIP), clType);
335  if (clPort < 0)
336  {eDest->Emsg("PMarkFF", clPort, "get peer information.");
337  return false;
338  }
339 
340  svPort = XrdNetUtils::GetSokInfo(-sockFD, svIP, sizeof(svIP), svType);
341  if (svPort < 0)
342  {eDest->Emsg("PMarkFF", clPort, "get self information.");
343  return false;
344  }
345 
346 // If there is no special collector, indicate so
347 //
348  if (netMsg) fdok = true;
349 
350 // If the messages need to flow to the origin, get the destination information
351 //
352  if (netOrg)
353  {const XrdNetSockAddr *urSad = addr.NetAddr();
354  if (!urSad) eDest->Emsg("PMarkFF", "unable to get origin address.");
355  else {char buff[1024];
356  mySad = new XrdNetSockAddr;
357  memcpy(mySad, urSad, sizeof(XrdNetSockAddr));
358  mySad->v4.sin_port = htons(static_cast<uint16_t>(ffPortO));
359  snprintf(buff, sizeof(buff), "%s:%d", clIP, ffPortO);
360  oDest = strdup(buff);
361  odok = true;
362  }
363  }
364 
365 // If we cannot report anywhere then indicate we failed
366 //
367  if (!fdok && !odok) return false;
368 
369 // Format the base firefly template. Note that the client determines the
370 // address family that is being used.
371 //
372  char utcBuff[40], bseg0[512];
373  int len0 = snprintf(bseg0, sizeof(bseg0), ffFmt0, myHostName,
374  getUTC(utcBuff, sizeof(utcBuff)));
375  if (len0 >= (int)sizeof(bseg0))
376  {eDest->Emsg("PMarkFF", "invalid json; bseg0 truncated.");
377  return false;
378  }
379 
380  ffHdr = strdup(bseg0);
381 
382  char bseg1[256];
383  int len1 = snprintf(bseg1, sizeof(bseg1), ffFmt1, eCode, aCode, appInfo);
384  if (len1 >= (int)sizeof(bseg1))
385  {eDest->Emsg("PMarkFF", "invalid json; bseg1 truncated.");
386  return false;
387  }
388 
389  char bseg2[256];
390  int len2 = snprintf(bseg2, sizeof(bseg2), ffFmt2,
391  clType, svIP, clIP, svPort, clPort);
392  if (len2 >= (int)sizeof(bseg2))
393  {eDest->Emsg("PMarkFF", "invalid json; cl bseg2 truncated.");
394  return false;
395  }
396 
397  ffTailsz = len1 + len2;
398  ffTail = (char *)malloc(ffTailsz + 1);
399  strcpy(ffTail, bseg1);
400  strcpy(ffTail+len1, bseg2);
401 
402 // OK, we now can emit the starting packet
403 //
404  fdOK = fdok;
405  odOK = odok;
406  return Emit("start", utcBuff, "");
407 }
#define DEBUG(txt)
#define EPName(ep)
#define IPPROTO_TCP
Definition: XrdNetUtils.cc:800
const XrdNetSockAddr * NetAddr()
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
Definition: XrdNetMsg.cc:70
bool Start(XrdNetAddrInfo &addr)
virtual ~XrdNetPMarkFF()
static int GetSokInfo(int fd, char *theAddr, int theALen, char &theType)
Definition: XrdNetUtils.cc:498
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysTrace * Trace
XrdScheduler * Sched
XrdSysError * eDest
const char * myHostName