XRootD
XrdFrmXfrDaemon.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d F r m X f r D a e m o n . c c */
4 /* */
5 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstdlib>
33 #include <cstring>
34 #include <strings.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/types.h>
38 #include <sys/stat.h>
39 
40 #include "XrdFrc/XrdFrcRequest.hh"
41 #include "XrdFrc/XrdFrcTrace.hh"
42 #include "XrdFrc/XrdFrcUtils.hh"
43 #include "XrdFrm/XrdFrmConfig.hh"
44 #include "XrdFrm/XrdFrmMigrate.hh"
45 #include "XrdFrm/XrdFrmTransfer.hh"
46 #include "XrdFrm/XrdFrmXfrAgent.hh"
48 #include "XrdNet/XrdNetOpts.hh"
49 #include "XrdNet/XrdNetSocket.hh"
50 #include "XrdOuc/XrdOucStream.hh"
51 #include "XrdSys/XrdSysPthread.hh"
52 #include "XrdSys/XrdSysTimer.hh"
53 
54 using namespace XrdFrc;
55 using namespace XrdFrm;
56 
57 /******************************************************************************/
58 /* S t a t i c V a r i a b l e s */
59 /******************************************************************************/
60 
61 XrdFrmReqBoss XrdFrmXfrDaemon::GetBoss("getf", XrdFrcRequest::getQ);
62 
63 XrdFrmReqBoss XrdFrmXfrDaemon::MigBoss("migr", XrdFrcRequest::migQ);
64 
65 XrdFrmReqBoss XrdFrmXfrDaemon::StgBoss("pstg", XrdFrcRequest::stgQ);
66 
67 XrdFrmReqBoss XrdFrmXfrDaemon::PutBoss("putf", XrdFrcRequest::putQ);
68 
69 /******************************************************************************/
70 /* Private: B o s s */
71 /******************************************************************************/
72 
73 XrdFrmReqBoss *XrdFrmXfrDaemon::Boss(char bType)
74 {
75 
76 // Return the boss corresponding to the type
77 //
78  switch(bType)
79  {case 0 :
80  case '+': return &StgBoss;
81  case '^':
82  case '&': return &MigBoss;
83  case '<': return &GetBoss;
84  case '=':
85  case '>': return &PutBoss;
86  default: break;
87  }
88  return 0;
89 }
90 
91 /******************************************************************************/
92 /* Public: I n i t */
93 /******************************************************************************/
94 
96 {
97  char buff[80];
98 
99 // Make sure we are the only daemon running
100 //
101  sprintf(buff, "%s/frm_xfrd.lock", Config.QPath);
102  if (!XrdFrcUtils::Unique(buff, Config.myProg)) return 0;
103 
104 // Initiliaze the transfer processor (it need to be active now)
105 //
106  if (!XrdFrmTransfer::Init()) return 0;
107 
108 // Fix up some values that might not make sense
109 //
110  if (Config.WaitMigr < Config.IdleHold) Config.WaitMigr = Config.IdleHold;
111 
112 // Check if it makes any sense to migrate and, if so, initialize migration
113 //
114  if (Config.pathList)
115  {if (!Config.xfrOUT)
116  Say.Emsg("Config","Output copy command not specified; "
117  "auto-migration disabled!");
118  else XrdFrmMigrate::Migrate();
119  } else Say.Emsg("Config","No migratable paths; "
120  "auto-migration disabled!");
121 
122 // Start the external interfaces
123 //
124  if (!StgBoss.Start(Config.QPath, Config.AdminMode)
125  || !MigBoss.Start(Config.QPath, Config.AdminMode)
126  || !GetBoss.Start(Config.QPath, Config.AdminMode)
127  || !PutBoss.Start(Config.QPath, Config.AdminMode)) return 0;
128 
129 // All done
130 //
131  return 1;
132 }
133 
134 /******************************************************************************/
135 /* Public: P o n g */
136 /******************************************************************************/
137 
138 void *XrdFrmXfrDaemonPong(void *parg)
139 {
140  (void)parg;
142  return (void *)0;
143 }
144 
146 {
147  EPNAME("Pong");
148  static int udpFD = -1;
149  XrdOucStream Request(&Say);
150  XrdFrmReqBoss *bossP;
151  char *tp;
152 
153 // Get a UDP socket for the server if we haven't already done so and start
154 // a thread to re-enter this code and wait for messages from an agent.
155 //
156  if (udpFD < 0)
157  {XrdNetSocket *udpSock;
158  pthread_t tid;
159  int retc;
160  if ((udpSock = XrdNetSocket::Create(&Say, Config.QPath,
161  "xfrd.udp", Config.AdminMode, XRDNET_UDPSOCKET)))
162  {udpFD = udpSock->Detach(); delete udpSock;
163  if ((retc = XrdSysThread::Run(&tid, XrdFrmXfrDaemonPong, (void *)0,
164  XRDSYSTHREAD_BIND, "Pong")))
165  Say.Emsg("main", retc, "create udp listner");
166  }
167  return;
168  }
169 
170 // Hookup to the udp socket as a stream
171 //
172  Request.Attach(udpFD, 64*1024);
173 
174 // Now simply get requests (see XrdFrmXfrDaemon for details). Here we screen
175 // out ping and list requests.
176 //
177  while((tp = Request.GetLine()))
178  {DEBUG(": '" <<tp <<"'");
179  switch(*tp)
180  {case '?': break;
181  case '!': if ((tp = Request.GetToken()))
182  while(*tp++)
183  {if ((bossP = Boss(*tp))) bossP->Wakeup(1);}
184  break;
185  default: XrdFrmXfrAgent::Process(Request);
186  }
187  }
188 
189 // We should never get here (but....)
190 //
191  Say.Emsg("Server", "Lost udp connection!");
192 }
193 
194 /******************************************************************************/
195 /* Public: S t a r t */
196 /******************************************************************************/
197 
199 {
200 
201 // Start the ponger
202 //
203  Pong();
204 
205 // Now start nudging
206 //
207  while(1)
208  {StgBoss.Wakeup(); GetBoss.Wakeup();
209  MigBoss.Wakeup(); PutBoss.Wakeup();
210  XrdSysTimer::Snooze(Config.WaitQChk);
211  }
212 
213 // We should never get here
214 //
215  return 0;
216 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
void * XrdFrmXfrDaemonPong(void *parg)
#define XRDNET_UDPSOCKET
Definition: XrdNetOpts.hh:79
#define XRDSYSTHREAD_BIND
const char * myProg
static const int stgQ
static const int getQ
static const int migQ
static const int putQ
static int Unique(const char *lkfn, const char *myProg)
Definition: XrdFrcUtils.cc:241
static void Migrate(int doinit=1)
void Wakeup(int PushIt=1)
static int Init()
static void Process(XrdOucStream &Request)
static void Pong()
static int Start()
static int Init()
static XrdNetSocket * Create(XrdSysError *Say, const char *path, const char *fn, mode_t mode, int isudp=0)
char * GetLine()
int Attach(int FileDescriptor, int bsz=2047)
char * GetToken(int lowcase=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Snooze(int seconds)
Definition: XrdSysTimer.cc:168
XrdCmsConfig Config
XrdSysError Say