XRootD
XrdXrootdGSReal.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d X r o o t d G S R e a l . h h */
4 /* */
5 /* (c) 2019 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstdlib>
33 #include <cstring>
34 #include <sys/uio.h>
35 
36 #include "Xrd/XrdScheduler.hh"
37 #include "XrdNet/XrdNetMsg.hh"
38 #include "XrdSys/XrdSysPlatform.hh"
40 
41 /******************************************************************************/
42 /* G l o b a l s */
43 /******************************************************************************/
44 
46 {
47 extern XrdScheduler *Sched;
48 extern XrdSysError *eDest;
49 extern char *monHost;
50 extern char *kySID;
51 extern long long mySID;
52 extern int startTime;
53 
54 extern char *SidCGI[4];
55 extern char *SidJSON[4];
56 }
57 
58 using namespace XrdXrootdMonInfo;
59 
60 /******************************************************************************/
61 /* C o n s t r u c t o r */
62 /******************************************************************************/
63 
65  bool &aOK)
66  : XrdJob("GStream"), XrdXrootdGStream(*this),
67  Hello(gsParms.Opt & XrdXrootdGSReal::optNoID
68  || gsParms.Hdr == XrdXrootdGSReal::hdrNone
69  ? 0 : gsParms.dest, gsParms.Fmt),
70  pSeq(0), pSeqID(0), pSeqDID(0), binHdr(0),
71  isCGI(false)
72 {
73  static const int minSZ = 1024;
74  static const int dflSZ = 1024*32;
75  static const int maxSZ = 1024*64;
76  int flsT, maxL, hdrLen;
77 
78 // Do common initialization
79 //
80  memset(&hInfo, 0, sizeof(hInfo));
81  aOK = true;
82 
83 // Compute the correct size of the UDP buffer
84 //
85  if (gsParms.maxL <= 0) maxL = dflSZ;
86  else if (gsParms.maxL < minSZ) maxL = minSZ;
87  else if (gsParms.maxL > maxSZ) maxL = maxSZ;
88  else maxL = gsParms.maxL;
89  maxL &= ~7; // Doubleword lengths
90 
91 // Allocate the UDP buffer. Try to keep the data within a single page.
92 //
93  int align;
94  if (maxL >= getpagesize()) align = getpagesize();
95  else if (maxL >= 2048) align = 2048;
96  else if (maxL >= 1024) align = 1024;
97  else align = sizeof(void*);
98 
99  if (posix_memalign((void **)&udpBuffer, align, maxL)) {aOK = false; return;}
100 
101 // Setup the header as needed
102 //
103  if (gsParms.Hdr == hdrNone)
104  {hdrLen = 0;
105  binHdr = 0;
106  dictHdr = idntHdr0 = idntHdr1 = 0;
107  } else {
108  switch(gsParms.Fmt)
109  {case fmtBin: hdrLen = hdrBIN(gsParms);
110  break;
111  case fmtCgi: hdrLen = hdrCGI(gsParms, udpBuffer, maxL);
112  break;
113  case fmtJson: hdrLen = hdrJSN(gsParms, udpBuffer, maxL);
114  break;
115  default: hdrLen = 0;
116  }
117  if (gsParms.Opt & optNoID)
118  {if (idntHdr0) {free(idntHdr0); idntHdr0 = 0;}
119  if (idntHdr1) {free(idntHdr1); idntHdr1 = 0;}
120  }
121  }
122 
123 // Setup buffer pointers
124 //
125  udpBFirst = udpBNext = udpBuffer + hdrLen;
126  udpBEnd = udpBuffer + maxL - 1;
127 
128  tBeg = tEnd = afTime = 0;
129 
130 // Initialize remaining variables
131 //
132  monType = gsParms.Mode;
133  rsvbytes = 0;
134 
135 // If we have a specific end-point, then create a network relay to it
136 //
137  if (gsParms.dest) udpDest = new XrdNetMsg(eDest, gsParms.dest, &aOK);
138  else udpDest = 0;
139 
140 // Setup autoflush (a negative value uses the default)
141 //
142  if (gsParms.flsT < 0) flsT = XrdXrootdMonitor::Flushing();
143  else flsT = gsParms.flsT;
144  afRunning = false;
145  SetAutoFlush(flsT);
146 
147 // Construct our user name as in <gNamePI>.0:0@<myhost>
148 //
149  char idBuff[1024];
150  snprintf(idBuff, sizeof(idBuff), "%s.0:0@%s", gsParms.pin, monHost);
151 
152 // Register ourselves
153 //
154  gMon.Register(idBuff, monHost, "xroot");
155 }
156 
157 /******************************************************************************/
158 /* A u t o F l u s h */
159 /******************************************************************************/
160 
161 void XrdXrootdGSReal::AutoFlush() // gMutex is locked outside constructor
162 {
163  if (afTime && !afRunning)
164  {Sched->Schedule((XrdJob *)this, time(0)+afTime);
165  afRunning = true;
166  }
167 }
168 
169 /******************************************************************************/
170 /* D o I t */
171 /******************************************************************************/
172 
174 {
175  XrdSysMutexHelper gHelp(gMutex);
176 
177 // Check if we need to do anything here
178 //
179  afRunning = false;
180  if (afTime)
181  {if (tBeg && time(0)-tBeg >= afTime) Expel(0);
182  AutoFlush();
183  }
184 }
185 
186 /******************************************************************************/
187 /* Private: E x p e l */
188 /******************************************************************************/
189 
190 void XrdXrootdGSReal::Expel(int dlen) // gMutex is held
191 {
192 
193 // Check if we need to flush this buffer.
194 //
195  if (udpBFirst == udpBNext || (dlen && (udpBNext + dlen) < udpBEnd)) return;
196  int size = udpBNext-udpBuffer;
197 
198 // Complete the buffer header if may be binary of text
199 //
200  if (binHdr)
201  {binHdr->hdr.pseq++;
202  binHdr->hdr.plen = htons(static_cast<uint16_t>(size));
203  binHdr->tBeg = htonl(tBeg);
204  binHdr->tEnd = htonl(tEnd);
205  } else {
206  if (hInfo.pseq)
207  {char tBuff[32];
208  if (pSeq >= 999) pSeq = 0;
209  else pSeq++;
210  snprintf(tBuff, sizeof(tBuff), "%3d%10u%10u", pSeq,
211  (unsigned int)tBeg, (unsigned int)tEnd);
212  if (isCGI)
213  {char *plus, *bP = tBuff;
214  while((plus = index(bP, ' '))) {*plus = '+'; bP = plus+1;}
215  }
216  memcpy(hInfo.pseq, tBuff, 3);
217  memcpy(hInfo.tbeg, tBuff+ 3, 10);
218  memcpy(hInfo.tend, tBuff+13, 10);
219  }
220  }
221 
222 // Make sure the whole thing ends with a null byte
223 //
224  *(udpBNext-1) = 0;
225 
226 // Send off the packet
227 //
228  if (udpDest) udpDest->Send(udpBuffer, size);
229  else XrdXrootdMonitor::Send(monType, udpBuffer, size, false);
230 
231 // Reset the buffer
232 //
233  udpBNext = udpBFirst;
234  tBeg = tEnd = 0;
235 }
236 
237 /******************************************************************************/
238 /* F l u s h */
239 /******************************************************************************/
240 
242 {
243  XrdSysMutexHelper gHelp(gMutex);
244  Expel(0);
245 }
246 
247 /******************************************************************************/
248 /* G e t D i c t I D */
249 /******************************************************************************/
250 
251 uint32_t XrdXrootdGSReal::GetDictID(const char *text, bool isPath)
252 {
253 // If this is binary encoded, the record the mapping and return it
254 //
255  if (binHdr) return (isPath ? gMon.MapPath(text) : gMon.MapInfo(text));
256 
257 // If there are no headers then we can't produce this record
258 //
259  uint32_t psq, did = XrdXrootdMonitor::GetDictID(true);
260  if (!dictHdr) return htonl(did);
261 
262 // We need to do some additional work to generate non-binary headers here
263 //
264  struct iovec iov[3];
265  char dit = (isPath ? XROOTD_MON_MAPPATH : XROOTD_MON_MAPINFO);
266  char buff[1024];
267 
268 // Generate a new packet sequence number
269 //
270  gMutex.Lock();
271  if (pSeqDID >= 999) pSeqDID = 0;
272  else pSeqDID++;
273  psq = pSeqDID;
274  gMutex.UnLock();
275 
276 // Generate the packet
277 //
278  iov[0].iov_base = buff;
279  iov[0].iov_len = snprintf(buff, sizeof(buff), dictHdr, dit, psq, did);
280  iov[1].iov_base = (void *)text;
281  iov[1].iov_len = strlen(text);
282  iov[2].iov_base = (void *)"\"}";
283  iov[2].iov_len = 3;
284 
285 // Now send it off
286 //
287  udpDest->Send(iov, (*dictHdr == '{' ? 3 : 2));
288  return htonl(did);
289 }
290 
291 /******************************************************************************/
292 /* H a s H d r */
293 /******************************************************************************/
294 
296 {
297  return binHdr != 0 || dictHdr != 0;
298 }
299 
300 /******************************************************************************/
301 /* Private: h d r B I N */
302 /******************************************************************************/
303 
304 int XrdXrootdGSReal::hdrBIN(const XrdXrootdGSReal::GSParms &gs)
305 {
306 
307 // Initialze the udp heaader in the buffer
308 //
309  binHdr = (XrdXrootdMonGS*)udpBuffer;
310  memset(binHdr, 0, sizeof(XrdXrootdMonGS));
311  binHdr->hdr.code = XROOTD_MON_MAPGSTA;
312  binHdr->hdr.stod = startTime;
313 
314  long long theSID = ntohll(mySID) & 0x00ffffffffffffff;
315  theSID = theSID | (static_cast<long long>(gs.Type) << XROOTD_MON_PIDSHFT);
316  binHdr->sID = htonll(theSID);
317 
318  return (int)sizeof(XrdXrootdMonGS);
319 }
320 
321 /******************************************************************************/
322 /* Private: h d r C G I */
323 /******************************************************************************/
324 
325 int XrdXrootdGSReal::hdrCGI(const XrdXrootdGSReal::GSParms &gs,
326  char *buff, int blen)
327 {
328  const char *hdr, *plug = "\n";
329  char hBuff[2048];
330  int n;
331 
332 // Pick any needed extensions to this header
333 //
334  switch(gs.Hdr)
335  {case hdrSite: plug = SidCGI[0]; break;
336  case hdrHost: plug = SidCGI[1]; break;
337  case hdrInst: plug = SidCGI[2]; break;
338  case hdrFull: plug = SidCGI[3]; break;
339  default: break;
340  }
341 
342 // Generate the header to use for 'd' or 'i' packets
343 //
344  hdr = "code=%%c&pseq=%%u&stod=%u&sid=%s%s&gs.type=%c&did=%%u&data=";
345 
346  snprintf(hBuff, sizeof(hBuff), hdr, ntohl(startTime), kySID, plug, gs.Type);
347  dictHdr = strdup(hBuff);
348 
349 // Generate the headers to use for '=' packets. These have a changeable part
350 // and a non-changeable part.
351 //
352  hdr = "code=%c&pseq=%%u";
353 
354  snprintf(hBuff, sizeof(hBuff), hdr, XROOTD_MON_MAPIDNT);
355  idntHdr0 = strdup(hBuff);
356 
357  hdr = "&stod=%u&sid=%s%s";
358 
359  n = snprintf(hBuff, sizeof(hBuff), hdr, ntohl(startTime), kySID, SidCGI[3]);
360  idntHdr1 = strdup(hBuff);
361  idntHsz1 = n+1;
362 
363 // Format the header
364 //
365  hdr = "code=%c&pseq=$12&stod=%u&sid=%s%s&gs.type=%c"
366  "&gs.tbeg=$123456789&gs.tend=$123456789%s\n";
367 
368  n = snprintf(buff, blen, hdr, XROOTD_MON_MAPGSTA, ntohl(startTime),
369  kySID, plug, gs.Type);
370 
371 // Return all of the substitution addresses
372 //
373  hInfo.pseq = index(buff, '$');
374  hInfo.tbeg = index(hInfo.pseq+1, '$');
375  hInfo.tend = index(hInfo.tbeg+1, '$');
376 
377 // Return the length
378 //
379  isCGI = true;
380  return n;
381 }
382 
383 /******************************************************************************/
384 /* Private: h d r J S N */
385 /******************************************************************************/
386 
387 int XrdXrootdGSReal::hdrJSN(const XrdXrootdGSReal::GSParms &gs,
388  char *buff, int blen)
389 {
390  const char *hdr, *plug1 = "", *plug2 = "";
391  char hBuff[2048];
392  int n;
393 
394 // Add any needed extensions to this header
395 //
396  if (gs.Hdr != hdrNorm)
397  {plug1 = ",";
398  switch(gs.Hdr)
399  {case hdrSite: plug2 = SidJSON[0]; break;
400  case hdrHost: plug2 = SidJSON[1]; break;
401  case hdrInst: plug2 = SidJSON[2]; break;
402  case hdrFull: plug2 = SidJSON[3]; break;
403  default: plug1 = ""; break;
404  }
405  }
406 
407 // Generate the header to use for 'd' or 'i' packets
408 //
409  hdr = "{\"code\":\"%%c\",\"pseq\":%%u,\"stod\":%u,\"sid\":%s%s%s,"
410  "\"gs\":{\"type\":\"%c\"},\"did\":%%u,\"data\":\"";
411 
412  snprintf(hBuff, sizeof(hBuff), hdr, ntohl(startTime), kySID,
413  plug1, plug2, gs.Type);
414  dictHdr = strdup(hBuff);
415 
416 // Generate the headers to use for '=' packets. These have a changeable part
417 // and a non-changeable part.
418 //
419  hdr = "{\"code\":\"%c\",\"pseq\":%%u,";
420 
421  snprintf(hBuff, sizeof(hBuff), hdr, XROOTD_MON_MAPIDNT);
422  idntHdr0 = strdup(hBuff);
423 
424  hdr = "\"stod\":%u,\"sid\":%s,%s}";
425 
426  n = snprintf(hBuff, sizeof(hBuff), hdr, ntohl(startTime), kySID, SidJSON[3]);
427  idntHdr1 = strdup(hBuff);
428  idntHsz1 = n+1;
429 
430 // Generate the header of plugin output
431 //
432  hdr = "{\"code\":\"%c\",\"pseq\":$12,\"stod\":%u,\"sid\":%s%s%s,"
433  "\"gs\":{\"type\":\"%c\",\"tbeg\":$123456789,\"tend\":$123456789}}\n";
434 
435 // Format the header (we are gauranteed to have at least 1024 bytes here)
436 //
437  n = snprintf(buff, blen, hdr, XROOTD_MON_MAPGSTA, ntohl(startTime),
438  kySID, plug1, plug2, gs.Type);
439 
440 // Return all of the substitution addresses
441 //
442  hInfo.pseq = index(buff, '$');
443  hInfo.tbeg = index(hInfo.pseq+1, '$');
444  hInfo.tend = index(hInfo.tbeg+1, '$');
445 
446 // Return the length
447 //
448  return n;
449 }
450 
451 /******************************************************************************/
452 /* I d e n t */
453 /******************************************************************************/
454 
456 {
457  struct iovec iov[2];
458  char buff[40];
459  uint32_t psq;
460 
461 // If identification suppressed, then just return
462 //
463  if (!idntHdr0 || !udpDest) return;
464 
465 // Generate a new packet sequence number
466 //
467  gMutex.Lock();
468  if (pSeqID >= 999) pSeqID = 0;
469  else pSeqID++;
470  psq = pSeqID;
471  gMutex.UnLock();
472 
473 // Create header and iovec to send the header
474 //
475  iov[0].iov_base = buff;
476  iov[0].iov_len = snprintf(buff, sizeof(buff), idntHdr0, psq);
477  iov[1].iov_base = idntHdr1;
478  iov[1].iov_len = idntHsz1;
479  udpDest->Send(iov, 2);
480 }
481 
482 /******************************************************************************/
483 /* I n s e r t */
484 /******************************************************************************/
485 
486 bool XrdXrootdGSReal::Insert(const char *data, int dlen)
487 {
488 
489 // Validate the length and message
490 //
491  if (dlen < 8 || dlen > XrdXrootdGStream::MaxDataLen
492  || !data || data[dlen-1]) return false;
493 
494 // Reserve the storage and copy the message. It always will end with a newline
495 //
496  gMutex.Lock();
497  Expel(dlen);
498  memcpy(udpBNext, data, dlen-1);
499  udpBNext[dlen-1] = '\n';
500 
501 // Timestamp the record and aAdjust buffer pointers
502 //
503  tEnd = time(0);
504  if (udpBNext == udpBFirst) tBeg = tEnd;
505  udpBNext += dlen;
506 
507 // All done
508 //
509  gMutex.UnLock();
510  return true;
511 }
512 
513 /******************************************************************************/
514 
516 {
517  XrdSysMutexHelper gHelp(gMutex);
518 
519 // Make sure space is reserved
520 //
521  if (!rsvbytes) return false;
522 
523 // We are now sure that the recursive lock is held twice by this thread. So,
524 // make it a unitary lock so it gets fully unlocked upon rturn.
525 //
526  gMutex.UnLock();
527 
528 // Check for cancellation
529 //
530  if (!dlen)
531  {rsvbytes = 0;
532  return true;
533  }
534 
535 // Length, it must >= 8 and <= reserved amount and the data must end with a 0.
536 //
537  if (dlen > rsvbytes || dlen < 8 || *(udpBNext+dlen-1))
538  {rsvbytes = 0;
539  return false;
540  }
541 
542 // Adjust the buffer space and time stamp the record
543 //
544  tEnd = time(0);
545  if (udpBNext == udpBFirst) tBeg = tEnd;
546  udpBNext += dlen;
547  *(udpBNext-1) = '\n';
548  rsvbytes = 0;
549 
550 // All done
551 
552  return true;
553 }
554 
555 /******************************************************************************/
556 /* R e s e r v e */
557 /******************************************************************************/
558 
560 {
561 // Validate the length
562 //
563  if (dlen < 8 || dlen > XrdXrootdGStream::MaxDataLen) return 0;
564 
565 // Make sure there is no reserve outstanding
566 //
567  gMutex.Lock();
568  if (rsvbytes)
569  {gMutex.UnLock();
570  return 0;
571  }
572 
573 // Return the allocated the space but keep the lock until Insert() is called.
574 //
575  rsvbytes = dlen;
576  Expel(dlen);
577  return udpBNext;
578 }
579 
580 /******************************************************************************/
581 /* S e t A u t o F l u s h */
582 /******************************************************************************/
583 
585 {
586  XrdSysMutexHelper gHelp(gMutex);
587 
588 // Save the current settting and establish the new one and relaunch
589 //
590  int afNow = afTime;
591  afTime = (afsec > 0 ? afsec : 0);
592  AutoFlush();
593 
594 // All done
595 //
596  return afNow;
597 }
598 
599 /******************************************************************************/
600 /* S p a c e */
601 /******************************************************************************/
602 
604 {
605  XrdSysMutexHelper gHelp(gMutex);
606 
607 // Return amount of space left
608 //
609  return udpBEnd - udpBNext;
610 }
const long long XROOTD_MON_PIDSHFT
const kXR_char XROOTD_MON_MAPGSTA
const kXR_char XROOTD_MON_MAPINFO
const kXR_char XROOTD_MON_MAPIDNT
const kXR_char XROOTD_MON_MAPPATH
XrdXrootdMonHeader hdr
Definition: XrdJob.hh:43
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
Definition: XrdNetMsg.cc:70
void Schedule(XrdJob *jp)
uint32_t GetDictID(const char *text, bool isPath=false)
char * Reserve(int dlen)
static const int hdrNone
Format as JSON info.
kXR_char Type
the specific G-Stream identifier
const char * dest
Destination for records.
static const int hdrInst
Include site, host, port, inst.
int SetAutoFlush(int afsec)
static const int fmtBin
Do not include info.
XrdXrootdGSReal(const GSParms &gsParms, bool &aOK)
static const int fmtJson
Format as CGI info.
int maxL
Maximum packet length (default 32K)
static const int hdrSite
Include site.
int flsT
Flush time (default from monitor)
const char * pin
the plugin name.
static const int hdrHost
Include site, host.
char Fmt
How to handle the records.
static const int optNoID
Don't send ident records.
bool Insert(const char *data, int dlen)
int Mode
the monitor type for send routing.
static const int hdrNorm
Include standard header.
static const int fmtCgi
Format as binary info.
static const int hdrFull
Include site, host, port, inst, pgm.
static const int MaxDataLen
The larest amount of data that can be inserted in a single call to GStream.
kXR_unt32 MapInfo(const char *Info)
kXR_unt32 MapPath(const char *Path)
void Register(const char *Uname, const char *Hname, const char *Pname, unsigned int xSID=0)
static int Flushing()
static int Send(int mmode, void *buff, int size, bool setseq=true)
static kXR_unt32 GetDictID(bool hbo=false)
XrdScheduler * Sched
XrdSysError * eDest