39 #include <netinet/in.h>
40 #include <sys/param.h>
44 #include "XrdVersion.hh"
83 int XrdCmsProtocol::readWait = 1000;
152 static int thePort = -1;
153 char *cfn = pi->
ConfigFN, buff[128];
158 {
if (pi->
Port && pi->
Port != thePort)
159 {sprintf(buff,
"%d disallowed; only using port %d",pi->
Port,thePort);
160 Say.
Emsg(
"Config",
"Alternate port", buff);
173 {
while(*parms ==
' ') parms++;
176 while(*parms !=
' ' && *parms) parms++;
183 Say.
Say(
"Copr. 2003-2020 Stanford University/SLAC cmsd.");
187 if (cfn) cfn = strdup(cfn);
212 {Reply_Delay(Arg, theDelay);
return 0;}
218 Say.
Emsg(
"Protocol",
"invalid request code from", myNode->Ident);
219 else if ((etxt = (myNode->*Method)(Arg)))
221 {
DEBUGR(etxt+1 <<
" delayed " <<Arg.waitVal <<
" seconds");
223 }
else if (*etxt ==
'.')
return -ECONNABORTED;
241 if ((dlen = lp->
Peek((
char *)&Hdr,
sizeof(Hdr),readWait)) !=
sizeof(Hdr))
242 {
if (dlen <= 0) lp->
setEtext(
"login not received");
249 {
if (!strncmp((
char *)&Hdr,
"login ", 6))
250 lp->
setEtext(
"protocol version 1 unsupported");
265 void XrdCmsProtocol::Pander(
const char *manager,
int mport)
270 time_t ddmsg = time(0);
271 unsigned int Mode, Role = 0;
274 int Lvl=0, Netopts=0, waits=6, tries=6, fails=0, xport=mport;
275 int rc, fsUtil, KickedOut, blRedir, myNID = Manager->ManTree->Register();
278 const char *Reason = 0, *manp = manager;
279 const int manblen =
sizeof(manbuff);
284 DEBUG(myRole <<
" services to " <<manager <<
':' <<mport);
288 memset(&loginData, 0,
sizeof(loginData));
296 loginData.
HoldTime=
static_cast<int>(getpid());
330 {
Say.
Emsg(
"Pander",
"Suspend state still active."); waits=6;}
334 if (!(rc = Manager->ManTree->Trying(myNID, Lvl)) && Lvl)
335 {
DEBUG(
"restarting at root node " <<manager <<
':' <<mport);
336 manp = manager; xport = mport; Lvl = 0;
337 }
else if (rc < 0)
break;
339 DEBUG(
"trying to connect to lvl " <<Lvl <<
' ' <<manp <<
':' <<xport);
343 {
Say.
Emsg(
"Pander",
"Is hostname", manp,
"spelled correctly "
344 "or just not running?");
348 else {tries = 6; Netopts = 0;}
349 if ((Lvl = Manager->myMans->Next(xport,manbuff,manblen)))
351 else {
if (manp != manager) fails++;
356 Netopts = 0; tries = waits = 6;
360 if (!(Link->AddrInfo()->isRegistered())
362 {
char *oldName = strdup(Link->Host());
363 Say.
Emsg(
"Protocol", oldName,
"is missing an IPv6 ptr record; "
364 "attempting local registration as", manp);
365 if (!(Link->Register(manp)))
367 "registration failed; address mismatch.");
370 "is now locally registered as", manp);
377 if (!(myNode = Manager->Add(Link, Lvl+1, terminate)))
379 if (terminate)
break;
380 Say.
Emsg(
"Pander",
"Unable to obtain node object.");
389 if (fails >= 6 && manp == manager)
398 Data = loginData; Data.
Mode =
Mode | myShare | myTimeZ;
400 {
if (!Manager->ManTree->Connect(myNID, myNode)) KickedOut = 1;
402 const char *sname = cgiEnv.Get(
"site");
403 Say.
Emsg(
"Protocol",
"Logged into", sname, Link->Name());
405 Manager->Verify(Link, (
const char *)Data.
SID, sname);
406 Reason = Dispatch(isUp, TimeOut, 2);
414 if (Data.
SID) {free(Data.
SID); Data.
SID = 0;}
419 Manager->Remove(myNode, (rc ==
kYR_redirect ?
"redirected"
420 : (Reason ? Reason :
"lost connection")));
421 Manager->ManTree->Disc(myNID);
428 Sync(); Manager->Delete(myNode); myNode = 0; Reason = 0;
434 Manager->myMans->Add(Link->NetAddr(), (
char *)Data.
Paths,
436 else Manager->Rerun((
char *)Data.
Paths);
443 if (!KickedOut && (Lvl = Manager->myMans->Next(xport,manbuff,manblen)))
444 {manp = manbuff;
continue;}
446 if (manp != manager) fails++;
447 manp = manager; xport = mport;
452 Manager->Finished(manager, mport);
477 if ((Routing=Admit()))
479 if (RSlot) {myWay = isLateral; tOut = -1;}
482 if ((Reason = Dispatch(myWay, tOut, 2))) lp->
setEtext(Reason);
493 if (!myNode)
return -1;
503 myNode->UnLock();
delete myNode; myNode = 0;
515 else myNode->UnLock();
529 bool isLoggedIn = loggedIn != 0;
532 ProtLink = ProtStack;
539 if (reason)
Say.
Emsg(
"Protocol", lp->
ID,
"logged out;", reason);
540 else Say.
Emsg(
"Protocol", lp->
ID,
"logged out.");
542 if (reason)
Say.
Emsg(
"Protocol", lp->
ID,
"login failed;", reason);
570 char *
envP = 0, envBuff[256], myBuff[4096];
576 int addedp = 0, Status = 0, isPeer = 0, isProxy = 0;
577 int isMan, isServ, isSubm, wasSuspended = 0, Share = 100, tZone = 0;
582 {snprintf(envBuff,
sizeof(envBuff),
"site=%s",
Config.
mySite);
602 const char *altName = cgiEnv.Get(
"ovHN");
604 std::string oldName(Link->Host());
606 snprintf(buff,
sizeof(buff),
"%s -> %s", oldName.c_str(), altName);
607 Say.
Emsg(
"Protocol",
"Attempting to use stated mapping", buff);
608 if (!(Link->Register(altName))) {
609 Say.
Emsg(
"Protocol", buff,
"stated mapping failed; address mismatch.");
611 Say.
Emsg(
"Protocol", oldName.c_str(),
"is now locally registered as", altName);
618 {Link->setID(
"redirector", Data.
HoldTime);
619 return Admit_Redirector(wasSuspended);
625 return Login_Failed(
"configuration disallows subscribers");
654 else return Login_Failed(
"invalid login role");
665 Reason =
"configuration only allows proxies";
667 else if (isProxy) Reason =
"configuration disallows proxies";
669 Reason =
"configuration disallows peers";
670 if (Reason)
return Login_Failed(Reason);
680 Say.
Emsg(
"Protocol",Link->Name(),
"has not yet found a cluster slot!");
686 (
const char *)Data.
SID, (
const char *)Data.
ifList)))
688 myNode->RoleID =
static_cast<char>(roleID);
689 myNode->setVersion(Data.
Version);
696 if (Share > 0) myNode->setShare(Share);
702 tZone = myNode->setTZone(tZone);
708 <<
" MB Util=" <<Data.
fsUtil <<
" Share=" <<Share
709 <<
" TZone=" <<tZone);
710 myNode->DiskTotal = Data.
tSpace;
711 myNode->DiskMinF = Data.
mSpace;
712 myNode->DiskFree = Data.
fSpace;
713 myNode->DiskNums = Data.
fsNum;
714 myNode->DiskUtil = Data.
fsUtil;
722 ConfigCheck(Data.
Paths);
723 while((tp = thePaths.GetLine()))
724 {
DEBUG(Link->Name() <<
" adding path: " <<tp);
725 if (!(tp = thePaths.GetToken())
726 || !(pp = thePaths.GetToken()))
break;
727 if (!(newmask = AddPath(myNode, tp, pp)))
728 return Login_Failed(
"invalid exported path");
739 pinfo.
rovec = myNode->Mask();
740 if (myNode->isPeer) pinfo.
ssvec = myNode->Mask();
742 Say.
Emsg(
"Protocol", myNode->Ident,
"defaulted r /");
757 isNBSQ = Link->setNB();
761 const char *sname = cgiEnv.Get(
"site");
762 const char *lfmt = (myNode->isMan > 1 ?
"Standby%s%s" :
"Primary%s%s");
763 snprintf(envBuff,
sizeof(envBuff),lfmt,(sname ?
" ":
""),(sname ? sname :
""));
764 Say.
Emsg(
"Protocol", envBuff, myNode->Ident,
768 Say.
Emsg(
"Protocol", myNode->Ident,
"system ID:", (
const char *)Data.
SID);
780 XrdCmsRouting *XrdCmsProtocol::Admit_Redirector(
int wasSuspended)
782 EPNAME(
"Admit_Redirector");
788 myRole =
"redirector";
793 myNode =
new XrdCmsNode(Link); myNode->Lock();
798 Say.
Emsg(
"Protocol",myNode->Ident,
"login failed; too many redirectors.");
800 }
else myNode->setSlot(RSlot);
806 myNode->Send((
char *)&newState,
sizeof(newState));
810 Say.
Emsg(
"Protocol", myNode->Ident,
"logged in.");
811 DEBUG(myNode->Ident <<
" assigned slot " <<RSlot);
820 const char *pType,
const char *
Path)
858 if ((xp = ProtStack)) ProtStack = xp->ProtLink;
864 if (!xp)
Say.
Emsg(
"Protocol",
"No more protocol objects.");
865 else xp->Init(theRole, uMan, theMan, thePort);
876 void XrdCmsProtocol::ConfigCheck(
unsigned char *theConfig)
878 unsigned int ConfigID;
883 if (!theConfig) ConfigID = 1;
888 if (ConfigID != myNode->ConfigID)
889 {
if (myNode->ConfigID)
Say.
Emsg(
"Protocol",Link->Name(),
"reconfigured.");
892 myNode->ConfigID = ConfigID;
906 const char *XrdCmsProtocol::Dispatch(Bearing cDir,
int maxWait,
int maxTries)
909 static const int ReqSize =
sizeof(
CmsRRHdr);
912 const char *toRC = (cDir == isUp ?
"manager not active"
913 :
"server not responding");
914 const char *myArgs, *myArgt;
924 do{
if ((rc = Link->RecvAll((
char *)&Data->
Request, ReqSize, maxWait)) < 0)
926 "blacklisted" :
"request read failed");
927 if (!toLeft--)
return toRC;
930 return "server blacklisted w/ redirect";
931 if (!SendPing())
return "server unreachable";
941 return "server blacklisted w/ redirect";
942 if (!SendPing())
return "server unreachable";
953 <<
" dlen=" <<Data->
Dlen);
954 if (!(Data->
Dlen)) {myArgs = myArgt = 0;}
955 else {
if (Data->
Dlen > maxReqSize)
956 {
Say.
Emsg(
"Protocol",
"Request args too long from",Link->Name());
957 return "protocol error";
961 {
Say.
Emsg(
"Protocol",
"No buffers to serve", Link->Name());
962 return "insufficient buffers";
964 if ((rc = Link->RecvAll(Data->
Buff, Data->
Dlen, maxWait)) < 0)
965 return (rc == -ETIMEDOUT ?
"read timed out" :
"read failed");
973 Say.
Emsg(
"Protocol",Link->Name(),
"sent an invalid request -", buff);
983 || !ProtArgs.Parse(
int(Data->
Request.
rrCode),myArgs,myArgt,Data))
984 {Reply_Error(*Data,
kYR_EINVAL,
"badly formed request");
997 {
if ((rc = Execute(*Data)) && rc == -ECONNABORTED)
return "disconnected";}
1003 else Say.
Emsg(
"Protocol",
"No jobs to serve", Link->Name());
1008 return "logic error";
1022 if (myRole) Pander(myMan, myManPort);
1029 void XrdCmsProtocol::Init(
const char *iRole,
XrdCmsManager *uMan,
1030 const char *iMan,
int iPort)
1049 XrdCmsRouting *XrdCmsProtocol::Login_Failed(
const char *reason)
1051 Link->setEtext(reason);
1068 if (refWait && refCount <= 0) {refWait->Post(); refWait = 0;}
1084 struct iovec ioB[2] = {{(
char *)&Data.
Request,
sizeof(Data.
Request)},
1092 "msg TTL exceeded for", Data.
Path);
1105 "aborted; no servers handling", Data.
Path);
1112 {
if (!(amask = pinfo.
rwvec))
1114 "aborted; no r/w servers handling", Data.
Path);
1146 Link->Send((
char *)&Resp,
sizeof(Resp));
1147 }
else act =
" skip";
1149 DEBUG(myNode->Ident <<act <<
" delay " <<ntohl(theDelay));
1156 void XrdCmsProtocol::Reply_Error(
XrdCmsRRData &Data,
int ecode,
const char *etext)
1160 int n = strlen(etext)+1;
1164 htons((
unsigned short int)(
sizeof(
kXR_unt32)+n))},
1165 htonl(
static_cast<unsigned int>(ecode))};
1166 struct iovec ioV[2] = {{(
char *)&Resp,
sizeof(Resp)},
1167 {(
char *)etext, (
size_t)n}};
1170 }
else act =
" skip";
1172 DEBUG(myNode->Ident <<act <<
" err " <<ecode <<
' ' <<etext);
1179 bool XrdCmsProtocol::SendPing()
1191 if (Link->Send((
char *)&
Ping,
sizeof(
Ping)) < 0)
return false;
1199 void XrdCmsProtocol::Sync()
1207 if (refCount <= 0) refMutex.UnLock();
1208 else {refWait = &mySem;
1209 DEBUG(
"Waiting for " <<refCount <<
' ' <<myNode->Ident
1210 <<
" thread(s) to end.");
XrdProtocol * XrdgetProtocol(const char *pname, char *parms, XrdProtocol_Config *pi)
XrdVERSIONINFO(XrdgetProtocol, cmsd)
int XrdgetProtocolPort(const char *pname, char *parms, XrdProtocol_Config *pi)
unsigned long long SMask_t
void Bounce(SMask_t smask, int SNum)
void SLock(bool dolock, bool wrmode=true)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
void ResetRef(SMask_t smask, bool isLocked=false)
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
XrdCmsNode * Add(XrdLink *lp, int dport, int Status, int sport, const char *theNID, const char *theIF)
void Remove(XrdCmsNode *theNode)
int Stats(char *bfr, int bln)
int Statt(char *bfr, int bln)
int Configure1(int argc, char **argv, char *cfn)
int Configure0(XrdProtocol_Config *pi)
static XrdCmsJob * Alloc(XrdCmsProtocol *, XrdCmsRRData *)
static int Login(XrdLink *Link, XrdCms::CmsLoginData &Data, int timeout=-1)
unsigned int TotalSpace(unsigned int &minfree)
int FreeSpace(int &tutil)
static const char allowsRW
static const char allowsSS
static const char isSuspend
static const char isDoomed
static const char isBlisted
void Remove(SMask_t mask)
int Find(const char *pname, XrdCmsPInfo &masks)
SMask_t Insert(const char *pname, XrdCmsPInfo *pinfo)
void Recycle(XrdLink *lp, int consec, const char *reason)
int Execute(XrdCmsRRData &Data)
int Stats(char *buff, int blen, int do_sync=0)
static XrdCmsProtocol * Alloc(const char *theRole="", XrdCmsManager *mP=0, const char *theMan=0, int thePort=0)
XrdProtocol * Match(XrdLink *lp)
static XrdCmsRRData * Objectify(XrdCmsRRData *op=0)
short Add(XrdCmsNode *nP)
static const char * Name(RoleID rid)
NodeMethod_t getMethod(int Code)
const char * getName(int Code)
const char *(XrdCmsNode::* NodeMethod_t)(XrdCmsRRData &)
static const char FES_Suspend
static const char All_Suspend
XrdLink * Connect(const char *host, int port, int opts=0, int timeout=-1)
void Secure(XrdNetSecurity *secp)
void Serialize()
Wait for all outstanding requests to be completed on the link.
int setEtext(const char *text)
int Peek(char *buff, int blen, int timeout=-1)
char * ID
Pointer to the client's link identity.
static uint32_t CRC32(const unsigned char *data, int count)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static void Snooze(int seconds)
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
static const unsigned char kYR_Version
static const int CMS_isSuper
static const int CMS_noStage
static const int CMS_isMan
static const int CMS_isPeer
static const int CMS_isProxy
static const int CMS_Suspend