XRootD
XrdSsiClient.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d S s i G e t C l i e n t S e r v i c e . c c */
4 /* */
5 /* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <cerrno>
31 #include <fcntl.h>
32 #include <cstdio>
33 #include <string>
34 #include <cstring>
35 #include <ctime>
36 #include <unistd.h>
37 #include <sys/types.h>
38 
39 #include "Xrd/XrdScheduler.hh"
40 #include "Xrd/XrdTrace.hh"
41 
42 #include "XrdCl/XrdClDefaultEnv.hh"
43 
44 #include "XrdNet/XrdNetAddr.hh"
45 #include "XrdNet/XrdNetRegistry.hh"
46 
47 #include "XrdSsi/XrdSsiAtomics.hh"
48 #include "XrdSsi/XrdSsiLogger.hh"
49 #include "XrdSsi/XrdSsiProvider.hh"
50 #include "XrdSsi/XrdSsiServReal.hh"
51 #include "XrdSsi/XrdSsiScale.hh"
52 #include "XrdSsi/XrdSsiTrace.hh"
53 
54 #include "XrdSys/XrdSysLogger.hh"
55 #include "XrdSys/XrdSysLogging.hh"
56 #include "XrdSys/XrdSysError.hh"
57 #include "XrdSys/XrdSysPthread.hh"
58 #include "XrdSys/XrdSysTrace.hh"
59 
60 /******************************************************************************/
61 /* N a m e S p a c e G l o b a l s */
62 /******************************************************************************/
63 
64 namespace XrdSsi
65 {
66 extern XrdSysError Log;
67 extern XrdSysLogger *Logger;
68 extern XrdSsiScale sidScale;
69 extern XrdSysTrace Trace;
72 
76  Atomic(int) contactN(1);
77  short maxTCB = 300;
78  short maxCLW = 30;
79  short maxPEL = 10;
80  Atomic(bool) initDone(false);
81  bool dsTTLSet = false;
82  bool reqTOSet = false;
83  bool strTOSet = false;
84  bool hiResTime= false;
85 
86 static const int rDispNone = 0;
87 static const int rDispRand = -1;
88 static const int rDispRR = 1;
89 
90  char rDisp = rDispRR;
91 }
92 
93 using namespace XrdSsi;
94 
95 /******************************************************************************/
96 /* L o c a l C l a s s e s */
97 /******************************************************************************/
98 
100 {
101 public:
102 
103 XrdSsiService *GetService(XrdSsiErrInfo &eInfo,
104  const std::string &contact,
105  int oHold=256
106  );
107 
108 virtual bool Init(XrdSsiLogger *logP,
109  XrdSsiCluster *clsP,
110  std::string cfgFn,
111  std::string parms,
112  int argc,
113  char **argv
114  ) {return true;}
115 
116 virtual rStat QueryResource(const char *rName,
117  const char *contact=0
118  ) {return notPresent;}
119 
120 virtual void SetCBThreads(int cbNum, int ntNum);
121 
122 virtual bool SetConfig(XrdSsiErrInfo &eInfo,
123  std::string &optname, int optvalue);
124 
125 virtual void SetSpread(short ssz);
126 
127 virtual void SetTimeout(tmoType what, int tmoval);
128 
131 
132 private:
133 void SetLogger();
134 void SetScheduler();
135 };
136 
137 /******************************************************************************/
138 /* X r d S s i C l i e n t P r o v i d e r : : G e t S e r v i c e */
139 /******************************************************************************/
140 
142  const std::string &contact,
143  int oHold)
144 {
145  static const int maxTMO = 0x7fffffff;
146  XrdNetAddr netAddr;
147  std::string eMsg;
148  const char *eText = 0;
149  char buff[512];
150 
151 // Allocate a scheduler if we do not have one and set default env (1st call)
152 //
153  if (!Atomic_GET(initDone))
154  {clMutex.Lock();
155  if (!Logger) SetLogger();
156  if (!schedP) SetScheduler();
158  if (!dsTTLSet) clEnvP->PutInt("DataServerTTL", maxTMO);
159  if (!reqTOSet) clEnvP->PutInt("RequestTimeout", maxTMO);
160  if (!strTOSet) clEnvP->PutInt("StreamTimeout", maxTMO);
161  clEnvP->PutInt("ParallelEvtLoop",maxPEL);
162  if (rDisp == rDispNone || rDisp == rDispRR)
163  clEnvP->PutInt("IPNoShuffle", 1);
164  initDone = true;
165  clMutex.UnLock();
166  }
167 
168 // If no contact is given then declare an error
169 //
170  if (contact.empty())
171  {eInfo.Set("Contact not specified.", EINVAL); return 0;}
172 
173 // If this is a single contact which is really a singleton, don't create
174 // a registry entry for it as it's just not needed (we need one to rotate).
175 //
176  if (contact.find(',') != std::string::npos
177  || !XrdNetUtils::Singleton(contact.c_str()))
178  {int cNum = contactN++;
179  bool rotate = rDisp == rDispRR;
180  snprintf(buff,sizeof(buff),"%ccontact-%d:4901",XrdNetRegistry::pfx,cNum);
181  if (!XrdNetRegistry::Register(buff, contact.c_str(), &eMsg, rotate))
182  eText = (eMsg.size() ? eMsg.c_str() : "reason unknown");
183 
184  } else {
185 
186  if (!(eText = netAddr.Set(contact.c_str()))
187  && !netAddr.Format(buff, sizeof(buff), XrdNetAddrInfo::fmtName))
188  eText = "formatting failed";
189  }
190 
191 // Check if validation or registration failed
192 //
193  if (eText)
194  {snprintf(buff, sizeof(buff), "Unable to validate contact; %s", eText);
195  eInfo.Set(buff, EINVAL);
196  return 0;
197  }
198 
199 // Allocate a service object and return it
200 //
201  return new XrdSsiServReal(buff, oHold);
202 }
203 
204 /******************************************************************************/
205 /* X r d S s i C l i e n t P r o v i d e r : : S e t C B T h r e a d s */
206 /******************************************************************************/
207 
208 void XrdSsiClientProvider::SetCBThreads(int cbNum, int ntNum)
209 {
210 // Validate thread number
211 //
212  if (cbNum > 1)
213  {if (cbNum > 32767) cbNum = 32767; // Max short value
214  if (ntNum < 1) ntNum = cbNum*10/100;
215  if (ntNum < 3) ntNum = 0;
216  else if (ntNum > 100) ntNum = 100;
217  clMutex.Lock();
218  maxTCB = static_cast<short>(cbNum);
219  maxCLW = static_cast<short>(ntNum);
220  clMutex.UnLock();
221  }
222 }
223 
224 /******************************************************************************/
225 /* X r d S s i C l i e n t P r o v i d e r : : S e t C o n f i g */
226 /******************************************************************************/
227 
229  std::string &optname, int optvalue)
230 {
231 // Look for an option we recognize
232 //
233  if (optname == "cbThreads")
234  {if (optvalue < 1)
235  {eInfo.Set("invalid cbThreads value.", EINVAL); return false;}
236  if (optvalue > 32767) optvalue = 32767;
237  clMutex.Lock();
238  maxTCB = static_cast<short>(optvalue);
239  clMutex.UnLock();
240  }
241  else if (optname == "hiResTime") hiResTime = true;
242  else if (optname == "netThreads")
243  {if (optvalue < 1)
244  {eInfo.Set("invalid netThreads value.", EINVAL); return false;}
245  if (optvalue > 32767) optvalue = 32767;
246  clMutex.Lock();
247  maxCLW = static_cast<short>(optvalue);
248  clMutex.UnLock();
249  }
250  else if (optname == "pollers")
251  {if (optvalue < 1)
252  {eInfo.Set("invalid pollers value.", EINVAL); return false;}
253  if (optvalue > 32767) optvalue = 32767;
254  clMutex.Lock();
255  maxPEL = static_cast<short>(optvalue);
256  clMutex.UnLock();
257  }
258  else if (optname == "reqDispatch")
259  {clMutex.Lock();
260  if (optvalue < 0) rDisp = rDispRand;
261  else if (optvalue > 0) rDisp = rDispRR;
262  else rDisp = rDispNone;
263  clMutex.UnLock();
264  }
265  else {eInfo.Set("invalid option name.", EINVAL); return false;}
266 
267  return true;
268 }
269 
270 /******************************************************************************/
271 /* X r d S s i C l i e n t P r o v i d e r : : S e t L o g g e r */
272 /******************************************************************************/
273 
274 void XrdSsiClientProvider::SetLogger()
275 {
276  int eFD;
277 
278 // Get a file descriptor mirroring standard error
279 //
280 #if ( defined(__linux__) || defined(__GNU__) ) && defined(F_DUPFD_CLOEXEC)
281  eFD = fcntl(STDERR_FILENO, F_DUPFD_CLOEXEC, 0);
282 #else
283  eFD = dup(STDERR_FILENO);
284  fcntl(eFD, F_SETFD, FD_CLOEXEC);
285 #endif
286 
287 // Now we need to get a logger object. We make this a real dumb one.
288 //
289  Logger = new XrdSysLogger(eFD, 0);
290  if (hiResTime || getenv("XRDSSI_HIRESLOG")) Logger->setHiRes();
291  Log.logger(Logger);
293  if (getenv("XRDSSIDEBUG") != 0) Trace.What = TRACESSI_Debug;
294 
295 // Check for a message callback object. This must be set at global init time.
296 //
297  if (msgCBCl)
298  {XrdSysLogging::Parms logParms;
299  msgCB = msgCBCl;
300  logParms.logpi = msgCBCl;
301  logParms.bufsz = 0;
302  XrdSysLogging::Configure(*Logger, logParms);
303  }
304 }
305 
306 /******************************************************************************/
307 /* X r d S s i C l i e n t P r o v i d e r : : S e t S c h e d u l e r */
308 /******************************************************************************/
309 
310 // This may not be called before the logger object is created!
311 
312 void XrdSsiClientProvider::SetScheduler()
313 {
314  static XrdSysTrace myTrc("XrdSsi", Log.logger());
315 
316 // Now construct the proper trace object (note that we do not set tracing if
317 // message forwarding is on because these messages will not be forwarded).
318 //
319  if (!msgCBCl && Trace.What & TRACESSI_Debug) myTrc.What = TRACE_SCHED;
320 
321 // We can now set allocate a scheduler
322 //
323  XrdSsi::schedP = new XrdScheduler(&Log, &myTrc);
324 
325 // Set thread count for callbacks
326 //
327  XrdSsi::schedP->setParms(-1, maxTCB, -1, -1, 0);
328 
329 // Set number of framework worker hreads if need be
330 //
331  if (maxCLW)
333  clEnvP->PutInt("WorkerThreads", maxCLW);
334  }
335 
336 // Start the scheduler
337 //
339 }
340 
341 /******************************************************************************/
342 /* X r d S s i C l i e n t P r o v i d e r : : S e t S p r e a d */
343 /******************************************************************************/
344 
346 {
347  sidScale.setSpread(ssz);
348 }
349 
350 /******************************************************************************/
351 /* X r d S s i C l i e n t P r o v i d e r : : S e t T i m e o u t */
352 /******************************************************************************/
353 
355 {
356 
357 // Ignore invalid timeouts
358 //
359  if (tmoval <= 0) return;
360 
361 // Get global environment
362 //
363  clMutex.Lock();
365 
366 // Set requested timeout
367 //
368  switch(what)
369  {case connect_N: clEnvP->PutInt("ConnectionRetry", tmoval);
370  break;
371  case connect_T: clEnvP->PutInt("ConnectionWindow", tmoval);
372  break;
373  case idleClose: clEnvP->PutInt("DataServerTTL", tmoval);
374  dsTTLSet = true;
375  break;
376  case request_T: clEnvP->PutInt("RequestTimeout", tmoval);
377  reqTOSet = true;
378  break;
379  case stream_T: clEnvP->PutInt("StreamTimeout", tmoval);
380  strTOSet = true;
381  break;
382  default: break;
383  }
384 
385 // All done
386 //
387  clMutex.UnLock();
388 }
389 
390 /******************************************************************************/
391 /* G l o b a l S t a t i c s */
392 /******************************************************************************/
393 
394 namespace
395 {
396 XrdSsiClientProvider ClientProvider;
397 }
398 
int fcntl(int fd, int cmd,...)
#define eMsg(x)
#define Atomic_GET(x)
XrdSsiProvider * XrdSsiProviderClient
#define TRACESSI_Debug
Definition: XrdSsiTrace.hh:34
#define TRACE_SCHED
Definition: XrdTrace.hh:42
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
Definition: XrdClEnv.cc:110
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtName
Hostname if it is resolvable o/w use fmtAddr.
const char * Set(const char *hSpec, int pNum=PortInSpec)
Definition: XrdNetAddr.cc:216
static bool Register(const char *hName, const char *hList[], int hLNum, std::string *eText=0, bool rotate=false)
static const char pfx
Registry names must start with this character.
static bool Singleton(const char *hSpec, const char **eText=0)
Definition: XrdNetUtils.cc:921
void setParms(int minw, int maxw, int avlt, int maxi, int once=0)
virtual void SetSpread(short ssz)
virtual bool SetConfig(XrdSsiErrInfo &eInfo, std::string &optname, int optvalue)
virtual ~XrdSsiClientProvider()
virtual rStat QueryResource(const char *rName, const char *contact=0)
virtual void SetCBThreads(int cbNum, int ntNum)
XrdSsiService * GetService(XrdSsiErrInfo &eInfo, const std::string &contact, int oHold=256)
virtual void SetTimeout(tmoType what, int tmoval)
virtual bool Init(XrdSsiLogger *logP, XrdSsiCluster *clsP, std::string cfgFn, std::string parms, int argc, char **argv)
void Set(const char *eMsg=0, int eNum=0, int eArg=0)
void() MCB_t(struct timeval const &mtime, unsigned long tID, const char *msg, int mlen)
Length of message text.
void setSpread(short sval)
Definition: XrdSsiScale.cc:175
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:141
void setHiRes()
Set log file timstamp to high resolution (hh:mm:ss.uuuu).
static bool Configure(XrdSysLogger &logr, Parms &parms)
void SetLogger(XrdSysLogger *logp)
Definition: XrdSysTrace.cc:65
XrdScheduler * schedP
Definition: XrdSsiClient.cc:74
short maxPEL
Definition: XrdSsiClient.cc:79
XrdSsiScale sidScale
XrdCl::Env * clEnvP
Definition: XrdSsiClient.cc:75
XrdSysMutex clMutex
Definition: XrdSsiClient.cc:73
XrdSsiLogger::MCB_t * msgCB
Definition: XrdSsiLogger.cc:59
XrdSsiLogger::MCB_t * msgCBCl
Definition: XrdSsiLogger.cc:60
short maxCLW
Definition: XrdSsiClient.cc:78
static const int rDispRR
Definition: XrdSsiClient.cc:88
short maxTCB
Definition: XrdSsiClient.cc:77
XrdSysTrace Trace
Definition: XrdSsiSfs.cc:107
static const int rDispRand
Definition: XrdSsiClient.cc:87
XrdSysLogger * Logger
Definition: XrdSsiLogger.cc:57
static const int rDispNone
Definition: XrdSsiClient.cc:86
bool reqTOSet
Definition: XrdSsiClient.cc:82
char rDisp
Definition: XrdSsiClient.cc:90
bool strTOSet
Definition: XrdSsiClient.cc:83
XrdSysError Log
Atomic(int) contactN(1)
bool dsTTLSet
Definition: XrdSsiClient.cc:81
bool hiResTime
Definition: XrdSsiClient.cc:84
Parameters to be passed to configure.
XrdSysLogPI_t logpi
-> log plugin object or nil if none
int bufsz
size of message buffer, -1 default, or 0