XRootD
XrdCmsRRQ Class Reference

#include <XrdCmsRRQ.hh>

+ Collaboration diagram for XrdCmsRRQ:

Classes

struct  Info
 

Public Member Functions

 XrdCmsRRQ ()
 
 ~XrdCmsRRQ ()
 
short Add (short Snum, XrdCmsRRQInfo *ip)
 
void Del (short Snum, const void *Key)
 
int Init (int Tint=0, int Tdly=0)
 
int Ready (int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
 
void * Respond ()
 
void Statistics (Info &Data)
 
void * TimeOut ()
 

Detailed Description

Definition at line 104 of file XrdCmsRRQ.hh.

Constructor & Destructor Documentation

◆ XrdCmsRRQ()

XrdCmsRRQ::XrdCmsRRQ ( )
inline

Definition at line 148 of file XrdCmsRRQ.hh.

148  : isWaiting(0), isReady(0),
149  luFast(0), luSlow(0), rdFast(0), rdSlow(0),
150  Tslice(178), Tdelay(5), myClock(0) {}

◆ ~XrdCmsRRQ()

XrdCmsRRQ::~XrdCmsRRQ ( )
inline

Definition at line 151 of file XrdCmsRRQ.hh.

151 {}

Member Function Documentation

◆ Add()

short XrdCmsRRQ::Add ( short  Snum,
XrdCmsRRQInfo ip 
)

Definition at line 76 of file XrdCmsRRQ.cc.

77 {
78 // EPNAME("RRQ Add");
79  XrdCmsRRQSlot *sp;
80 
81 // Obtain a slot and fill it in
82 //
83  if (!(sp = XrdCmsRRQSlot::Alloc(Info))) return 0;
84 // DEBUG("adding slot " <<sp->slotNum);
85 
86 // If a slot number given, check if it's the right slot and it is still queued.
87 // If so, piggy-back this request to existing one and make a fast exit
88 //
89  myMutex.Lock(); Stats.Add2Q++;
90  if (Snum && Slot[Snum].Info.Key == Info->Key && Slot[Snum].Expire)
91  {if (Info->isLU)
92  {sp->LkUp = Slot[Snum].LkUp;
93  Slot[Snum].LkUp = sp;
94  } else {
95  sp->Cont = Slot[Snum].Cont;
96  Slot[Snum].Cont = sp;
97  }
98  Stats.PBack++;
99  myMutex.UnLock();
100  return Snum;
101  }
102 
103 // Queue this slot to the pending response queue and tell the timeout scheduler
104 //
105  sp->Expire = myClock+1;
106  if (waitQ.Singleton()) isWaiting.Post();
107  waitQ.Prev()->Insert(&sp->Link);
108  myMutex.UnLock();
109  return sp->slotNum;
110 }
@ Info
XrdOucDLlist * Prev()
Definition: XrdOucDLlist.hh:98
void Insert(XrdOucDLlist *Node, T *Item=0)
Definition: XrdOucDLlist.hh:71
long long Add2Q
Definition: XrdCmsRRQ.hh:134
long long PBack
Definition: XrdCmsRRQ.hh:135

References XrdPosixGlobals::Stats, and XrdPosixStats::UnLock().

+ Here is the call graph for this function:

◆ Del()

void XrdCmsRRQ::Del ( short  Snum,
const void *  Key 
)

Definition at line 116 of file XrdCmsRRQ.cc.

117 {
118  Ready(Snum, Key, 0, 0);
119 }
int Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
Definition: XrdCmsRRQ.cc:197

◆ Init()

int XrdCmsRRQ::Init ( int  Tint = 0,
int  Tdly = 0 
)

Definition at line 125 of file XrdCmsRRQ.cc.

126 {
127  int rc;
128  pthread_t tid;
129 
130 // Set values
131 //
132  if (Tint) Tslice = Tint;
133  if (Tdly) Tdelay = Tdly;
134  Stats.Reset();
135 
136 // Fill out the response structure
137 //
138  dataResp.Hdr.streamid = 0;
139  dataResp.Hdr.rrCode = kYR_data;
140  dataResp.Hdr.modifier = 0;
141  dataResp.Hdr.datalen = 0;
142  dataResp.Val = 0;
143 
144 // Fill out the data i/o vector
145 //
146  data_iov[0].iov_base = (char *)&dataResp;
147  data_iov[0].iov_len = sizeof(dataResp);
148  data_iov[1].iov_base = databuff;;
149 
150 // Fill out the response structure
151 //
152  redrResp.Hdr.streamid = 0;
153  redrResp.Hdr.rrCode = kYR_redirect;
154  redrResp.Hdr.modifier = 0;
155  redrResp.Hdr.datalen = 0;
156  redrResp.Val = 0;
157 
158 // Fill out the redirect i/o vector
159 //
160  redr_iov[0].iov_base = (char *)&redrResp;
161  redr_iov[0].iov_len = sizeof(redrResp);
162  redr_iov[1].iov_base = hostbuff;;
163 
164 // Fill out the wait info
165 //
166  waitResp.Hdr.streamid = 0;
167  waitResp.Hdr.rrCode = kYR_wait;
168  waitResp.Hdr.modifier = 0;
169  waitResp.Hdr.datalen = htons(static_cast<unsigned short>(sizeof(waitResp.Val)));
170  waitResp.Val = htonl(Tdelay);
171 
172 // Start the responder thread
173 //
174  if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartRespond, (void *)0,
175  0, "Request Responder")))
176  {Say.Emsg("Config", rc, "create request responder thread");
177  return 1;
178  }
179 
180 // Start the timeout thread
181 //
182  if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartTimeOut, (void *)0,
183  0, "Request Timeout")))
184  {Say.Emsg("Config", rc, "create request timeout thread");
185  return 1;
186  }
187 
188 // All done
189 //
190  return 0;
191 }
void * XrdCmsRRQ_StartRespond(void *parg)
Definition: XrdCmsRRQ.cc:67
void * XrdCmsRRQ_StartTimeOut(void *parg)
Definition: XrdCmsRRQ.cc:65
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
kXR_unt16 datalen
Definition: YProtocol.hh:86
kXR_char modifier
Definition: YProtocol.hh:85
@ kYR_data
Definition: YProtocol.hh:141
@ kYR_redirect
Definition: YProtocol.hh:143
@ kYR_wait
Definition: YProtocol.hh:144
XrdSysError Say
kXR_char rrCode
Definition: YProtocol.hh:84
kXR_unt32 streamid
Definition: YProtocol.hh:83

