40 #include <sys/times.h>
41 #include <sys/types.h>
44 #include <netinet/in.h>
47 #include "XrdVersion.hh"
129 while((mp = nmp)) {nmp = mp->
nextManager();
delete mp;}
131 while((tp = tpp)) {tpp = tp->
next;
delete tp;}
158 {
Say.
Emsg(
"Finder",
"Network not defined; unable to connect to cmsd.");
167 if (config.
Configure(cfn, What, How))
return 0;
183 else {SMode = config.
SMode;
191 if (
QTRACE(Redirect) || getenv(
"XRDMONRDR")) savePath = 1;
196 if (!isMeta && !isTarget && config.
haveMeta)
211 const char *arg1,
const char *arg2,
215 static struct timeval fwdClk = {time(0),0};
216 static const int xNum = 12;
221 int iovcnt, is2way, doAll = 0, opQ1Len = 0, opQ2Len = 0;
223 struct iovec xmsg[xNum];
227 if ((is2way = (*cmd ==
'+'))) cmd++;
236 else {
Say.
Emsg(
"Finder",
"Unable to forward '", cmd,
"'.");
237 Resp.
setErrInfo(EINVAL,
"Internal error processing file.");
244 Data.
Path = (
char *)arg1;
245 Data.
Mode = (
char *)arg2;
246 Data.
Path2 = (
char *)arg2;
253 (
char *)&Data, Work)))
254 {Resp.
setErrInfo(EINVAL,
"Internal error processing file.");
262 xmsg[0].iov_base = (
char *)&Data.
Request;
263 xmsg[0].iov_len =
sizeof(Data.
Request);
267 if (is2way)
return send2Man(Resp, (arg1 ? arg1 :
"/"), xmsg, iovcnt+1);
272 if (doAll && FwdWait)
273 {
struct timeval nowClk;
276 gettimeofday(&nowClk, 0);
277 fwdClk.tv_sec = nowClk.tv_sec - fwdClk.tv_sec;
278 fwdClk.tv_usec = nowClk.tv_usec - fwdClk.tv_usec;
279 if (fwdClk.tv_usec < 0) {fwdClk.tv_sec--; fwdClk.tv_usec += 1000000;}
280 Window = fwdClk.tv_sec*1000 + fwdClk.tv_usec/1000;
288 if (!(Manp = SelectManager(Resp, (arg1 ? arg1 :
"/"))))
return ConWait;
292 if (Manp->
Send(iMan, xmsg, iovcnt+1))
295 Inform(Manp, xmsg, iovcnt+1);
311 struct iovec xmsg[],
int xnum)
319 {
Say.
Emsg(
"Finder",
"SelectManager() called prior to Configure().");
325 Womp = Manp = myManagers;
326 do {
if (Manp != xman && Manp->
isActive()) Manp->
Send(iMan, xmsg, xnum);
337 static const int xNum = 12;
342 struct iovec xmsg[xNum];
343 char *triedRC, *affmode;
348 Data.
Path = (
char *)path;
350 Data.
Avoid = (Env ? Env->
Get(
"tried") : 0);
356 if (flags &
SFS_O_LOCAL)
return LocLocal(Resp, Env);
359 | (flags &
SFS_O_RESET ? CmsLocateRequest::kYR_refresh : 0);
361 Data.
Opts |= CmsLocateRequest::kYR_prvtnet;
364 ? CmsLocateRequest::kYR_retipv46 : 0);
367 ? CmsLocateRequest::kYR_retipv64 :
368 CmsLocateRequest::kYR_retipv6);
372 if (doAll) Data.
Opts |= CmsLocateRequest::kYR_listall;
377 { Data.
Opts = CmsSelectRequest::kYR_create;
379 Data.
Opts|= CmsSelectRequest::kYR_replica;
381 else if (flags &
SFS_O_STAT) Data.
Opts = CmsSelectRequest::kYR_stat;
385 ? CmsSelectRequest::kYR_write : CmsSelectRequest::kYR_read);
387 if (flags &
SFS_O_META) Data.
Opts |= CmsSelectRequest::kYR_metaop;
395 if (Env && (affmode = Env->
Get(
"cms.aff")))
396 {
if (*affmode ==
'n') Data.
Opts |= CmsSelectRequest::kYR_aNone;
397 else if (*affmode ==
'S') Data.
Opts |= CmsSelectRequest::kYR_aStrict;
398 else if (*affmode ==
's') Data.
Opts |= CmsSelectRequest::kYR_aStrong;
399 else if (*affmode ==
'w') Data.
Opts |= CmsSelectRequest::kYR_aWeak;
403 Data.
Opts |= CmsSelectRequest::kYR_prvtnet;
406 ? CmsSelectRequest::kYR_retipv46 : 0);
409 ? CmsSelectRequest::kYR_retipv64 :
410 CmsSelectRequest::kYR_retipv6);
413 if (Data.
Avoid && Env && (triedRC = Env->
Get(
"triedrc")))
414 {
char *comma = rindex(triedRC,
',');
415 if (comma) triedRC = comma+1;
416 if (!strcmp(triedRC,
"enoent"))
417 Data.
Opts |= CmsSelectRequest::kYR_tryMISS;
418 else if (!strcmp(triedRC,
"ioerr"))
419 Data.
Opts |= CmsSelectRequest::kYR_tryIOER;
420 else if (!strcmp(triedRC,
"fserr"))
421 Data.
Opts |= CmsSelectRequest::kYR_tryFSER;
422 else if (!strcmp(triedRC,
"srverr"))
423 Data.
Opts |= CmsSelectRequest::kYR_trySVER;
424 else if (!strcmp(triedRC,
"resel"))
425 Data.
Opts |= CmsSelectRequest::kYR_tryRSEL;
426 else if (!strcmp(triedRC,
"reseg"))
427 Data.
Opts |= CmsSelectRequest::kYR_tryRSEG;
434 (
char *)&Data, Work)))
435 {Resp.
setErrInfo(EINVAL,
"Internal error processing file.");
443 xmsg[0].iov_base = (
char *)&Data.
Request;
444 xmsg[0].iov_len =
sizeof(Data.
Request);
448 return send2Man(Resp, path, xmsg, iovcnt+1);
459 char *mBeg, *mBuff, mStat;
471 Womp = Manp = myManagers;
476 n = 8 + (myManCount * (256+6+2));
478 {mBeg = mBuff = (
char *)malloc(n);
480 {Resp.
setErrInfo(ENOMEM,
"Insufficient memory.");
498 n = snprintf(mBuff, mBlen,
"%s:%d/%c ",
500 mBuff += n; mBlen -= n;
501 }
while((Manp = Manp->
nextManager()) != Womp && mBlen > 0);
506 {Resp.
setErrInfo(EINVAL,
"Internal processing error.");
532 static const int xNum = 16;
540 int iovcnt = 0, NoteLen, n;
541 char Prty[1032], *NoteNum = 0, *colocp = 0;
543 struct iovec xmsg[xNum];
551 xmsg[0].iov_base = (
char *)&Data.
Request;
552 xmsg[0].iov_len =
sizeof(Data.
Request);
556 if (!(tp = pargs.
paths))
559 (
char *)&Data, Work)))
560 {Resp.
setErrInfo(EINVAL,
"Internal error processing file.");
563 if (!(Manp = SelectManager(Resp, 0)))
return ConWait;
564 if (Manp->
Send(iMan, (
const struct iovec *)&xmsg, iovcnt+1))
return 0;
565 DEBUG(
"Finder: Failed to send prepare cancel to "
566 <<Manp->
Name() <<
" reqid=" <<pargs.
reqid);
585 Data.
Notify = (
char *)
"*";
588 NoteLen = strlen(pargs.
notify);
589 Data.
Notify = (
char *)malloc(NoteLen+16);
591 NoteNum = Data.
Notify+NoteLen; *NoteNum++ =
'-';
612 {
if (NoteNum) sprintf(NoteNum,
"%d", tp->val);
617 (
char *)&Data, Work)))
break;
618 if (!(Manp = SelectManager(Resp, tp->
text)))
break;
619 DEBUG(
"Finder: Sending " <<Manp->
Name() <<
' ' <<Data.Reqid
621 if (!Manp->
Send(iMan, (
const struct iovec *)&xmsg, iovcnt+1))
break;
624 if (colocp) {Data.Request.modifier |= CmsPrepAddRequest::kYR_coloc;
625 *colocp =
' '; colocp = 0;
631 if (NoteNum) free(Data.
Notify);
636 if (!Manp)
return ConWait;
639 {
Say.
Emsg(
"Finder",
"Unable to send prepadd; too much data.");
640 Resp.
setErrInfo(EINVAL,
"Internal error processing file.");
645 DEBUG(
"Finder: Failed to send prepare to " <<(Manp ? Manp->
Name() :
"?")
646 <<
" reqid=" <<pargs.
reqid);
662 {
Say.
Emsg(
"Finder",
"SelectManager() called prior to Configure().");
690 static time_t nextMsg = 0;
700 Say.
Emsg(
"Finder",
"All managers are dysfunctional.");
703 TRACE(Redirect,
"user=" <<Resp.
getErrUser() <<
" No managers available; wait " <<ConWait);
710 int XrdCmsFinderRMT::send2Man(
XrdOucErrInfo &Resp,
const char *path,
711 struct iovec *xmsg,
int xnum)
721 if (!(Manp = SelectManager(Resp, path)) || Manp->Suspended())
728 {Resp.setErrInfo(RepDelay,
"");
729 TRACE(Redirect, Resp.getErrUser() <<
" no more msg objects; path=" <<path);
735 ((
CmsRRHdr *)(xmsg[0].iov_base))->streamid = mp->
ID();
736 if (savePath) Resp.setErrData(path);
737 else Resp.setErrData(0);
743 retc = Manp->
whatsUp(Resp.getErrUser(), path, iMan);
744 Resp.setErrInfo(retc,
"");
752 else if (retc ==
SFS_STALL) retc = Resp.getErrInfo();
774 int XrdCmsFinderRMT::StartManagers(
XrdOucTList *theManList)
784 myManList = theManList;
788 memset((
void *)myManTable, 0,
sizeof(myManTable));
796 if (myManagers) mp->
setNext(myManagers);
800 Say.
Emsg(
"Finder", errno,
"start manager");
808 {
Say.
Emsg(
"Config warning: too many managers;",tp->
text,
"ignored.");
814 if (firstone) firstone->
setNext(myManagers);
818 sprintf(buff,
"%d manager(s) started.", i);
826 Say.
Emsg(
"Finder", errno,
"start callback manager");
839 static const int xNum = 4;
844 struct iovec xmsg[xNum];
849 Data.
Path = (
char *)path;
854 (
char *)&Data, Work)))
855 {Resp.
setErrInfo(EINVAL,
"Internal error processing file.");
864 ? CmsStatfsRequest::kYR_qvfs : 0);
865 xmsg[0].iov_base = (
char *)&Data.
Request;
866 xmsg[0].iov_len =
sizeof(Data.
Request);
870 return send2Man(Resp, path, xmsg, iovcnt+1);
915 if (CMSp)
delete CMSp;
916 if (Login) free(Login);
918 while((tp = tpp)) {tpp = tp->
next;
delete tp;}
932 data[0] = (
char *)
"newfn "; dlen[0] = 6;
933 data[1] = (
char *)path; dlen[1] = strlen(path);
935 {data[2] = (
char *)
" p\n"; dlen[2] = 3;}
937 {data[2] = (
char *)
"\n"; dlen[2] = 1;}
938 data[3] = 0; dlen[3] = 0;
943 if (Active && CMSp->
Put((
const char **)data, (
const int *)dlen))
944 {CMSp->
Close(); Active = 0;}
954 void *StartPM(
void *carg)
959 void *StartRsp(
void *carg)
995 {
Say.
Emsg(
"Config", errno,
"start performance monitor.");
return 0;}
1021 {Resp.
setErrInfo(EINVAL,
"Invalid locate option for target config.");
1031 n = snprintf(mBuff, mBlen,
"localhost:0/%c", (Active ?
'a' :
'd'));
1044 char *data[2] = {buff, 0};
1046 uint32_t cpu_load, mem_load, net_load, pag_load, xeq_load;
1054 dlen[0] = snprintf(buff,
sizeof(buff),
"%s %u %u %u %u %u\n",
1055 (alert ?
"PERF" :
"perf"),
1056 xeq_load, cpu_load, mem_load, pag_load, net_load);
1062 if (Active && CMSp->
Put((
const char **)data, (
const int *)dlen))
1063 {CMSp->
Close(); Active = 0;}
1082 if (resMax < 0 || rNum <= 0) {rrMutex.
UnLock();
return resOld;}
1087 if (resCur > resMax) resCur = resMax;
1088 if (resOld < 1 && resCur > 0)
Resume(0);
1108 data[0] = (
char *)
"rmdid "; dlen[0] = 6;
1109 data[1] = (
char *)path; dlen[1] = strlen(path);
1110 data[2] = (
char *)
"\n"; dlen[2] = 1;
1111 data[3] = 0; dlen[3] = 0;
1116 if (Active && CMSp->
Put((
const char **)data, (
const int *)dlen))
1117 {CMSp->
Close(); Active = 0;}
1136 if (resMax < 0 || rNum <= 0) {rrMutex.
UnLock();
return resOld;}
1141 if (resOld > 0 && resCur < 1)
Suspend(0);
1161 resOld = (resMax < 0 ? 0 : resMax);
1165 if (rNum <= 0) {rrMutex.
UnLock();
return resOld;}
1170 if (resCur > resMax) resCur = resMax;
1184 static const char *rPerm[2] = {
"resume\n", 0};
1185 static const char *rTemp[2] = {
"resume t\n", 0};
1186 static int lPerm[2] = { 7, 0};
1187 static int lTemp[2] = { 9, 0};
1192 if (Active && CMSp->
Put((
const char **)(Perm ? rPerm : rTemp),
1193 (
const int *) (Perm ? lPerm : lTemp)))
1194 {CMSp->
Close(); Active = 0;}
1204 static const char *sPerm[2] = {
"suspend\n", 0};
1205 static const char *sTemp[2] = {
"suspend t\n", 0};
1206 static int lPerm[2] = { 8, 0};
1207 static int lTemp[2] = {10, 0};
1211 if (Active && CMSp->
Put((
const char **)(Perm ? sPerm : sTemp),
1212 (
const int *) (Perm ? lPerm : lTemp)))
1213 {CMSp->
Close(); Active = 0;}
1229 if (!(CMSPath =
Path))
1230 {
Say.
Emsg(
"Config",
"Unable to determine cms admin path");
return 0;}
1234 lFmt = (vnid ?
"login %c %d port %d vnid %s\n" :
"login %c %d port %d\n");
1235 snprintf(buff,
sizeof(buff), lFmt, (isProxy ?
'P' :
'p'),
1236 static_cast<int>(getpid()), myPort, vnid);
1237 Login = strdup(buff);
1242 {
Say.
Emsg(
"Config", errno,
"start cmsd interface");
return 0;}
1295 MSG_WAITALL) > 0 && Process(Data)) {}
1305 Say.
Emsg(
"Finder",
"Lost contact with cmsd via", CMSPath);
1324 if (util > 100) util = 100;
1352 void XrdCmsFinderTRG::Hookup()
1356 int opts = 0, tries = 6;
1360 while(
stat(CMSPath, &buf))
1362 {
Say.
Emsg(
"Finder",
"Waiting for cms path", CMSPath); tries=6;}
1369 while(Sock.Open(CMSPath, -1,
opts) < 0)
1373 }
else if (!tries)
opts = 0;
1381 CMSp->
Attach(Sock.Detach());
1386 Say.
Emsg(
"Finder",
"Connected to cmsd via", CMSPath);
1396 static const
int maxReqSize = 16384;
1397 static
int Wmsg = 255;
1398 const
char *myArgs, *myArgt, *Act;
1404 Data.Dlen = static_cast<
int>(ntohs(Data.Request.datalen));
1405 if (!(Data.Dlen)) {myArgs = myArgt = 0;}
1406 else {
if (Data.
Dlen > maxReqSize)
1407 {
Say.
Emsg(
"Finder",
"Request args too long from local cmsd");
1412 {
Say.
Emsg(
"Finder",
"No buffers to serve local cmsd");
1425 {
case kYR_mv: Act =
"mv";
break;
1426 case kYR_rm: Act =
"rm"; Data.
Path2 = (
char *)
"";
break;
1429 Say.
Emsg(
"Finder",
"Local cmsd sent an invalid request -",buff);
1436 {
Say.
Emsg(
"Finder",
"Local cmsd sent a badly formed",Act,
"request");
1439 DEBUG(
"cmsd requested " <<Act <<
" " <<Data.
Path <<
' ' <<Data.
Path2);
1446 if (!(Wmsg & 255))
Say.
Emsg(
"Finder",
"Local cmsd request",Act,
1447 "ignored; no storage system provided.");
1457 default: rc = 0;
break;
void * XrdCmsStartResp(void *carg)
void * XrdCmsStartManager(void *carg)
int stat(const char *path, struct stat *buf)
char * notify
Notification path or 0.
XrdOucTList * paths
List of paths.
XrdOucTList * oinfo
1-to-1 correspondence of opaque info
int Configure(const char *cfn, configWhat What, configHow How)
void setNext(XrdCmsClientMan *np)
XrdCmsClientMan * nextManager()
int Send(unsigned int &iMan, char *msg, int mlen=0)
int delayResp(XrdOucErrInfo &Resp)
static void setConfig(const char *cfn)
static void setNetwork(XrdInet *nP)
int whatsUp(const char *user, const char *path, unsigned int iMan)
int Wait4Reply(int wtime)
int Forward(XrdOucErrInfo &Resp, const char *cmd, const char *arg1=0, const char *arg2=0, XrdOucEnv *Env1=0, XrdOucEnv *Env2=0)
int Locate(XrdOucErrInfo &Resp, const char *path, int flags, XrdOucEnv *Info=0)
int Prepare(XrdOucErrInfo &Resp, XrdSfsPrep &pargs, XrdOucEnv *Info=0)
int Space(XrdOucErrInfo &Resp, const char *path, XrdOucEnv *Info=0)
int Configure(const char *cfn, char *Args, XrdOucEnv *EnvInfo)
XrdCmsFinderRMT(XrdSysLogger *lp, int whoami=0, int Port=0)
static bool VCheck(XrdVersionInfo &urVersion)
XrdCmsFinderTRG(XrdSysLogger *, int, int, XrdOss *theSS=0)
int Locate(XrdOucErrInfo &Resp, const char *path, int flags, XrdOucEnv *Info=0)
void PutInfo(XrdCmsPerfMon::PerfInfo &perfInfo, bool alert=false)
void Utilization(unsigned int util, bool alert=false)
void Removed(const char *path)
int RunAdmin(char *Path, const char *vnid)
void Added(const char *path, int Pend=0)
static bool VCheck(XrdVersionInfo &urVersion)
int Configure(const char *cfn, char *Args, XrdOucEnv *EnvInfo)
static int Pack(int rnum, struct iovec *iovP, struct iovec *iovE, char *Base, char *Work)
int Parse(XrdCms::CmsLoginData *Data, const char *Aps, const char *Apt)
virtual void GetInfo(PerfInfo &info)
static void setSecFunc(void *secfP)
virtual int Remdir(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void Recycle()
Recycle the buffer. The buffer may be reused in the future.
void SetLen(int dataL, int dataO=0)
char * EnvTidy(int &envlen)
void * GetPtr(const char *varname)
char * Get(const char *varname)
void PutPtr(const char *varname, void *value)
int setErrInfo(int code, const char *emsg)
char * getMsgBuff(int &mblen)
const char * getErrUser()
static int Index(int KeyMax, const char *KeyVal, int KeyLen=0)
int Attach(int FileDescriptor, int bsz=2047)
int Put(const char *data, const int dlen)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
static bool VerCmp(XrdVersionInfo &vInf1, XrdVersionInfo &vInf2, bool noMsg=false)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Snooze(int seconds)
static void Wait(int milliseconds)
void SetLogger(XrdSysLogger *logp)
XrdVERSIONINFODEF(myVersion, cmsclient, XrdVNUMBER, XrdVERSION)
@ IsTarget
The role is server and will be a redirection target.
@ IsProxy
The role is proxy {plus one or more of the below}.
@ IsRedir
The role is manager and will redirect users.
@ IsMeta
The role is meta {plus one or more of the above}.
XrdSysError Say(0, "cms_")
Structure used for reporting performance metrics.
unsigned char pag_load
Paging 0 to 100 utilization.
unsigned char xeq_load
Other 0 to 100 utilization (arbitrary)
unsigned char cpu_load
CPU 0 to 100 utilization.
unsigned char mem_load
Memory 0 to 100 utilization.
unsigned char net_load
Network 0 to 100 utilization.
static const int uIPv64
ucap: Supports only IPv4 info
static const int uIPv4
ucap: Supports read redirects