XRootD
XrdXrootdMonFile.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d X r o o t d M o n F i l e . c c */
4 /* */
5 /* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstring>
32 
33 #include "Xrd/XrdScheduler.hh"
34 
35 #include "XrdSys/XrdSysError.hh"
36 #include "XrdSys/XrdSysPlatform.hh"
37 
40 
41 /******************************************************************************/
42 /* G l o b a l s */
43 /******************************************************************************/
44 
45 namespace XrdXrootdMonInfo
46 {
49 extern long long mySID;
50 extern int32_t startTime;
51 }
52 
53 /******************************************************************************/
54 /* S t a t i c M e m b e r s */
55 /******************************************************************************/
56 
57 XrdSysMutex XrdXrootdMonFile::bfMutex;
58 XrdSysMutex XrdXrootdMonFile::fmMutex;
59 XrdXrootdMonFMap XrdXrootdMonFile::fmMap[XrdXrootdMonFMap::mapNum];
60 short XrdXrootdMonFile::fmUse[XrdXrootdMonFMap::mapNum] = {0};
61 char *XrdXrootdMonFile::repBuff = 0;
62 XrdXrootdMonHeader *XrdXrootdMonFile::repHdr = 0;
63 XrdXrootdMonFileTOD *XrdXrootdMonFile::repTOD = 0;
64 char *XrdXrootdMonFile::repNext = 0;
65 char *XrdXrootdMonFile::repFirst = 0;
66 char *XrdXrootdMonFile::repLast = 0;
67 int XrdXrootdMonFile::totRecs = 0;
68 int XrdXrootdMonFile::xfrRecs = 0;
69 int XrdXrootdMonFile::repSize = 0;
70 int XrdXrootdMonFile::repTime = 0;
71 int XrdXrootdMonFile::fmHWM =-1;
72 int XrdXrootdMonFile::crecSize = 0;
73 int XrdXrootdMonFile::xfrCnt = 0;
74 int XrdXrootdMonFile::fBsz = 65472;
75 int XrdXrootdMonFile::xfrRem = 0;
76 XrdXrootdMonFileXFR XrdXrootdMonFile::xfrRec;
77 short XrdXrootdMonFile::crecNLen = 0;
78 short XrdXrootdMonFile::trecNLen = 0;
79 char XrdXrootdMonFile::fsLFN = 0;
80 char XrdXrootdMonFile::fsLVL = 0;
81 char XrdXrootdMonFile::fsOPS = 0;
82 char XrdXrootdMonFile::fsSSQ = 0;
83 char XrdXrootdMonFile::fsXFR = 0;
84 char XrdXrootdMonFile::crecFlag = 0;
85 
86 /******************************************************************************/
87 /* C l o s e */
88 /******************************************************************************/
89 
91 {
93  char *cP;
94  int iEnt, iMap, iSlot;
95 
96 // If this object was registered for I/O reporting, deregister it.
97 //
98  if (fsP->MonEnt != -1)
99  {iEnt = fsP->MonEnt & 0xffff;
100  iMap = iEnt >> XrdXrootdMonFMap::fmShft;
101  iSlot = iEnt & XrdXrootdMonFMap::fmMask;
102  fsP->MonEnt = -1;
103  fmMutex.Lock();
104  if (fmMap[iMap].Free(iSlot)) fmUse[iMap]--;
105  if (iMap == fmHWM) while(fmHWM >= 0 && !fmUse[fmHWM]) fmHWM--;
106  fmMutex.UnLock();
107  }
108 
109 // Insert a close record header (mostly precomputed)
110 //
112  cRec.Hdr.recFlag = crecFlag;
113  if (isDisc) cRec.Hdr.recFlag |= XrdXrootdMonFileHdr::forced;
114  cRec.Hdr.recSize = crecNLen;
115  cRec.Hdr.fileID = fsP->FileID;
116 
117 // Insert the I/O bytes
118 //
119  cRec.Xfr.read = htonll(fsP->xfr.read);
120  cRec.Xfr.readv = htonll(fsP->xfr.readv);
121  cRec.Xfr.write = htonll(fsP->xfr.write);
122 
123 // Insert ops if so wanted
124 //
125  if (fsOPS)
126  {cRec.Ops.read = htonl (fsP->ops.read);
127  if (fsP->ops.read)
128  {cRec.Ops.rdMin = htonl (fsP->ops.rdMin);
129  cRec.Ops.rdMax = htonl (fsP->ops.rdMax);
130  } else {
131  cRec.Ops.rdMin = cRec.Ops.rdMax = 0;
132  }
133  cRec.Ops.readv = htonl (fsP->ops.readv);
134  cRec.Ops.rsegs = htonll(fsP->ops.rsegs);
135  if (fsP->ops.readv)
136  {cRec.Ops.rsMin = htons (fsP->ops.rsMin);
137  cRec.Ops.rsMax = htons (fsP->ops.rsMax);
138  cRec.Ops.rvMin = htonl (fsP->ops.rvMin);
139  cRec.Ops.rvMax = htonl (fsP->ops.rvMax);
140  } else {
141  cRec.Ops.rsMin = cRec.Ops.rsMax = 0;
142  cRec.Ops.rvMin = cRec.Ops.rvMax = 0;
143  }
144  cRec.Ops.write = htonl (fsP->ops.write);
145  if (fsP->ops.write)
146  {cRec.Ops.wrMin = htonl (fsP->ops.wrMin);
147  cRec.Ops.wrMax = htonl (fsP->ops.wrMax);
148  } else {
149  cRec.Ops.wrMin = cRec.Ops.wrMax = 0;
150  }
151  }
152 
153 // Record sum of squares if so needed
154 //
155  if (fsSSQ)
156  {XrdXrootdMonDouble xval;
157  xval.dreal = fsP->ssq.read;
158  cRec.Ssq.read.dlong = htonll(xval.dlong);
159  xval.dreal = fsP->ssq.readv;
160  cRec.Ssq.readv.dlong = htonll(xval.dlong);
161  xval.dreal = fsP->ssq.rsegs;
162  cRec.Ssq.rsegs.dlong = htonll(xval.dlong);
163  xval.dreal = fsP->ssq.write;
164  cRec.Ssq.write.dlong = htonll(xval.dlong);
165  }
166 
167 // Get a pointer to the next slot (the buffer gets locked)
168 //
169  cP = GetSlot(crecSize);
170  memcpy(cP, &cRec, crecSize);
171  bfMutex.UnLock();
172 }
173 
174 /******************************************************************************/
175 /* D e f a u l t s */
176 /******************************************************************************/
177 
178 void XrdXrootdMonFile::Defaults(int intv, int opts, int xfrcnt, int fbsz)
179 {
180 
181 // Set the reporting interval and I/O counter
182 //
183  repTime = intv;
184  xfrCnt = xfrcnt;
185  xfrRem = xfrcnt;
186  fBsz = (fbsz <= 0 ? 65472 : fbsz);
187 
188 // Expand out the options
189 //
190  fsXFR = (opts & XROOTD_MON_FSXFR) != 0;
191  fsLFN = (opts & XROOTD_MON_FSLFN) != 0;
192  fsOPS = (opts & (XROOTD_MON_FSOPS | XROOTD_MON_FSSSQ)) != 0;
193  fsSSQ = (opts & XROOTD_MON_FSSSQ) != 0;
194 
195 // Set monitoring level
196 //
197  if (fsSSQ) fsLVL = XrdXrootdFileStats::monSsq;
198  else if (fsOPS) fsLVL = XrdXrootdFileStats::monOps;
199  else if (intv) fsLVL = XrdXrootdFileStats::monOn;
200  else fsLVL = XrdXrootdFileStats::monOff;
201 }
202 
203 /******************************************************************************/
204 /* D i s c */
205 /******************************************************************************/
206 
207 void XrdXrootdMonFile::Disc(unsigned int usrID)
208 {
209  static short drecSize = htons(sizeof(XrdXrootdMonFileDSC));
211 
212 // Get a pointer to the next slot (the buffer gets locked)
213 //
214  dP = (XrdXrootdMonFileDSC *)GetSlot(sizeof(XrdXrootdMonFileDSC));
215 
216 // Fill out the record. It's pretty simple
217 //
219  dP->Hdr.recFlag = 0;
220  dP->Hdr.recSize = drecSize;
221  dP->Hdr.userID = usrID;
222  bfMutex.UnLock();
223 }
224 
225 /******************************************************************************/
226 /* D o I t */
227 /******************************************************************************/
228 
230 {
231 
232 // First check if we need to report all the I/O stats
233 //
234  xfrRem--;
235  if (!xfrRem) DoXFR();
236 
237 // Check if we should flush the buffer
238 //
239  bfMutex.Lock();
240  if (repNext) Flush();
241  bfMutex.UnLock();
242 
243 // Reschedule ourselves
244 //
245  XrdXrootdMonInfo::Sched->Schedule((XrdJob *)this, time(0)+repTime);
246 }
247 
248 /******************************************************************************/
249 /* Private: D o X F R */
250 /******************************************************************************/
251 
252 void XrdXrootdMonFile::DoXFR()
253 {
254  XrdXrootdFileStats *fsP;
255  int keep, i, n, hwm;
256 
257 // Reset interval counter
258 //
259  xfrRem = xfrCnt;
260 
261 // Grab the high watermark once
262 //
263  fmMutex.Lock();
264  hwm = fmHWM;
265  fmMutex.UnLock();
266 
267 // Report on all the files we have registered. This is a CPU burner as we
268 // periodically drop the lock to allow open/close requests to come through.
269 //
270  for (i = 0; i <= hwm; i++)
271  {fmMutex.Lock();
272  if (fmUse[i])
273  {n = 0;
275  while((fsP = fmMap[i].Next(n)))
276  {if (fsP->xfrXeq) DoXFR(fsP);
277  if (!keep--)
278  {fmMutex.UnLock();
280  fmMutex.Lock();
281  }
282  }
283  }
284  fmMutex.UnLock();
285  }
286 }
287 
288 /******************************************************************************/
289 
290 void XrdXrootdMonFile::DoXFR(XrdXrootdFileStats *fsP)
291 {
292  long long xfrRead, xfrReadv, xfrWrite;
293  char *cP;
294 
295 // Turn off the activity flag
296 //
297  fsP->xfrXeq = 0;
298 
299 // Grab the I/O bytes to get a somewhat consistent image here
300 //
301  xfrRead = fsP->xfr.read;
302  xfrReadv = fsP->xfr.readv;
303  xfrWrite = fsP->xfr.write;
304 
305 // Complete the record
306 //
307  xfrRec.Hdr.fileID = fsP->FileID;
308  xfrRec.Xfr.read = htonll(xfrRead);
309  xfrRec.Xfr.readv = htonll(xfrReadv);
310  xfrRec.Xfr.write = htonll(xfrWrite);
311 
312 // Get a pointer to the next slot (the buffer gets locked)
313 //
314  cP = GetSlot(sizeof(xfrRec));
315  memcpy(cP, &xfrRec, sizeof(xfrRec));
316  xfrRecs++;
317  bfMutex.UnLock();
318 }
319 
320 /******************************************************************************/
321 /* I n i t */
322 /******************************************************************************/
323 
325 {
326  XrdXrootdMonFile *mfP;
327  int alignment, pagsz = getpagesize();
328 
329 // Allocate a socket buffer
330 //
331  alignment = (fBsz < pagsz ? 1024 : pagsz);
332  if (posix_memalign((void **)&repBuff, alignment, fBsz))
333  {XrdXrootdMonInfo::eDest->Emsg("MonFile", "Unable to allocate monitor buffer.");
334  return false;
335  }
336 
337 // Set the header (always present)
338 //
339  repHdr = (XrdXrootdMonHeader *)repBuff;
340  repHdr->code = XROOTD_MON_MAPFSTA;
341  repHdr->pseq = 0;
343 
344 // Set the time record (always present)
345 //
346  repTOD = (XrdXrootdMonFileTOD *)(repBuff + sizeof(XrdXrootdMonHeader));
349  repTOD->Hdr.recSize = htons(sizeof(XrdXrootdMonFileTOD));
350  repTOD->sID = static_cast<kXR_int64>(XrdXrootdMonInfo::mySID);
351 
352 // Establish first real record in the buffer (always fixed)
353 //
354  repFirst = repBuff+sizeof(XrdXrootdMonHeader)+sizeof(XrdXrootdMonFileTOD);
355 
356 // Calculate the end nut the next slot always starts with a null pointer
357 //
358  repLast = repBuff+fBsz-1;
359  repNext = 0;
360 
361 // Calculate the close record size and the initial flags
362 //
363  crecSize = sizeof(XrdXrootdMonFileHdr) + sizeof(XrdXrootdMonStatXFR);
364  if (fsSSQ || fsOPS)
365  {crecSize += sizeof(XrdXrootdMonStatOPS);
366  crecFlag = XrdXrootdMonFileHdr::hasOPS;
367  } else crecFlag = 0;
368  if (fsSSQ)
369  {crecSize += sizeof(XrdXrootdMonStatSSQ);
370  crecFlag |= XrdXrootdMonFileHdr::hasSSQ;
371  }
372  crecNLen = htons(static_cast<short>(crecSize));
373 
374 // Preformat the i/o record
375 //
377  xfrRec.Hdr.recFlag = 0;
378  xfrRec.Hdr.recSize = htons(static_cast<short>(sizeof(xfrRec)));
379 
380 // Calculate the tod record size
381 //
382  trecNLen = htons(static_cast<short>(sizeof(XrdXrootdMonFileTOD)));
383 
384 // Allocate an instance of ourselves so we can schedule ourselves
385 //
386  mfP = new XrdXrootdMonFile();
387 
388 // Schedule an the flushes
389 //
390  XrdXrootdMonInfo::Sched->Schedule((XrdJob *)mfP, time(0)+repTime);
391  return true;
392 }
393 
394 /******************************************************************************/
395 /* Private: F l u s h */
396 /******************************************************************************/
397 
398 void XrdXrootdMonFile::Flush() // The bfMutex must be locked
399 {
400  static int seq = 0;
401  int bfSize;
402 
403 // Update the sequence number
404 //
405  repHdr->pseq = static_cast<char>(0x00ff & seq++);
406 
407 // Insert ending timestamp and record counts
408 //
409  repTOD->Hdr.nRecs[0] = htons(static_cast<short>(xfrRecs));
410  repTOD->Hdr.nRecs[1] = htons(static_cast<short>(totRecs));
411  repTOD->tEnd = htonl(static_cast<int>(time(0)));
412 
413 // Calculate buffer size and stick into the header
414 //
415  bfSize = (repNext - repBuff);
416  repHdr->plen = htons(static_cast<short>(bfSize));
417  repNext = 0;
418 
419 // Write this out
420 //
421  XrdXrootdMonitor::Send(XROOTD_MON_FSTA, repBuff, bfSize, false);
422  repTOD->tBeg = repTOD->tEnd;
423  xfrRecs = totRecs = 0;
424 }
425 
426 /******************************************************************************/
427 /* Private: G e t S l o t */
428 /******************************************************************************/
429 
430 char *XrdXrootdMonFile::GetSlot(int slotSZ)
431 {
432  char *myRec;
433 
434 // Lock this code to prevent interference (we should use double buffering)
435 // Note that the caller must do the unlock when finished with the slot.
436 //
437  bfMutex.Lock();
438 
439 // Check if we need to flush the buffer (sets repNext to zero). Otherwise,
440 // if this is the first record insert a timestamp.
441 //
442  if (repNext)
443  {if ((repNext + slotSZ) > repLast)
444  {Flush();
445  repNext = repFirst;
446  }
447  } else {
448  repTOD->tBeg = htonl(static_cast<int>(time(0)));
449  repNext = repFirst;
450  }
451 
452 // Return the slot
453 //
454  totRecs++;
455  myRec = repNext;
456  repNext += slotSZ;
457  return myRec;
458 }
459 
460 /******************************************************************************/
461 /* O p e n */
462 /******************************************************************************/
463 
465  unsigned int uDID, bool isRW)
466 {
467  static const int minRecSz = sizeof(XrdXrootdMonFileOPN)
468  - sizeof(XrdXrootdMonFileLFN);
470  int i = 0, sNum = -1, rLen, pLen = 0;
471 
472 // Assign the path a dictionary id if not assigned via file monitoring
473 //
474  if (fsP->FileID == 0) fsP->FileID = XrdXrootdMonitor::GetDictID();
475 
476 // Add this open to the map table if we are doing I/O stats.
477 //
478  if (fsXFR)
479  {fmMutex.Lock();
480  for (i = 0; i < XrdXrootdMonFMap::mapNum; i++)
481  if (fmUse[i] < XrdXrootdMonFMap::fmSize)
482  {if ((sNum = fmMap[i].Insert(fsP)) >= 0)
483  {fmUse[i]++;
484  if (i > fmHWM) fmHWM = i;
485  break;
486  }
487  }
488  fmMutex.UnLock();
489  }
490 
491 // Generate the cookie (real or virtual) to find the entry in the map table.
492 // Supply the monitoring options for effeciency.
493 //
494  fsP->MonEnt = (sNum | (i << XrdXrootdMonFMap::fmShft)) & 0xffff;
495  fsP->monLvl = fsLVL;
496  fsP->xfrXeq = 0;
497 
498 // Compute the size of this record
499 //
500  rLen = minRecSz;
501  if (fsLFN)
502  {pLen = strlen(Path);
503  rLen += sizeof(kXR_unt32) + pLen;
504  i = (rLen + 8) & ~0x00000003;
505  pLen = pLen + (i - rLen);
506  rLen = i;
507  }
508 
509 // Get a pointer to the next slot (the buffer gets locked)
510 //
511  oP = (XrdXrootdMonFileOPN *)GetSlot(rLen);
512 
513 // Fill out the record
514 //
516  oP->Hdr.recFlag = (isRW ? XrdXrootdMonFileHdr::hasRW : 0);
517  oP->Hdr.recSize = htons(static_cast<short>(rLen));
518  oP->Hdr.fileID = fsP->FileID;
519  oP->fsz = htonll(fsP->fSize);
520 
521 // Append user and path if so wanted (sizes have been verified)
522 //
523  if (fsLFN)
525  oP->ufn.user = uDID;
526  strncpy(oP->ufn.lfn, Path, pLen);
527  }
528  bfMutex.UnLock();
529 }
long long kXR_int64
Definition: XPtypes.hh:98
unsigned int kXR_unt32
Definition: XPtypes.hh:90
XrdOucString Path
struct myOpts opts
XrdXrootdMonDouble write
XrdXrootdMonFileHdr Hdr
XrdXrootdMonFileLFN ufn
XrdXrootdMonDouble read
XrdXrootdMonStatXFR Xfr
XrdXrootdMonDouble readv
XrdXrootdMonStatXFR Xfr
XrdXrootdMonFileHdr Hdr
XrdXrootdMonStatOPS Ops
const kXR_char XROOTD_MON_MAPFSTA
XrdXrootdMonFileHdr Hdr
XrdXrootdMonFileHdr Hdr
XrdXrootdMonDouble rsegs
XrdXrootdMonStatSSQ Ssq
XrdXrootdMonFileHdr Hdr
#define XROOTD_MON_FSSSQ
#define XROOTD_MON_FSLFN
#define XROOTD_MON_FSTA
#define XROOTD_MON_FSOPS
#define XROOTD_MON_FSXFR
Definition: XrdJob.hh:43
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
struct XrdXrootdFileStats::@167 ssq
XrdXrootdMonStatXFR xfr
XrdXrootdMonStatOPS ops
static const int fmShft
static const int fmSize
static const int mapNum
static const int fmHold
static const int fmMask
static void Disc(unsigned int usrID)
static void Defaults(int intv, int opts, int iocnt, int fbsz)
static void Open(XrdXrootdFileStats *fsP, const char *Path, unsigned int uDID, bool isRW)
static bool Init()
static void Close(XrdXrootdFileStats *fsP, bool isDisc=false)
static int Send(int mmode, void *buff, int size, bool setseq=true)
static kXR_unt32 GetDictID(bool hbo=false)
XrdScheduler * Sched
XrdSysError * eDest