XRootD
XrdXrootdCallBack.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d X r o o t d C a l l B a c k . c c */
4 /* */
5 /* (c) 2006 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cerrno>
32 #include <cstdio>
33 #include <cstring>
34 #include <netinet/in.h>
35 #include <sys/uio.h>
36 
37 #include "Xrd/XrdScheduler.hh"
38 #include "XProtocol/XProtocol.hh"
39 #include "XProtocol/XPtypes.hh"
40 #include "XrdSys/XrdSysError.hh"
49 
50 /******************************************************************************/
51 /* L o c a l C l a s s e s */
52 /******************************************************************************/
53 
55 {
56 public:
57 
59  const char *Path, int rval);
60 
61  void DoIt();
62 
63 inline void Recycle(){myMutex.Lock();
64  Next = FreeJob;
65  FreeJob = this;
66  myMutex.UnLock();
67  }
68 
70  XrdOucErrInfo *erp,
71  const char *path,
72  int rval)
73  : XrdJob("async response"),
74  cbFunc(cbp), eInfo(erp), Path(path),
75  Result(rval) {}
76 
78 
79 private:
80 XrdXrootdFile *DoClose(XrdOucErrInfo *eInfo);
81 void DoStatx(XrdOucErrInfo *eInfo);
82 static XrdSysMutex myMutex;
83 static XrdXrootdCBJob *FreeJob;
84 
85 XrdXrootdCBJob *Next;
86 XrdXrootdCallBack *cbFunc;
87 XrdOucErrInfo *eInfo;
88 const char *Path;
89 int Result;
90 };
91 
92 /******************************************************************************/
93 /* G l o b a l s */
94 /******************************************************************************/
95 
97 
98 namespace
99 {
103  int Port;
104 }
105 
106  XrdSysMutex XrdXrootdCBJob::myMutex;
107  XrdXrootdCBJob *XrdXrootdCBJob::FreeJob;
108 
109 /******************************************************************************/
110 /* X r d X r o o t d C B J o b */
111 /******************************************************************************/
112 /******************************************************************************/
113 /* A l l o c */
114 /******************************************************************************/
115 
117  XrdOucErrInfo *erp,
118  const char *Path,
119  int rval)
120 {
121  XrdXrootdCBJob *cbj;
122 
123 // Obtain a call back object by trying to avoid new()
124 //
125  myMutex.Lock();
126  if (!(cbj = FreeJob)) cbj = new XrdXrootdCBJob(cbF, erp, Path, rval);
127  else {cbj->cbFunc = cbF, cbj->eInfo = erp;
128  cbj->Result = rval;cbj->Path = Path;
129  FreeJob = cbj->Next;
130  }
131  myMutex.UnLock();
132 
133 // Return the new object
134 //
135  return cbj;
136 }
137 
138 /******************************************************************************/
139 /* D o I t */
140 /******************************************************************************/
141 
143 {
144  static const char *TraceID = "DoIt";
145  XrdXrootdFile *fP = 0;
146 
147 // Do some tracing here
148 //
149  TRACE(RSP, eInfo->getErrUser() <<' ' <<cbFunc->Func() <<" async callback");
150 
151 // Some operations differ in the way we handle them. For instance, for open()
152 // if it succeeds then we must force the client to retry the open request
153 // because we can't attach the file to the client here. We do this by asking
154 // the client to wait zero seconds. Protocol demands a client retry. Close
155 // operations are always final and we need to do some cleanup.
156 //
157  if (*(cbFunc->Func()) == 'c') fP = DoClose(eInfo);
158  else if (SFS_OK == Result)
159  {if (*(cbFunc->Func()) == 'o')
160  {int rc = 0; cbFunc->sendResp(eInfo, kXR_wait, &rc);}
161  else {if (*(cbFunc->Func()) == 'x') DoStatx(eInfo);
162  cbFunc->sendResp(eInfo, kXR_ok, 0, eInfo->getErrText(),
163  eInfo->getErrTextLen());
164  }
165  }
166  else cbFunc->sendError(Result, eInfo, Path);
167 
168 // Tell the requestor that the callback has completed
169 //
170  if (eInfo->getErrCB()) eInfo->getErrCB()->Done(Result, eInfo);
171  else delete eInfo;
172  eInfo = 0;
173  if (fP) delete fP;
174  Recycle();
175 }
176 
177 /******************************************************************************/
178 /* D o C l o s e */
179 /******************************************************************************/
180 
181 XrdXrootdFile *XrdXrootdCBJob::DoClose(XrdOucErrInfo *eInfo)
182 {
183  XrdXrootdFile *fP = (XrdXrootdFile *)eInfo->getErrArg();
184 
185 // For close the main argument is the file pointer. Set the main arg to
186 // be the request identifier which is saved in the file object.
187 //
188  eInfo->setErrArg(fP->cbArg);
189 
190 // Responses to close() must be final; otherwise it's a system error.
191 //
192  if (Result != SFS_OK && Result != SFS_ERROR)
193  {char buff[64];
194  SI->errorCnt++;
195  sprintf(buff, "Invalid close() callback result of %d for", Result);
196  eDest->Emsg("DoClose", buff, Path);
197  Result = SFS_ERROR;
198  eInfo->setErrInfo(kXR_FSError, "Internal error; file close forced");
199  }
200 
201 // Send appropriate response (OK or error)
202 //
203  if (Result == SFS_OK) cbFunc->sendResp(eInfo, kXR_ok);
204  else cbFunc->sendError(Result, eInfo, Path);
205 
206 // Return the file object for disposal
207 //
208  return fP;
209 }
210 
211 /******************************************************************************/
212 /* D o S t a t x */
213 /******************************************************************************/
214 
215 void XrdXrootdCBJob::DoStatx(XrdOucErrInfo *einfo)
216 {
217  const char *tp = einfo->getErrText();
218  char cflags[2];
219  int flags;
220 
221 // Skip to the third token
222 //
223  while(*tp && *tp == ' ') tp++;
224  while(*tp && *tp != ' ') tp++; // 1st
225  while(*tp && *tp == ' ') tp++;
226  while(*tp && *tp != ' ') tp++; // 2nd
227 
228 // Convert to flags
229 //
230  flags = atoi(tp);
231 
232 // Convert to proper indicator
233 //
234  if (flags & kXR_offline) cflags[0] = (char)kXR_offline;
235  else if (flags & kXR_isDir) cflags[0] = (char)kXR_isDir;
236  else cflags[0] = (char)kXR_file;
237 
238 // Set the new response
239 //
240  cflags[1] = '\0';
241  einfo->setErrInfo(0, cflags);
242 }
243 
244 /******************************************************************************/
245 /* X r d X r o o t d C a l l B a c k */
246 /******************************************************************************/
247 /******************************************************************************/
248 /* D o n e */
249 /******************************************************************************/
250 
251 void XrdXrootdCallBack::Done(int &Result, //I/O: Function result
252  XrdOucErrInfo *eInfo, // In: Error information
253  const char *Path) // In: Path related
254 {
255  XrdXrootdCBJob *cbj;
256 
257 // Sending an async response may take a long time. So, we schedule the task
258 // to run asynchronously from the forces that got us here.
259 //
260  if (!(cbj = XrdXrootdCBJob::Alloc(this, eInfo, Path, Result)))
261  {eDest->Emsg("Done",ENOMEM,"get call back job; user",eInfo->getErrUser());
262  if (eInfo->getErrCB()) eInfo->getErrCB()->Done(Result, eInfo);
263  else delete eInfo;
264  } else Sched->Schedule((XrdJob *)cbj);
265 }
266 
267 /******************************************************************************/
268 /* S a m e */
269 /******************************************************************************/
270 
271 int XrdXrootdCallBack::Same(unsigned long long arg1, unsigned long long arg2)
272 {
273  XrdXrootdReqID ReqID1(arg1), ReqID2(arg2);
274  unsigned char sid1[2], sid2[2];
275  unsigned int inst1, inst2;
276  int lid1, lid2;
277 
278  ReqID1.getID(sid1, lid1, inst1);
279  ReqID2.getID(sid2, lid2, inst2);
280  return lid1 == lid2;
281 }
282 
283 /******************************************************************************/
284 /* s e n d E r r o r */
285 /******************************************************************************/
286 
288  XrdOucErrInfo *eInfo,
289  const char *Path)
290 {
291  static const char *TraceID = "fsError";
292  static int Xserr = kXR_ServerError;
293  int ecode;
294  const char *eMsg = eInfo->getErrText(ecode);
295  const char *User = eInfo->getErrUser();
296 
297 // Process the data response vector (we need to do this here)
298 //
299  if (rc == SFS_DATAVEC)
300  {if (ecode > 1) sendVesp(eInfo, kXR_ok, (struct iovec *)eMsg, ecode);
301  else sendResp(eInfo, kXR_ok, 0);
302  return;
303  }
304 
305 // Optimize error message handling here
306 //
307  if (eMsg && !*eMsg) eMsg = 0;
308 
309 // Process standard errors
310 //
311  if (rc == SFS_ERROR)
312  {SI->errorCnt++;
313  rc = XProtocol::mapError(ecode);
314  sendResp(eInfo, kXR_error, &rc, eMsg, eInfo->getErrTextLen()+1);
315  return;
316  }
317 
318 // Process the redirection (error msg is host:port)
319 //
320  if (rc == SFS_REDIRECT)
321  {SI->redirCnt++;
322  if (ecode <= 0) ecode = (ecode ? -ecode : Port);
323  TRACE(REDIR, User <<" async redir to " << eMsg <<':' <<ecode <<' '
324  <<(Path ? Path : ""));
325  sendResp(eInfo, kXR_redirect, &ecode, eMsg, eInfo->getErrTextLen());
327  XrdXrootdMonitor::Redirect(eInfo->getErrMid(),eMsg,ecode,Opcode,Path);
328  return;
329  }
330 
331 // Process the deferal
332 //
333  if (rc >= SFS_STALL)
334  {SI->stallCnt++;
335  TRACE(STALL, "Stalling " <<User <<" for " <<rc <<" sec");
336  sendResp(eInfo, kXR_wait, &rc, eMsg, eInfo->getErrTextLen()+1);
337  return;
338  }
339 
340 // Process the data response
341 //
342  if (rc == SFS_DATA)
343  {if (ecode) sendResp(eInfo, kXR_ok, 0, eMsg, ecode);
344  else sendResp(eInfo, kXR_ok, 0);
345  return;
346  }
347 
348 // Unknown conditions, report it
349 //
350  {char buff[64];
351  SI->errorCnt++;
352  ecode = sprintf(buff, "Unknown sfs response code %d", rc);
353  eDest->Emsg("sendError", buff);
354  sendResp(eInfo, kXR_error, &Xserr, buff, ecode+1);
355  return;
356  }
357 }
358 
359 /******************************************************************************/
360 /* s e n d R e s p */
361 /******************************************************************************/
362 
364  XResponseType Status,
365  int *Data,
366  const char *Msg,
367  int Mlen)
368 {
369  static const char *TraceID = "sendResp";
370  struct iovec rspVec[4];
371  XrdXrootdReqID ReqID;
372  int dlen = 0, n = 1;
373  kXR_int32 xbuf;
374 
375  if (Data)
376  {xbuf = static_cast<kXR_int32>(htonl(*Data));
377  rspVec[n].iov_base = (caddr_t)(&xbuf);
378  dlen = rspVec[n].iov_len = sizeof(xbuf); n++; // 1
379  }
380  if (Msg && *Msg)
381  { rspVec[n].iov_base = (caddr_t)Msg;
382  dlen += rspVec[n].iov_len = Mlen; n++; // 2
383  }
384 
385 // Set the destination
386 //
387  ReqID.setID(eInfo->getErrArg());
388 
389 // Send the async response
390 //
391  if (XrdXrootdResponse::Send(ReqID, Status, rspVec, n, dlen) < 0)
392  eDest->Emsg("sendResp", eInfo->getErrUser(), Opname,
393  "async resp aborted; user gone.");
394  else if (TRACING(TRACE_RSP))
395  {XrdXrootdResponse theResp;
396  theResp.Set(ReqID.Stream());
397  TRACE(RSP, eInfo->getErrUser() <<" async " <<theResp.ID()
398  <<' ' <<Opname <<" status " <<Status);
399  }
400 
401 // Release any external buffer from the errinfo object
402 //
403  if (eInfo->extData()) eInfo->Reset();
404 }
405 
406 /******************************************************************************/
407 /* s e n d V e s p */
408 /******************************************************************************/
409 
411  XResponseType Status,
412  struct iovec *ioV,
413  int ioN)
414 {
415  static const char *TraceID = "sendVesp";
416  XrdXrootdReqID ReqID;
417  int dlen = 0;
418 
419 // Calculate the amount of data being sent
420 //
421  for (int i = 1; i < ioN; i++) dlen += ioV[i].iov_len;
422 
423 // Set the destination
424 //
425  ReqID.setID(eInfo->getErrArg());
426 
427 // Send the async response
428 //
429  if (XrdXrootdResponse::Send(ReqID, Status, ioV, ioN, dlen) < 0)
430  eDest->Emsg("sendResp", eInfo->getErrUser(), Opname,
431  "async resp aborted; user gone.");
432  else if (TRACING(TRACE_RSP))
433  {XrdXrootdResponse theResp;
434  theResp.Set(ReqID.Stream());
435  TRACE(RSP, eInfo->getErrUser() <<" async " <<theResp.ID()
436  <<' ' <<Opname <<" status " <<Status);
437  }
438 
439 // Release any external buffer from the errinfo object
440 //
441  if (eInfo->extData()) eInfo->Reset();
442 }
443 
444 /******************************************************************************/
445 /* S e t V a l s */
446 /******************************************************************************/
447 
449  XrdXrootdStats *SIp,
450  XrdScheduler *schp,
451  int port)
452 {
453 // Set values into out unnamed static space
454 //
455  eDest = erp;
456  SI = SIp;
457  Sched = schp;
458  Port = port;
459 }
@ kXR_ServerError
Definition: XProtocol.hh:1002
@ kXR_FSError
Definition: XProtocol.hh:995
XResponseType
Definition: XProtocol.hh:898
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_wait
Definition: XProtocol.hh:905
@ kXR_error
Definition: XProtocol.hh:903
@ kXR_file
Definition: XProtocol.hh:1219
@ kXR_isDir
Definition: XProtocol.hh:1221
@ kXR_offline
Definition: XProtocol.hh:1223
int kXR_int32
Definition: XPtypes.hh:89
static XrdSysError eDest(0,"crypto_")
#define TRACE_RSP
Definition: XrdHttpTrace.hh:53
XrdOucString Path
#define eMsg(x)
#define SFS_DATAVEC
#define SFS_DATA
#define SFS_ERROR
#define SFS_REDIRECT
#define SFS_STALL
#define SFS_OK
#define TRACE(act, x)
Definition: XrdTrace.hh:63
#define TRACING(x)
Definition: XrdTrace.hh:70
XrdSysTrace XrdXrootdTrace
static int mapError(int rc)
Definition: XProtocol.hh:1361
Definition: XrdJob.hh:43
virtual void Done(int &Result, XrdOucErrInfo *eInfo, const char *Path=0)=0
XrdOucEICB * getErrCB()
void setErrArg(unsigned long long cbarg=0)
unsigned long long getErrArg()
const char * getErrText()
int setErrInfo(int code, const char *emsg)
const char * getErrUser()
void Reset()
Reset object to no message state. Call this method to release appendages.
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdXrootdCBJob(XrdXrootdCallBack *cbp, XrdOucErrInfo *erp, const char *path, int rval)
static XrdXrootdCBJob * Alloc(XrdXrootdCallBack *cbF, XrdOucErrInfo *erp, const char *Path, int rval)
void Done(int &Result, XrdOucErrInfo *eInfo, const char *Path=0)
static void setVals(XrdSysError *erp, XrdXrootdStats *SIp, XrdScheduler *schp, int port)
int Same(unsigned long long arg1, unsigned long long arg2)
void sendError(int rc, XrdOucErrInfo *eInfo, const char *Path)
void sendResp(XrdOucErrInfo *eInfo, XResponseType xrt, int *Data=0, const char *Msg=0, int Mlen=0)
void sendVesp(XrdOucErrInfo *eInfo, XResponseType xrt, struct iovec *ioV, int ioN)
const char * Func()
static int Redirect()
void setID(unsigned long long id)
unsigned char * Stream()
unsigned long long getID()
void Set(XrdLink *lp)
long long redirCnt
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54
XrdXrootdStats * SI