References XrdSysError::Emsg(), XrdCms::kYR_data, XrdCms::kYR_redirect, XrdCms::kYR_wait, XrdSysThread::Run(), XrdCms::Say, XrdPosixGlobals::Stats, XrdCmsRRQ_StartRespond(), and XrdCmsRRQ_StartTimeOut().

+ Here is the call graph for this function:

◆ Ready()

int XrdCmsRRQ::Ready ( int  Snum,
const void *  Key,
SMask_t  mask1,
SMask_t  mask2 
)

Definition at line 197 of file XrdCmsRRQ.cc.

198 {
199 // EPNAME("RRQ Ready");
200  XrdCmsRRQSlot *sp;
201 
202 // Check if it's the right slot and it is still queued.
203 //
204  myMutex.Lock();
205  sp = &Slot[Snum];
206  if (sp->Info.Key != Key || !sp->Expire)
207  {myMutex.UnLock();
208 // DEBUG("slot " <<Snum <<" no longer valid");
209  return 1;
210  }
211 
212 // Update the arguments. The first is the running node mask and the second is
213 // a fixed differentiation mask. Accumulate the 1st but replace the 2nd.
214 //
215  sp->Arg1 |= mask1; sp->Arg2 = mask2;
216  Stats.Resp++;
217 
218 // Check if we should still hold on to this slot because the number of actual
219 // responders is less than the number needed.
220 //
221  if (sp->Info.actR < sp->Info.minR)
222  {sp->Info.actR++; Stats.Multi++;
223  myMutex.UnLock();
224  return 0;
225  }
226 
227 // Move the element from the waiting queue to the ready queue
228 //
229  sp->Link.Remove();
230  if (readyQ.Singleton()) isReady.Post();
231  readyQ.Prev()->Insert(&sp->Link);
232  myMutex.UnLock();
233 // DEBUG("readied slot " <<Snum <<" mask " <<mask);
234  return 1;
235 }
void * Key
Definition: XrdCmsRRQ.hh:49
long long Resp
Definition: XrdCmsRRQ.hh:136
long long Multi
Definition: XrdCmsRRQ.hh:137

