XRootD
XrdCmsResp.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d C m s R e s p . c c */
4 /* */
5 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstdlib>
33 #include <cstring>
34 
36 #include "XrdCms/XrdCmsParser.hh"
37 #include "XrdCms/XrdCmsResp.hh"
38 #include "XrdCms/XrdCmsTrace.hh"
39 
40 #include "XrdOuc/XrdOucBuffer.hh"
41 #include "XrdOuc/XrdOucErrInfo.hh"
43 #include "XrdSys/XrdSysError.hh"
44 
45 using namespace XrdCms;
46 
47 /******************************************************************************/
48 /* G l o b a l s */
49 /******************************************************************************/
50 
51 XrdSysSemaphore XrdCmsResp::isReady(0);
52 XrdSysMutex XrdCmsResp::rdyMutex;
53 XrdCmsResp *XrdCmsResp::First = 0;
54 XrdCmsResp *XrdCmsResp::Last = 0;
55 XrdSysMutex XrdCmsResp::myMutex;
56 XrdCmsResp *XrdCmsResp::nextFree = 0;
57 int XrdCmsResp::numFree = 0;
58 int XrdCmsResp::RepDelay = 5;
59 
60 /******************************************************************************/
61 /* A l l o c */
62 /******************************************************************************/
63 
65 {
66  XrdCmsResp *rp;
67 
68 // Allocate a response object. We must be assured that the semaphore count
69 // is zero. This will be true for freshly allocated objects. For reused
70 // objects we will need to run down the count to zero as multiple calls
71 // to sem_init may produced undefined behaviour.
72 //
73  myMutex.Lock();
74  if (nextFree)
75  {rp = nextFree;
76  nextFree = rp->next;
77  numFree--;
78  rp->SyncCB.Init();
79  }
80  else if (!(rp = new XrdCmsResp()))
81  {myMutex.UnLock();
82  return (XrdCmsResp *)0;
83  }
84  myMutex.UnLock();
85 
86 // Initialize it. We also replace the callback object pointer with a pointer
87 // to the synchronization semaphore as we have taken over the object and must
88 // provide a callback synchronization path for the caller. The OucEI object
89 // must be setup with pointers to stable areas and we will relocate any data
90 // to allow for a message to be sent back without overwriting the data (usually
91 // the path related to this request). We do this manually as the assignment
92 // operator does a bit more and a bit less than what we really need to do here.
93 //
94  strlcpy(rp->UserID, erp->getErrUser(), sizeof(rp->UserID));
95  rp->setErrUser(rp->UserID);
97  rp->setErrInfo(0, "");
98  rp->setErrMid(erp->getErrMid());
99  rp->ErrCB = erp->getErrCB(rp->ErrCBarg);
100  erp->setErrCB((XrdOucEICB *)&rp->SyncCB);
101  rp->myID = msgid;
102  rp->next = 0;
103 
104 // Return the response object
105 //
106  return rp;
107 }
108 
109 /******************************************************************************/
110 /* R e c y c l e */
111 /******************************************************************************/
112 
113 void XrdCmsResp::Recycle()
114 {
115 
116 // Recycle appendages
117 //
118  if (myBuff) {myBuff->Recycle(); myBuff = 0;}
119 
120 // We keep a stash of allocated response objects. If there are too many we
121 // simply delete this object.
122 //
123  if (XrdCmsResp::numFree >= XrdCmsResp::maxFree) delete this;
124  else {myMutex.Lock();
125  next = nextFree;
126  nextFree = this;
127  numFree++;
128  myMutex.UnLock();
129  }
130 }
131 
132 /******************************************************************************/
133 /* R e p l y */
134 /******************************************************************************/
135 
136 // This version of reply simply queues the object for reply
137 
138 void XrdCmsResp::Reply(const char *manp, CmsRRHdr &rrhdr, XrdOucBuffer *netbuff)
139 {
140 
141 // Copy the data we need to have
142 //
143  myRRHdr = rrhdr;
144  myBuff = netbuff;
145  next = 0;
146  strlcpy(theMan, manp, sizeof(theMan));
147 
148 // Now queue this object
149 //
150  rdyMutex.Lock();
151  if (Last) {Last->next = this; Last = this;}
152  else Last=First = this;
153  rdyMutex.UnLock();
154 
155 // Now indicate we have something to process
156 //
157  isReady.Post();
158 }
159 
160 /******************************************************************************/
161 
162 // This version of Reply() dequeues queued replies for processing
163 
165 {
166  XrdCmsResp *rp;
167 
168 // Endless look looking for something to reply to
169 //
170  while(1)
171  {isReady.Wait();
172  rdyMutex.Lock();
173  if ((rp = First))
174  {if (!(First = rp->next)) Last = 0;
175  rdyMutex.UnLock();
176  rp->ReplyXeq();
177  } else rdyMutex.UnLock();
178  }
179 }
180 
181 /******************************************************************************/
182 /* R e p l y X e q */
183 /******************************************************************************/
184 
185 void XrdCmsResp::ReplyXeq()
186 {
187  EPNAME("Reply")
188  XrdOucEICB *theCB;
189  int Result;
190 
191 // If there is no callback object, ignore this call. Eventually, we may wish
192 // to simulate a callback but this is rather complicated.
193 //
194  if (!ErrCB)
195  {DEBUG("No callback object for user " <<UserID <<" msgid="
196  <<myRRHdr.streamid <<' ' <<theMan);
197  Recycle();
198  return;
199  }
200 
201 // Get the values for the callback.
202 //
203  Result = XrdCmsParser::Decode(theMan,myRRHdr,myBuff,(XrdOucErrInfo *)this);
204 
205 // Translate the return code to what the caller's caller wanst to see. We
206 // should only receive the indicated codes at this point.
207 //
208  if (Result != SFS_REDIRECT && Result != SFS_STALL
209  && Result != SFS_DATA && Result != SFS_ERROR)
210  {char buff[16];
211  sprintf(buff, "%d", Result);
212  Say.Emsg("Reply", "Invalid call back result code", buff);
213  setErrInfo(EINVAL,"Invalid call back response from redirector.");
214  Result = SFS_ERROR;
215  }
216 
217 // Before invoking the callback we must be assured that the waitresp response
218 // has been sent to the client. We do this by waiting on a semaphore which is
219 // posted *after* the waitresp response is sent.
220 //
221  SyncCB.Wait();
222 
223 // We now must request a callback to recycle this object once the callback
224 // response is actually sent by setting the callback object pointer to us.
225 //
226  theCB = ErrCB;
227  ErrCB = (XrdOucEICB *)this;
228 
229 // Invoke the callback
230 //
231  theCB->Done(Result, (XrdOucErrInfo *)this, getErrData());
232 }
233 
234 /******************************************************************************/
235 /* X r d O d c R e s p Q */
236 /******************************************************************************/
237 /******************************************************************************/
238 /* C o n s t r u c t o r */
239 /******************************************************************************/
240 
242 {
243  memset(mqTab, 0, sizeof(mqTab));
244 }
245 
246 /******************************************************************************/
247 /* A d d */
248 /******************************************************************************/
249 
251 {
252  int i;
253 
254 // Compute index and either add or chain the entry
255 //
256  i = rp->myID % mqSize;
257  myMutex.Lock();
258  rp->next = (mqTab[i] ? mqTab[i] : 0);
259  mqTab[i] = rp;
260  myMutex.UnLock();
261 }
262 
263 /******************************************************************************/
264 /* P u r g e */
265 /******************************************************************************/
266 
268 {
269  XrdCmsResp *rp;
270  int i;
271 
272  myMutex.Lock();
273  for (i = 0; i < mqSize; i++)
274  {while ((rp = mqTab[i])) {mqTab[i] = rp->next; delete rp;}}
275  myMutex.UnLock();
276 }
277 
278 /******************************************************************************/
279 /* R e m */
280 /******************************************************************************/
281 
283 {
284  int i;
285  XrdCmsResp *rp, *pp = 0;
286 
287 // Compute the index and find the entry
288 //
289  i = msgid % mqSize;
290  myMutex.Lock();
291  rp = mqTab[i];
292  while(rp && rp->myID != msgid) {pp = rp; rp = rp->next;}
293 
294 // Remove the entry if we found it
295 //
296  if (rp) {if (pp) pp->next = rp->next;
297  else mqTab[i] = rp->next;
298  }
299 
300 // Return what we found
301 //
302  myMutex.UnLock();
303  return rp;
304 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define SFS_DATA
#define SFS_ERROR
#define SFS_REDIRECT
#define SFS_STALL
if(Avsz)
size_t strlcpy(char *dst, const char *src, size_t sz)
static int Decode(const char *Man, XrdCms::CmsRRHdr &hdr, XrdOucBuffer *dBuff, XrdOucErrInfo *eInfo)
void Init()
Definition: XrdCmsResp.hh:49
XrdCmsResp * Rem(int msgid)
Definition: XrdCmsResp.cc:282
void Add(XrdCmsResp *rp)
Definition: XrdCmsResp.cc:250
void Purge()
Definition: XrdCmsResp.cc:267
static void Reply()
Definition: XrdCmsResp.cc:164
static XrdCmsResp * Alloc(XrdOucErrInfo *erp, int msgid)
Definition: XrdCmsResp.cc:64
const char * getErrData()
XrdOucEICB * getErrCB()
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
void setErrUser(const char *user)
void setErrMid(int mid)
Set the monitoring identifier.
int setErrInfo(int code, const char *emsg)
const char * getErrUser()
XrdOucEICB * ErrCB
void setErrData(const char *Data, int Offs=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysError Say
static const int Path_Offset