36 #include <sys/types.h>
38 #include "XrdVersion.hh"
55 char *XrdXrootdMonitor::idRec = 0;
56 int XrdXrootdMonitor::idLen = 0;
57 char *XrdXrootdMonitor::Dest1 = 0;
58 int XrdXrootdMonitor::monMode1 = 0;
59 XrdNetMsg *XrdXrootdMonitor::InetDest1 = 0;
60 char *XrdXrootdMonitor::Dest2 = 0;
61 int XrdXrootdMonitor::monMode2 = 0;
62 XrdNetMsg *XrdXrootdMonitor::InetDest2 = 0;
65 int XrdXrootdMonitor::monRlen = 0;
66 XrdXrootdMonitor::MonRdrBuff
68 XrdXrootdMonitor::MonRdrBuff
69 *XrdXrootdMonitor::rdrMP = 0;
71 int XrdXrootdMonitor::monBlen = 0;
72 int XrdXrootdMonitor::lastEnt = 0;
73 int XrdXrootdMonitor::lastRnt = 0;
74 int XrdXrootdMonitor::isEnabled = 0;
75 int XrdXrootdMonitor::numMonitor = 0;
76 int XrdXrootdMonitor::autoFlash = 0;
77 int XrdXrootdMonitor::autoFlush = 600;
78 int XrdXrootdMonitor::FlushTime = 0;
79 int XrdXrootdMonitor::monIdent = 3600;
80 kXR_int32 XrdXrootdMonitor::currWindow = 0;
81 int XrdXrootdMonitor::rdrTOD = 0;
82 int XrdXrootdMonitor::rdrWin = 0;
83 int XrdXrootdMonitor::rdrNum = 3;
84 kXR_int32 XrdXrootdMonitor::sizeWindow = 60;
85 char XrdXrootdMonitor::monINFO = 0;
86 char XrdXrootdMonitor::monIO = 0;
87 char XrdXrootdMonitor::monFILE = 0;
88 char XrdXrootdMonitor::monREDR = 0;
89 char XrdXrootdMonitor::monUSER = 0;
90 char XrdXrootdMonitor::monAUTH = 0;
91 char XrdXrootdMonitor::monACTIVE = 0;
92 char XrdXrootdMonitor::monFSTAT = 0;
93 char XrdXrootdMonitor::monCLOCK = 0;
106 return htonl( time( 0 ) );
130 #define setTMark(TM_mb, TM_en, TM_tm) \
131 TM_mb->info[TM_en].arg0.val = mySID; \
132 TM_mb->info[TM_en].arg0.id[0] = XROOTD_MON_WINDOW; \
133 TM_mb->info[TM_en].arg1.Window = \
134 TM_mb->info[TM_en].arg2.Window = static_cast<kXR_int32>(ntohl(TM_tm));
136 #define setTMurk(TM_mb, TM_en, TM_tm) \
137 TM_mb->info[TM_en].arg0.Window = rdrWin; \
138 TM_mb->info[TM_en].arg1.Window = static_cast<kXR_int32>(TM_tm);
156 if ((doIdnt || doHail) && idInt > 0)
161 idInt(idt), doIdnt(ison), doHail(true) {}
180 const char *TraceID =
"MonTick";
185 else {
TRACE(
DEBUG,
"Monitor clock stopping.");}
191 Sched(0), Window(0) {}
207 static void Lock() {monLock.Lock();}
213 else {unLock = 1; monLock.Lock();}
230 : Next(0), theDest(0), theMode(0)
234 while(nP) {
if (!strcmp(dest, nP->theDest) && mode == theMode)
return;
239 theDest = strdup(dest);
256 while(nP) {nP->
Ident(); nP = nP->Next;}
270 {XrdXrootdMonitor::unAlloc(Agent); Agent = 0;}
280 if (Agent || (Agent = XrdXrootdMonitor::Alloc(1)))
281 {Iops = XrdXrootdMonitor::monIO;
282 Fops = XrdXrootdMonitor::monFILE;
283 }
else Iops = Fops = 0;
292 const char *Pname,
unsigned int xSID)
295 const char *TraceID =
"Monitor";
297 char *dotP, *colonP, *atP;
298 char uBuff[1024], tBuff[1024], sBuff[64];
303 snprintf(tBuff,
sizeof(tBuff),
"%s", Uname);
304 if ((dotP = index(tBuff,
'.')) && (colonP = index(dotP+1,
':')) &&
305 (atP = index(colonP+1,
'@')))
306 {*dotP = 0; *colonP = 0; *atP = 0;
308 {snprintf(sBuff,
sizeof(sBuff),
" %u", xSID);
312 int n = snprintf(uBuff,
sizeof(uBuff),
"%s/%s.%s:%s@%s", Pname, tBuff,
313 dotP+1,
kySID, atP+1);
315 if (n < 0 || n >= (
int)
sizeof(uBuff))
316 TRACE(LOGIN,
"Login ID was truncated: " << uBuff);
318 if (xSID) {
TRACE(LOGIN,
"Register remap "<<Uname<<
" -> "<<uBuff);}
319 }
else snprintf(uBuff,
sizeof(uBuff),
"%s/%s", Pname, Uname);
324 Agent = XrdXrootdMonitor::Alloc();
327 Name = strdup(uBuff);
328 Iops = XrdXrootdMonitor::monIO;
329 Fops = XrdXrootdMonitor::monFILE;
340 snprintf(buff,
sizeof(buff),
"&Uc=%d&Ec=%d&Ac=%d", ntohl(Did), eCode, aCode);
353 if (infoT != TokenInfo)
return false;
355 snprintf(buff,
sizeof(buff),
"&Uc=%d%s%s", ntohl(Did),
356 (*info ==
'&' ?
"" :
"&"), info);
373 localWindow = currWindow;
377 if (posix_memalign((
void **)&monBuff, getpagesize(), monBlen))
378 eDest->
Emsg(
"Monitor",
"Unable to allocate monitor buffer.");
388 XrdXrootdMonitor::~XrdXrootdMonitor()
391 if (monBuff) {Flush(); free(monBuff);}
404 if (
this ==
altMon || !*
id)
return;
408 if (lastWindow != currWindow) Mark();
409 else if (nextEnt == lastEnt) Flush();
411 strncpy((
char *)(&(monBuff->
info[nextEnt])+4),
id, apInfoSize);
427 if (!isEnabled) mp = 0;
428 else if (!monIO) mp =
altMon;
430 if (!(mp->monBuff)) {
delete mp; mp = 0;}
434 if (mp && isEnabled < 0)
436 lastVal = numMonitor; numMonitor++;
437 if (!lastVal && !monREDR) startClock();
453 unsigned int rVal, wVal;
457 if (lastWindow != currWindow) Mark();
458 else if (nextEnt == lastEnt) Flush();
460 monBuff->
info[nextEnt].
arg0.id[1] = do_Shift(rTot, rVal);
461 monBuff->
info[nextEnt].
arg0.rTot[1] = htonl(rVal);
462 monBuff->
info[nextEnt].
arg0.id[2] = do_Shift(wTot, wVal);
463 monBuff->
info[nextEnt].
arg0.id[3] = 0;
464 monBuff->
info[nextEnt].
arg1.wTot = htonl(wVal);
465 monBuff->
info[nextEnt++].
arg2.dictid = dictid;
485 if (!dest1 && !dest2) {isEnabled = 0;
return;}
486 if (!dest1) {dest1 = dest2; dest2 = 0; mode1 |= mode2; mode2 = 0;}
490 if (Dest1) free(Dest1);
491 Dest1 = dest1; monMode1 = mode1;
492 if (Dest2) free(Dest2);
493 Dest2 = dest2; monMode2 = mode2;
497 mmode = mode1 | mode2;
498 monACTIVE = (mmode ? 1 : 0);
511 if (monREDR || (isEnabled > 0 && (monIO || monFILE))) monCLOCK = 1;
531 int flush,
int flash,
int idt,
int rnm,
532 int fbsz,
int fsint,
int fsopt,
int fsion)
537 sizeWindow = (wsz <= 0 ? 60 : wsz);
538 autoFlush = (flush <= 0 ? 600 : flush);
539 autoFlash = (flash <= 0 ? 0 : flash);
541 rdrNum = (rnm <= 0 || rnm >
rdrMax ? 3 : rnm);
542 rdrWin = (sizeWindow > 16777215 ? 16777215 : sizeWindow);
543 rdrWin = htonl(rdrWin);
548 monFSTAT = fsint != 0;
552 if (msz <= 0) msz = 16384;
553 else if (msz < 1024) msz = 1024;
561 if (rsz <= 0) rsz = 32768;
562 else if (rsz < 2048) rsz = 2048;
583 if (lastWindow != currWindow) Mark();
584 else if (nextEnt == lastEnt) Flush();
585 monBuff->
info[nextEnt].
arg0.rTot[0] = 0;
587 monBuff->
info[nextEnt].
arg0.id[1] = Flags;
588 monBuff->
info[nextEnt].
arg1.wTot = htonl(csec);
589 monBuff->
info[nextEnt++].
arg2.dictid = dictid;
607 if (lastWindow != currWindow) Mark();
608 else if (nextEnt == lastEnt) Flush();
617 XrdXrootdMonitor::MonRdrBuff *XrdXrootdMonitor::Fetch()
624 if ((bP = rdrMP)) rdrMP = rdrMP->Next;
634 const char *iHost,
const char *iProg,
635 const char *iName,
int Port)
637 const char *cgID0 =
"&site=%s";
638 const char *cgID1 =
"&host=%s";
639 const char *cgID2 =
"&port=%d&inst=%s";
640 const char *cgID3 =
"&pgm=%s&ver=%s";
642 const char *jsID0 =
"\"src\":{\"site\":\"%s\"}";
643 const char *jsID1 =
"%s\"host\":\"%s\"}";
644 const char *jsID2 =
"%s\"port\":%d,\"inst\":\"%s\"}";
645 const char *jsID3 =
"%s\"pgm\":\"%s\",\"ver\":\"%s\"}";
648 char iBuff[1024], iMuff[2048], iPuff[1024];
660 iHost, iProg, iName, Port);
662 snprintf(iBuff+n,
sizeof(iBuff)-n,
"&ver=%s", XrdVERSION);
669 #if defined(__GNUC__) && __GNUC__ >= 12
670 #pragma GCC diagnostic push
671 #pragma GCC diagnostic ignored "-Warray-bounds"
676 idRec = (
char *)malloc(idLen+1);
681 strcpy(mP->
info, iBuff);
682 #if defined(__GNUC__) && __GNUC__ >= 12
683 #pragma GCC diagnostic pop
688 const char *Site (getenv(
"XRDSITE") ? getenv(
"XRDSITE") :
"");
689 i = snprintf(iPuff,
sizeof(iPuff), cgID0, Site);
690 SidCGI[0] = strdup(iPuff);
691 LidCGI[0] = strlen(iPuff);
693 n =
sizeof(iPuff)-i; j = i;
694 i = snprintf(iPuff+j, n, cgID1, iHost);
695 SidCGI[1] = strdup(iPuff);
696 LidCGI[1] = strlen(iPuff);
699 i = snprintf(iPuff+j, n, cgID2, Port, iName);
700 SidCGI[2] = strdup(iPuff);
701 LidCGI[2] = strlen(iPuff);
704 snprintf(iPuff+j, n, cgID3, iProg, XrdVERSION);
705 SidCGI[3] = strdup(iPuff);
706 LidCGI[3] = strlen(iPuff);
710 n = snprintf(iPuff,
sizeof(iPuff), jsID0, Site);
714 strcpy(iPuff+n-1,
",");
715 n = snprintf(iMuff,
sizeof(iMuff), jsID1, iPuff, iHost);
719 strcpy(iMuff+n-1,
",");
720 n = snprintf(iPuff,
sizeof(iPuff), jsID2, iMuff, Port, iName);
724 strcpy(iPuff+n-1,
",");
725 snprintf(iMuff,
sizeof(iMuff), jsID3, iPuff, iProg, XrdVERSION);
735 int i, Now = time(0);
743 {
eDest->
Emsg(
"Monitor",
"Unable to setup primary monitor collector.");
753 {
eDest->
Emsg(
"Monitor",
"Unable to setup secondary monitor collector.");
764 if (!isEnabled)
return 1;
773 eDest->
Emsg(
"Monitor",
"allocate monitor; insufficient storage.");
779 if (monCLOCK) startClock();
783 if (!
Sched || !monFSTAT) monFSTAT = 0;
788 if (!monREDR)
return 1;
792 for (i = 0; i < rdrNum; i++)
793 {
if (posix_memalign((
void **)&rdrMon[i].Buff, getpagesize(),monRlen))
794 {
eDest->
Emsg(
"Monitor",
"Unable to allocate monitor rdr buffer.");
797 rdrMon[i].Buff->sID =
mySID;
799 rdrMon[i].Next = (i ? &rdrMon[i-1] : &rdrMon[0]);
800 rdrMon[i].nextEnt = 0;
801 rdrMon[i].flushIt = Now + autoFlush;
802 rdrMon[i].lastTOD = 0;
804 rdrMon[0].Next = &rdrMon[i-1];
819 static unsigned int monSeqID = 1;
820 unsigned int mySeqID;
825 mySeqID = monSeqID++;
830 if (hbo)
return mySeqID;
831 return htonl(mySeqID);
850 {*(map.
info+size) =
'\n';
852 size = size + strlen(path) + 1;
858 fillHeader(&map.
hdr, code, size);
867 Send(montype, (
void *)&map, size);
882 if (lastWindow != currWindow) Mark();
883 else if (nextEnt == lastEnt) Flush();
884 h2nll(fsize, monBuff->
info[nextEnt].
arg0.val);
886 monBuff->
info[nextEnt].
arg1.buflen = 0;
887 monBuff->
info[nextEnt++].
arg2.dictid = dictid;
899 char opC,
const char *
Path)
902 MonRdrBuff *mP = Fetch();
903 int n, slots, hLen, pLen;
908 if (*hName ==
'/') {
Path = hName; hName =
""; hLen = 0;}
909 else {
const char *quest = index(hName,
'?');
910 hLen = (quest ? quest - hName : strlen(hName));
911 if (hLen > 256) hLen = 256;
917 if (pLen > 1024) pLen = 1024;
921 n = (hLen + 1 + pLen + 1);
934 if (mP->nextEnt + slots + 2 >= lastRnt) Flush(mP);
938 if (mP->lastTOD != rdrTOD)
939 {mP->lastTOD = rdrTOD;
940 setTMurk(mP->Buff, mP->nextEnt, mP->lastTOD);
946 mtP = &(mP->Buff->info[mP->nextEnt]);
948 mtP->
arg0.rdr.Dent =
static_cast<char>(slots);
949 mtP->
arg0.rdr.Port = htons(
static_cast<short>(Port));
950 mtP->
arg1.dictid = mID;
951 dest = (
char *)(mtP+1);
952 strncpy(dest, hName,hLen); dest += hLen; *dest++ =
':';
953 strncpy(dest,
Path, pLen);
957 mP->nextEnt = mP->nextEnt + (slots+1);
969 time_t Now = time(0);
975 currWindow =
static_cast<kXR_int32>(Now);
976 rdrTOD = htonl(currWindow);
977 nextFlush = currWindow + autoFlush;
981 if (
altMon && currWindow >= FlushTime)
983 if (currWindow >= FlushTime)
985 else FlushTime = nextFlush;
995 {rdrMon[n].Mutex.Lock();
996 if (rdrMon[n].nextEnt == 0) rdrMon[n].flushIt = nextFlush;
997 else if (rdrMon[n].flushIt <= currWindow) Flush(&rdrMon[n]);
998 rdrMon[n].Mutex.UnLock();
1006 if (!monREDR && isEnabled < 0)
1007 {windowMutex.
Lock();
1008 if (!numMonitor) Now = 0;
1023 if (monp !=
altMon)
delete monp;
1028 {windowMutex.
Lock();
1041 unsigned char XrdXrootdMonitor::do_Shift(
long long xTot,
unsigned int &xVal)
1043 const long long smask = 0x7fffffff00000000LL;
1044 const long long xmask = 0x7fffffffffffffffLL;
1045 unsigned char xshift = 0;
1048 while(xTot & smask) {xTot = xTot >> 1LL; xshift++;}
1049 xVal =
static_cast<unsigned int>(xTot);
1059 const char id,
int size)
1066 hdr->
plen = htons(
static_cast<uint16_t
>(size));
1074 void XrdXrootdMonitor::Flush()
1081 if (nextEnt <= 1)
return;
1086 localWindow = currWindow;
1098 now = lastWindow + sizeWindow;
1105 FlushTime = localWindow + autoFlush;
1113 void XrdXrootdMonitor::Flush(XrdXrootdMonitor::MonRdrBuff *mP)
1120 mP->flushIt =
static_cast<int>(time(0)) + autoFlush;
1121 if (mP->nextEnt <= 1)
return;
1125 setTMurk(mP->Buff, mP->nextEnt, rdrTOD);
1143 void XrdXrootdMonitor::Mark()
1150 localWindow = currWindow;
1158 if (
this !=
altMon && autoFlash && nextEnt > 1)
1161 if (localWindow - bufStartWindow >= autoFlash)
1163 lastWindow = localWindow;
1174 monBuff->
info[nextEnt-1].
arg2.Window =
1175 static_cast<kXR_int32>(htonl(localWindow));
1177 else if (nextEnt+8 > lastEnt)
1185 monBuff->
info[nextEnt].
arg1.Window =
1186 static_cast<kXR_int32>(htonl(lastWindow + sizeWindow));
1187 monBuff->
info[nextEnt].
arg2.Window =
1188 static_cast<kXR_int32>(htonl(localWindow));
1191 lastWindow = localWindow;
1201 const char *TraceID =
"Monitor";
1204 static int seq1=0, seq2=0;
1214 if (monMode & monMode1 && InetDest1)
1215 {
if (mHdr) mHdr->
pseq = (seq1++) & 0xff;
1216 rc1 = InetDest1->
Send((
char *)buff, blen);
1217 TRACE(
DEBUG,blen <<
" bytes sent to " <<Dest1 <<
" rc=" <<rc1);
1220 if (monMode & monMode2 && InetDest2)
1221 {
if (mHdr) mHdr->
pseq = (seq2++) & 0xff;
1222 rc2 = InetDest2->
Send((
char *)buff, blen);
1223 TRACE(
DEBUG,blen <<
" bytes sent to " <<Dest2 <<
" rc=" <<rc2);
1228 return (rc1 ? rc1 : rc2);
1235 void XrdXrootdMonitor::startClock()
1243 currWindow =
static_cast<kXR_int32>(Now);
1244 rdrTOD = htonl(currWindow);
1246 FlushTime = autoFlush + currWindow;
static XrdSysError eDest(0,"crypto_")
union XrdXrootdMonRedir::@173 arg1
const kXR_char XROOTD_MON_DISC
const kXR_char XROOTD_MON_MAPUEAC
const kXR_char XROOTD_MON_WINDOW
union XrdXrootdMonTrace::@170 arg1
XrdXrootdMonTrace info[sizeof(XrdXrootdMonTrace)]
const kXR_char XROOTD_MON_MAPUSER
const kXR_char XROOTD_MON_APPID
union XrdXrootdMonTrace::@169 arg0
const kXR_char XROOTD_MON_REDSID
const kXR_char XROOTD_MON_MAPIDNT
const kXR_char XROOTD_MON_MAPTRCE
const kXR_char XROOTD_MON_CLOSE
union XrdXrootdMonTrace::@171 arg2
const kXR_char XROOTD_MON_MAPPATH
const kXR_char XROOTD_MON_OPEN
const kXR_char XROOTD_MON_REDIRECT
const kXR_char XROOTD_MON_MAPTOKN
union XrdXrootdMonRedir::@172 arg0
const kXR_char XROOTD_MON_MAPREDR
XrdSysTrace XrdXrootdTrace
#define setTMurk(TM_mb, TM_en, TM_tm)
#define setTMark(TM_mb, TM_en, TM_tm)
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
static int Export(const char *Var, const char *Val)
static char * Ident(long long &mySID, char *iBuff, int iBlen, const char *iHost, const char *iProg, const char *iName, int Port)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Defaults(int intv, int opts, int iocnt, int fbsz)
XrdXrootdMonitorLock(XrdXrootdMonitor *theMonitor)
Hello(const char *dest, char mode)
void Register(const char *Uname, const char *Hname, const char *Pname, unsigned int xSID=0)
void Report(const char *Info)
XrdXrootdMonitor_Ident(int idt, bool ison)
~XrdXrootdMonitor_Ident()
void Set(XrdScheduler *sp, int intvl)
static XrdXrootdMonitor * altMon
static void Defaults(char *dest1, int m1, char *dest2, int m2)
void Disc(kXR_unt32 dictid, int csec, char Flags=0)
void Close(kXR_unt32 dictid, long long rTot, long long wTot)
static int Send(int mmode, void *buff, int size, bool setseq=true)
void Open(kXR_unt32 dictid, off_t fsize)
static kXR_unt32 GetDictID(bool hbo=false)
static int32_t InitStartTime()