XRootD
XrdBwmHandle.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d B w m H a n d l e . c c */
4 /* */
5 /* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstring>
33 
34 #include "XrdBwm/XrdBwmHandle.hh"
35 #include "XrdBwm/XrdBwmLogger.hh"
36 #include "XrdBwm/XrdBwmTrace.hh"
38 #include "XrdSys/XrdSysError.hh"
39 #include "XrdSys/XrdSysPlatform.hh"
40 
41 #include "XProtocol/XProtocol.hh"
42 
43 /******************************************************************************/
44 /* S t a t i c O b j e c t s */
45 /******************************************************************************/
46 
47 XrdBwmLogger *XrdBwmHandle::Logger = 0;
48 XrdBwmPolicy *XrdBwmHandle::Policy = 0;
49 XrdBwmHandle *XrdBwmHandle::Free = 0;
50 unsigned int XrdBwmHandle::numQueued = 0;
51 
52 extern XrdSysError BwmEroute;
53 
54 /******************************************************************************/
55 /* L o c a l C l a s s e s */
56 /******************************************************************************/
57 
58 class XrdBwmHandleCB : public XrdOucEICB, public XrdOucErrInfo
59 {
60 public:
61 
62 static
64  {XrdBwmHandleCB *mP;
65  xMutex.Lock();
66  if (!(mP = Free)) mP = new XrdBwmHandleCB;
67  else Free = mP->Next;
68  xMutex.UnLock();
69  return mP;
70  }
71 
72 void Done(int &Results, XrdOucErrInfo *eInfo, const char *Path=0)
73  {xMutex.Lock();
74  Next = Free;
75  Free = this;
76  xMutex.UnLock();
77  }
78 
79 int Same(unsigned long long arg1, unsigned long long arg2) {return 0;}
80 
81  XrdBwmHandleCB() : Next(0) {}
83 
84 private:
85  XrdBwmHandleCB *Next;
86 static XrdSysMutex xMutex;
87 static XrdBwmHandleCB *Free;
88 };
89 
90 XrdSysMutex XrdBwmHandleCB::xMutex;
91 XrdBwmHandleCB *XrdBwmHandleCB::Free = 0;
92 
93 /******************************************************************************/
94 /* E x t e r n a l L i n k a g e s */
95 /******************************************************************************/
96 
97 void *XrdBwmHanXeq(void *pp)
98 {
99  return XrdBwmHandle::Dispatch();
100 }
101 
102 /******************************************************************************/
103 /* c l a s s X r d B w m H a n d l e */
104 /******************************************************************************/
105 /******************************************************************************/
106 /* A c t i v a t e */
107 /******************************************************************************/
108 
109 #define tident Parms.Tident
110 
112 {
113  EPNAME("Activate");
114  XrdSysMutexHelper myHelper(hMutex);
115  char *rBuff;
116  int rSize, rc;
117 
118 // Check the status of this request.
119 //
120  if (Status != Idle)
121  {if (Status == Scheduled)
122  einfo.setErrInfo(kXR_inProgress, "Request already scheduled.");
123  else einfo.setErrInfo(kXR_InvalidRequest, "Visa already issued.");
124  return SFS_ERROR;
125  }
126 
127 // Try to schedule this request.
128 //
129  qTime = time(0);
130  rBuff = einfo.getMsgBuff(rSize);
131  if (!(rc = Policy->Schedule(rBuff, rSize, Parms))) return SFS_ERROR;
132 
133 // If resource immediately available, let client run
134 //
135  if (rc > 0)
136  {rHandle = rc;
137  Status = Dispatched;
138  rTime = time(0);
139  ZTRACE(sched,"Run " <<Parms.Lfn <<' ' <<Parms.LclNode
140  <<(Parms.Direction==XrdBwmPolicy::Incoming?" <- ":" -> ")
141  <<Parms.RmtNode);
142  einfo.setErrCode(strlen(rBuff));
143  return (*rBuff ? SFS_DATA : SFS_OK);
144  }
145 
146 // Request was queued. We need to hold on to this so we can issue an async
147 // response later when the resource becomes available.
148 //
149  rHandle = -rc;
150  ErrCB = einfo.getErrCB(ErrCBarg);
151  einfo.setErrCB((XrdOucEICB *)&myEICB);
152  Status = Scheduled;
153  refHandle(rHandle, this);
154  ZTRACE(sched, "inQ " <<Parms.Lfn <<' ' <<Parms.LclNode
155  <<(Parms.Direction==XrdBwmPolicy::Incoming?" <- ":" -> ")
156  <<Parms.RmtNode);
157 
158 // Indicate that client needs to wait
159 //
160  return SFS_STARTED;
161 }
162 #undef tident
163 
164 /******************************************************************************/
165 /* static public A l l o c # 1 */
166 /******************************************************************************/
167 
168 XrdBwmHandle *XrdBwmHandle::Alloc(const char *theUsr, const char *thePath,
169  const char *LclNode, const char *RmtNode,
170  int Incoming)
171 {
172  XrdBwmHandle *hP = Alloc();
173 
174 // Initialize the hanlde
175 //
176  if (hP)
177  {hP->Parms.Tident = theUsr; // Always available
178  hP->Parms.Lfn = strdup(thePath);
179  hP->Parms.LclNode = strdup(LclNode);
180  hP->Parms.RmtNode = strdup(RmtNode);
181  hP->Parms.Direction = (Incoming ? XrdBwmPolicy::Incoming
183  hP->Status = Idle;
184  hP->qTime = 0;
185  hP->rTime = 0;
186  hP->xSize = 0;
187  hP->xTime = 0;
188  }
189 
190 // All done
191 //
192  return hP;
193 }
194 
195 /******************************************************************************/
196 /* private A l l o c # 2 */
197 /******************************************************************************/
198 
200 {
201  static const int minAlloc = 4096/sizeof(XrdBwmHandle);
202  static XrdSysMutex aMutex;
203  XrdBwmHandle *hP;
204 
205 // No handle currently in the table. Get a new one off the free list or
206 // return one to the free list.
207 //
208  aMutex.Lock();
209  if (old_hP) {old_hP->Next = Free; Free = old_hP; hP = 0;}
210  else {if (!Free && (hP = new XrdBwmHandle[minAlloc]))
211  {int i = minAlloc; while(i--) {hP->Next = Free; Free = hP; hP++;}}
212  if ((hP = Free)) Free = hP->Next;
213  }
214  aMutex.UnLock();
215 
216  return hP;
217 }
218 
219 /******************************************************************************/
220 /* D i s p a t c h */
221 /******************************************************************************/
222 
223 #define tident hP->Parms.Tident
224 
226 {
227  EPNAME("Dispatch");
229  XrdBwmHandle *hP;
230  char *RespBuff;
231  int RespSize, readyH, Result, Err;
232 
233 // Dispatch ready requests in an endless loop
234 //
235  do {
236 
237 // Setup buffer
238 //
239  RespBuff = erP->getMsgBuff(RespSize);
240  *RespBuff = '\0';
241  erP->setErrCode(0);
242 
243 // Get next ready request and test if it ended with an error
244 //
245  if ((Err = (readyH = Policy->Dispatch(RespBuff, RespSize)) < 0))
246  readyH = -readyH;
247 
248 // Find the matching handle
249 //
250  if (!(hP = refHandle(readyH)))
251  {sprintf(RespBuff, "%d", readyH);
252  BwmEroute.Emsg("Dispatch", "Lost handle from", RespBuff);
253  if (!Err) Policy->Done(readyH);
254  continue;
255  }
256 
257 // Lock the handle and make sure it can be dispatched
258 //
259  hP->hMutex.Lock();
260  if (hP->Status != Scheduled)
261  {BwmEroute.Emsg("Dispatch", "ref to unscheduled handle",
262  hP->Parms.Tident, hP->Parms.Lfn);
263  if (!Err) Policy->Done(readyH);
264  } else {
265  hP->myEICB.Wait(); hP->rTime = time(0);
266  erP->setErrCB((XrdOucEICB *)erP, hP->ErrCBarg);
267  if (Err) {hP->Status = Idle; Result = SFS_ERROR;}
268  else {hP->Status = Dispatched;
269  erP->setErrCode(strlen(RespBuff));
270  Result = (*RespBuff ? SFS_DATA : SFS_OK);
271  }
272  ZTRACE(sched,(Err?"Err ":"Run ") <<hP->Parms.Lfn <<' ' <<hP->Parms.LclNode
273  <<(hP->Parms.Direction == XrdBwmPolicy::Incoming ? " <- ":" -> ")
274  <<hP->Parms.RmtNode);
275  hP->ErrCB->Done(Result, (XrdOucErrInfo *)erP);
276  erP = XrdBwmHandleCB::Alloc();
277  }
278  hP->hMutex.UnLock();
279  } while(1);
280 
281 // Keep the compiler happy
282 //
283  return (void *)0;
284 }
285 
286 #undef tident
287 
288 /******************************************************************************/
289 /* private r e f H a n d l e */
290 /******************************************************************************/
291 
292 XrdBwmHandle *XrdBwmHandle::refHandle(int refID, XrdBwmHandle *hP)
293 {
294  static XrdSysMutex tMutex;
295  static struct {XrdBwmHandle *First;
296  XrdBwmHandle *Last;
297  } hTab[256] = {{0,0}};
298  XrdBwmHandle *pP = 0;
299  int i = refID % 256;
300 
301 // If we have a handle passed, add the handle to the table
302 //
303  tMutex.Lock();
304  if (hP)
305  {hP->Next = 0;
306  if (hTab[i].Last) {hTab[i].Last->Next = hP; hTab[i].Last = hP;}
307  else {hTab[i].First = hTab[i].Last = hP; hP->Next = 0;}
308  numQueued++;
309  } else {
310  hP = hTab[i].First;
311  while(hP && hP->rHandle != refID) {pP = hP; hP = hP->Next;}
312  if (hP)
313  {if (pP) pP->Next = hP->Next;
314  else hTab[i].First = hP->Next;
315  if (hTab[i].Last == hP) hTab[i].Last = pP;
316  numQueued--;
317  }
318  }
319  tMutex.UnLock();
320 
321 // All done.
322 //
323  return hP;
324 }
325 
326 /******************************************************************************/
327 /* public R e t i r e */
328 /******************************************************************************/
329 
330 // The handle must be locked upon entry! It is unlocked upon exit.
331 
333 {
334  XrdSysMutexHelper myHelper(hMutex);
335 
336 // Get the global lock as the links field can only be manipulated with it.
337 // If not idle, cancel the resource. If scheduled, remove it from the table.
338 //
339  if (Status != Idle)
340  {Policy->Done(rHandle);
341  if (Status == Scheduled && !refHandle(rHandle, this))
342  BwmEroute.Emsg("Retire", "Lost handle to", Parms.Tident, Parms.Lfn);
343  Status = Idle; rHandle = 0;
344  }
345 
346 // If we have a logger, then log this event
347 //
348  if (Logger && qTime)
349  {XrdBwmLogger::Info myInfo;
350  myInfo.Tident = Parms.Tident;
351  myInfo.Lfn = Parms.Lfn;
352  myInfo.lclNode = Parms.LclNode;
353  myInfo.rmtNode = Parms.RmtNode;
354  myInfo.ATime = qTime;
355  myInfo.BTime = rTime;
356  myInfo.CTime = time(0);
357  myInfo.Size = xSize;
358  myInfo.ESec = xTime;
359  myInfo.Flow = (Parms.Direction == XrdBwmPolicy::Incoming ? 'I':'O');
360  Policy->Status(myInfo.numqIn, myInfo.numqOut, myInfo.numqXeq);
361  Logger->Event(myInfo);
362  }
363 
364 // Free storage appendages and recycle handle
365 //
366  if (Parms.Lfn) {free(Parms.Lfn); Parms.Lfn = 0;}
367  if (Parms.LclNode) {free(Parms.LclNode); Parms.LclNode = 0;}
368  if (Parms.RmtNode) {free(Parms.RmtNode); Parms.RmtNode = 0;}
369  Alloc(this);
370 }
371 
372 /******************************************************************************/
373 /* s e t P o l i c y */
374 /******************************************************************************/
375 
377 {
378  pthread_t tid;
379  int rc, startThread = (Policy == 0);
380 
381 // Set the policy and then start a thread to do dispatching if we have none
382 //
383  Policy = pP;
384  if (startThread)
385  if ((rc = XrdSysThread::Run(&tid, XrdBwmHanXeq, (void *)0,
386  0, "Handle Dispatcher")))
387  {BwmEroute.Emsg("setPolicy", rc, "create handle dispatch thread");
388  return 1;
389  }
390 
391 // All done
392 //
393  Logger = lP;
394  return 0;
395 }
@ kXR_InvalidRequest
Definition: XProtocol.hh:996
@ kXR_inProgress
Definition: XProtocol.hh:1010
XrdSysError BwmEroute
void * XrdBwmHanXeq(void *pp)
Definition: XrdBwmHandle.cc:97
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define ZTRACE(act, x)
Definition: XrdBwmTrace.hh:52
#define Err(p, a, b, c)
Definition: XrdNetSocket.cc:81
XrdOucString Path
#define SFS_DATA
#define SFS_ERROR
#define SFS_STARTED
#define SFS_OK
void Done(int &Results, XrdOucErrInfo *eInfo, const char *Path=0)
Definition: XrdBwmHandle.cc:72
static XrdBwmHandleCB * Alloc()
Definition: XrdBwmHandle.cc:63
int Same(unsigned long long arg1, unsigned long long arg2)
Definition: XrdBwmHandle.cc:79
static void * Dispatch()
int Activate(XrdOucErrInfo &einfo)
HandleState Status
Definition: XrdBwmHandle.hh:47
static XrdBwmHandle * Alloc(const char *theUsr, const char *thePath, const char *lclNode, const char *rmtNode, int Incoming)
static int setPolicy(XrdBwmPolicy *pP, XrdBwmLogger *lP)
const char * rmtNode
Definition: XrdBwmLogger.hh:48
const char * lclNode
Definition: XrdBwmLogger.hh:47
const char * Tident
Definition: XrdBwmLogger.hh:45
void Event(Info &eInfo)
const char * Lfn
Definition: XrdBwmLogger.hh:46
virtual int Done(int rHandle)=0
virtual int Schedule(char *RespBuff, int RespSize, SchedParms &Parms)=0
virtual void Status(int &numqIn, int &numqOut, int &numXeq)=0
virtual int Dispatch(char *RespBuff, int RespSize)=0
virtual void Done(int &Result, XrdOucErrInfo *eInfo, const char *Path=0)=0
XrdOucEICB * getErrCB()
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
int setErrInfo(int code, const char *emsg)
char * getMsgBuff(int &mblen)
int setErrCode(int code)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)