32 #include <sys/types.h>
33 #include <netinet/in.h>
59 short XrdCmsRRQSlot::initSlot = 0;
83 if (!(sp = XrdCmsRRQSlot::Alloc(
Info)))
return 0;
89 myMutex.Lock();
Stats.Add2Q++;
90 if (Snum && Slot[Snum].
Info.Key ==
Info->Key && Slot[Snum].Expire)
92 {sp->LkUp = Slot[Snum].LkUp;
95 sp->Cont = Slot[Snum].Cont;
105 sp->Expire = myClock+1;
106 if (waitQ.Singleton()) isWaiting.Post();
107 waitQ.Prev()->Insert(&sp->Link);
118 Ready(Snum, Key, 0, 0);
132 if (Tint) Tslice = Tint;
133 if (Tdly) Tdelay = Tdly;
138 dataResp.Hdr.streamid = 0;
140 dataResp.Hdr.modifier = 0;
141 dataResp.Hdr.datalen = 0;
146 data_iov[0].iov_base = (
char *)&dataResp;
147 data_iov[0].iov_len =
sizeof(dataResp);
148 data_iov[1].iov_base = databuff;;
152 redrResp.Hdr.streamid = 0;
154 redrResp.Hdr.modifier = 0;
155 redrResp.Hdr.datalen = 0;
160 redr_iov[0].iov_base = (
char *)&redrResp;
161 redr_iov[0].iov_len =
sizeof(redrResp);
162 redr_iov[1].iov_base = hostbuff;;
166 waitResp.Hdr.streamid = 0;
168 waitResp.Hdr.modifier = 0;
169 waitResp.Hdr.datalen = htons(
static_cast<unsigned short>(
sizeof(waitResp.Val)));
170 waitResp.Val = htonl(Tdelay);
175 0,
"Request Responder")))
176 {
Say.
Emsg(
"Config", rc,
"create request responder thread");
183 0,
"Request Timeout")))
184 {
Say.
Emsg(
"Config", rc,
"create request timeout thread");
206 if (sp->Info.
Key != Key || !sp->Expire)
215 sp->Arg1 |= mask1; sp->Arg2 = mask2;
230 if (readyQ.Singleton()) isReady.Post();
231 readyQ.Prev()->Insert(&sp->Link);
250 Stats.rdFast += rdFast;
Stats.rdSlow += rdSlow;
251 Stats.luFast += luFast;
Stats.luSlow += luSlow;
252 if (readyQ.Singleton()) {myMutex.
UnLock();
break;}
253 sp = readyQ.Next()->Item(); sp->Link.
Remove(); sp->Expire = 0;
261 {sp->Cont->Arg1 = sp->Arg1;
262 sendRedResp(sp->Cont);
267 {sp->LkUp->Arg1 = sp->Arg1; sp->LkUp->Arg2 = sp->Arg2;
268 sendLocResp(sp->LkUp);
287 static const int ovhd =
sizeof(
kXR_unt32);
315 data_iov[1].iov_len = bytes;
317 dataResp.Hdr.datalen = htons(
static_cast<unsigned short>(bytes));
318 bytes +=
sizeof(dataResp.Hdr);
324 {dataResp.Hdr.streamid = lP->Info.
ID;
325 nP->
Send(data_iov, iov_cnt, bytes);
328 }
while((lP = lP->LkUp));
345 {waitResp.Hdr.streamid = rP->Info.
ID; luSlow++;
346 nP->
Send((
char *)&waitResp,
sizeof(waitResp));
350 }
while((rP = rP->LkUp));
361 static const int ovhd =
sizeof(
kXR_unt32);
363 int doredir = 0, port = 0, hlen = 0;
367 if ((doredir = (rP->Arg1 &&
Cluster.
Select(rP->Arg1, port, hostbuff, hlen,
370 {redrResp.Val = htonl(port);
371 redrResp.Hdr.datalen = htons(
static_cast<unsigned short>(hlen+ovhd));
372 redr_iov[1].iov_len = hlen;
373 hlen += ovhd +
sizeof(redrResp.Hdr);
380 {
if (doredir){redrResp.Hdr.streamid = rP->Info.
ID; rdFast++;
381 nP->
Send(redr_iov, iov_cnt, hlen);
384 else {waitResp.Hdr.streamid = rP->Info.
ID; rdSlow++;
385 nP->
Send((
char *)&waitResp,
sizeof(waitResp));
390 }
while((rP = rP->Cont));
416 while((sp=waitQ.Next()->Item()) && sp->Expire < myClock)
418 if (readyQ.Singleton()) isReady.Post();
421 readyQ.Prev()->Insert(&sp->Link);
423 if (waitQ.Singleton())
break;
440 XrdCmsRRQSlot::XrdCmsRRQSlot() : Link(this)
443 slotNum = initSlot++;
462 {sp->Info = *theInfo;
477 void XrdCmsRRQSlot::Recycle()
void * XrdCmsRRQ_StartRespond(void *parg)
void * XrdCmsRRQ_StartTimeOut(void *parg)
unsigned long long SMask_t
int Select(XrdCmsSelect &Sel)
XrdCmsSelected * List(SMask_t mask, CmsLSOpts opts, bool &oksel)
int Send(const char *buff, int blen=0)
static int do_LocFmt(char *buff, XrdCmsSelected *sP, SMask_t pf, SMask_t wf, bool lsall=false, bool lsuniq=false)
int Init(int Tint=0, int Tdly=0)
short Add(short Snum, XrdCmsRRQInfo *ip)
void Del(short Snum, const void *Key)
int Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
XrdCmsNode * Find(short Num, int Inst)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)