54 XrdInet *XrdCmsClientMan::Network = 0;
58 const char *XrdCmsClientMan::ConfigFN = 0;
67 int cw,
int nr,
int rw,
int rd)
71 static int Instance = 0;
75 if ((dot = index(Host,
'.')))
76 {*dot =
'\0'; HPfx = strdup(Host); *dot =
'.';}
77 else HPfx = strdup(Host);
91 lastUpdt= lastTOut = time(0);
98 if (dally < 3) dally = 3;
99 else if (dally > 10) dally = 10;
104 manMask = 1<<Instance++;
114 if (Link) Link->
Close();
115 if (Host) free(Host);
116 if (HPfx) free(HPfx);
117 if (NetBuff) NetBuff->
Recycle();
132 {
Say.
Emsg(
"Manager", Host,
"supplied invalid waitr msgid");
133 Resp.
setErrInfo(EILSEQ,
"redirector protocol error");
150 if (msgid < maxMsgID) RespQ.
Purge();
173 if (!mlen) mlen = strlen(msg);
181 {
if (!(allok = Link->
Send(msg, mlen) > 0))
207 {
if (!(allok = Link->
Send(
iov, iovcnt, iotot) > 0))
240 if (Response.
modifier & CmsResponse::kYR_async) relayResp();
248 if (Link) {Link->
Close(); Link = 0;}
249 Active = 0; Suspend = 1;
254 Say.
Emsg(
"ClientMan",
"Disconnected from", Host);
280 {
if (Active == RecvCnt)
281 {
if ((time(0)-lastTOut) >= repWait)
284 {Active = 0; Silent = 0; Suspend = 1;
285 if (Link && iMan == manInst)
287 manInst++; lClose =
true;
289 }
else if (Silent & 0x02 && repWait < repWMax) repWait++;
291 }
else {Active = RecvCnt; Silent = 0; lastTOut = time(0);}
298 theDelay = inQ * qTime;
301 theDelay = theDelay/1000 + (theDelay % 1000 ? 1 : 0);
302 if (theDelay < minDelay) theDelay = minDelay;
303 if (theDelay > maxDelay) theDelay = maxDelay;
307 TRACE(Redirect, user <<
" no resp from inst " <<iMan <<
" of "<<HPfx
309 <<
" inst " <<xMan <<(lClose ?
" closed" :
" steady")
310 <<
"; inQ " <<inQ <<
" delay " <<theDelay <<
" path=" <<path);
321 int XrdCmsClientMan::Hookup()
326 char buff[256], hnBuff[264*2+1];
328 int rc, oldWait, tries = 12,
opts = 0;
338 const char *hn = getenv(
"XRDHOST");
339 const char *override_hn = getenv(
"OVERRIDEXRDHOST");
340 if (hn && override_hn)
341 {snprintf(hnBuff,
sizeof(hnBuff),
"myHN=%s&ovHN=%s", hn, override_hn);
345 {snprintf(hnBuff,
sizeof(hnBuff),
"myHN=%s", hn);
352 do {
while(!(lp = Network->
Connect(Host, Port,
opts)))
355 else {
opts = 0; tries = 12;}
359 memset(&Data, 0,
sizeof(Data));
361 Data.
Mode = CmsLoginData::kYR_director;
362 Data.
HoldTime =
static_cast<int>(getpid());
371 doDebug |= (Data.
Mode & CmsLoginData::kYR_debug ? manMask : 0);
382 Suspend = (Data.
Mode & CmsLoginData::kYR_suspend);
387 if ((oldWait = (repWait*20/100)) < 2) oldWait = 2;
388 if (Data.
HoldTime > repWMax*1000) repWait = repWMax;
389 else if (Data.
HoldTime <= 0) repWait = repWMax;
391 repWait = (repWait/1000) + (repWait % 1000 ? 1 : 0);
392 if (repWait > repWMax) repWait = repWMax;
393 else if (repWait < oldWait) repWait = oldWait;
401 sprintf(buff,
"v %d", Data.
Version);
402 Say.
Emsg(
"ClientMan", (Suspend ?
"Connected to suspended" :
"Connected to"),
404 DEBUG(Host <<
" qt=" <<qTime <<
"ms rw=" <<repWait);
412 int XrdCmsClientMan::Receive()
420 if (Link->RecvAll((
char *)&Response, sizeof(Response)) > 0)
421 {
int dlen =
static_cast<int>(ntohs(Response.datalen));
423 DEBUG(Link->
Name() <<
' ' <<dlen <<
" bytes on " <<Response.streamid);
427 Say.
Emsg(
"ClientMan",
"Excessive msg length from", Host);
428 else {NetBuff->
SetLen(dlen);
439 void XrdCmsClientMan::relayResp()
446 if (!(rp = RespQ.
Rem(Response.streamid)))
447 {
DEBUG(Host <<
" replied to non-existent request; id=" <<Response.streamid);
453 rp->
Reply(HPfx, Response, NetBuff);
464 int XrdCmsClientMan::chkStatus()
475 if ((nowTime - lastUpdt) >= 30)
477 if (Active) Link->
Send((
char *)&Updt,
sizeof(Updt));
487 void XrdCmsClientMan::setStatus()
490 const char *State = 0, *Event =
"?";
494 if (Response.modifier & CmsStatusRequest::kYR_Suspend)
496 if (!Suspend) {Suspend = 1; State =
"suspended";}
498 else if (Response.modifier & CmsStatusRequest::kYR_Resume)
500 if (Suspend) {Suspend = 0; State =
"resumed";}
504 DEBUG(Host <<
" sent " <<Event <<
" event");
505 if (State)
Say.
Emsg(
"setStatus",
"Manager", Host, State);
int Send(unsigned int &iMan, char *msg, int mlen=0)
int delayResp(XrdOucErrInfo &Resp)
int whatsUp(const char *user, const char *path, unsigned int iMan)
XrdCmsClientMan(char *host, int port, int cw, int nr, int rw, int rd)
static int Reply(const char *Man, XrdCms::CmsRRHdr &hdr, XrdOucBuffer *buff)
static int Login(XrdLink *Link, XrdCms::CmsLoginData &Data, int timeout=-1)
XrdCmsResp * Rem(int msgid)
void Reply(const char *Man, XrdCms::CmsRRHdr &rrhdr, XrdOucBuffer *netbuff)
static XrdCmsResp * Alloc(XrdOucErrInfo *erp, int msgid)
XrdLink * Connect(const char *host, int port, int opts=0, int timeout=-1)
int Close(bool defer=false)
int RecvAll(char *buff, int blen, int timeout=-1)
int Send(const char *buff, int blen)
const char * Name() const
XrdOucBuffer * Alloc(int sz)
void Recycle()
Recycle the buffer. The buffer may be reused in the future.
void SetLen(int dataL, int dataO=0)
int setErrInfo(int code, const char *emsg)
const char * getErrUser()
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
static const size_t Max_Error_Len