XRootD
XrdSsiFileReq.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d S s i F i l e R e q . c c */
4 /* */
5 /* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <cstdio>
31 #include <cstring>
32 #include <arpa/inet.h>
33 #include <sys/types.h>
34 
35 #include "XrdOuc/XrdOucBuffer.hh"
36 #include "XrdOuc/XrdOucERoute.hh"
37 #include "XrdOuc/XrdOucErrInfo.hh"
38 #include "XrdSfs/XrdSfsDio.hh"
39 #include "XrdSsi/XrdSsiAlert.hh"
40 #include "XrdSsi/XrdSsiFileReq.hh"
42 #include "XrdSsi/XrdSsiFileSess.hh"
43 #include "XrdSsi/XrdSsiRRAgent.hh"
44 #include "XrdSsi/XrdSsiService.hh"
45 #include "XrdSsi/XrdSsiSfs.hh"
46 #include "XrdSsi/XrdSsiStream.hh"
47 #include "XrdSsi/XrdSsiStats.hh"
48 #include "XrdSsi/XrdSsiTrace.hh"
49 #include "XrdSsi/XrdSsiUtils.hh"
50 #include "XrdSys/XrdSysError.hh"
51 
52 /******************************************************************************/
53 /* L o c a l M a c r o s */
54 /******************************************************************************/
55 
56 #define DEBUGXQ(x) DEBUG(rID<<sessN<<rspstID[urState]<<reqstID[myState]<<x)
57 
58 #define DUMPIT(x,y) XrdSsiUtils::b2x(x,y,hexBuff,sizeof(hexBuff),dotBuff)<<dotBuff
59 
60 /******************************************************************************/
61 /* G l o b a l s */
62 /******************************************************************************/
63 
64 namespace XrdSsi
65 {
67 extern XrdScheduler *Sched;
68 extern XrdSsiService *Service;
70 };
71 
72 using namespace XrdSsi;
73 
74 /******************************************************************************/
75 /* S t a t i c L o c a l s */
76 /******************************************************************************/
77 
78 namespace
79 {
80 const char *rspstID[XrdSsiFileReq::isMax] =
81  {" [new", " [begun", " [bound",
82  " [abort", " [done"
83  };
84 
85 const char *reqstID[XrdSsiFileReq::rsEnd] =
86  {" wtReq] ", " xqReq] ", " wtRsp] ",
87  " doRsp] ", " odRsp] ", " erRsp] "
88  };
89 };
90 
91 /******************************************************************************/
92 /* S t a t i c M e m b e r s */
93 /******************************************************************************/
94 
95 XrdSysMutex XrdSsiFileReq::aqMutex;
96 XrdSsiFileReq *XrdSsiFileReq::freeReq = 0;
97 int XrdSsiFileReq::freeCnt = 0;
98 int XrdSsiFileReq::freeMax = 256;
99 
100 /******************************************************************************/
101 /* A c t i v a t e */
102 /******************************************************************************/
103 
105 {
106  EPNAME("Activate");
107 
108 // Do some debugging
109 //
110  DEBUGXQ((oP ? "oucbuff" : "sfsbuff") <<" rqsz=" <<rSz);
111 
112 // Do statistics
113 //
115  Stats.ReqCount++;
116  Stats.ReqBytes += rSz;
117  if (rSz > Stats.ReqMaxsz) Stats.ReqMaxsz = rSz;
119 
120 // Set request buffer pointers
121 //
122  oucBuff = oP;
123  sfsBref = bR;
124  reqSize = rSz;
125 
126 // Now schedule ourselves to process this request. The state is new.
127 //
128  Sched->Schedule((XrdJob *)this);
129 }
130 
131 /******************************************************************************/
132 /* A l e r t */
133 /******************************************************************************/
134 
136 {
137  EPNAME("Alert");
138  XrdSsiAlert *aP;
139  int msgLen;
140 
141 // Do some debugging
142 //
143  aMsg.GetMsg(msgLen);
144  DEBUGXQ(msgLen <<" byte alert presented wtr=" <<respWait);
145 
146 // Add up statistics
147 //
149 
150 // Lock this object
151 //
152  frqMutex.Lock();
153 
154 // Validate the length and whether this call is allowed
155 //
156  if (msgLen <= 0 || haveResp || isEnding)
157  {frqMutex.UnLock();
158  aMsg.RecycleMsg();
159  return;
160  }
161 
162 // Allocate an alert object and chain it into the pending queue
163 //
164  aP = XrdSsiAlert::Alloc(aMsg);
165 
166 // Alerts must be sent in the orer they are presented. So, check if we need
167 // to chain this and try to send the first in the chain. This only really
168 // matters if we can send the alert now because the client is waiting.
169 //
170  if (respWait)
171  {if (alrtPend)
172  {alrtLast->next = aP;
173  alrtLast = aP;
174  aP = alrtPend;
175  alrtPend = alrtPend->next;
176  }
177  WakeUp(aP);
178  } else {
179  if (alrtLast) alrtLast->next = aP;
180  else alrtPend = aP;
181  alrtLast = aP;
182  }
183 
184 // All done
185 //
186  frqMutex.UnLock();
187 }
188 
189 /******************************************************************************/
190 /* A l l o c */
191 /******************************************************************************/
192 
194  XrdSsiFileResource *rP,
195  XrdSsiFileSess *fP,
196  const char *sID,
197  const char *cID,
198  unsigned int rnum)
199 {
200  XrdSsiFileReq *nP;
201 
202 // Check if we can grab this from out queue
203 //
204  aqMutex.Lock();
205  if ((nP = freeReq))
206  {freeCnt--;
207  freeReq = nP->nextReq;
208  aqMutex.UnLock();
209  nP->Init(cID);
210  } else {
211  aqMutex.UnLock();
212  nP = new XrdSsiFileReq(cID);
213  }
214 
215 // Initialize for processing
216 //
217  if (nP)
218  {nP->sessN = sID;
219  nP->fileR = rP;
220  nP->fileP = fP;
221  nP->cbInfo = eiP;
222  nP->reqID = rnum;
223  snprintf(nP->rID, sizeof(nP->rID), "%u:", rnum);
224  }
225 
226 // Return the pointer
227 //
228  return nP;
229 }
230 
231 /******************************************************************************/
232 /* Private: B i n d D o n e */
233 /******************************************************************************/
234 
235 // This is called with frqMutex locked!
236 
237 void XrdSsiFileReq::BindDone()
238 {
239  EPNAME("BindDone");
240 
241 // Do some debugging
242 //
243  DEBUGXQ("Bind called; for request " <<reqID);
244 
245 // Collect statistics
246 //
248 
249 // Processing depends on the current state. Only listed states are valid.
250 // When the state is done, a finished event occuured between the time the
251 // request was handed off to the service but before the service bound it.
252 //
253  switch(urState)
254  {case isBegun: urState = isBound;
255  case isBound: return;
256  break;
257  case isDone: if (!schedDone)
258  {schedDone = true;
259  Sched->Schedule((XrdJob *)this);
260  }
261  return;
262  break;
263  default: break;
264  }
265 
266 // If we get here then we have an invalid state. Report it but otherwise we
267 // can't really do anything else. This means some memory may be lost.
268 //
269  Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
270 }
271 
272 /******************************************************************************/
273 /* D i s p o s e */
274 /******************************************************************************/
275 
276 void XrdSsiFileReq::Dispose()
277 {
278  EPNAME("Dispose");
279 
280 // Do some debugging
281 //
282  DEBUGXQ("Recycling request...");
283 
284 // Collect statistics
285 //
286  Stats.Bump(Stats.ReqBound, -1);
287 
288 // Simply recycle the object
289 //
290  Recycle();
291 }
292 
293 /******************************************************************************/
294 /* D o I t */
295 /******************************************************************************/
296 
298 {
299  EPNAME("DoIt");
300  bool cancel;
301 
302 // Processing is determined by the responder's state. Only listed states are
303 // valid. Others should never occur in this context.
304 //
305  frqMutex.Lock();
306  switch(urState)
307  {case isNew: myState = xqReq; urState = isBegun;
308  DEBUGXQ("Calling service processor");
309  frqMutex.UnLock();
312  (XrdSsiFileResource &)*fileR);
313  return;
314  break;
315  case isAbort: DEBUGXQ("Skipped calling service processor");
316  frqMutex.UnLock();
318  Recycle();
319  return;
320  break;
321  case isDone: cancel = (myState != odRsp);
322  DEBUGXQ("Calling Finished(" <<cancel <<')');
323  if (respWait) WakeUp();
324  if (finWait) finWait->Post();
325  frqMutex.UnLock();
327  if (cancel) Stats.Bump(Stats.ReqCancels);
328  Finished(cancel); // This object may be deleted!
329  return;
330  break;
331  default: break;
332  }
333 
334 // If we get here then we have an invalid state. Report it but otherwise we
335 // can't really do anything else. This means some memory may be lost.
336 //
337  frqMutex.UnLock();
338  Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
339 }
340 
341 /******************************************************************************/
342 /* D o n e */
343 /******************************************************************************/
344 
345 // Gets invoked only after query() waitresp signal was sent
346 
347 void XrdSsiFileReq::Done(int &retc, XrdOucErrInfo *eiP, const char *name)
348 {
349  EPNAME("Done");
350  XrdSsiMutexMon mHelper(frqMutex);
351 
352 // We may need to delete the errinfo object if this callback was async. Note
353 // that the following test is valid even if the file object has been deleted.
354 //
355  if (eiP != fileP->errInfo()) delete eiP;
356 
357 // Check if we should finalize this request. This will be the case if the
358 // complete response was sent.
359 //
360  if (myState == odRsp)
361  {DEBUGXQ("resp sent; no additional data remains");
362  Finalize();
363  return;
364  }
365 
366 // Do some debugging
367 //
368  DEBUGXQ("wtrsp sent; resp " <<(haveResp ? "here" : "pend"));
369 
370 // We are invoked when sync() waitresp has been sent, check if a response was
371 // posted while this was going on. If so, make sure to send a wakeup. Note
372 // that the respWait flag is at this moment false as this is called in the
373 // sync response path for fctl() and the response may have been posted.
374 //
375  if (!haveResp) respWait = true;
376  else WakeUp();
377 }
378 
379 /******************************************************************************/
380 /* Private: E m s g */
381 /******************************************************************************/
382 
383 int XrdSsiFileReq::Emsg(const char *pfx, // Message prefix value
384  int ecode, // The error code
385  const char *op) // Operation being performed
386 {
387  char buffer[2048];
388 
389 // Count errors
390 //
392 
393 // Get correct error code
394 //
395  if (ecode < 0) ecode = -ecode;
396 
397 // Format the error message
398 //
399  XrdOucERoute::Format(buffer, sizeof(buffer), ecode, op, sessN);
400 
401 // Put the message in the log
402 //
403  Log.Emsg(pfx, tident, buffer);
404 
405 // Place the error message in the error object and return
406 //
407  if (cbInfo) cbInfo->setErrInfo(ecode, buffer);
408  return SFS_ERROR;
409 }
410 
411 /******************************************************************************/
412 
413 int XrdSsiFileReq::Emsg(const char *pfx, // Message prefix value
414  XrdSsiErrInfo &eObj, // The error description
415  const char *op) // Operation being performed
416 {
417  const char *eMsg;
418  char buffer[2048];
419  int eNum;
420 
421 // Count errors
422 //
424 
425 // Get correct error code and message
426 //
427  eMsg = eObj.Get(eNum).c_str();
428  if (eNum <= 0) eNum = EFAULT;
429  if (!eMsg || !(*eMsg)) eMsg = "reason unknown";
430 
431 // Format the error message
432 //
433  snprintf(buffer, sizeof(buffer), "Unable to %s %s; %s", op, sessN, eMsg);
434 
435 // Put the message in the log
436 //
437  Log.Emsg(pfx, tident, buffer);
438 
439 // Place the error message in the error object and return
440 //
441  if (cbInfo) cbInfo->setErrInfo(eNum, buffer);
442  return SFS_ERROR;
443 }
444 
445 /******************************************************************************/
446 /* F i n a l i z e */
447 /******************************************************************************/
448 
450 {
451  EPNAME("Finalize");
452  XrdSsiMutexMon mHelper(frqMutex);
453  bool cancel = (myState != odRsp);
454 
455 // Release any unsent alerts (prevent any new alerts from being accepted)
456 //
457  isEnding = true;
458  if (alrtSent || alrtPend)
459  {XrdSsiAlert *dP, *aP = alrtSent;
460  if (aP) aP->next = alrtPend;
461  else aP = alrtPend;
462  mHelper.UnLock();
463  while((dP = aP)) {aP = aP->next; dP->Recycle();}
464  mHelper.Lock(frqMutex);
465  }
466 
467 // Processing is determined by the responder's state
468 //
469  switch(urState)
470  // Request is being scheduled, so we can simply abort it.
471  //
472  {case isNew: DEBUGXQ("Aborting request processing");
473  urState = isAbort;
474  cbInfo = 0;
475  sessN = "???";
477  return;
478  break;
479 
480  // Request already handed off but not yet bound. Defer until bound.
481  // We need to wait until this occurs to sequence Unprovision().
482  //
483  case isBegun: urState = isDone;
484  {XrdSysSemaphore wt4fin(0);
485  finWait = &wt4fin;
486  mHelper.UnLock();
487  wt4fin.Wait();
488  }
489  sessN = "n/a";
490  return;
491 
492  // Request is bound so we can finish right off.
493  //
494  case isBound: urState = isDone;
495  if (strBuff) {strBuff->Recycle(); strBuff = 0;}
496  DEBUGXQ("Calling Finished(" <<cancel <<')');
497  if (respWait) WakeUp();
498  mHelper.UnLock();
500  if (cancel) Stats.Bump(Stats.ReqCancels);
501  Finished(cancel); // This object may be deleted!
502  sessN = "n/a";
503  return;
504  break;
505 
506  // The following two cases may happen but it's safe to ignore them.
507  //
508  case isAbort:
509  case isDone: sessN = "bad";
510  return;
511  break;
512  default: break;
513  }
514 
515 // If we get here then we have an invalid state. Report it but otherwise we
516 // can't really do anything else. This means some memory may be lost.
517 //
518  Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
519 }
520 
521 /******************************************************************************/
522 /* G e t R e q u e s t */
523 /******************************************************************************/
524 
526 {
527  EPNAME("GetRequest");
528 
529 // Do some debugging
530 //
531  DEBUGXQ("sz=" <<reqSize);
533 
534 // The request may come from a ouc buffer or an sfs buffer
535 //
536  rLen = reqSize;
537  if (oucBuff) return oucBuff->Data();
538  return XrdSfsXio::Buffer(sfsBref);
539 }
540 
541 /******************************************************************************/
542 /* Private: I n i t */
543 /******************************************************************************/
544 
545 void XrdSsiFileReq::Init(const char *cID)
546 {
547  tident = (cID ? strdup(cID) : strdup("???"));
548  finWait = 0;
549  nextReq = 0;
550  cbInfo = 0;
551  respCB = 0;
552  respCBarg = 0;
553  alrtSent = 0;
554  alrtPend = 0;
555  alrtLast = 0;
556  sessN = "???";
557  oucBuff = 0;
558  sfsBref = 0;
559  strBuff = 0;
560  reqSize = 0;
561  respBuf = 0;
562  respOff = 0;
563  fileSz = 0; // Also does respLen = 0;
564  myState = wtReq;
565  urState = isNew;
566  *rID = 0;
567  schedDone = false;
568  haveResp = false;
569  respWait = false;
570  strmEOF = false;
571  isEnding = false;
573  XrdSsiRRAgent::SetMutex(this, &frqMutex);
574 }
575 
576 /******************************************************************************/
577 /* Protected: P r o c e s s R e s p o n s e */
578 /******************************************************************************/
579 
580 // This is called via the responder with the responder and request locks held.
581 
583  const XrdSsiRespInfo &Resp)
584 {
585  EPNAME("ProcessResponse");
586 
587 // Do some debugging
588 //
589  DEBUGXQ("Response presented wtr=" <<respWait);
590 
591 // Make sure we are still in execute state
592 //
593  if (urState != isBegun && urState != isBound) return false;
594  myState = doRsp;
595  respOff = 0;
596 
597 // Handle the response
598 //
599  switch(Resp.rType)
601  DEBUGXQ("Resp data sz="<<Resp.blen);
602  respLen = Resp.blen;
604  break;
606  DEBUGXQ("Resp err rc="<<Resp.eNum<<" msg="<<Resp.eMsg);
607  respLen = 0;
609  break;
611  DEBUGXQ("Resp file fd="<<Resp.fdnum<<" sz="<<Resp.fsize);
612  fileSz = Resp.fsize;
613  respOff = 0;
615  break;
617  DEBUGXQ("Resp strm");
618  respLen = 0;
620  break;
621  default:
622  DEBUGXQ("Resp invalid!!!!");
623  return false;
625  break;
626  }
627 
628 // If the client is waiting for the response, wake up the client to get it.
629 //
630  haveResp = true;
631  if (respWait) WakeUp();
632  return true;
633 }
634 
635 /******************************************************************************/
636 /* R e a d */
637 /******************************************************************************/
638 
640  char *buff, // Out
641  XrdSfsXferSize blen) // In
642 /*
643  Function: Read `blen' bytes at `offset' into 'buff' and return the actual
644  number of bytes read.
645 
646  Input: buff - Address of the buffer in which to place the data.
647  blen - The size of the buffer. This is the maximum number
648  of bytes that will be returned.
649 
650  Output: Returns the number of bytes read upon success and SFS_ERROR o/w.
651 */
652 {
653  static const char *epname = "read";
654  XrdSfsXferSize nbytes;
655  XrdSsiRespInfo const *Resp = XrdSsiRRAgent::RespP(this);
656 
657 // A read should never be issued unless a response has been set
658 //
659  if (myState != doRsp)
660  {done = true;
661  return (myState == odRsp ? 0 : Emsg(epname, ENOMSG, "read"));
662  }
663 
664 // Fan out based on the kind of response we have
665 //
666  switch(Resp->rType)
668  if (respLen <= 0) {done = true; myState = odRsp; return 0;}
669  if (blen >= respLen)
670  {memcpy(buff, Resp->buff+respOff, respLen);
671  blen = respLen; myState = odRsp; done = true;
672  } else {
673  memcpy(buff, Resp->buff+respOff, blen);
674  respLen -= blen; respOff += blen;
675  }
676  return blen;
677  break;
679  cbInfo->setErrInfo(Resp->eNum, Resp->eMsg);
680  myState = odRsp; done = true;
681  return SFS_ERROR;
682  break;
684  if (fileSz <= 0) {done = true; myState = odRsp; return 0;}
685  nbytes = pread(Resp->fdnum, buff, blen, respOff);
686  if (nbytes <= 0)
687  {done = true;
688  if (!nbytes) {myState = odRsp; return 0;}
689  myState = erRsp;
690  return Emsg(epname, errno, "read");
691  }
692  respOff += nbytes; fileSz -= nbytes;
693  return nbytes;
694  break;
696  nbytes = (Resp->strmP->Type() == XrdSsiStream::isActive ?
697  readStrmA(Resp->strmP, buff, blen)
698  : readStrmP(Resp->strmP, buff, blen));
699  done = strmEOF && strBuff == 0;
700  return nbytes;
701  break;
702  default: break;
703  };
704 
705 // We should never get here
706 //
707  myState = erRsp;
708  done = true;
709  return Emsg(epname, EFAULT, "read");
710 }
711 
712 /******************************************************************************/
713 /* Private: r e a d S t r m A */
714 /******************************************************************************/
715 
716 XrdSfsXferSize XrdSsiFileReq::readStrmA(XrdSsiStream *strmP,
717  char *buff, XrdSfsXferSize blen)
718 {
719  static const char *epname = "readStrmA";
720  XrdSsiErrInfo eObj;
721  XrdSfsXferSize xlen = 0;
722 
723 
724 // Copy out data from the stream to fill the buffer
725 //
726 do{if (strBuff)
727  {if (respLen > blen)
728  {memcpy(buff, strBuff->data+respOff, blen);
729  respLen -= blen; respOff += blen;
730  return xlen+blen;
731  }
732  memcpy(buff, strBuff->data+respOff, respLen);
733  xlen += respLen;
734  strBuff->Recycle(); strBuff = 0;
735  blen -= respLen; buff += respLen;
736  }
737 
738  if (!strmEOF && blen)
739  {respLen = blen; respOff = 0;
740  strBuff = strmP->GetBuff(eObj, respLen, strmEOF);
741  }
742  } while(strBuff);
743 
744 // Check if we have data to return
745 //
746  if (strmEOF) {myState = odRsp; return xlen;}
747  else if (!blen) return xlen;
748 
749 // Report the error
750 //
751  myState = erRsp; strmEOF = true;
752  return Emsg(epname, eObj, "read stream");
753 }
754 
755 /******************************************************************************/
756 /* Private: r e a d S t r m P */
757 /******************************************************************************/
758 
759 XrdSfsXferSize XrdSsiFileReq::readStrmP(XrdSsiStream *strmP,
760  char *buff, XrdSfsXferSize blen)
761 {
762  static const char *epname = "readStrmP";
763  XrdSsiErrInfo eObj;
764  XrdSfsXferSize xlen = 0;
765  int dlen = 0;
766 
767 // Copy out data from the stream to fill the buffer
768 //
769  while(!strmEOF && (dlen = strmP->SetBuff(eObj, buff, blen, strmEOF)) > 0)
770  {xlen += dlen;
771  if (dlen == blen) return xlen;
772  if (dlen > blen) {eObj.Set(0,EOVERFLOW); break;}
773  buff += dlen; blen -= dlen;
774  }
775 
776 // Check if we ended with an zero length read
777 //
778  if (strmEOF || !dlen) {myState = odRsp; strmEOF = true; return xlen;}
779 
780 // Return an error
781 //
782  myState = erRsp; strmEOF = true;
783  return Emsg(epname, eObj, "read stream");
784 }
785 
786 /******************************************************************************/
787 /* Private: R e c y c l e */
788 /******************************************************************************/
789 
790 void XrdSsiFileReq::Recycle()
791 {
792 
793 // If we have an oucbuffer then we need to recycle it, otherwise if we have
794 // and sfs buffer, put it on the deferred release queue.
795 //
796  if (oucBuff) {oucBuff->Recycle(); oucBuff = 0;}
797  else if (sfsBref) {XrdSfsXio::Reclaim(sfsBref); sfsBref = 0;}
798  reqSize = 0;
799 
800 // Add to queue unless we have too many of these. If we add it back to the
801 // queue; make sure it's a cleaned up object!
802 //
803  aqMutex.Lock();
804  if (tident) {free(tident); tident = 0;}
805  if (freeCnt >= freeMax) {aqMutex.UnLock(); delete this;}
806  else {XrdSsiRRAgent::CleanUp(*this);
807  nextReq = freeReq;
808  freeReq = this;
809  freeCnt++;
810  aqMutex.UnLock();
811  }
812 }
813 
814 /******************************************************************************/
815 /* R e l R e q u e s t B u f f e r */
816 /******************************************************************************/
817 
819 {
820  EPNAME("RelReqBuff");
821  XrdSsiMutexMon mHelper(frqMutex);
822 
823 // Do some debugging
824 //
825  DEBUGXQ("called");
827 
828 // Release buffers
829 //
830  if (oucBuff) {oucBuff->Recycle(); oucBuff = 0;}
831  else if (sfsBref) {XrdSfsXio::Reclaim(sfsBref); sfsBref = 0;}
832  reqSize = 0;
833 }
834 
835 /******************************************************************************/
836 /* S e n d */
837 /******************************************************************************/
838 
840 {
841  static const char *epname = "send";
842  XrdSsiRespInfo const *Resp = XrdSsiRRAgent::RespP(this);
843  XrdOucSFVec sfVec[2];
844  int rc;
845 
846 // A send should never be issued unless a response has been set. Return a
847 // continuation which will cause Read() to be called to return the error.
848 //
849  if (myState != doRsp) return 1;
850 
851 // Fan out based on the kind of response we have
852 //
853  switch(Resp->rType)
855  if (blen > 0)
856  {sfVec[1].buffer = (char *)Resp->buff+respOff;
857  sfVec[1].fdnum = -1;
858  if (blen > respLen)
859  {blen = respLen; myState = odRsp;
860  } else {
861  respLen -= blen; respOff += blen;
862  }
863  } else blen = 0;
864  break;
866  return 1; // Causes error to be returned via Read()
867  break;
869  if (fileSz > 0)
870  {sfVec[1].offset = respOff; sfVec[1].fdnum = Resp->fdnum;
871  if (blen > fileSz)
872  {blen = fileSz; myState = odRsp;}
873  respOff += blen; fileSz -= blen;
874  } else blen = 0;
875  break;
877  if (Resp->strmP->Type() == XrdSsiStream::isPassive) return 1;
878  return sendStrmA(Resp->strmP, sfDio, blen);
879  break;
880  default: myState = erRsp;
881  return Emsg(epname, EFAULT, "send");
882  break;
883  };
884 
885 // Send off the data
886 //
887  if (!blen) {sfVec[1].buffer = rID; myState = odRsp;}
888  sfVec[1].sendsz = blen;
889  rc = sfDio->SendFile(sfVec, 2);
890 
891 // If send succeeded, indicate the action to be taken
892 //
893  if (!rc) return myState != odRsp;
894 
895 // The send failed, diagnose the problem
896 //
897  rc = (rc < 0 ? EIO : EFAULT);
898  myState = erRsp;
899  return Emsg(epname, rc, "send");
900 }
901 
902 /******************************************************************************/
903 /* Private: s e n d S t r m A */
904 /******************************************************************************/
905 
906 int XrdSsiFileReq::sendStrmA(XrdSsiStream *strmP,
907  XrdSfsDio *sfDio, XrdSfsXferSize blen)
908 {
909  static const char *epname = "sendStrmA";
910  XrdSsiErrInfo eObj;
911  XrdOucSFVec sfVec[2];
912  int rc;
913 
914 // Check if we need a buffer
915 //
916  if (!strBuff)
917  {respLen = blen;
918  if (strmEOF || !(strBuff = strmP->GetBuff(eObj, respLen, strmEOF)))
919  {myState = odRsp; strmEOF = true;
920  if (!strmEOF) Emsg(epname, eObj, "read stream");
921  return 1;
922  }
923  respOff = 0;
924  }
925 
926 // Complete the sendfile vector
927 //
928  sfVec[1].buffer = strBuff->data+respOff;
929  sfVec[1].fdnum = -1;
930  if (respLen > blen)
931  {sfVec[1].sendsz = blen;
932  respLen -= blen; respOff += blen;
933  } else {
934  sfVec[1].sendsz = respLen;
935  respLen = 0;
936  }
937 
938 // Send off the data
939 //
940  rc = sfDio->SendFile(sfVec, 2);
941 
942 // Release any completed buffer
943 //
944  if (strBuff && !respLen) {strBuff->Recycle(); strBuff = 0;}
945 
946 // If send succeeded, indicate the action to be taken
947 //
948  if (!rc) return myState != odRsp;
949 
950 // The send failed, diagnose the problem
951 //
952  rc = (rc < 0 ? EIO : EFAULT);
953  myState = erRsp; strmEOF = true;
954  return Emsg(epname, rc, "send");
955 }
956 
957 /******************************************************************************/
958 /* W a n t R e s p o n s e */
959 /******************************************************************************/
960 
962 {
963  EPNAME("WantResp");
964  XrdSsiMutexMon frqMon;
965  const XrdSsiRespInfo *rspP;
966 
967 // Check if we have a previos alert that was sent (we need to recycle it). We
968 // don't need a lock for this as it's fully serialized via serial fsctl calls.
969 //
970  if (alrtSent) {alrtSent->Recycle(); alrtSent = 0;}
971 
972 // Serialize the remainder of this code
973 //
974  frqMon.Lock(frqMutex);
975  rspP = XrdSsiRRAgent::RespP(this);
976 
977 // If we have a pending alert then we need to send it now. Suppress the callback
978 // as we will recycle the alert on the next call (there should be one).
979 //
980  if (alrtPend)
981  {char hexBuff[16], binBuff[8], dotBuff[4];
982  alrtSent = alrtPend;
983  if (!(alrtPend = alrtPend->next)) alrtLast = 0;
984  int n = alrtSent->SetInfo(eInfo, binBuff, sizeof(binBuff));
985  eInfo.setErrCB((XrdOucEICB *)0);
986  DEBUGXQ(n <<" byte alert (0x" <<DUMPIT(binBuff, n) <<") sent; "
987  <<(alrtPend ? "" : "no ") <<"more pending");
988  return true;
989  }
990 
991 // Check if a response is here (well, ProcessResponse was called)
992 //
993 // if (rspP->rType)
994  if (haveResp)
995  {respCBarg = 0;
996  if (fileP->AttnInfo(eInfo, rspP, reqID))
997  { eInfo.setErrCB((XrdOucEICB *)this); myState = odRsp;}
998  else eInfo.setErrCB((XrdOucEICB *)0);
999  return true;
1000  }
1001 
1002 // Defer this and record the callback arguments. We defer setting respWait
1003 // to true until we know the deferal request has been sent (i.e. when Done()
1004 // is called). This forces ProcessResponse() to not prematurely wakeup the
1005 // client. This is necessitated by the fact that we must release the request
1006 // lock upon return; allowing a response to come in while the deferal request
1007 // is still in transit.
1008 //
1009  respCB = eInfo.getErrCB(respCBarg);
1010  respWait = false;
1011  return false;
1012 }
1013 
1014 /******************************************************************************/
1015 /* Private: W a k e U p */
1016 /******************************************************************************/
1017 
1018 void XrdSsiFileReq::WakeUp(XrdSsiAlert *aP) // Called with frqMutex locked!
1019 {
1020  EPNAME("WakeUp");
1021  XrdOucErrInfo *wuInfo =
1022  new XrdOucErrInfo(tident,(XrdOucEICB *)0,respCBarg);
1023  const XrdSsiRespInfo *rspP = XrdSsiRRAgent::RespP(this);
1024  int respCode = SFS_DATAVEC;
1025 
1026 // Do some debugging
1027 //
1028  DEBUGXQ("respCBarg=" <<Xrd::hex <<respCBarg <<Xrd::dec);
1029 
1030 // Setup the wakeup data. This may be for an alert or for an actual response.
1031 // If this is an alert or the complete response, then make sure we get a
1032 // callback to do the finalization. Otherwise, we don't need a callback
1033 // and the callback handler will simply delete the error object for us.
1034 //
1035  if (aP)
1036  {char hexBuff[16], binBuff[8], dotBuff[4];
1037  int n = aP->SetInfo(*wuInfo, binBuff, sizeof(binBuff));
1038  wuInfo->setErrCB((XrdOucEICB *)aP, respCBarg);
1039  DEBUGXQ(n <<" byte alert (0x" <<DUMPIT(binBuff, n) <<") sent; "
1040  <<(alrtPend ? "" : "no ") <<"more pending");
1041  } else {
1042  if (fileP->AttnInfo(*wuInfo, rspP, reqID))
1043  {wuInfo->setErrCB((XrdOucEICB *)this, respCBarg); myState = odRsp;}
1044  }
1045 
1046 // Tell the client to issue a read now or handle the alert or full response.
1047 //
1048  respWait = false;
1049  respCB->Done(respCode, wuInfo, sessN);
1051 }
#define tident
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset)
#define eMsg(x)
#define SFS_DATAVEC
#define SFS_ERROR
int XrdSfsXferSize
class XrdBuffer * XrdSfsXioHandle
Definition: XrdSfsXio.hh:46
#define DEBUGXQ(x)
#define DUMPIT(x, y)
if(Avsz)
Definition: XrdJob.hh:43
static int Format(char *buff, int blen, int ecode, const char *etxt1, const char *etxt2=0)
Definition: XrdOucERoute.cc:44
XrdOucEICB * getErrCB()
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
XrdSysMutex statsMutex
Definition: XrdOucStats.hh:55
void Bump(int &val)
Definition: XrdOucStats.hh:47
void Schedule(XrdJob *jp)
virtual int SendFile(int fildes)=0
static void Reclaim(XrdSfsXioHandle theHand)
Definition: XrdSfsXio.cc:70
static char * Buffer(XrdSfsXioHandle theHand, int *buffsz=0)
Definition: XrdSfsXio.cc:61
void Recycle()
Definition: XrdSsiAlert.cc:98
static XrdSsiAlert * Alloc(XrdSsiRespInfoMsg &aMsg)
Definition: XrdSsiAlert.cc:52
int SetInfo(XrdOucErrInfo &eInfo, char *aMsg, int aLen)
Definition: XrdSsiAlert.cc:117
XrdSsiAlert * next
Definition: XrdSsiAlert.hh:41
void Set(const char *eMsg=0, int eNum=0, int eArg=0)
const std::string & Get(int &eNum) const
void Alert(XrdSsiRespInfoMsg &aMsg)
Send or receive a server generated alert.
bool WantResponse(XrdOucErrInfo &eInfo)
XrdSfsXferSize Read(bool &done, char *buffer, XrdSfsXferSize blen)
char * GetRequest(int &rLen)
void RelRequestBuffer()
bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &resp)
int Send(XrdSfsDio *sfDio, XrdSfsXferSize size)
static XrdSsiFileReq * Alloc(XrdOucErrInfo *eP, XrdSsiFileResource *rP, XrdSsiFileSess *fP, const char *sn, const char *id, unsigned int rnum)
void Activate(XrdOucBuffer *oP, XrdSfsXioHandle bR, int rSz)
void Done(int &Result, XrdOucErrInfo *cbInfo, const char *path=0)
void Lock(XrdSsiMutex *mutex)
static void SetMutex(XrdSsiRequest *rP, XrdSsiMutex *mP)
static XrdSsiRespInfo * RespP(XrdSsiRequest *rP)
static void onServer(XrdSsiRequest *rP)
static void CleanUp(XrdSsiRequest &reqR)
char * GetMsg(int &mlen)
virtual void RecycleMsg(bool sent=true)=0
virtual void ProcessRequest(XrdSsiRequest &reqRef, XrdSsiResource &resRef)=0
Process a request; client-side or server-side.
long long ReqMaxsz
Definition: XrdSsiStats.hh:41
long long ReqBytes
Definition: XrdSsiStats.hh:40
virtual bool SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen)
virtual Buffer * GetBuff(XrdSsiErrInfo &eRef, int &dlen, bool &last)
Definition: XrdSsiStream.hh:93
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSsiStats Stats
XrdSsiService * Service
XrdScheduler * Sched
XrdSysError Log
@ dec
Definition: XrdSysTrace.hh:42
@ hex
Definition: XrdSysTrace.hh:42
int fdnum
File descriptor for data.
Definition: XrdOucSFVec.hh:47
int sendsz
Length of data at offset.
Definition: XrdOucSFVec.hh:46