34 #include <netinet/in.h>
35 #include <sys/types.h>
81 int XrdCmsNode::LastFree = 0;
88 const char *msrcmsg =
"Cluster does not support multi-source access.";
89 int msrclen = strlen(msrcmsg)+1;
90 const char *mtrymsg =
"Cluster retry limit exceeded.";
91 int mtrylen = strlen(mtrymsg)+1;
99 int port,
int lvl,
int id)
102 static const SMask_t smask_1(1);
106 NodeMask = (
id < 0 ? 0 : smask_1 << id);
108 isOffline= (lnkp == 0);
110 myNID = strdup(nid ? nid :
"?");
111 if ((myCID = index(myNID,
' '))) myCID++;
118 setName(lnkp, theIF, (nid ? port : 0));
135 if (cidP) {cidP->RemNode(
this); cidP = 0;}
136 if (Ident) free(Ident);
137 if (myNID) free(myNID);
138 if (myName)free(myName);
148 const char *hname = lnkp->
Host();
153 {
if (!strcmp(myName,hname) && port == netIF.Port()
154 && netID.Same(lnkp->
NetAddr()))
return;
165 if (theIF && !netIF.InDomain(&netID)) theIF = 0;
166 netIF.SetIF(&netID, theIF, port);
167 hasNet = netIF.Mask();
169 netIF.SetPublicName(hname);
173 myName = strdup(hname);
174 myNlen = strlen(hname);
176 if (!port) strcpy(buff, lnkp->
ID);
177 else sprintf(buff,
"%s:%d", lnkp->
ID, port);
178 if (Ident) free(Ident);
179 Ident = strdup(buff);
189 static const int warnIntvl = 60;
190 int totWait = 0, tmoWarn = 60;
206 DEBUG(Ident <<
" refs=" <<refCnt);
215 if (totWait >= tmoWarn)
216 {
unsigned int theCnt = refCnt;
218 tmoWarn += warnIntvl;
228 if (doDel)
delete this;
229 else {
char eBuff[256];
230 snprintf(eBuff,
sizeof(eBuff),
231 " (%p) delete timeout; node object lost!",
this);
232 Say.
Emsg(
"Delete", Ident, eBuff);
240 void XrdCmsNode::DeleteWarn(
unsigned int lkVal)
247 snprintf(eBuff,
sizeof(eBuff),
"delete sync stall; refs = %u", lkVal);
248 Say.
Emsg(
"Delete", Ident, eBuff);
262 if (needLock) nodeMutex.Lock();
269 {Link->setEtext(reason);
276 if (needLock) nodeMutex.UnLock();
293 DiskUtil =
static_cast<int>(Arg.dskUtil);
297 DEBUGR(DiskFree <<
"MB free; " <<DiskUtil <<
"% util");
321 if (!mode && !getMode(Arg.
Mode, mode))
return "invalid mode";
330 return (rc ? fsFail(Arg.
Ident,
"chmod", Arg.
Path, rc) : 0);
346 Say.
Emsg(
"Node", Link->Name(),
"requested a disconnect");
369 static const SMask_t allNodes(~0);
411 static const SMask_t allNodes(~0);
462 uint32_t pcpu, pnet, pxeq, pmem, ppag, pdsk;
486 DEBUGR(
"cpu=" <<pcpu <<
" net=" <<pnet <<
" xeq=" <<pxeq
487 <<
" mem=" <<pmem <<
" pag=" <<ppag <<
" dsk=" <<pdsk
488 <<
"% " <<DiskFree <<
"MB load=" <<myLoad <<
" mass=" <<myMass);
496 if (isRW && DiskFree != LastFree)
509 long long nRefs =
static_cast<long long>(RefTotW + RefTotR)*100;
510 long long sRefs =
static_cast<long long>(Share) * Shrin * 100;
511 int myShr = (Share ? Share : 100);
512 if (tRefs) {nRefs /= tRefs; sRefs /= tRefs;}
513 else nRefs = sRefs = 0;
514 snprintf(buff,
sizeof(buff)-1,
515 "load=%d; cpu=%d net=%d inq=%d mem=%d pag=%d dsk=%d utl=%d "
516 "shr=[%d %lld %lld] ref=[%d %d]",
517 myLoad, pcpu, pnet, pxeq, pmem, ppag, Arg.
dskFree, pdsk,
518 myShr, nRefs, sRefs, RefTotR+RefR, RefTotW+RefW);
519 Say.
Emsg(
"Node", Name(), buff);
541 struct iovec ioV[2] = {{(
char *)&Arg.
Request,
sizeof(Arg.
Request)},
544 char eBuff[128], theopts[8], *toP = theopts;
548 bool lsuniq =
false, oksel =
false, lsall = (*Arg.
Path ==
'*');
563 {lsuniq =
true; *toP++=
'u';}
591 reqInfo.
lsLU =
static_cast<char>(lsopts);
605 bytes =
sizeof(Resp.Val); Why =
"delay ";
610 bytes =
strlcpy(Resp.outbuff,
"No servers have access to the file",
611 sizeof(Resp.outbuff)) +
sizeof(Resp.Val) + 1;
613 }
else {Why =
"?"; bytes = 0;}
623 sprintf(eBuff,
"No servers are reachable via %s network",
628 eTxt =
"No servers have the file";
630 bytes =
strlcpy(Resp.outbuff, eTxt,
631 sizeof(Resp.outbuff)) +
sizeof(Resp.Val) + 1;
638 {Resp.Val = htonl(rc);
641 bytes = do_LocFmt(Resp.outbuff, sP, Sel.
Vec.pf, Sel.
Vec.wf, lsall,lsuniq)
642 +
sizeof(Resp.Val) + 1;
650 ioV[1].iov_len = bytes;
651 Link->Send(ioV, 2, bytes+
sizeof(Arg.
Request));
671 if (!lsall && lsuniq)
677 {
if (haverw) pP->
Status |= Skip;
678 else {
if (xP) xP->
Status |= Skip;
680 haverw = (pP->
Mask & wfVec) != 0;
694 if (sP->
Status & Hung) *oP = tolower(*oP);
695 *(oP+1) = (sP->
Mask & wfVec ?
'w' :
'r');
697 if (sP->
next) *oP++ =
' ';
698 pP = sP; sP = sP->
next;
delete pP;
702 {
if (!(sP->
Status & Skip))
704 if (sP->
Mask & pfVec) *oP = tolower(*oP);
705 *(oP+1) = (sP->
Mask & wfVec ?
'w' :
'r');
707 if (sP->
next) *oP++ =
' ';
709 pP = sP; sP = sP->
next;
delete pP;
738 if (!mode && !getMode(Arg.
Mode, mode))
return "invalid mode";
747 return (rc ? fsFail(Arg.
Ident,
"mkdir", Arg.
Path, rc) : 0);
770 if (!mode && !getMode(Arg.
Mode, mode))
return "invalid mode";
779 return (rc ? fsFail(Arg.
Ident,
"mkpath", Arg.
Path, rc) : 0);
791 static const SMask_t allNodes(~0);
816 {
if (rc > 0) {Arg.waitVal = rc;
return "!mv";}
817 else if (Sel2.
Vec.hf)
819 return "target file exists";
835 return (rc ? fsFail(Arg.
Ident,
"mv", Arg.
Path, rc) : 0);
851 if (isBad & isDoomed)
return ".redirected";
852 Link->Send((
char *)&pongIt,
sizeof(pongIt));
921 static const SMask_t allNodes(~0);
944 return (rc ? fsFail(Arg.
Ident,
"rm", Arg.
Path, rc) : 0);
956 static const SMask_t allNodes(~0);
979 return (rc ? fsFail(Arg.
Ident,
"rmdir", Arg.
Path, rc) : 0);
987 char *Avoid,
bool &doRedir)
996 do {
if ((Comma = index(Avoid,
','))) *Comma =
'\0';
998 else if (!avoidAddr.
Set(Avoid,0))
1000 Avoid = Comma+1; avNum++;
1001 }
while(Comma && *Avoid);
1023 strncpy(Sel.
Resp.Data, msrcmsg,
sizeof(Sel.
Resp.Data));
1024 Sel.
Resp.DLen = msrclen;
1039 strncpy(Sel.
Resp.Data, mtrymsg,
sizeof(Sel.
Resp.Data));
1040 Sel.
Resp.DLen = mtrylen;
1064 struct iovec ioV[2];
1065 char theopts[16], *toP = theopts;
1135 bool doRedir =
false;
1137 if (Arg.
Avoid) rc = do_SelAvoid(Arg, Sel, Arg.
Avoid, doRedir);
1154 Sel.
Resp.Port = rtEC[rtRC];
1157 Sel.
Resp.DLen = sprintf(Sel.
Resp.Data,
"%s",
"Item not found.")+1;
1162 }
else if (!Sel.
Resp.DLen)
return 0;
1165 <<Sel.
Resp.Port <<
" for " <<Arg.
Path);
1170 bytes = Sel.
Resp.DLen+
sizeof(Sel.
Resp.Port);
1172 Sel.
Resp.Port = htonl(Sel.
Resp.Port);
1176 ioV[0].iov_base = (
char *)&Arg.
Request;
1177 ioV[0].iov_len =
sizeof(Arg.
Request);
1178 ioV[1].iov_base = (
char *)&Sel.
Resp;
1179 ioV[1].iov_len = bytes;
1183 Link->Send(ioV, 2, bytes+
sizeof(Arg.
Request));
1227 DEBUGR(
"coloc to " <<Arg.clPath <<
" delayed " <<rc <<
" seconds");
1240 DEBUGR(
"prep delayed " <<rc <<
" seconds");
1261 struct iovec xmsg[2];
1263 char buff[
sizeof(int)*2+2], *bp = buff;
1264 int blen, maxfr, tutil;
1273 DEBUGR(maxfr <<
"MB free; " <<tutil <<
"% util");
1279 mySpace.Hdr.datalen = htons(
static_cast<unsigned short>(blen));
1285 else {xmsg[0].iov_base = (
char *)&mySpace;
1286 xmsg[0].iov_len =
sizeof(mySpace);
1287 xmsg[1].iov_base = buff;
1288 xmsg[1].iov_len = blen;
1289 mySpace.Hdr.datalen = htons(
static_cast<unsigned short>(blen));
1290 Link->Send(xmsg, 2);
1304 struct iovec xmsg[2];
1323 pinfo.
rovec = NodeMask;
1334 {
TRACER(Files,Arg.
Path <<
" responding have!");
1335 xmsg[0].iov_base = (
char *)&Arg.
Request;
1336 xmsg[0].iov_len =
sizeof(Arg.
Request);
1337 xmsg[1].iov_base = Arg.
Buff;
1338 xmsg[1].iov_len = Arg.
Dlen;
1341 Link->Send(xmsg, 2);
1353 static const SMask_t allNodes(~0);
1361 <<
int(rP->
Mod) <<
" rc=" <<rc <<
" path=" <<rP->
Path);
1362 Sel.Path.Hash = rP->
Sid;
1406 static const SMask_t allNodes(~0);
1414 {
DEBUGR(
"Path find failed for state " <<Arg.
Path);
1420 Sel.
Vec.hf = Sel.
Vec.pf = Sel.
Vec.bf = 0;
1438 {
if (retc < 0)
return 0;
1462 if (!retc || Sel.
Vec.bf != 0)
1485 struct iovec ioV[3] = {{(
char *)&Arg.
Request,
sizeof(Arg.
Request)},
1486 {(
char *)&Zero,
sizeof(Zero)},
1487 {(
char *)&buff, 0}};
1497 {bytes = sprintf(buff,
"A %lld %lld %d",
1500 : theSpace.
wFree)) + 1;
1502 bytes = sprintf(buff,
"%d %d %d %d %d %d",
1506 }
else bytes =
strlcpy(buff,
"-1 -1 -1 -1 -1 -1",
sizeof(buff)) + 1;
1510 ioV[2].iov_len = bytes;
1511 bytes +=
sizeof(Zero);
1514 Link->Send(ioV, 3, bytes+
sizeof(Arg.
Request));
1526 static const unsigned short szLen =
sizeof(
kXR_unt32);
1528 static int statsz = 0;
1529 static int statln = 0;
1530 static char *statbuff = 0;
1531 static time_t statlast = 0;
1534 struct iovec ioV[3] = {{(
char *)&Arg.
Request,
sizeof(Arg.
Request)},
1535 {(
char *)&theSize,
sizeof(theSize)},
1543 if (!statsz || !statbuff)
1545 statbuff = (
char *)malloc(statsz);
1546 theSize = htonl(statsz);
1552 {ioV[1].iov_len =
sizeof(theSize);
1563 if (statlast+9 >= tNow)
1564 {statln =
Cluster.
Stats(statbuff, statsz); statlast = tNow;}
1568 ioV[2].iov_base = statbuff;
1569 ioV[2].iov_len = statln;
1570 Arg.
Request.
datalen = htons(
static_cast<unsigned short>(szLen+statln));
1592 const char *srvMsg, *stgMsg;
1598 int add2Activ, add2Stage, port;
1602 DEBUGR( (Reset ?
"reset " :
"")
1603 <<(Resume ?
"resume " : (Suspend ?
"suspend " :
""))
1604 <<(Stage ?
"stage " : (noStage ?
"nostage " :
"")));
1615 if ((Stage && isNoStage) || (noStage && !isNoStage))
1616 if (noStage) {add2Stage = -1; isNoStage = 1; stgMsg=
"staging suspended";}
1617 else {add2Stage = 1; isNoStage = 0; stgMsg=
"staging resumed";}
1618 else {add2Stage = 0; stgMsg = 0;}
1622 if ((Resume && (isBad & isSuspend)) || (Suspend && !(isBad & isSuspend)))
1623 if (Suspend) {add2Activ = -1;
1627 srvMsg=
"service suspended";
1630 else {add2Activ = 1;
1632 isBad &= ~isSuspend;
1634 srvMsg=
"service resumed";
1635 stgMsg = (isNoStage ?
"(no staging)" :
"(staging)");
1637 if (port && port != netIF.Port())
1638 {Lock(); netIF.Port(port); UnLock();
1639 DEBUGR(
"set data port to " <<port);
1642 else {add2Activ = 0; srvMsg = 0;}
1646 if (isOffline) {srvMsg =
"service offline"; stgMsg = 0;}
1647 else if (isBad & isDisabled) {srvMsg =
"service disabled"; stgMsg = 0;}
1648 else if (isBad & isBlisted ) {srvMsg =
"service blacklisted"; stgMsg = 0;}
1652 if (add2Activ || add2Stage)
1654 Say.
Emsg(
"Node", Name(), srvMsg, stgMsg);
1669 long long Size = -1;
1680 if (Size < 0 && !getSize(Arg.
Mode, Size))
return "invalid size";
1689 return (rc ? fsFail(Arg.
Ident,
"trunc", Arg.
Path, rc) : 0);
1715 return ".redirected";
1756 struct iovec xmsg[2];
1758 char respbuff[
sizeof(loadbuff)+2+
sizeof(
int)+2], *bp = respbuff;
1759 int blen, maxfr, pcpu, pnet, pxeq, pmem, ppag, pdsk;
1763 maxfr =
Meter.
Report(pcpu, pnet, pxeq, pmem, ppag, pdsk);
1774 myLoad.
Hdr.
datalen = htons(
static_cast<unsigned short>(blen));
1776 xmsg[0].iov_base = (
char *)&myLoad;
1777 xmsg[0].iov_len =
sizeof(myLoad);
1778 xmsg[1].iov_base = respbuff;
1779 xmsg[1].iov_len = blen;
1780 if (lp) lp->
Send(xmsg, 2);
1785 DEBUG(
"cpu=" <<pcpu <<
" net=" <<pnet <<
" xeq=" <<pxeq
1786 <<
" mem=" <<pmem <<
" pag=" <<ppag <<
" dsk=" <<pdsk <<
' ' <<maxfr);
1801 if (isRW && DiskFree > LastFree)
1802 {old_free = LastFree; LastFree = DiskFree;}
1811 Arg.dskUtil = DiskUtil;
1823 int XrdCmsNode::fsExec(
XrdOucProg *Prog,
char *Arg1,
char *Arg2)
1826 char Pfn1[PfnSZ], Pfn2[PfnSZ];
1844 return Prog->
Run(Arg1, Arg2);
1851 const char *XrdCmsNode::fsFail(
const char *Who,
const char *What,
1852 const char *
Path,
int rc)
1859 if (rc == fsL2PFail1) return "lfn2pfn path1 failed";
1860 if (rc == fsL2PFail2) return "lfn2pfn path2 failed";
1861 if (rc != ENOENT)
Say.Emsg("Node", rc, What,
Path);
1862 else {
struct {
const char *Ident;} Arg = {Who};
1863 DEBUGR(
"rc=" <<rc <<
' ' <<What <<
' ' <<
Path);
1872 int XrdCmsNode::getMode(
const char *theMode, mode_t &
Mode)
1878 if (!(
Mode = strtol(theMode, &eP, 8)) || *eP || (
Mode >> 9))
return 0;
1886 int XrdCmsNode::getSize(
const char *theSize,
long long &Size)
1892 if (!(Size = strtoll(theSize, &eP, 10)) || *eP)
return 0;
1900 void XrdCmsNode::setHash(
XrdCmsSelect &Sel,
int acount)
1909 {
if (!(slash = index(spos,
'/')))
return;
1910 acount--; spos = slash+1;
1923 if (Sel.
Path.
Val[i] ==
'/') i--;
1926 {
if (Sel.
Path.
Val[i] ==
'/' && !(++acount))
break;}
unsigned long long SMask_t
#define XrdCmsMAX_PATH_LEN
const char * XrdSysE2T(int errcode)
int Exists(XrdCmsRRData &Arg, XrdCmsPInfo &Who, int noLim=0)
void Bounce(SMask_t smask, int SNum)
int GetFile(XrdCmsSelect &Sel, SMask_t mask)
int DelFile(XrdCmsSelect &Sel, SMask_t mask)
int AddFile(XrdCmsSelect &Sel, SMask_t mask)
SMask_t getMask(const XrdNetAddr *addr)
void SLock(bool dolock, bool wrmode=true)
void Space(XrdCms::SpaceData &sData, SMask_t smask)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
int Select(XrdCmsSelect &Sel)
int Locate(XrdCmsSelect &Sel)
static const int EReplete
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
XrdCmsSelected * List(SMask_t mask, CmsLSOpts opts, bool &oksel)
static const int RetryErr
static const int Wait4CBk
int Stats(char *bfr, int bln)
XrdOucName2Name * lcl_N2N
static void Inform(const char *What, const char *Data, int Dlen)
void Record(int pcpu, int pnet, int pxeq, int pmem, int ppag, int pdsk)
int Report(int &pcpu, int &pnet, int &pxeq, int &pmem, int &ppag, int &pdsk)
int FreeSpace(int &tutil)
int calcLoad(uint32_t pcpu, uint32_t pio, uint32_t pload, uint32_t pmem, uint32_t ppag)
const char * do_PrepDel(XrdCmsRRData &Arg)
int do_StateFWD(XrdCmsRRData &Arg)
const char * do_Gone(XrdCmsRRData &Arg)
const char * do_Locate(XrdCmsRRData &Arg)
const char * do_Update(XrdCmsRRData &Arg)
const char * do_Try(XrdCmsRRData &Arg)
const char * do_State(XrdCmsRRData &Arg)
void Delete(XrdSysRWLock &gMutex)
static void do_StateDFS(XrdCmsBaseFR *rP, int rc)
const char * do_Space(XrdCmsRRData &Arg)
int do_SelAvoid(XrdCmsRRData &Arg, XrdCmsSelect &Sel, char *Avoid, bool &doRedir)
const char * do_Select(XrdCmsRRData &Arg)
const char * do_Mv(XrdCmsRRData &Arg)
const char * do_Trunc(XrdCmsRRData &Arg)
static void Report_Usage(XrdLink *lp)
const char * do_Usage(XrdCmsRRData &Arg)
const char * do_Chmod(XrdCmsRRData &Arg)
const char * do_Load(XrdCmsRRData &Arg)
static int do_SelPrep(XrdCmsPrepArgs &Arg)
const char * do_Rm(XrdCmsRRData &Arg)
const char * do_PrepAdd(XrdCmsRRData &Arg)
const char * do_Ping(XrdCmsRRData &Arg)
const char * do_Have(XrdCmsRRData &Arg)
const char * do_Stats(XrdCmsRRData &Arg)
const char * do_Disc(XrdCmsRRData &Arg)
const char * do_Avail(XrdCmsRRData &Arg)
static int do_LocFmt(char *buff, XrdCmsSelected *sP, SMask_t pf, SMask_t wf, bool lsall=false, bool lsuniq=false)
void Disc(const char *reason=0, int needLock=1)
const char * do_Mkpath(XrdCmsRRData &Arg)
XrdCmsNode(XrdLink *lnkp, const char *theIF=0, const char *sid=0, int port=0, int lvl=0, int id=-1)
const char * do_Pong(XrdCmsRRData &Arg)
void setName(XrdLink *lnkp, const char *theIF, int port)
const char * do_Mkdir(XrdCmsRRData &Arg)
const char * do_StatFS(XrdCmsRRData &Arg)
const char * do_Rmdir(XrdCmsRRData &Arg)
const char * do_Status(XrdCmsRRData &Arg)
int Find(const char *pname, XrdCmsPInfo &masks)
void Inform(const char *cmd, XrdCmsPrepArgs *pargs)
struct XrdCmsSelect::@93 Resp
struct XrdCmsSelect::@92 Vec
void Update(StateType StateT, int ActivVal, int StageVal=0)
void sendState(XrdLink *Link)
const char * Host() const
const XrdNetAddr * NetAddr() const
char * ID
Pointer to the client's link identity.
int Send(const char *buff, int blen)
const char * Set(const char *hSpec, int pNum=PortInSpec)
static const char * Name(ifType ifT)
ifType
The enum that is used to index into ifData to get appropriate interface.
static void Privatize(ifType &x)
virtual int Mkdir(const char *path, mode_t mode, int mkpath=0, XrdOucEnv *envP=0)=0
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Remdir(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual int Truncate(const char *path, unsigned long long fsize, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
virtual int lfn2pfn(const char *lfn, char *buff, int blen)=0
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
static int Pack(struct iovec **, const char *, unsigned short &buff)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
static const unsigned char kYR_Version