XRootD
XrdCmsRRQ.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d C m s R R Q . c c */
4 /* */
5 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstring>
32 #include <sys/types.h>
33 #include <netinet/in.h>
34 #include <cinttypes>
35 
36 #include "XrdCms/XrdCmsCluster.hh"
37 #include "XrdCms/XrdCmsNode.hh"
38 #include "XrdCms/XrdCmsRRQ.hh"
39 #include "XrdCms/XrdCmsRTable.hh"
40 #include "XrdCms/XrdCmsTrace.hh"
41 #include "XrdSys/XrdSysError.hh"
42 #include "XrdSys/XrdSysTimer.hh"
43 #include <cstdio>
44 
45 using namespace XrdCms;
46 
47 // Note: Debugging statements have been commented out. This is time critical
48 // code and debugging may only be enabled in standalone testing as the
49 // delays introduced by DEBUG() will usually cause timeout failures.
50 
51 /******************************************************************************/
52 /* G l o b a l O b j e c t s & S t a t i c M e m b e r s */
53 /******************************************************************************/
54 
56 
57 XrdSysMutex XrdCmsRRQSlot::myMutex;
58 XrdCmsRRQSlot *XrdCmsRRQSlot::freeSlot = 0;
59 short XrdCmsRRQSlot::initSlot = 0;
60 
61 /******************************************************************************/
62 /* E x t e r n a l F u n c t i o n s */
63 /******************************************************************************/
64 
65 void *XrdCmsRRQ_StartTimeOut(void *parg) {return RRQ.TimeOut();}
66 
67 void *XrdCmsRRQ_StartRespond(void *parg) {return RRQ.Respond();}
68 
69 /******************************************************************************/
70 /* X r d C m s R R Q C l a s s M e t h o d s */
71 /******************************************************************************/
72 /******************************************************************************/
73 /* A d d */
74 /******************************************************************************/
75 
76 short XrdCmsRRQ::Add(short Snum, XrdCmsRRQInfo *Info)
77 {
78 // EPNAME("RRQ Add");
79  XrdCmsRRQSlot *sp;
80 
81 // Obtain a slot and fill it in
82 //
83  if (!(sp = XrdCmsRRQSlot::Alloc(Info))) return 0;
84 // DEBUG("adding slot " <<sp->slotNum);
85 
86 // If a slot number given, check if it's the right slot and it is still queued.
87 // If so, piggy-back this request to existing one and make a fast exit
88 //
89  myMutex.Lock(); Stats.Add2Q++;
90  if (Snum && Slot[Snum].Info.Key == Info->Key && Slot[Snum].Expire)
91  {if (Info->isLU)
92  {sp->LkUp = Slot[Snum].LkUp;
93  Slot[Snum].LkUp = sp;
94  } else {
95  sp->Cont = Slot[Snum].Cont;
96  Slot[Snum].Cont = sp;
97  }
98  Stats.PBack++;
99  myMutex.UnLock();
100  return Snum;
101  }
102 
103 // Queue this slot to the pending response queue and tell the timeout scheduler
104 //
105  sp->Expire = myClock+1;
106  if (waitQ.Singleton()) isWaiting.Post();
107  waitQ.Prev()->Insert(&sp->Link);
108  myMutex.UnLock();
109  return sp->slotNum;
110 }
111 
112 /******************************************************************************/
113 /* D e l */
114 /******************************************************************************/
115 
116 void XrdCmsRRQ::Del(short Snum, const void *Key)
117 {
118  Ready(Snum, Key, 0, 0);
119 }
120 
121 /******************************************************************************/
122 /* I n i t */
123 /******************************************************************************/
124 
125 int XrdCmsRRQ::Init(int Tint, int Tdly)
126 {
127  int rc;
128  pthread_t tid;
129 
130 // Set values
131 //
132  if (Tint) Tslice = Tint;
133  if (Tdly) Tdelay = Tdly;
134  Stats.Reset();
135 
136 // Fill out the response structure
137 //
138  dataResp.Hdr.streamid = 0;
139  dataResp.Hdr.rrCode = kYR_data;
140  dataResp.Hdr.modifier = 0;
141  dataResp.Hdr.datalen = 0;
142  dataResp.Val = 0;
143 
144 // Fill out the data i/o vector
145 //
146  data_iov[0].iov_base = (char *)&dataResp;
147  data_iov[0].iov_len = sizeof(dataResp);
148  data_iov[1].iov_base = databuff;;
149 
150 // Fill out the response structure
151 //
152  redrResp.Hdr.streamid = 0;
153  redrResp.Hdr.rrCode = kYR_redirect;
154  redrResp.Hdr.modifier = 0;
155  redrResp.Hdr.datalen = 0;
156  redrResp.Val = 0;
157 
158 // Fill out the redirect i/o vector
159 //
160  redr_iov[0].iov_base = (char *)&redrResp;
161  redr_iov[0].iov_len = sizeof(redrResp);
162  redr_iov[1].iov_base = hostbuff;;
163 
164 // Fill out the wait info
165 //
166  waitResp.Hdr.streamid = 0;
167  waitResp.Hdr.rrCode = kYR_wait;
168  waitResp.Hdr.modifier = 0;
169  waitResp.Hdr.datalen = htons(static_cast<unsigned short>(sizeof(waitResp.Val)));
170  waitResp.Val = htonl(Tdelay);
171 
172 // Start the responder thread
173 //
174  if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartRespond, (void *)0,
175  0, "Request Responder")))
176  {Say.Emsg("Config", rc, "create request responder thread");
177  return 1;
178  }
179 
180 // Start the timeout thread
181 //
182  if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartTimeOut, (void *)0,
183  0, "Request Timeout")))
184  {Say.Emsg("Config", rc, "create request timeout thread");
185  return 1;
186  }
187 
188 // All done
189 //
190  return 0;
191 }
192 
193 /******************************************************************************/
194 /* R e a d y */
195 /******************************************************************************/
196 
197 int XrdCmsRRQ::Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
198 {
199 // EPNAME("RRQ Ready");
200  XrdCmsRRQSlot *sp;
201 
202 // Check if it's the right slot and it is still queued.
203 //
204  myMutex.Lock();
205  sp = &Slot[Snum];
206  if (sp->Info.Key != Key || !sp->Expire)
207  {myMutex.UnLock();
208 // DEBUG("slot " <<Snum <<" no longer valid");
209  return 1;
210  }
211 
212 // Update the arguments. The first is the running node mask and the second is
213 // a fixed differentiation mask. Accumulate the 1st but replace the 2nd.
214 //
215  sp->Arg1 |= mask1; sp->Arg2 = mask2;
216  Stats.Resp++;
217 
218 // Check if we should still hold on to this slot because the number of actual
219 // responders is less than the number needed.
220 //
221  if (sp->Info.actR < sp->Info.minR)
222  {sp->Info.actR++; Stats.Multi++;
223  myMutex.UnLock();
224  return 0;
225  }
226 
227 // Move the element from the waiting queue to the ready queue
228 //
229  sp->Link.Remove();
230  if (readyQ.Singleton()) isReady.Post();
231  readyQ.Prev()->Insert(&sp->Link);
232  myMutex.UnLock();
233 // DEBUG("readied slot " <<Snum <<" mask " <<mask);
234  return 1;
235 }
236 
237 /******************************************************************************/
238 /* R e s p o n d */
239 /******************************************************************************/
240 
242 {
243 // EPNAME("RRQ Respond");
244  XrdCmsRRQSlot *sp;
245 
246 // In an endless loop, process all ready elements
247 //
248  do {isReady.Wait(); // DEBUG("responder awoken");
249  do {myMutex.Lock();
250  Stats.rdFast += rdFast; Stats.rdSlow += rdSlow;
251  Stats.luFast += luFast; Stats.luSlow += luSlow;
252  if (readyQ.Singleton()) {myMutex.UnLock(); break;}
253  sp = readyQ.Next()->Item(); sp->Link.Remove(); sp->Expire = 0;
254  myMutex.UnLock();
255 
256  // A locate request can be pggy-backed on a select request and vice-versa
257  // We separate the two queues here as each has a different response.
258  //
259  if (sp->Info.isLU)
260  {if (sp->Cont)
261  {sp->Cont->Arg1 = sp->Arg1;
262  sendRedResp(sp->Cont);
263  }
264  sendLocResp(sp);
265  } else {
266  if (sp->LkUp)
267  {sp->LkUp->Arg1 = sp->Arg1; sp->LkUp->Arg2 = sp->Arg2;
268  sendLocResp(sp->LkUp);
269  }
270  sendRedResp(sp);
271  }
272  sp->Recycle();
273  } while(1);
274  } while(1);
275 
276 // Keep the compiler happy
277 //
278  return (void *)0;
279 }
280 
281 /******************************************************************************/
282 /* s e n d L o c R e s p */
283 /******************************************************************************/
284 
285 void XrdCmsRRQ::sendLocResp(XrdCmsRRQSlot *lP)
286 {
287  static const int ovhd = sizeof(kXR_unt32);
288  XrdCmsSelected *sP;
289  XrdCmsNode *nP;
291  int bytes;
292  bool oksel;
293 
294 // Send a delay if we timed out
295 //
296  if (!(lP->Arg1))
297  {sendLwtResp(lP);
298  return;
299  }
300 
301 // Get the list of servers that have this file. If none found, then force the
302 // client to wait as this should never happen and the long path is called for.
303 // ASAP responses always respond in with IPv6 addresses or mapped IPv4 ones.
304 //
305  lsopts = static_cast<XrdCmsCluster::CmsLSOpts>(lP->Info.lsLU);
306  if (!(sP = Cluster.List(lP->Arg1, lsopts, oksel))
307  || (!(bytes = XrdCmsNode::do_LocFmt(databuff,sP,lP->Arg2,lP->Info.rwVec))))
308  {sendLwtResp(lP);
309  return;
310  }
311 
312 // Complete the I/O vector
313 //
314  bytes++;
315  data_iov[1].iov_len = bytes;
316  bytes += ovhd;
317  dataResp.Hdr.datalen = htons(static_cast<unsigned short>(bytes));
318  bytes += sizeof(dataResp.Hdr);
319 
320 // Send the reply to each waiting redirector
321 //
322  RTable.Lock();
323  do {if ((nP = RTable.Find(lP->Info.Rnum, lP->Info.Rinst)))
324  {dataResp.Hdr.streamid = lP->Info.ID;
325  nP->Send(data_iov, iov_cnt, bytes);
326  }
327  luFast++;
328  } while((lP = lP->LkUp));
329  RTable.UnLock();
330 }
331 
332 /******************************************************************************/
333 /* s e n d L w t R e s p */
334 /******************************************************************************/
335 
336 void XrdCmsRRQ::sendLwtResp(XrdCmsRRQSlot *rP)
337 {
338 // EPNAME("sendLwtResp");
339  XrdCmsNode *nP;
340 
341 // For each request, find the redirector and ask it to send a wait
342 //
343  RTable.Lock();
344 do{if ((nP = RTable.Find(rP->Info.Rnum, rP->Info.Rinst)))
345  {waitResp.Hdr.streamid = rP->Info.ID; luSlow++;
346  nP->Send((char *)&waitResp, sizeof(waitResp));
347 // DEBUG("Redirect delay " <<nP->Name() <<' ' <<Tdelay);
348  }
349 // else {DEBUG("redirector " <<Info->Rnum <<'.' <<Info->Rinst <<"not found");}
350  } while((rP = rP->LkUp));
351  RTable.UnLock();
352 }
353 
354 /******************************************************************************/
355 /* s e n d R e d R e s p */
356 /******************************************************************************/
357 
358 void XrdCmsRRQ::sendRedResp(XrdCmsRRQSlot *rP)
359 {
360 // EPNAME("sendRedResp");
361  static const int ovhd = sizeof(kXR_unt32);
362  XrdCmsNode *nP;
363  int doredir = 0, port = 0, hlen = 0;
364 
365 // Determine where the client should be redirected
366 //
367  if ((doredir = (rP->Arg1 && Cluster.Select(rP->Arg1, port, hostbuff, hlen,
368  rP->Info.isRW, rP->Info.actR,
369  rP->Info.ifOP))))
370  {redrResp.Val = htonl(port);
371  redrResp.Hdr.datalen = htons(static_cast<unsigned short>(hlen+ovhd));
372  redr_iov[1].iov_len = hlen;
373  hlen += ovhd + sizeof(redrResp.Hdr);
374  }
375 
376 // For each request, find the redirector and ask it to send the message
377 //
378  RTable.Lock();
379 do{if ((nP = RTable.Find(rP->Info.Rnum, rP->Info.Rinst)))
380  {if (doredir){redrResp.Hdr.streamid = rP->Info.ID; rdFast++;
381  nP->Send(redr_iov, iov_cnt, hlen);
382 // DEBUG("Fast redirect " <<nP->Name() <<" -> " <<hostbuff);
383  }
384  else {waitResp.Hdr.streamid = rP->Info.ID; rdSlow++;
385  nP->Send((char *)&waitResp, sizeof(waitResp));
386 // DEBUG("Redirect delay " <<nP->Name() <<' ' <<Tdelay);
387  }
388  }
389 // else {DEBUG("redirector " <<Info->Rnum <<'.' <<Info->Rinst <<"not found");}
390  } while((rP = rP->Cont));
391  RTable.UnLock();
392 }
393 
394 /******************************************************************************/
395 /* T i m e O u t */
396 /******************************************************************************/
397 
399 {
400 // EPNAME("RRQ TimeOut");
401  XrdCmsRRQSlot *sp;
402 
403 // We measure millisecond intervals to timeout waiting requests. We used to zero
404 // out arg1/2 to force expiration, but they would be zero anyway if no responses
405 // occurred. Now with qdn we need to leave them alone as we may have deferred
406 // a fast dispatch because we were waiting for more than one responder.
407 //
408  while(1)
409  {isWaiting.Wait();
410  myMutex.Lock();
411  while(1)
412  {myClock++;
413  myMutex.UnLock();
414  XrdSysTimer::Wait(Tslice);
415  myMutex.Lock();
416  while((sp=waitQ.Next()->Item()) && sp->Expire < myClock)
417  {sp->Link.Remove();
418  if (readyQ.Singleton()) isReady.Post();
419 // sp->Arg1 = 0; sp->Arg2 = 0;
420 // DEBUG("expired slot " <<sp->slotNum);
421  readyQ.Prev()->Insert(&sp->Link);
422  }
423  if (waitQ.Singleton()) break;
424  }
425  myMutex.UnLock();
426  }
427 
428 // Keep the compiler happy
429 //
430  return (void *)0;
431 }
432 
433 /******************************************************************************/
434 /* X r d C m s R R Q S l o t C l a s s M e t h o d s */
435 /******************************************************************************/
436 /******************************************************************************/
437 /* C o n s t r u c t o r */
438 /******************************************************************************/
439 
440 XrdCmsRRQSlot::XrdCmsRRQSlot() : Link(this)
441 {
442 
443  slotNum = initSlot++;
444  if (slotNum)
445  {Cont = freeSlot;
446  freeSlot = this;
447  } else Cont = 0;
448  Arg1 = Arg2 = 0;
449  Info.Key = 0;
450 }
451 
452 /******************************************************************************/
453 /* A l l o c */
454 /******************************************************************************/
455 
456 XrdCmsRRQSlot *XrdCmsRRQSlot::Alloc(XrdCmsRRQInfo *theInfo)
457 {
458  XrdCmsRRQSlot *sp;
459 
460  myMutex.Lock();
461  if ((sp = freeSlot))
462  {sp->Info = *theInfo;
463  freeSlot = sp->Cont;
464  sp->Cont = 0;
465  sp->LkUp = 0;
466  sp->Arg1 = 0;
467  sp->Arg2 = 0;
468  }
469  myMutex.UnLock();
470  return sp;
471 }
472 
473 /******************************************************************************/
474 /* R e c y c l e */
475 /******************************************************************************/
476 
477 void XrdCmsRRQSlot::Recycle()
478 {
479  XrdCmsRRQSlot *sp, *np;
480 
481  myMutex.Lock();
482  if (!Link.Singleton()) Link.Remove();
483 
484 // Remove items in the lookup chain first
485 //
486  np = LkUp;
487  while((sp = np))
488  {np = sp->LkUp;
489  sp->Cont = freeSlot;
490  freeSlot = sp;
491  sp->Info.Key = 0;
492  }
493 
494 // Now remove items in the select chain
495 //
496  np = Cont;
497  while((sp = np))
498  {np = sp->Cont;
499  sp->Cont = freeSlot;
500  freeSlot = sp;
501  sp->Info.Key = 0;
502  }
503 
504 // Now put this item in the free chain
505 //
506  Info.Key = 0;
507  Cont = freeSlot;
508  freeSlot = this;
509  myMutex.UnLock();
510 }
unsigned int kXR_unt32
Definition: XPtypes.hh:90
void * XrdCmsRRQ_StartRespond(void *parg)
Definition: XrdCmsRRQ.cc:67
void * XrdCmsRRQ_StartTimeOut(void *parg)
Definition: XrdCmsRRQ.cc:65
unsigned long long SMask_t
Definition: XrdCmsTypes.hh:33
@ Info
int Select(XrdCmsSelect &Sel)
XrdCmsSelected * List(SMask_t mask, CmsLSOpts opts, bool &oksel)
int Send(const char *buff, int blen=0)
Definition: XrdCmsNode.hh:184
static int do_LocFmt(char *buff, XrdCmsSelected *sP, SMask_t pf, SMask_t wf, bool lsall=false, bool lsuniq=false)
Definition: XrdCmsNode.cc:659
void * Key
Definition: XrdCmsRRQ.hh:49
SMask_t rwVec
Definition: XrdCmsRRQ.hh:59
kXR_unt32 ID
Definition: XrdCmsRRQ.hh:50
int Init(int Tint=0, int Tdly=0)
Definition: XrdCmsRRQ.cc:125
void * TimeOut()
Definition: XrdCmsRRQ.cc:398
void * Respond()
Definition: XrdCmsRRQ.cc:241
short Add(short Snum, XrdCmsRRQInfo *ip)
Definition: XrdCmsRRQ.cc:76
void Del(short Snum, const void *Key)
Definition: XrdCmsRRQ.cc:116
int Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
Definition: XrdCmsRRQ.cc:197
XrdCmsNode * Find(short Num, int Inst)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
XrdCmsRRQ RRQ
Definition: XrdCmsRRQ.cc:55
XrdCmsCluster Cluster
@ kYR_data
Definition: YProtocol.hh:141
@ kYR_redirect
Definition: YProtocol.hh:143
@ kYR_wait
Definition: YProtocol.hh:144
XrdSysError Say
XrdCmsRTable RTable
Definition: XrdCmsRTable.cc:40
XrdPosixStats Stats
Definition: XrdPosixFile.cc:64