37 #include <netinet/in.h>
38 #include <sys/types.h>
83 {nodeP->Delete(
Cluster.STMutex);
86 if (!
Cluster.Drop(nodeEnt, nodeInst,
this))
delete this;
91 nodeEnt(0), nodeInst(0)
95 nodeEnt(nid), nodeInst(inst)
111 memset((
void *)NodeTab, 0,
sizeof(NodeTab));
112 memset((
void *)AltMans, (
int)
' ',
sizeof(AltMans));
121 peerMask = ~peerHost;
129 const char *theNID,
const char *theIF)
133 const char *act =
"";
137 int tmp, Slot, Free = -1, Bump1 = -1, Bump2 = -1, Bump3 = -1, aSet = 0;
139 bool SpecAlt = (Special && !(Status &
CMS_isSuper));
149 for (Slot = 0; Slot <
STMax; Slot++)
151 {
if (NodeTab[Slot]->isNode(lp, theNID, port))
break;
152 if (NodeTab[Slot]->isConn)
153 {
if (!NodeTab[Slot]->isPerm && Special)
156 if ( NodeTab[Slot]->isPerm)
157 {
if (Bump3 < 0 && Special) Bump3 = Slot;}
160 }
else if (Free < 0) Free = Slot;
165 {
if (NodeTab[Slot] && NodeTab[Slot]->isBound)
166 {
Say.
Emsg(
"Cluster", lp->
ID,
"already logged in.");
184 {
if (!(nP = AddAlt(cidP, lp, port, Status, sport, theNID, theIF)))
186 aSet = 1; Slot = nP->NodeID;
187 if (nP != NodeTab[Slot]) {Hidden =
true; act =
"Alternate ";}
194 {
if (Free >= 0) Slot = Free;
195 else {
if (Bump1 >= 0) Slot = Bump1;
196 else Slot = (Bump2 >= 0 ? Bump2 : Bump3);
199 "failed; too many subscribers.");
201 DEBUG(lp->
ID <<
" redirected; too many subscribers.");
206 if (Status &
CMS_isMan) {setAltMan(Slot, lp, sport); aSet=1;}
208 sendAList(NodeTab[Slot]->Link);
210 DEBUG(lp->
ID <<
" bumps " << NodeTab[Slot]->Ident <<
" #" <<Slot);
211 NodeTab[Slot]->Lock();
212 Remove(
"redirected", NodeTab[Slot], -1);
215 NodeTab[Slot] = nP =
new XrdCmsNode(lp, theIF, theNID, port, 0, Slot);
217 if ((cidP->
AddNode(nP, SpecAlt))) nP->cidP = cidP;
218 else {
delete nP; NodeTab[Slot] = 0;
return 0;}
227 if (!aSet && (Status &
CMS_isSuper)) setAltMan(Slot, lp, sport);
228 if (Slot > STHi) STHi = Slot;
236 nP->subsPort = sport;
245 }
else nP->
isMan |= 0x02;
249 if (nP->
isPeer) peerHost |= nP->NodeMask;
250 else peerHost &= ~nP->NodeMask;
251 peerMask = ~peerHost;
256 {
DEBUG(act <<nP->
Ident <<
" to cluster " <<nP->myNID <<
" slot "
257 <<Slot <<
'.' <<nP->Instance <<
" (nodecnt=" <<NodeCnt
281 int port,
int Status,
int sport,
282 const char *theNID,
const char *theIF)
287 int slot = cidP->Slot();
292 {
Say.
Emsg(epname, lp->ID,
"already logged in.");
299 {nP =
new XrdCmsNode(lp, theIF, theNID, port, 0, slot);
300 if (!(cidP->
AddNode(nP,
true))) {
delete nP; nP = 0;}
306 {
Say.
Emsg(epname,
"Add alternate manager", lp->ID,
307 "failed; too many subscribers.");
313 if ((pP = NodeTab[slot]) && !(pP->
isBound))
314 {setAltMan(nP->NodeID, nP->Link, sport);
334 const char *etxt =
"blacklisted.";
344 for (i = 0; i <= STHi; i++)
345 {
if ((nP = NodeTab[i]))
354 etxt =
"blacklisted; redirect unsupported.";
355 else etxt =
"blacklisted with redirect.";
357 nP->
Send((
char *)&discRequest,
sizeof(discRequest));
362 Say.
Emsg(
"Manager", nP->
Name(),
"removed from blacklist.");
375 int iovcnt,
int iotot)
385 bmask = smask & peerMask;
391 for (i = 0; i <= STHi; i++)
392 {
if ((nP = NodeTab[i]) && nP->
isNode(bmask))
396 if (nP->
Send(iod, iovcnt, iotot) < 0)
397 {unQueried |= nP->
Mask();
412 char *Data,
int Dlen)
414 struct iovec ioV[3], *iovP = &ioV[1];
421 Blen =
XrdOucPup::Pack(&iovP, Data, Temp, (Dlen ? strlen(Data)+1 : Dlen));
422 Hdr.
datalen = htons(
static_cast<unsigned short>(Blen));
426 ioV[0].iov_base = (
char *)&Hdr; ioV[0].iov_len =
sizeof(Hdr);
427 return Broadcast(smask, ioV, 3, Blen+
sizeof(Hdr));
433 void *Data,
int Dlen)
435 struct iovec ioV[2] = {{(
char *)&Hdr,
sizeof(Hdr)},
436 {(
char *)Data, (
size_t)Dlen}};
440 Hdr.
datalen = htons(
static_cast<unsigned short>(Dlen));
441 return Broadcast(smask, ioV, 2, Dlen+
sizeof(Hdr));
451 void *Data,
int Dlen)
454 static int Start = 0;
456 struct iovec ioV[2] = {{(
char *)&Hdr,
sizeof(Hdr)},
457 {(
char *)Data, (
size_t)Dlen}};
458 int i, Beg, Fin, ioTot = Dlen+
sizeof(Hdr);
462 Hdr.
datalen = htons(
static_cast<unsigned short>(Dlen));
468 Beg = Start = (Start <= STHi ? Start+1 : 0);
475 do{
for (i = Beg; i <= Fin; i++)
476 {
if ((nP = NodeTab[i]) && nP->
isNode(Who))
480 if (nP->
Send(ioV, 2, ioTot) >= 0) {nP->
unRef();
return 1;}
487 Fin = Beg-1; Beg = 0;
512 for (i = 0; i <= STHi; i++)
513 if ((nP = NodeTab[i]) && nP->
isNode(addr))
514 {smask = nP->NodeMask;
break;}
541 bool retName = (
opts & LS_IDNT) != 0;
542 bool retAny = (
opts & LS_ANY ) != 0;
543 bool retDest = retName || (
opts & LS_IPO);
549 for (i = 0; i <= STHi; i++)
550 if ((nP=NodeTab[i]) && (nP->NodeMask & mask))
553 {
if (nP->netIF.
HasDest(ifType)) ifGet = ifType;
554 else if (!retAny)
continue;
556 if (!nP->netIF.
HasDest(ifGet))
continue;
560 if (retDest) destLen = nP->netIF.
GetPublicDest(sip->Ident, iSize);
562 else {strcpy(sip->Ident, nP->myName); destLen = nP->myNlen;}
563 if (!destLen) {
delete sip;
continue;}
565 sip->IdentLen = destLen;
566 sip->Mask = nP->NodeMask;
567 sip->Id = nP->NodeID;
568 sip->Port = nP->netIF.
Port();
569 sip->RefTotW = nP->RefTotW;
570 sip->RefTotR = nP->RefTotR;
571 sip->Shrin = nP->Shrin;
572 sip->Share = nP->Share;
606 else {
if (*(Sel.
Path.
Val+1) ==
'\0')
607 {Sel.
Vec.hf = ~0LL; Sel.
Vec.pf = Sel.
Vec.wf = 0;
616 {Sel.
Vec.hf = Sel.
Vec.pf = Sel.
Vec.wf = 0;
640 amask = pmask = pinfo.
rovec;
643 if (!(retc = SelDFS(Sel, amask, pmask, smask, 1)))
646 if (retc < 0)
return NotFound;
658 qfVec = pinfo.
rovec; Sel.
Vec.hf = 0;
659 }
else qfVec = Sel.
Vec.bf;
663 if ((!qfVec && retc >= 0) || (Sel.
Vec.hf && Sel.
InfoP)) retc = 0;
687 struct iovec ioV[] = {{(
char *)&
Usage,
sizeof(
Usage)}};
688 int ioVnum =
sizeof(ioV)/
sizeof(
struct iovec);
689 int ioVtot =
sizeof(
Usage);
697 Broadcast(allNodes, ioV, ioVnum, ioVtot);
709 int snooze_interval = 60, snooze_total = 0;
710 int rCnt = 0, wCnt = 0;
711 bool resetW, resetR, resetRW;
718 int totR = 0, totW = 0;
721 for (
int i = 0; i <= STHi; i++)
722 {
if ((nP = NodeTab[i]))
723 {totR += nP->RefTotR;
729 rCnt += (totR - SelRtot); SelRtot = totR;
730 wCnt += (totW - SelWtot); SelWtot = totW;
731 snooze_total += snooze_interval;
735 resetRW = (snooze_total >=
Config.
RefReset && (resetW || resetR));
738 if (resetR) rCnt = 0;
739 if (resetW) wCnt = 0;
778 : myMutex(mtx), myNode(node), hasLK(immed < 0),
781 myNID = node->
ID(myInst);
794 myNode->DropTime = 0;
799 if (!hasLK) myMutex->
UnLock();
801 } LockHandler(&STMutex, theNode, immed);
804 int Inst, NodeID = theNode->
ID(Inst);
811 if (LockHandler.myNID != NodeID || LockHandler.myInst != Inst)
812 {
Say.
Emsg(
"Manager", LockHandler.myIdent,
"removal aborted.");
813 DEBUG(LockHandler.myIdent <<
" node " <<NodeID <<
'.' <<Inst <<
" != "
814 << LockHandler.myNID <<
'.' <<LockHandler.myInst <<
" at entry.");
828 {theNode->
Disc(reason, 0);
835 if (!(NodeTab[NodeID] == theNode))
836 {
const char *why = (theNode->
isMan ?
"dropped as alternate."
837 :
"dropped and redirected.");
839 LockHandler.doDrop =
true;
859 if (theNode->
isMan && theNode->cidP && !(theNode->cidP->
IsSingle())
860 && (altNode = theNode->cidP->
RemNode(theNode)))
861 {
if (altNode->
isBound) NodeCnt++;
862 NodeTab[NodeID] = altNode;
867 setAltMan(altNode->NodeID, altNode->Link, altNode->subsPort);
869 LockHandler.doDrop =
true;
878 LockHandler.myNode = 0;
887 if (theNode->DropJob) theNode->DropJob->
nodeInst = Inst;
888 else theNode->DropJob =
new XrdCmsDrop(NodeID, Inst);
893 Say.
Emsg(
"Manager", theNode->
Ident,
"scheduled for removal;", reason);
894 else DEBUG(theNode->
Ident <<
" node " <<NodeID <<
'.' <<Inst);
904 bool doAll (nMask == 0);
908 if (!isLocked) STMutex.ReadLock();
913 for (
int i = 0; i <= STHi; i++)
914 {
if ((nP = NodeTab[i]) && (doAll || nP->
isNode(nMask)))
917 nP->Shrem = nP->Share;
923 if (!isLocked) STMutex.UnLock();
941 {isRW = 1; Amode =
"write";
947 else {isRW = 0; Amode =
"read"; fRD = 1;}
953 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
954 "No servers %s %s access to the file",
970 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
971 "Too many DFS %s attempts; operation terminated", Amode)+1;
979 if (!(retc = SelDFS(Sel, amask, pmask, smask, isRW)))
981 if (retc < 0)
return retc;
982 }
else if (noSel)
return 0;
983 return SelNode(Sel, pmask, smask);
999 {pmask = amask & ~(Sel.
Vec.hf | Sel.
Vec.bf); smask = 0;
1000 if (!pmask && !Sel.
Vec.bf)
return SelFail(Sel,eNoRep);
1002 else if (Sel.
Vec.bf) pmask = smask = 0;
1003 else if (Sel.
Vec.hf)
1007 && maxBits(Sel.
Vec.hf,2))
return SelFail(Sel,eDups);
1009 != (Sel.
Vec.hf & pinfo.
rovec))
return SelFail(Sel,eROfs);
1011 if (!(pmask = Sel.
Vec.hf & amask))
return SelFail(Sel,eNoSel);
1015 {pmask = amask; smask = 0;}
1016 else if ((smask = pinfo.
ssvec & amask)) pmask = 0;
1017 else pmask = smask = 0;
1019 pmask = Sel.
Vec.hf & amask;
1021 else smask = (retc < 0 ? 0 : pinfo.
ssvec & amask);
1027 Sel.
Vec.hf = Sel.
Vec.pf = pmask = smask = 0;
1033 dowt = (!pmask && !smask);
1047 if (dowt)
return retc;
1048 }
else if (dowt && retc < 0 && !noSel)
1060 if (noSel)
return 0;
1064 if (dowt)
return Unuseable(Sel);
1072 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
1073 "Too many attempts to stage %s access to the file", Amode)+1;
1081 return SelNode(Sel, pmask, smask);
1087 int isrw,
int isMulti,
int ifWant)
1089 static const SMask_t smLow(255);
1098 if (!pmask)
return 0;
1118 {STMutex.ReadLock();
1121 : SelbyLoadR(pmask, selR));
1132 do {
if (!(tmask = pmask & smLow)) Snum += 8;
1133 else {
while((tmask = tmask>>1)) Snum++;
break;}
1134 }
while((pmask = pmask >> 8));
1139 if ((nP = NodeTab[Snum]))
1140 {
if (nP->
isBad) nP = 0;
1146 else {nP->RefTotW++; nP->RefW++;}
1147 else {nP->RefTotR++; nP->RefR++;}
1170 const char *etext, *Item =
"file";
1174 etext =
"Unable to create %s; it already exists.";
1177 case eROfs: etext =
"Unable to modify %s; r/o copy already exists.";
1180 case eDups: etext =
"Unable to modify %s; multiple copies exist.";
1183 case eNoRep: etext =
"Unable to replicate %s; no new sites available.";
1186 case eNoSel:
if (Sel.
Vec.hf & Sel.
nmask)
1187 {etext =
"Unable to access %s; eligible servers shunned.";
1191 {etext =
"Unable to write %s; r/w exports not found.";
1193 etext =
"Unable to access %s; it does not exist.";
1199 default: etext =
"Unable to access %s; it does not exist.";
1204 int n = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data), etext, Item);
1205 if (n < (
int)
sizeof(Sel.
Resp.Data)) Sel.
Resp.DLen = n+1;
1206 else Sel.
Resp.DLen =
sizeof(Sel.
Resp.Data);
1225 bmask = smask & peerMask;
1229 for (i = 0; i <= STHi; i++)
1231 {
if (doAll || !sData.
Total)
1257 static const char statfmt1[] =
"<stats id=\"cms\">"
1258 "<role>%s</role></stats>";
1263 if (!bfr)
return sizeof(statfmt1) + 8;
1269 if ((bln -= mlen) <= 0)
return 0;
1279 static const char statfmt0[] =
"</stats>";
1280 static const char statfmt1[] =
"<stats id=\"cmsm\">"
1281 "<role>%s</role><sel><t>%lld</t><r>%lld</r><w>%lld</w></sel>"
1283 static const char statfmt2[] =
"<stats id=\"%d\">"
1284 "<host>%s</host><role>%s</role>"
1285 "<run>%s</run><ref><r>%d</r><w>%d</w></ref>%s</stats>";
1286 static const char statfmt3[] =
"<shr>%d<use>%d</use></shr>";
1287 static const char statfmt4[] =
"</node>";
1288 static const char statfmt5[] =
1289 "<frq><add>%lld<d>%lld</d></add><rsp>%lld<m>%lld</m></rsp>"
1290 "<lf>%lld</lf><ls>%lld</ls><rf>%lld</rf><rs>%lld</rs></frq>";
1298 int mlen, tlen, n = 0;
1299 char shrBuff[80],
stat[6], *stp;
1307 while((xsp = sp)) {sp = sp->
next;
delete xsp;}
1314 {n =
sizeof(statfmt0) +
1315 sizeof(statfmt1) + 12*3 + 3 + 3 +
1316 (
sizeof(statfmt2) + 10*2 + 256 + 16) *
STMax +
sizeof(statfmt4);
1317 if (AddShr) n +=
sizeof(statfmt3) + 12;
1318 if (AddFrq) n +=
sizeof(statfmt4) + (10*8);
1329 while(sp) {n++; sp = sp->
next;}
1334 long long lclTcnt = SelTcnt, lclRtot = SelRtot, lclWtot = SelWtot;
1335 mlen = snprintf(bfr, bln, statfmt1,
1338 if ((bln -= mlen) <= 0)
return 0;
1339 tlen = mlen; bfr += mlen; n = 0; *shrBuff = 0;
1341 while(sp && bln > 0)
1350 if (AddShr) snprintf(shrBuff,
sizeof(shrBuff), statfmt3,
1352 mlen = snprintf(bfr, bln, statfmt2, n, sp->
Ident,
1355 bfr += mlen; bln -= mlen; tlen += mlen;
1359 if (bln <= (
int)
sizeof(statfmt4))
return 0;
1360 strcpy(bfr, statfmt4); mlen =
sizeof(statfmt4) - 1;
1361 bfr += mlen; bln -= mlen; tlen += mlen;
1363 if (AddFrq && bln > 0)
1364 {mlen = snprintf(bfr, bln, statfmt5, Frq.
Add2Q, Frq.
PBack, Frq.
Resp,
1366 bfr += mlen; bln -= mlen; tlen += mlen;
1371 if (sp || bln < (
int)
sizeof(statfmt0))
return 0;
1372 strcpy(bfr, statfmt0);
1373 return tlen +
sizeof(statfmt0) - 1;
1387 ?
"no eligible servers reachable for"
1388 :
"no eligible servers for");
1391 selR.
reason =
"no eligible servers have space for";
1394 selR.
reason =
"eligible servers overloaded for";
1397 selR.
reason =
"eligible servers suspended for";
1400 selR.
reason =
"eligible servers offline for";
1403 selR.
reason =
"server selection error for";
1418 int XrdCmsCluster::Drop(
int sent,
int sinst,
XrdCmsDrop *djp)
1426 if (djp) STMutex.WriteLock();
1430 if (!(nP = NodeTab[sent]) || nP->Inst() != sinst)
1431 {
if (nP && djp == nP->DropJob) {nP->DropJob = 0; nP->DropTime = 0;}
1432 if (djp) STMutex.UnLock();
1433 DEBUG(sent <<
'.' <<sinst <<
" cancelled.");
1439 if (djp && time(0) < nP->DropTime)
1441 if (djp) STMutex.UnLock();
1459 if (nP->
isPeer) {peerHost &= nP->NodeMask; peerMask = ~peerHost;}
1464 {memset((
void *)&AltMans[sent*AltSize], (
int)
' ', AltSize);
1465 if (sent == AltMent)
1467 while(AltMent >= 0 && NodeTab[AltMent]
1468 && !NodeTab[AltMent]->isMan) AltMent--;
1469 if (AltMent < 0) AltMend = AltMans;
1470 else AltMend = AltMans + ((AltMent+1)*AltSize);
1476 if (sent == STHi)
while(STHi >= 0 && !NodeTab[STHi]) STHi--;
1480 if (nP->NodeMask)
Cache.
Drop(nP->NodeMask, sent, STHi);
1486 if (djp) {STMutex.UnLock(); nP->
Delete(STMutex);}
1491 Say.
Emsg(
"Drop_Node", hname,
"dropped.");
1499 int XrdCmsCluster::Multiple(
SMask_t mVec)
1501 static const unsigned long long Left32 = 0xffffffff00000000LL;
1502 static const unsigned long long Right32 = 0x00000000ffffffffLL;
1503 static const unsigned long long Left16 = 0x00000000ffff0000LL;
1504 static const unsigned long long Right16 = 0x000000000000ffffLL;
1505 static const unsigned long long Left08 = 0x000000000000ff00LL;
1506 static const unsigned long long Right08 = 0x00000000000000ffLL;
1507 static const unsigned long long Left04 = 0x00000000000000f0LL;
1508 static const unsigned long long Right04 = 0x000000000000000fLL;
1510 static const int isMult[16] = {0,0,0,1,0,1,1,1,0,1,1,1,1,1,1,1};
1512 if (mVec & Left32) {
if (mVec & Right32)
return 1;
1513 else mVec = mVec >> 32LL;
1515 if (mVec & Left16) {
if (mVec & Right16)
return 1;
1516 else mVec = mVec >> 16LL;
1518 if (mVec & Left08) {
if (mVec & Right08)
return 1;
1519 else mVec = mVec >> 8LL;
1521 if (mVec & Left04) {
if (mVec & Right04)
return 1;
1522 else mVec = mVec >> 4LL;
1524 return isMult[mVec];
1531 bool XrdCmsCluster::maxBits(
SMask_t mVec,
int mbits)
1538 {mVec &= (mVec - 1);
1540 if (count >= mbits)
return true;
1552 void XrdCmsCluster::Record(
char *path,
const char *reason,
bool force)
1555 static
int msgcnt = 255;
1559 DEBUG(reason <<path);
1561 msgcnt++; skipmsg = msgcnt & (force ? 0x0f : 0xff);
1564 if (!skipmsg)
Say.Emsg(epname, "client deferred;", reason, path);
1575 int affsel = 1, count = 0, isalt = 0, pass = 2;
1583 selR.needNet =
XrdNetIF::Mask(nType);
1589 ? Sel.AltHash : Sel.Path.Hash);
1591 for (count = 0; sVec; count++) sVec &= (sVec - 1);
1592 if (count > 1) selR.
selPack = affsel = (theHash % count) + 1;
1608 mask = pmask & peerMask;
1612 ? SelbyRef(mask,selR)
1614 : SelbyLoadR(pmask, selR));
1625 {
TRACE(Redirect,
"affinity " <<affsel <<
'/' <<count <<
'/'
1627 <<nP->
Name() <<
' ' <<Sel.Path.Val);
1635 Sel.Resp.DLen = nP->netIF.
GetPublicName(Sel.Resp.Data, Sel.Resp.Port);
1636 if (!Sel.Resp.DLen) {nP->
UnLock();
return Unreachable(Sel,
false);}
1637 Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
1641 if (Sel.iovN && Sel.iovP) nP->
Send(Sel.iovP, Sel.iovN);
1657 {
if (isalt) act = (Sel.iovN ?
" staging " :
" assigned ");
1658 else act =
" serving ";
1660 TRACE(Stage, Sel.Resp.Data <<act <<Sel.Path.Val);
1669 Record(Sel.Path.Val,
"insufficient number of nodes",
true);
1677 Record(Sel.Path.Val, selR.
reason);
1686 {
const char *reason1 = selR.
reason;
1687 int delay1 = selR.
delay;
1688 bool noNet = selR.
xNoNet;
1689 if ((mask = (pmask | amask) & peerHost)) nP = SelbyCost(mask, selR);
1692 Sel.Resp.DLen = nP->netIF.
GetPublicName(Sel.Resp.Data,Sel.Resp.Port);
1693 if (!Sel.Resp.DLen) {nP->
UnLock();
return Unreachable(Sel,
false);}
1694 Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
1695 if (Sel.iovN && Sel.iovP) nP->
Send(Sel.iovP, Sel.iovN);
1697 TRACE(Stage,
"Peer " <<Sel.Resp.Data <<
" handling " <<Sel.Path.Val);
1711 {Record(Sel.Path.Val, selR.
reason);
1717 if (selR.
xNoNet)
return Unreachable(Sel,
true);
1718 return Unuseable(Sel);
1728 #define RefCount(sP, sPMulti, NeedSpace) \
1729 if (NeedSpace) {sP->RefTotW++; sP->RefW++;} \
1730 else {sP->RefTotR++; sP->RefR++;} \
1731 if (sPMulti && sP->Share && !sP->Shrem--) \
1732 {sP->RefW += sP->Shrip; sP->RefR += sP->Shrip; \
1733 sP->Shrem = sP->Share; sP->Shrin++; \
1752 selR.
Reset(); SelTcnt++;
1753 for (
int i = 0; i <= STHi; i++)
1754 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1758 if (np->
isBad) {selR.
xSusp =
true;
continue;}
1761 else{
if (abs(sp->myCost - np->myCost) <=
Config.
P_fuzz)
1770 else if (sp->RefR > np->RefR) sp=np;
1772 else if (sp->myCost > np->myCost) sp=np;
1779 if (!sp)
return calcDelay(selR);
1797 selR.
Reset(); SelTcnt++;
1798 for (
int i = 0; i <= STHi; i++)
1799 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1803 if (np->
isBad) {selR.
xSusp =
true;
continue;}
1807 {selR.
xFull =
true;
continue;}
1812 else if (sp->myMass > np->myMass) sp=np;
1819 else if (sp->RefR > np->RefR) sp=np;
1821 else if (sp->myLoad > np->myLoad) sp=np;
1829 if (!sp)
return calcDelay(selR);
1842 static std::random_device rand_dev;
1843 static std::default_random_engine generator(rand_dev());
1855 for (
int i = 0; i <= STHi; ++i) {
1858 if (!((np = NodeTab[i]) && (np->NodeMask & mask)))
1866 if (np->
isBad) { selR.
xSusp =
true;
continue; }
1878 NodeWeight[i] = totWeight;
1881 std::uniform_int_distribution<int> distr(1, totWeight);
1882 int selected = distr(generator);
1884 for (
int i = 0; i <= STHi; ++i) {
1885 if (NodeWeight[i] < selected)
1892 return sp ? sp : calcDelay(selR);
1908 selR.
Reset(); SelTcnt++;
1909 for (
int i = 0; i <= STHi; i++)
1910 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1914 if (np->
isBad) {selR.
xSusp =
true;
continue;}
1917 {selR.
xFull =
true;
continue;}
1926 else if (sp->RefR > np->RefR) sp=np;
1932 if (!sp)
return calcDelay(selR);
1945 static const SMask_t allNodes(~0);
1965 Sel.
Vec.hf = amask; Sel.
Vec.wf = (isRW ? amask : 0);
1983 if (isRW && Sel.
Vec.hf)
1993 return SelFail(Sel, eNoEnt);
2003 void XrdCmsCluster::sendAList(
XrdLink *lp)
2006 static int HdrSize =
sizeof(Req.
Hdr) +
sizeof(Req.
sLen);
2007 static char *AltNext = AltMans;
2008 static struct iovec
iov[4] = {{(caddr_t)&Req, (
size_t)HdrSize},
2011 {(caddr_t)
"\0", 1}};
2016 AltNext = AltNext + AltSize;
2017 if (AltNext >= AltMend)
2020 iov[2].iov_len = dlen = AltMend - AltMans;
2022 iov[1].iov_base = (caddr_t)AltNext;
2023 iov[1].iov_len = AltMend - AltNext;
2024 iov[2].iov_len = AltNext - AltMans;
2025 dlen =
iov[1].iov_len +
iov[2].iov_len;
2031 Req.
Hdr.
datalen = htons(
static_cast<unsigned short>(dlen+
sizeof(Req.
sLen)));
2032 Req.
sLen = htons(
static_cast<unsigned short>(dlen));
2036 lp->
Send(
iov, 4, dlen+HdrSize);
2045 void XrdCmsCluster::setAltMan(
int snum,
XrdLink *lp,
int port)
2048 char *ap = &AltMans[snum*AltSize];
2054 memset(ap,
int(
' '), AltSize);
2068 if (ap >= AltMend) {AltMend = ap + AltSize; AltMent = snum;}
2082 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
2083 "No servers are reachable via %s network to %s%s the file.",
2086 Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
2087 "Eligible server is unreachable via %s network to %s%s the file.",
2104 int n = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data),
2105 "No servers are available to %s%s the %s.",
2106 Xmode, Amode, EType);
2107 if (n < (
int)
sizeof(Sel.
Resp.Data)) Sel.
Resp.DLen = n+1;
2108 else Sel.
Resp.DLen =
sizeof(Sel.
Resp.Data);
void Usage(const char *msg)
#define RefCount(sP, sPMulti, NeedSpace)
unsigned long long SMask_t
int stat(const char *path, struct stat *buf)
int Exists(XrdCmsRRData &Arg, XrdCmsPInfo &Who, int noLim=0)
static int Present(const char *hName, XrdOucTList *bList=0, char *rbuff=0, int rblen=0)
int GetFile(XrdCmsSelect &Sel, SMask_t mask)
int AddFile(XrdCmsSelect &Sel, SMask_t mask)
int UnkFile(XrdCmsSelect &Sel, SMask_t mask)
void Drop(SMask_t mask, int SNum, int xHi)
int WT4File(XrdCmsSelect &Sel, SMask_t mask)
static XrdCmsClustID * AddID(const char *cID)
static SMask_t Mask(const char *cID)
XrdCmsNode * RemNode(XrdCmsNode *nP)
static XrdCmsClustID * Find(const char *cID)
bool AddNode(XrdCmsNode *nP, bool isMan)
SMask_t getMask(const XrdNetAddr *addr)
void Space(XrdCms::SpaceData &sData, SMask_t smask)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
int Select(XrdCmsSelect &Sel)
int Locate(XrdCmsSelect &Sel)
void ResetRef(SMask_t smask, bool isLocked=false)
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
XrdCmsSelected * List(SMask_t mask, CmsLSOpts opts, bool &oksel)
XrdCmsNode * Add(XrdLink *lp, int dport, int Status, int sport, const char *theNID, const char *theIF)
void Remove(XrdCmsNode *theNode)
int Stats(char *bfr, int bln)
virtual void BlackList(XrdOucTList *blP)
int Statt(char *bfr, int bln)
static const int RepStat_shr
static const int RepStat_frq
XrdCmsDrop(int nid, int inst)
XrdCmsDrop(XrdCmsNode *nP)
static const char allowsRW
void Delete(XrdSysRWLock &gMutex)
void n2gLock(XrdSysRWLock &gMutex, bool rdlock=false)
int Send(const char *buff, int blen=0)
static const char allowsSS
static const char isDisabled
int isNode(SMask_t smask)
void g2nLock(XrdSysRWLock &gMutex)
static const char isSuspend
void Disc(const char *reason=0, int needLock=1)
void setName(XrdLink *lnkp, const char *theIF, int port)
static const char isDoomed
static const char isBlisted
int Find(const char *pname, XrdCmsPInfo &masks)
void Statistics(Info &Data)
static const char * Type(RoleID rid)
struct XrdCmsSelect::@93 Resp
struct XrdCmsSelect::@92 Vec
static const int IdentSize
void Update(StateType StateT, int ActivVal, int StageVal=0)
const XrdNetAddr * NetAddr() const
char * ID
Pointer to the client's link identity.
int Send(const char *buff, int blen)
static const int prefipv4
Use if mapped IPV4 actual format.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
@ fmtName
Hostname if it is resolvable o/w use fmtAddr.
int GetPublicName(char *nbuff, int &nport)
bool HasDest(ifType ifT=PublicV6)
static const char * Name(ifType ifT)
int GetPublicDest(char *dest, size_t dlen)
ifType
The enum that is used to index into ifData to get appropriate interface.
static int Pack(struct iovec **, const char *, unsigned short &buff)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
static struct XrdCl::None none
ZipListImpl< false > List(Ctx< ZipArchive > zip)
Factory for creating ZipStatImpl objects.
static const unsigned char kYR_Version
static const int CMS_isSuper
static const int CMS_noStage
static const int CMS_isMan
static const int CMS_isPeer
static const int CMS_Suspend