References XrdCmsRRQInfo::actR, XrdCmsRRQInfo::Key, XrdCmsRRQInfo::minR, XrdOucDLlist< T >::Remove(), XrdPosixGlobals::Stats, and XrdPosixStats::UnLock().

+ Here is the call graph for this function:

◆ Respond()

void * XrdCmsRRQ::Respond ( )

Definition at line 241 of file XrdCmsRRQ.cc.

242 {
243 // EPNAME("RRQ Respond");
244  XrdCmsRRQSlot *sp;
245 
246 // In an endless loop, process all ready elements
247 //
248  do {isReady.Wait(); // DEBUG("responder awoken");
249  do {myMutex.Lock();
250  Stats.rdFast += rdFast; Stats.rdSlow += rdSlow;
251  Stats.luFast += luFast; Stats.luSlow += luSlow;
252  if (readyQ.Singleton()) {myMutex.UnLock(); break;}
253  sp = readyQ.Next()->Item(); sp->Link.Remove(); sp->Expire = 0;
254  myMutex.UnLock();
255 
256  // A locate request can be pggy-backed on a select request and vice-versa
257  // We separate the two queues here as each has a different response.
258  //
259  if (sp->Info.isLU)
260  {if (sp->Cont)
261  {sp->Cont->Arg1 = sp->Arg1;
262  sendRedResp(sp->Cont);
263  }
264  sendLocResp(sp);
265  } else {
266  if (sp->LkUp)
267  {sp->LkUp->Arg1 = sp->Arg1; sp->LkUp->Arg2 = sp->Arg2;
268  sendLocResp(sp->LkUp);
269  }
270  sendRedResp(sp);
271  }
272  sp->Recycle();
273  } while(1);
274  } while(1);
275 
276 // Keep the compiler happy
277 //
278  return (void *)0;
279 }
XrdOucDLlist * Next()
Definition: XrdOucDLlist.hh:94
long long luSlow
Definition: XrdCmsRRQ.hh:139
long long rdSlow
Definition: XrdCmsRRQ.hh:141
long long luFast
Definition: XrdCmsRRQ.hh:138
long long rdFast
Definition: XrdCmsRRQ.hh:140

References XrdCmsRRQInfo::isLU, XrdOucDLlist< T >::Remove(), XrdPosixGlobals::Stats, and XrdPosixStats::UnLock().

Referenced by XrdCmsRRQ_StartRespond().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Statistics()

void XrdCmsRRQ::Statistics ( Info Data)
inline

Definition at line 144 of file XrdCmsRRQ.hh.

144 {myMutex.Lock(); Data = Stats; myMutex.UnLock();}

References XrdSysMutex::Lock(), and XrdSysMutex::UnLock().

Referenced by XrdCmsCluster::Statt().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ TimeOut()

void * XrdCmsRRQ::TimeOut ( )

Definition at line 398 of file XrdCmsRRQ.cc.

399 {
400 // EPNAME("RRQ TimeOut");
401  XrdCmsRRQSlot *sp;
402 
403 // We measure millisecond intervals to timeout waiting requests. We used to zero
404 // out arg1/2 to force expiration, but they would be zero anyway if no responses
405 // occurred. Now with qdn we need to leave them alone as we may have deferred
406 // a fast dispatch because we were waiting for more than one responder.
407 //
408  while(1)
409  {isWaiting.Wait();
410  myMutex.Lock();
411  while(1)
412  {myClock++;
413  myMutex.UnLock();
414  XrdSysTimer::Wait(Tslice);
415  myMutex.Lock();
416  while((sp=waitQ.Next()->Item()) && sp->Expire < myClock)
417  {sp->Link.Remove();
418  if (readyQ.Singleton()) isReady.Post();
419 // sp->Arg1 = 0; sp->Arg2 = 0;
420 // DEBUG("expired slot " <<sp->slotNum);
421  readyQ.Prev()->Insert(&sp->Link);
422  }
423  if (waitQ.Singleton()) break;
424  }
425  myMutex.UnLock();
426  }
427 
428 // Keep the compiler happy
429 //
430  return (void *)0;
431 }
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227

References XrdOucDLlist< T >::Remove(), and XrdSysTimer::Wait().

Referenced by XrdCmsRRQ_StartTimeOut().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: