51 extern long long mySID;
69 ? 0 : gsParms.dest, gsParms.
Fmt),
70 pSeq(0), pSeqID(0), pSeqDID(0), binHdr(0),
73 static const int minSZ = 1024;
74 static const int dflSZ = 1024*32;
75 static const int maxSZ = 1024*64;
76 int flsT, maxL, hdrLen;
80 memset(&hInfo, 0,
sizeof(hInfo));
85 if (gsParms.
maxL <= 0) maxL = dflSZ;
86 else if (gsParms.
maxL < minSZ) maxL = minSZ;
87 else if (gsParms.
maxL > maxSZ) maxL = maxSZ;
88 else maxL = gsParms.
maxL;
94 if (maxL >= getpagesize()) align = getpagesize();
95 else if (maxL >= 2048) align = 2048;
96 else if (maxL >= 1024) align = 1024;
97 else align =
sizeof(
void*);
99 if (posix_memalign((
void **)&udpBuffer, align, maxL)) {aOK =
false;
return;}
106 dictHdr = idntHdr0 = idntHdr1 = 0;
109 {
case fmtBin: hdrLen = hdrBIN(gsParms);
111 case fmtCgi: hdrLen = hdrCGI(gsParms, udpBuffer, maxL);
113 case fmtJson: hdrLen = hdrJSN(gsParms, udpBuffer, maxL);
118 {
if (idntHdr0) {free(idntHdr0); idntHdr0 = 0;}
119 if (idntHdr1) {free(idntHdr1); idntHdr1 = 0;}
125 udpBFirst = udpBNext = udpBuffer + hdrLen;
126 udpBEnd = udpBuffer + maxL - 1;
128 tBeg = tEnd = afTime = 0;
132 monType = gsParms.
Mode;
143 else flsT = gsParms.
flsT;
150 snprintf(idBuff,
sizeof(idBuff),
"%s.0:0@%s", gsParms.
pin,
monHost);
161 void XrdXrootdGSReal::AutoFlush()
163 if (afTime && !afRunning)
181 {
if (tBeg && time(0)-tBeg >= afTime) Expel(0);
190 void XrdXrootdGSReal::Expel(
int dlen)
195 if (udpBFirst == udpBNext || (dlen && (udpBNext + dlen) < udpBEnd))
return;
196 int size = udpBNext-udpBuffer;
202 binHdr->
hdr.
plen = htons(
static_cast<uint16_t
>(size));
203 binHdr->
tBeg = htonl(tBeg);
204 binHdr->
tEnd = htonl(tEnd);
208 if (pSeq >= 999) pSeq = 0;
210 snprintf(tBuff,
sizeof(tBuff),
"%3d%10u%10u", pSeq,
211 (
unsigned int)tBeg, (
unsigned int)tEnd);
213 {
char *plus, *bP = tBuff;
214 while((plus = index(bP,
' '))) {*plus =
'+'; bP = plus+1;}
216 memcpy(hInfo.pseq, tBuff, 3);
217 memcpy(hInfo.tbeg, tBuff+ 3, 10);
218 memcpy(hInfo.tend, tBuff+13, 10);
228 if (udpDest) udpDest->
Send(udpBuffer, size);
233 udpBNext = udpBFirst;
255 if (binHdr)
return (isPath ? gMon.
MapPath(text) : gMon.
MapInfo(text));
260 if (!dictHdr)
return htonl(did);
271 if (pSeqDID >= 999) pSeqDID = 0;
278 iov[0].iov_base = buff;
279 iov[0].iov_len = snprintf(buff,
sizeof(buff), dictHdr, dit, psq, did);
280 iov[1].iov_base = (
void *)text;
281 iov[1].iov_len = strlen(text);
282 iov[2].iov_base = (
void *)
"\"}";
287 udpDest->
Send(
iov, (*dictHdr ==
'{' ? 3 : 2));
297 return binHdr != 0 || dictHdr != 0;
314 long long theSID = ntohll(
mySID) & 0x00ffffffffffffff;
316 binHdr->
sID = htonll(theSID);
326 char *buff,
int blen)
328 const char *hdr, *plug =
"\n";
344 hdr =
"code=%%c&pseq=%%u&stod=%u&sid=%s%s&gs.type=%c&did=%%u&data=";
347 dictHdr = strdup(hBuff);
352 hdr =
"code=%c&pseq=%%u";
355 idntHdr0 = strdup(hBuff);
357 hdr =
"&stod=%u&sid=%s%s";
360 idntHdr1 = strdup(hBuff);
365 hdr =
"code=%c&pseq=$12&stod=%u&sid=%s%s&gs.type=%c"
366 "&gs.tbeg=$123456789&gs.tend=$123456789%s\n";
373 hInfo.pseq = index(buff,
'$');
374 hInfo.tbeg = index(hInfo.pseq+1,
'$');
375 hInfo.tend = index(hInfo.tbeg+1,
'$');
388 char *buff,
int blen)
390 const char *hdr, *plug1 =
"", *plug2 =
"";
403 default: plug1 =
"";
break;
409 hdr =
"{\"code\":\"%%c\",\"pseq\":%%u,\"stod\":%u,\"sid\":%s%s%s,"
410 "\"gs\":{\"type\":\"%c\"},\"did\":%%u,\"data\":\"";
413 plug1, plug2, gs.
Type);
414 dictHdr = strdup(hBuff);
419 hdr =
"{\"code\":\"%c\",\"pseq\":%%u,";
422 idntHdr0 = strdup(hBuff);
424 hdr =
"\"stod\":%u,\"sid\":%s,%s}";
427 idntHdr1 = strdup(hBuff);
432 hdr =
"{\"code\":\"%c\",\"pseq\":$12,\"stod\":%u,\"sid\":%s%s%s,"
433 "\"gs\":{\"type\":\"%c\",\"tbeg\":$123456789,\"tend\":$123456789}}\n";
442 hInfo.pseq = index(buff,
'$');
443 hInfo.tbeg = index(hInfo.pseq+1,
'$');
444 hInfo.tend = index(hInfo.tbeg+1,
'$');
463 if (!idntHdr0 || !udpDest)
return;
468 if (pSeqID >= 999) pSeqID = 0;
475 iov[0].iov_base = buff;
476 iov[0].iov_len = snprintf(buff,
sizeof(buff), idntHdr0, psq);
477 iov[1].iov_base = idntHdr1;
478 iov[1].iov_len = idntHsz1;
492 || !data || data[dlen-1])
return false;
498 memcpy(udpBNext, data, dlen-1);
499 udpBNext[dlen-1] =
'\n';
504 if (udpBNext == udpBFirst) tBeg = tEnd;
521 if (!rsvbytes)
return false;
537 if (dlen > rsvbytes || dlen < 8 || *(udpBNext+dlen-1))
545 if (udpBNext == udpBFirst) tBeg = tEnd;
547 *(udpBNext-1) =
'\n';
591 afTime = (afsec > 0 ? afsec : 0);
609 return udpBEnd - udpBNext;
const long long XROOTD_MON_PIDSHFT
const kXR_char XROOTD_MON_MAPGSTA
const kXR_char XROOTD_MON_MAPINFO
const kXR_char XROOTD_MON_MAPIDNT
const kXR_char XROOTD_MON_MAPPATH
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
void Schedule(XrdJob *jp)
uint32_t GetDictID(const char *text, bool isPath=false)
static const int hdrNone
Format as JSON info.
kXR_char Type
the specific G-Stream identifier
const char * dest
Destination for records.
static const int hdrInst
Include site, host, port, inst.
int SetAutoFlush(int afsec)
static const int fmtBin
Do not include info.
XrdXrootdGSReal(const GSParms &gsParms, bool &aOK)
static const int fmtJson
Format as CGI info.
int maxL
Maximum packet length (default 32K)
static const int hdrSite
Include site.
int flsT
Flush time (default from monitor)
const char * pin
the plugin name.
static const int hdrHost
Include site, host.
char Fmt
How to handle the records.
static const int optNoID
Don't send ident records.
bool Insert(const char *data, int dlen)
int Mode
the monitor type for send routing.
static const int hdrNorm
Include standard header.
static const int fmtCgi
Format as binary info.
static const int hdrFull
Include site, host, port, inst, pgm.
static const int MaxDataLen
The larest amount of data that can be inserted in a single call to GStream.
kXR_unt32 MapInfo(const char *Info)
kXR_unt32 MapPath(const char *Path)
void Register(const char *Uname, const char *Hname, const char *Pname, unsigned int xSID=0)
static int Send(int mmode, void *buff, int size, bool setseq=true)
static kXR_unt32 GetDictID(bool hbo=false)