XRootD
XrdFrmReqBoss.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d F r m R e q B o s s . c c */
4 /* */
5 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstdlib>
33 #include <cstring>
34 #include <strings.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/types.h>
38 #include <sys/stat.h>
39 
40 #include "XrdFrc/XrdFrcCID.hh"
41 #include "XrdFrc/XrdFrcTrace.hh"
42 #include "XrdFrc/XrdFrcUtils.hh"
43 #include "XrdFrm/XrdFrmReqBoss.hh"
44 #include "XrdFrm/XrdFrmXfrQueue.hh"
45 #include "XrdNet/XrdNetMsg.hh"
46 #include "XrdOuc/XrdOucUtils.hh"
47 #include "XrdSys/XrdSysHeaders.hh"
48 
49 using namespace XrdFrc;
50 
51 /******************************************************************************/
52 /* T h r e a d I n t e r f a c e s */
53 /******************************************************************************/
54 
55 void *mainServerXeq(void *parg)
56 {
57  XrdFrmReqBoss *theBoss = (XrdFrmReqBoss *)parg;
58  theBoss->Process();
59  return (void *)0;
60 }
61 
62 /******************************************************************************/
63 /* Public: A d d */
64 /******************************************************************************/
65 
67 {
68 
69 // Complete the request including verifying the priority
70 //
71  if (Request.Prty > XrdFrcRequest::maxPrty)
72  Request.Prty = XrdFrcRequest::maxPrty;
73  else if (Request.Prty < 0)Request.Prty = 0;
74  Request.addTOD = time(0);
75 
76 // Now add it to the queue
77 //
78  rQueue[static_cast<int>(Request.Prty)]->Add(&Request);
79 
80 // Now wake ourselves up
81 //
82  Wakeup(1);
83 }
84 
85 /******************************************************************************/
86 /* Public: D e l */
87 /******************************************************************************/
88 
90 {
91  int i;
92 
93 // Remove all pending requests for this id
94 //
95  for (i = 0; i <= XrdFrcRequest::maxPrty; i++) rQueue[i]->Can(&Request);
96 }
97 
98 /******************************************************************************/
99 /* Public: P r o c e s s */
100 /******************************************************************************/
101 
103 {
104  EPNAME("Process");
105  XrdFrcRequest myReq;
106  int i, rc, numXfr, numPull;;
107 
108 // Perform staging in an endless loop
109 //
110 do{Wakeup(0);
111  do{numXfr = 0;
112  for (i = XrdFrcRequest::maxPrty; i >= 0; i--)
113  {numPull = i+1;
114  while(numPull && (rc = rQueue[i]->Get(&myReq)))
115  {if (myReq.Options & XrdFrcRequest::Register) Register(myReq,i);
116  else {numPull -= XrdFrmXfrQueue::Add(&myReq,rQueue[i],theQ);
117  numXfr++;
118  DEBUG(Persona <<" from Q " << i <<' ' <<numPull <<" left");
119  if (rc < 0) break;
120  }
121  }
122  }
123  } while(numXfr);
124  } while(1);
125 }
126 
127 /******************************************************************************/
128 /* Private: R e g i s t e r */
129 /******************************************************************************/
130 
131 void XrdFrmReqBoss::Register(XrdFrcRequest &Req, int qNum)
132 {
133  EPNAME("Register");
134  char *eP;
135  int Pid;
136 
137 // Ignore this request if there is no cluster id or the process if is invalid
138 //
139  if (!(*Req.LFN)) return;
140  Pid = strtol(Req.ID, &eP, 10);
141  if (*eP || Pid == 0) return;
142 
143 // Register this cluster
144 //
145  if (CID.Add(Req.iName, Req.LFN, static_cast<time_t>(Req.addTOD), Pid))
146  {DEBUG("Instance=" <<Req.iName <<" cluster=" <<Req.LFN <<" pid=" <<Pid);}
147  else rQueue[qNum]->Del(&Req);
148 }
149 
150 /******************************************************************************/
151 /* S t a r t */
152 /******************************************************************************/
153 
154 int XrdFrmReqBoss::Start(char *aPath, int aMode)
155 {
156  pthread_t tid;
157  char buff[2048], *qPath;
158  int retc, i;
159 
160 // Generate the queue directory path
161 //
162  if (!(qPath = XrdFrcUtils::makeQDir(aPath, aMode))) return 0;
163 
164 // Initialize the request queues if all went well
165 //
166  for (i = 0; i <= XrdFrcRequest::maxPrty; i++)
167  {sprintf(buff, "%s%sQ.%d", qPath, Persona, i);
168  rQueue[i] = new XrdFrcReqFile(buff, 0);
169  if (!rQueue[i]->Init()) {free(qPath); return 0;}
170  }
171  free(qPath);
172 
173 // Start the request processing thread
174 //
175  if ((retc = XrdSysThread::Run(&tid, mainServerXeq, (void *)this,
176  XRDSYSTHREAD_BIND, Persona)))
177  {sprintf(buff, "create %s request thread", Persona);
178  Say.Emsg("Start", retc, buff);
179  return 0;
180  }
181 
182 // All done
183 //
184  return 1;
185 }
186 
187 /******************************************************************************/
188 /* Public: W a k e u p */
189 /******************************************************************************/
190 
191 void XrdFrmReqBoss::Wakeup(int PushIt)
192 {
193  static XrdSysMutex rqMutex;
194 
195 // If this is a PushIt then see if we need to push the binary semaphore
196 //
197  if (PushIt) {rqMutex.Lock();
198  if (!isPosted) {rqReady.Post(); isPosted = 1;}
199  rqMutex.UnLock();
200  }
201  else {rqReady.Wait();
202  rqMutex.Lock(); isPosted = 0; rqMutex.UnLock();
203  }
204 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
void * mainServerXeq(void *parg)
#define XRDSYSTHREAD_BIND
int Add(const char *iName, const char *cName, time_t addT, pid_t Pid)
Definition: XrdFrcCID.cc:64
char LFN[3072]
char iName[32]
static const int Register
signed char Prty
static const int maxPrty
long long addTOD
static char * makeQDir(const char *Path, int Mode)
Definition: XrdFrcUtils.cc:127
void Del(XrdFrcRequest &Request)
void Add(XrdFrcRequest &Request)
int Start(char *aPath, int aMode)
void Wakeup(int PushIt=1)
static int Add(XrdFrcRequest *rP, XrdFrcReqFile *reqF, int theQ)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
XrdSysError Say
XrdFrcCID CID
Definition: XrdFrcCID.cc:56