XRootD
XrdCmsState.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d C m s S t a t e . c c */
4 /* */
5 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <fcntl.h>
32 #include <limits.h>
33 #include <unistd.h>
34 #include <netinet/in.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 
38 #include "XProtocol/YProtocol.hh"
39 
40 #include "Xrd/XrdLink.hh"
41 
42 #include "XrdCms/XrdCmsManager.hh"
43 #include "XrdCms/XrdCmsRTable.hh"
44 #include "XrdCms/XrdCmsState.hh"
45 #include "XrdCms/XrdCmsTrace.hh"
46 
47 #include "XrdSys/XrdSysError.hh"
48 
49 using namespace XrdCms;
50 
51 /******************************************************************************/
52 /* G l o b a l s */
53 /******************************************************************************/
54 
56 
57 /******************************************************************************/
58 /* C o n s t r u c t o r */
59 /******************************************************************************/
60 
61 XrdCmsState::XrdCmsState() : mySemaphore(0)
62 {
63  minNodeCnt = 1;
64  numActive = 0;
65  numStaging = 0;
66  currState = All_NoStage | All_Suspend;
67  prevState = 0;
70  feOK = 0;
71  noSpace = 0;
72  adminNoStage = 0;
73  adminSuspend = 0;
74  NoStageFile = "";
75  SuspendFile = "";
76  isMan = 0;
77  dataPort = 0;
78  Enabled = 0;
79 }
80 
81 /******************************************************************************/
82 /* Punlic E n a b l e */
83 /******************************************************************************/
84 
86 {
87  struct stat buff;
88 
89 // Set correct admin staging state
90 //
91  Update(Stage, stat(NoStageFile, &buff));
92 
93 // Set correct admin suspend state
94 //
95  Update(Active, stat(SuspendFile, &buff));
96 
97 // We will force the information to be sent to interested parties by making
98 // the previous state different from the current state and enabling ourselves.
99 //
100  myMutex.Lock();
101  Enabled = 1;
102  prevState = ~currState;
103  mySemaphore.Post();
104  myMutex.UnLock();
105 }
106 
107 /******************************************************************************/
108 /* Public M o n i t o r */
109 /******************************************************************************/
110 
112 {
113  CmsStatusRequest myStatus = {{0, kYR_status, 0, 0}};
114  int RTsend, theState, Changes, myPort;
115 
116 // Do this forever (we are only posted when finally enabled)
117 //
118  do {mySemaphore.Wait();
119  myMutex.Lock();
120  Changes = currState ^ prevState;
121  theState = currState;
122  prevState = currState;
123  myPort = dataPort;
124  myMutex.UnLock();
125 
126  if (Changes && (myStatus.Hdr.modifier = Status(Changes, theState)))
127  {if (myStatus.Hdr.modifier & CmsStatusRequest::kYR_Resume)
128  {myStatus.Hdr.streamid = htonl(myPort); RTsend = 1;}
129  else {myStatus.Hdr.streamid = 0;
130  RTsend = (isMan > 0 ? (theState & SRV_Suspend) : 0);
131  }
132  if (isMan && RTsend)
133  RTable.Send("status", (char *)&myStatus, sizeof(myStatus));
134  XrdCmsManager::Inform(myStatus.Hdr);
135  }
136  } while(1);
137 
138 // All done
139 //
140  return (void *)0;
141 }
142 
143 /******************************************************************************/
144 /* Public P o r t */
145 /******************************************************************************/
146 
148 {
149  int xPort;
150 
151  myMutex.Lock();
152  xPort = dataPort;
153  myMutex.UnLock();
154  return xPort;
155 }
156 
157 /******************************************************************************/
158 /* Public s e n d S t a t e */
159 /******************************************************************************/
160 
162 {
163  CmsStatusRequest myStatus = {{0, kYR_status, 0, 0}};
164 
165  myMutex.Lock();
166  myStatus.Hdr.modifier = Suspended
167  ? CmsStatusRequest::kYR_Suspend
168  : CmsStatusRequest::kYR_Resume;
169 
170  myStatus.Hdr.modifier |= NoStaging
171  ? CmsStatusRequest::kYR_noStage
172  : CmsStatusRequest::kYR_Stage;
173 
174  lp->Send((char *)&myStatus.Hdr, sizeof(myStatus.Hdr));
175  myMutex.UnLock();
176 }
177 
178 /******************************************************************************/
179 /* Public S e t */
180 /******************************************************************************/
181 
182 void XrdCmsState::Set(int ncount)
183 {
184 
185 // Set the node count (this requires a lock)
186 //
187  myMutex.Lock();
188  minNodeCnt = ncount;
189  myMutex.UnLock();
190 }
191 
192 /******************************************************************************/
193 
194 void XrdCmsState::Set(int ncount, int isman, const char *AdminPath)
195 {
196  char fnbuff[1048];
197  int i;
198 
199 // This is a configuration call no locks are required.
200 //
201  minNodeCnt = ncount;
202  isMan = isman;
203  i = strlen(AdminPath);
204  strcpy(fnbuff, AdminPath);
205  if (AdminPath[i-1] != '/') fnbuff[i++] = '/';
206  strcpy(fnbuff+i, "NOSTAGE");
207  NoStageFile = strdup(fnbuff);
208  strcpy(fnbuff+i, "SUSPEND");
209  SuspendFile = strdup(fnbuff);
210 }
211 
212 /******************************************************************************/
213 /* Private S t a t u s */
214 /******************************************************************************/
215 
216 unsigned char XrdCmsState::Status(int Changes, int theState)
217 {
218  const char *SRstate = 0, *SNstate = 0;
219  unsigned char rrModifier;
220 
221 // Check for suspend changes
222 //
223  if (Changes & All_Suspend)
224  if (theState & All_Suspend)
225  {rrModifier = CmsStatusRequest::kYR_Suspend;
226  SRstate = "suspended";
227  } else {
228  rrModifier = CmsStatusRequest::kYR_Resume;
229  SRstate = "active";
230  }
231  else rrModifier = 0;
232 
233 // Check for staging changes
234 //
235  if (Changes & All_NoStage)
236  {if (theState & All_NoStage)
237  {rrModifier |= CmsStatusRequest::kYR_noStage;
238  SNstate = "+ nostaging";
239  } else {
240  rrModifier |= CmsStatusRequest::kYR_Stage;
241  SNstate = "+ staging";
242  }
243  }
244 
245 // Report and return status
246 //
247  if (rrModifier)
248  {if (!SRstate && SNstate) SNstate += 2;
249  Say.Emsg("State", "Status changed to", SRstate, SNstate);
250  }
251  return rrModifier;
252 }
253 
254 /******************************************************************************/
255 /* Public U p d a t e */
256 /******************************************************************************/
257 
258 void XrdCmsState::Update(StateType StateT, int ActivCnt, int StageCnt)
259 {
260  EPNAME("Update");
261  const char *What;
262  char newVal;
263 
264 // Create new state
265 //
266  myMutex.Lock();
267  switch(StateT)
268  {case Active: if ((newVal = ActivCnt ? 0 : 1) != adminSuspend)
269  { if ( newVal && !StageCnt) unlink(SuspendFile);
270  else if (!newVal || !StageCnt) unlink(SuspendFile);
271  else close(open(SuspendFile, O_WRONLY|O_CREAT,
272  S_IRUSR|S_IWUSR));
273  adminSuspend = newVal;
274  }
275  What = "Active";
276  break;
277  case Counts: numStaging += StageCnt;
278  numActive += ActivCnt;
279  What = "Counts";
280  break;
281  case FrontEnd: if ((feOK = (ActivCnt ? 1 : 0)) && StageCnt >= 0)
282  dataPort = StageCnt;
283  What = "FrontEnd";
284  break;
285  case Space: noSpace = (ActivCnt ? 0 : 1);
286  What = "Space";
287  break;
288  case Stage: if ((newVal = ActivCnt ? 0 : 1) != adminNoStage)
289  {if (newVal) unlink(NoStageFile);
290  else close(open(NoStageFile, O_WRONLY|O_CREAT,
291  S_IRUSR|S_IWUSR));
292  adminNoStage = newVal;
293  }
294  What = "Stage";
295  break;
296  default: Say.Emsg("State", "Invalid state update");
297  What = "Unknown";
298  break;
299  }
300 
301  DEBUG(What <<" Parm1=" <<ActivCnt <<" Parm2=" <<StageCnt);
302  currState=(numActive < minNodeCnt || adminSuspend ? SRV_Suspend:0)
303  |(numStaging < 1 || noSpace || adminNoStage ? All_NoStage:0)
304  | ( !feOK ? FES_Suspend:0);
305 
306  Suspended = currState & All_Suspend;
307  NoStaging = currState & All_NoStage;
308 
309 // If any changes are noted then we must send out notifications
310 //
311  if (currState != prevState && Enabled) mySemaphore.Post();
312 
313 // All done
314 //
315  myMutex.UnLock();
316 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
int stat(const char *path, struct stat *buf)
int open(const char *path, int oflag,...)
int unlink(const char *path)
#define close(a)
Definition: XrdPosix.hh:43
static void Inform(const char *What, const char *Data, int Dlen)
void Send(const char *What, const char *data, int dlen)
static const char FES_Suspend
Definition: XrdCmsState.hh:64
static const char All_NoStage
Definition: XrdCmsState.hh:66
void * Monitor()
Definition: XrdCmsState.cc:111
void Update(StateType StateT, int ActivVal, int StageVal=0)
Definition: XrdCmsState.cc:258
static const char All_Suspend
Definition: XrdCmsState.hh:65
static const char SRV_Suspend
Definition: XrdCmsState.hh:63
void Set(int ncount)
Definition: XrdCmsState.cc:182
void Enable()
Definition: XrdCmsState.cc:85
void sendState(XrdLink *Link)
Definition: XrdCmsState.cc:161
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
kXR_char modifier
Definition: YProtocol.hh:85
XrdSysError Say
XrdCmsRTable RTable
Definition: XrdCmsRTable.cc:40
XrdCmsState CmsState
Definition: XrdCmsState.cc:55
kXR_unt32 streamid
Definition: YProtocol.hh:83
@ kYR_status
Definition: YProtocol.hh:112