37 #include <sys/socket.h>
39 #include <sys/types.h>
53 #define TRACE(txt) if (doTrace) SYSTRACE(Trace->, tident, epName, 0, txt)
55 #define DEBUG(txt) if (doDebug) SYSTRACE(Trace->, tident, epName, 0, txt)
57 #define EPName(ep) const char *epName = ep
66 "<134>1 - %s xrootd - firefly-json - "
69 "\"flow-lifecycle\":{"
71 "\"current-time\":\"%%s\","
72 "\"start-time\":\"%s\""
75 "\"usage\":{\"received\":%%llu,\"sent\":%%llu},"
76 "\"netlink\":{\"rtt\":%%u.%%.03u},";
80 "\"experiment-id\":%d,"
90 "\"protocol\":\"tcp\","
96 const char *ffApp =
",\"application\":\"%.*s\"";
98 const char *ffEnd =
",\"end-time\":\"%s\"";
144 bool XrdNetPMarkFF::Emit(
const char *state,
const char *cT,
const char *eT)
152 int n = snprintf(msgBuff,
sizeof(msgBuff), ffHdr, state, cT, eT,
153 ss.bRecv, ss.bSent, ss.msRTT, ss.usRTT);
155 if (n + ffTailsz >= (
int)
sizeof(msgBuff))
156 {
eDest->
Emsg(
"PMarkFF",
"invalid json; msgBuff truncated.");
161 memcpy(msgBuff+n, ffTail, ffTailsz+1);
164 {
DEBUG(
"Sending pmark s-msg: " <<msgBuff);
172 {
DEBUG(
"Sending pmark o-msg: " <<(
netMsg ?
"=s-msg" : msgBuff));
173 if (
netOrg->
Send(oDest, *mySad, msgBuff, n+ffTailsz) < 0)
186 const char *XrdNetPMarkFF::getUTC(
char *utcBuff,
int utcBLen)
194 gettimeofday(&tod, 0);
195 gmtime_r(&tod.tv_sec, &utcDT);
199 size_t n = strftime(utcBuff, utcBLen,
"%FT%T", &utcDT);
200 bP = utcBuff + n; utcBLen -= n;
201 snprintf(bP, utcBLen,
".%06u+00:00",
static_cast<unsigned int>(tod.tv_usec));
265 {
char utcBuff[40], endBuff[80];
266 snprintf(endBuff,
sizeof(endBuff), ffEnd,
267 getUTC(utcBuff,
sizeof(utcBuff)));
268 Emit(
"end", utcBuff, endBuff);
273 if (mySad)
delete(mySad);
274 if (oDest) free(oDest);
275 if (ffHdr) free(ffHdr);
276 if (ffTail) free(ffTail);
277 if (xtraFH)
delete xtraFH;
285 #include <linux/tcp.h>
288 void XrdNetPMarkFF::SockStats(
struct sockStats &ss)
291 memset(&ss, 0,
sizeof(
struct sockStats));
294 struct tcp_info tcpInfo;
295 socklen_t tiLen =
sizeof(tcpInfo);
297 if (getsockopt(sockFD,
IPPROTO_TCP, TCP_INFO, (
void *)&tcpInfo, &tiLen) == 0)
298 {ss.bRecv =
static_cast<uint64_t
>(tcpInfo.tcpi_bytes_received);
299 ss.bSent =
static_cast<uint64_t
>(tcpInfo.tcpi_bytes_acked);
300 ss.msRTT =
static_cast<uint32_t
>(tcpInfo.tcpi_rtt/1000);
301 ss.usRTT =
static_cast<uint32_t
>(tcpInfo.tcpi_rtt%1000);
303 memset(&ss, 0,
sizeof(
struct sockStats));
304 DEBUG(
"Unable to get TCP information errno=" << strerror(errno));
315 char appInfo[128], clIP[INET6_ADDRSTRLEN+2], svIP[INET6_ADDRSTRLEN+2];
318 bool fdok =
false, odok =
false;
322 if (!appName) *appInfo = 0;
323 else snprintf(appInfo,
sizeof(appInfo),ffApp,
sizeof(appInfo)-20,appName);
336 {
eDest->
Emsg(
"PMarkFF", clPort,
"get peer information.");
342 {
eDest->
Emsg(
"PMarkFF", clPort,
"get self information.");
354 if (!urSad)
eDest->
Emsg(
"PMarkFF",
"unable to get origin address.");
355 else {
char buff[1024];
358 mySad->v4.sin_port = htons(
static_cast<uint16_t
>(
ffPortO));
359 snprintf(buff,
sizeof(buff),
"%s:%d", clIP,
ffPortO);
360 oDest = strdup(buff);
367 if (!fdok && !odok)
return false;
372 char utcBuff[40], bseg0[512];
373 int len0 = snprintf(bseg0,
sizeof(bseg0), ffFmt0,
myHostName,
374 getUTC(utcBuff,
sizeof(utcBuff)));
375 if (len0 >= (
int)
sizeof(bseg0))
376 {
eDest->
Emsg(
"PMarkFF",
"invalid json; bseg0 truncated.");
380 ffHdr = strdup(bseg0);
383 int len1 = snprintf(bseg1,
sizeof(bseg1), ffFmt1, eCode, aCode, appInfo);
384 if (len1 >= (
int)
sizeof(bseg1))
385 {
eDest->
Emsg(
"PMarkFF",
"invalid json; bseg1 truncated.");
390 int len2 = snprintf(bseg2,
sizeof(bseg2), ffFmt2,
391 clType, svIP, clIP, svPort, clPort);
392 if (len2 >= (
int)
sizeof(bseg2))
393 {
eDest->
Emsg(
"PMarkFF",
"invalid json; cl bseg2 truncated.");
397 ffTailsz = len1 + len2;
398 ffTail = (
char *)malloc(ffTailsz + 1);
399 strcpy(ffTail, bseg1);
400 strcpy(ffTail+len1, bseg2);
406 return Emit(
"start", utcBuff,
"");
const XrdNetSockAddr * NetAddr()
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
bool Start(XrdNetAddrInfo &addr)
static int GetSokInfo(int fd, char *theAddr, int theALen, char &theType)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)