XRootD
XrdCmsClientMan.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d C m s C l i e n t M a n . c c */
4 /* */
5 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <ctime>
32 
35 #include "XrdCms/XrdCmsLogin.hh"
36 #include "XrdCms/XrdCmsTrace.hh"
37 
39 
40 #include "XrdSys/XrdSysError.hh"
41 #include "XrdSys/XrdSysTimer.hh"
42 
43 #include "Xrd/XrdInet.hh"
44 #include "Xrd/XrdLink.hh"
45 
46 using namespace XrdCms;
47 
48 /******************************************************************************/
49 /* G l o b a l s */
50 /******************************************************************************/
51 
52 XrdOucBuffPool XrdCmsClientMan::BuffPool(XrdOucEI::Max_Error_Len, 65536, 1, 16);
53 
54 XrdInet *XrdCmsClientMan::Network = 0;
55 
57 
58 const char *XrdCmsClientMan::ConfigFN = 0;
59 
60 XrdSysMutex XrdCmsClientMan::manMutex;
61 
62 /******************************************************************************/
63 /* C o n s t r u c t o r */
64 /******************************************************************************/
65 
66 XrdCmsClientMan::XrdCmsClientMan(char *host, int port,
67  int cw, int nr, int rw, int rd)
68  : syncResp(0)
69 {
70  static XrdSysMutex initMutex;
71  static int Instance = 0;
72  char *dot;
73 
74  Host = strdup(host);
75  if ((dot = index(Host, '.')))
76  {*dot = '\0'; HPfx = strdup(Host); *dot = '.';}
77  else HPfx = strdup(Host);
78  Port = port;
79  Link = 0;
80  Active = 0;
81  Silent = 0;
82  Suspend = 1;
83  RecvCnt = 0;
84  nrMax = nr;
85  NetBuff = BuffPool.Alloc(XrdOucEI::Max_Error_Len);
86  repWMax = rw;
87  repWait = 0;
88  minDelay= rd;
89  maxDelay= rd*3;
90  chkCount= chkVal;
91  lastUpdt= lastTOut = time(0);
92  Next = 0;
93  manInst = 1;
94 
95 // Compute dally value
96 //
97  dally = cw / 2 - 1;
98  if (dally < 3) dally = 3;
99  else if (dally > 10) dally = 10;
100 
101 // Provide a unique mask number for this manager
102 //
103  initMutex.Lock();
104  manMask = 1<<Instance++;
105  initMutex.UnLock();
106 }
107 
108 /******************************************************************************/
109 /* D e s t r u c t o r */
110 /******************************************************************************/
111 
113 {
114  if (Link) Link->Close();
115  if (Host) free(Host);
116  if (HPfx) free(HPfx);
117  if (NetBuff) NetBuff->Recycle();
118 }
119 
120 /******************************************************************************/
121 /* d e l a y R e s p */
122 /******************************************************************************/
123 
125 {
126  XrdCmsResp *rp;
127  int msgid;
128 
129 // Obtain the message ID
130 //
131  if (!(msgid = Resp.getErrInfo()))
132  {Say.Emsg("Manager", Host, "supplied invalid waitr msgid");
133  Resp.setErrInfo(EILSEQ, "redirector protocol error");
134  syncResp.Post();
135  return SFS_ERROR;
136  }
137 
138 // Allocate a delayed response object
139 //
140  if (!(rp = XrdCmsResp::Alloc(&Resp, msgid)))
141  {Say.Emsg("Manager",ENOMEM,"allocate resp object for",Resp.getErrUser());
142  Resp.setErrInfo(0, "0");
143  syncResp.Post();
144  return SFS_STALL;
145  }
146 
147 // Add this object to our delayed response queue. If the manager bounced then
148 // purge all of the pending repsonses to avoid sending wrong ones.
149 //
150  if (msgid < maxMsgID) RespQ.Purge();
151  maxMsgID = msgid;
152  RespQ.Add(rp);
153 
154 // Tell client to wait for response. The semaphore post allows the manager
155 // to get the next message from the cmsd. This prevents us from getting the
156 // delayed response before the response object is added to the queue.
157 //
158  Resp.setErrInfo(0, "");
159  syncResp.Post();
160  return SFS_STARTED;
161 }
162 
163 /******************************************************************************/
164 /* S e n d */
165 /******************************************************************************/
166 
167 int XrdCmsClientMan::Send(unsigned int &iMan, char *msg, int mlen)
168 {
169  int allok = 0;
170 
171 // Determine message length
172 //
173  if (!mlen) mlen = strlen(msg);
174 
175 // Send the request
176 //
177  myData.Lock();
178  iMan = manInst;
179  if (Active)
180  {if (Link)
181  {if (!(allok = Link->Send(msg, mlen) > 0))
182  {Active = 0;
183  Link->Close(1);
184  manInst++;
185  } else SendCnt++;
186  }
187  }
188  myData.UnLock();
189 
190 // All done
191 //
192  return allok;
193 }
194 
195 /******************************************************************************/
196 
197 int XrdCmsClientMan::Send(unsigned int &iMan, const struct iovec *iov, int iovcnt, int iotot)
198 {
199  int allok = 0;
200 
201 // Send the request
202 //
203  myData.Lock();
204  iMan = manInst;
205  if (Active)
206  {if (Link)
207  {if (!(allok = Link->Send(iov, iovcnt, iotot) > 0))
208  {Active = 0;
209  Link->Close(1);
210  manInst++;
211  } else SendCnt++;
212  }
213  }
214  myData.UnLock();
215 
216 // All done
217 //
218  return allok;
219 }
220 
221 /******************************************************************************/
222 /* S t a r t */
223 /******************************************************************************/
224 
226 {
227 
228 // First step is to connect to the manager
229 //
230  do {Hookup();
231  // Now simply start receiving messages on the stream. When we get a
232  // respwait reply then we must be assured that the object representing
233  // the request is added to the queue before the actual reply arrives.
234  // We do this by waiting on syncResp which is posted once the request
235  // object is fully processed. The actual response associated with the
236  // respwait is synchronized during the callback phase since the client
237  // must receive the respwait before the subsequent response.
238  //
239  while(Receive())
240  if (Response.modifier & CmsResponse::kYR_async) relayResp();
241  else if (Response.rrCode == kYR_status) setStatus();
242  else if (XrdCmsClientMsg::Reply(HPfx, Response, NetBuff))
243  {if (Response.rrCode == kYR_waitresp) syncResp.Wait();}
244 
245  // Tear down the connection
246  //
247  myData.Lock();
248  if (Link) {Link->Close(); Link = 0;}
249  Active = 0; Suspend = 1;
250  myData.UnLock();
251 
252  // Indicate the problem
253  //
254  Say.Emsg("ClientMan", "Disconnected from", Host);
255  XrdSysTimer::Snooze(dally);
256  } while(1);
257 
258 // We should never get here
259 //
260  return (void *)0;
261 }
262 
263 /******************************************************************************/
264 /* w h a t s U p */
265 /******************************************************************************/
266 
267 int XrdCmsClientMan::whatsUp(const char *user, const char *path,
268  unsigned int iMan)
269 {
270  EPNAME("whatsUp");
271  unsigned int xMan;
272  int theDelay, inQ;
273  bool lClose = false;
274 
275 // The cmsd did not respond. Increase silent count and see if restart is needed
276 // Otherwise, increase the wait interval just in case things are just slow.
277 //
278  myData.Lock();
279  if (Active)
280  {if (Active == RecvCnt)
281  {if ((time(0)-lastTOut) >= repWait)
282  {Silent++;
283  if (Silent > nrMax)
284  {Active = 0; Silent = 0; Suspend = 1;
285  if (Link && iMan == manInst)
286  {Link->Close(1);
287  manInst++; lClose = true;
288  }
289  } else if (Silent & 0x02 && repWait < repWMax) repWait++;
290  }
291  } else {Active = RecvCnt; Silent = 0; lastTOut = time(0);}
292  }
293 
294 // Calclulate how long to delay the client. This will be based on the number
295 // of outstanding requests bounded by the config delay value.
296 //
297  inQ = XrdCmsClientMsg::inQ();
298  theDelay = inQ * qTime;
299  xMan = manInst;
300  myData.UnLock();
301  theDelay = theDelay/1000 + (theDelay % 1000 ? 1 : 0);
302  if (theDelay < minDelay) theDelay = minDelay;
303  if (theDelay > maxDelay) theDelay = maxDelay;
304 
305 // Do Some tracing here
306 //
307  TRACE(Redirect, user <<" no resp from inst " <<iMan <<" of "<<HPfx
308  <<" in " <<repWait
309  <<" inst " <<xMan <<(lClose ? " closed" : " steady")
310  <<"; inQ " <<inQ <<" delay " <<theDelay <<" path=" <<path);
311  return theDelay;
312 }
313 
314 /******************************************************************************/
315 /* P r i v a t e M e t h o d s */
316 /******************************************************************************/
317 /******************************************************************************/
318 /* H o o k u p */
319 /******************************************************************************/
320 
321 int XrdCmsClientMan::Hookup()
322 {
323  EPNAME("Hookup");
324  CmsLoginData Data;
325  XrdLink *lp;
326  char buff[256], hnBuff[264*2+1];
327  kXR_char *envData = 0;
328  int rc, oldWait, tries = 12, opts = 0;
329 
330 // Turn off our debugging and version flags
331 //
332  manMutex.Lock();
333  doDebug &= ~manMask;
334  manMutex.UnLock();
335 
336 // Report our hostname (there are better ways of doing this)
337 //
338  const char *hn = getenv("XRDHOST");
339  const char *override_hn = getenv("OVERRIDEXRDHOST");
340  if (hn && override_hn)
341  {snprintf(hnBuff, sizeof(hnBuff), "myHN=%s&ovHN=%s", hn, override_hn);
342  envData = (kXR_char *)hnBuff;
343  }
344  else if (hn)
345  {snprintf(hnBuff, sizeof(hnBuff), "myHN=%s", hn);
346  envData = (kXR_char *)hnBuff;
347  }
348 
349 // Keep trying to connect to the manager. Note that we bind the link to this
350 // thread to make sure we get notified should another thread close the socket.
351 //
352  do {while(!(lp = Network->Connect(Host, Port, opts)))
353  {XrdSysTimer::Snooze(dally);
354  if (tries--) opts = XRDNET_NOEMSG;
355  else {opts = 0; tries = 12;}
356  continue;
357  }
358 // lp->Bind(XrdSysThread::ID());
359  memset(&Data, 0, sizeof(Data));
360  Data.envCGI = envData;
361  Data.Mode = CmsLoginData::kYR_director;
362  Data.HoldTime = static_cast<int>(getpid());
363  if (!(rc = XrdCmsLogin::Login(lp, Data))) break;
364  lp->Close();
365  XrdSysTimer::Snooze(dally);
366  } while(1);
367 
368 // Establish global state
369 //
370  manMutex.Lock();
371  doDebug |= (Data.Mode & CmsLoginData::kYR_debug ? manMask : 0);
372  manMutex.UnLock();
373 
374 // All went well, finally
375 //
376  myData.Lock();
377  Link = lp;
378  Active = 1;
379  Silent = 0;
380  RecvCnt = 1;
381  SendCnt = 1;
382  Suspend = (Data.Mode & CmsLoginData::kYR_suspend);
383 
384 // Calculate how long we will wait for replies before delaying the client.
385 // This is computed dynamically based on the expected response window.
386 //
387  if ((oldWait = (repWait*20/100)) < 2) oldWait = 2;
388  if (Data.HoldTime > repWMax*1000) repWait = repWMax;
389  else if (Data.HoldTime <= 0) repWait = repWMax;
390  else {repWait = Data.HoldTime*3;
391  repWait = (repWait/1000) + (repWait % 1000 ? 1 : 0);
392  if (repWait > repWMax) repWait = repWMax;
393  else if (repWait < oldWait) repWait = oldWait;
394  }
395  qTime = (Data.HoldTime < 100 ? 100 : Data.HoldTime);
396  lastTOut = time(0);
397  myData.UnLock();
398 
399 // Tell the world
400 //
401  sprintf(buff, "v %d", Data.Version);
402  Say.Emsg("ClientMan", (Suspend ? "Connected to suspended" : "Connected to"),
403  Host, buff);
404  DEBUG(Host <<" qt=" <<qTime <<"ms rw=" <<repWait);
405  return 1;
406 }
407 
408 /******************************************************************************/
409 /* R e c e i v e */
410 /******************************************************************************/
411 
412 int XrdCmsClientMan::Receive()
413 {
414 // This method is always run out of the object's main thread. Other threads
415 // may call methods that initiate a link reset via a deferred close. We will
416 // notice that here because the file descriptor will be closed. This will
417 // cause us to return an error and precipitate a connection teardown.
418 //
419  EPNAME("Receive")
420  if (Link->RecvAll((char *)&Response, sizeof(Response)) > 0)
421  {int dlen = static_cast<int>(ntohs(Response.datalen));
422  RecvCnt++;
423  DEBUG(Link->Name() <<' ' <<dlen <<" bytes on " <<Response.streamid);
424  if (!dlen) return 1;
425  if ((dlen > NetBuff->BuffSize())
426  && (Response.rrCode != kYR_data || !NetBuff->Resize(dlen)))
427  Say.Emsg("ClientMan", "Excessive msg length from", Host);
428  else {NetBuff->SetLen(dlen);
429  return Link->RecvAll(NetBuff->Buffer(), dlen);
430  }
431  }
432  return 0;
433 }
434 
435 /******************************************************************************/
436 /* r e l a y R e s p */
437 /******************************************************************************/
438 
439 void XrdCmsClientMan::relayResp()
440 {
441  EPNAME("relayResp");
442  XrdCmsResp *rp;
443 
444 // Remove the response object from our queue.
445 //
446  if (!(rp = RespQ.Rem(Response.streamid)))
447  {DEBUG(Host <<" replied to non-existent request; id=" <<Response.streamid);
448  return;
449  }
450 
451 // Queue the request for reply (this transfers the network buffer)
452 //
453  rp->Reply(HPfx, Response, NetBuff);
454 
455 // Obtain a new network buffer
456 //
457  NetBuff = BuffPool.Alloc(XrdOucEI::Max_Error_Len);
458 }
459 
460 /******************************************************************************/
461 /* Private: c h k S t a t u s */
462 /******************************************************************************/
463 
464 int XrdCmsClientMan::chkStatus()
465 {
466  static CmsUpdateRequest Updt = {{0, kYR_update, 0, 0}};
467  XrdSysMutexHelper mdMon(myData);
468  time_t nowTime;
469 
470 // Count down the query count and ask again every 30 seconds
471 //
472  if (!chkCount--)
473  {chkCount = chkVal;
474  nowTime = time(0);
475  if ((nowTime - lastUpdt) >= 30)
476  {lastUpdt = nowTime;
477  if (Active) Link->Send((char *)&Updt, sizeof(Updt));
478  }
479  }
480  return Suspend;
481 }
482 
483 /******************************************************************************/
484 /* s e t S t a t u s */
485 /******************************************************************************/
486 
487 void XrdCmsClientMan::setStatus()
488 {
489  EPNAME("setStatus");
490  const char *State = 0, *Event = "?";
491 
492 
493  myData.Lock();
494  if (Response.modifier & CmsStatusRequest::kYR_Suspend)
495  {Event = "suspend";
496  if (!Suspend) {Suspend = 1; State = "suspended";}
497  }
498  else if (Response.modifier & CmsStatusRequest::kYR_Resume)
499  {Event = "resume";
500  if (Suspend) {Suspend = 0; State = "resumed";}
501  }
502  myData.UnLock();
503 
504  DEBUG(Host <<" sent " <<Event <<" event");
505  if (State) Say.Emsg("setStatus", "Manager", Host, State);
506 }
unsigned char kXR_char
Definition: XPtypes.hh:65
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define XRDNET_NOEMSG
Definition: XrdNetOpts.hh:71
struct myOpts opts
#define SFS_ERROR
#define SFS_STALL
#define SFS_STARTED
if(Avsz)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
int Send(unsigned int &iMan, char *msg, int mlen=0)
int delayResp(XrdOucErrInfo &Resp)
int whatsUp(const char *user, const char *path, unsigned int iMan)
static char doDebug
XrdCmsClientMan(char *host, int port, int cw, int nr, int rw, int rd)
static int inQ()
static int Reply(const char *Man, XrdCms::CmsRRHdr &hdr, XrdOucBuffer *buff)
static int Login(XrdLink *Link, XrdCms::CmsLoginData &Data, int timeout=-1)
Definition: XrdCmsLogin.cc:125
XrdCmsResp * Rem(int msgid)
Definition: XrdCmsResp.cc:282
void Add(XrdCmsResp *rp)
Definition: XrdCmsResp.cc:250
void Purge()
Definition: XrdCmsResp.cc:267
void Reply(const char *Man, XrdCms::CmsRRHdr &rrhdr, XrdOucBuffer *netbuff)
Definition: XrdCmsResp.cc:138
static XrdCmsResp * Alloc(XrdOucErrInfo *erp, int msgid)
Definition: XrdCmsResp.cc:64
XrdLink * Connect(const char *host, int port, int opts=0, int timeout=-1)
Definition: XrdInet.cc:185
XrdOucBuffer * Alloc(int sz)
bool Resize(int newsz)
int BuffSize() const
char * Buffer() const
void Recycle()
Recycle the buffer. The buffer may be reused in the future.
void SetLen(int dataL, int dataO=0)
int setErrInfo(int code, const char *emsg)
const char * getErrUser()
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static void Snooze(int seconds)
Definition: XrdSysTimer.cc:168
kXR_char modifier
Definition: YProtocol.hh:85
@ kYR_data
Definition: YProtocol.hh:141
@ kYR_waitresp
Definition: YProtocol.hh:145
XrdSysError Say
kXR_char rrCode
Definition: YProtocol.hh:84
@ kYR_update
Definition: YProtocol.hh:115
@ kYR_status
Definition: YProtocol.hh:112
static const size_t Max_Error_Len