XRootD
XrdFrmMigrate.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d F r m M i g r a t e . c c */
4 /* */
5 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstring>
33 #include <strings.h>
34 #include <utime.h>
35 #include <sys/param.h>
36 #include <sys/types.h>
37 
38 #include "XrdOss/XrdOss.hh"
39 #include "XrdOss/XrdOssPath.hh"
40 #include "XrdOuc/XrdOucNSWalk.hh"
41 #include "XrdOuc/XrdOucTList.hh"
42 #include "XrdFrc/XrdFrcRequest.hh"
43 #include "XrdFrc/XrdFrcTrace.hh"
44 #include "XrdFrm/XrdFrmFiles.hh"
45 #include "XrdFrm/XrdFrmConfig.hh"
46 #include "XrdFrm/XrdFrmMigrate.hh"
47 #include "XrdFrm/XrdFrmTransfer.hh"
48 #include "XrdFrm/XrdFrmXfrQueue.hh"
49 #include "XrdSys/XrdSysPthread.hh"
50 #include "XrdSys/XrdSysPlatform.hh"
51 #include "XrdSys/XrdSysTimer.hh"
52 
53 using namespace XrdFrc;
54 using namespace XrdFrm;
55 
56 /******************************************************************************/
57 /* S t a t i c M e m b e r s */
58 /******************************************************************************/
59 
60 XrdFrmFileset *XrdFrmMigrate::fsDefer = 0;
61 
62 int XrdFrmMigrate::numMig = 0;
63 
64 /******************************************************************************/
65 /* Private: A d d */
66 /******************************************************************************/
67 
68 void XrdFrmMigrate::Add(XrdFrmFileset *sP)
69 {
70  EPNAME("Add");
71  const char *Why;
72  time_t xTime;
73 
74 // Check to see if the file is really eligible for purging
75 //
76  if ((Why = Eligible(sP, xTime)))
77  {DEBUG(sP->basePath() <<"cannot be migrated; " <<Why);
78  delete sP;
79  return;
80  }
81 
82 // Add the file to the migr queue or the defer queue based on mod time
83 //
84  if (xTime < Config.IdleHold) Defer(sP);
85  else Queue(sP);
86 }
87 
88 /******************************************************************************/
89 /* Private: A d v a n c e */
90 /******************************************************************************/
91 
92 int XrdFrmMigrate::Advance()
93 {
94  XrdFrmFileset *fP;
95  int xTime = 0;
96 
97 // Try to re-add everything in this queue
98 //
99  while(fsDefer)
100  {xTime = static_cast<int>(time(0) - fsDefer->baseFile()->Stat.st_mtime);
101  if (xTime < Config.IdleHold) break;
102  fP = fsDefer; fsDefer = fsDefer->Next;
103  if (fP->Refresh(1,0)) Add(fP);
104  else delete fP;
105  }
106 
107 // Return number of seconds to next advance event
108 //
109  return fsDefer ? Config.IdleHold - xTime : 0;
110 }
111 
112 /******************************************************************************/
113 /* Private: D e f e r */
114 /******************************************************************************/
115 
116 void XrdFrmMigrate::Defer(XrdFrmFileset *sP)
117 {
118  XrdFrmFileset *fP = fsDefer, *pfP = 0;
119  time_t mTime = sP->baseFile()->Stat.st_mtime;
120 
121 // Insert this entry into the defer queue in ascending mtime order
122 //
123  while(fP && fP->baseFile()->Stat.st_mtime < mTime)
124  {pfP = fP; fP = fP->Next;}
125 
126 // Chain in the fileset
127 //
128  sP->Next = fP;
129  if (pfP) pfP->Next = sP;
130  else fsDefer = sP;
131 }
132 
133 /******************************************************************************/
134 /* D i s p l a y */
135 /******************************************************************************/
136 
138 {
139  XrdFrmConfig::VPInfo *vP = Config.pathList;
140  XrdOucTList *tP;
141 
142 // Type header
143 //
144  Say.Say("=====> ", "Migrate configuration:");
145 
146 // Display what we will scan
147 //
148  while(vP)
149  {Say.Say("=====> ", "Scanning ", (vP->Val?"r/w: ":"r/o: "), vP->Name);
150  tP = vP->Dir;
151  while(tP) {Say.Say("=====> ", "Excluded ", tP->text); tP = tP->next;}
152  vP = vP->Next;
153  }
154 }
155 
156 /******************************************************************************/
157 /* Private: E l i g i b l e */
158 /******************************************************************************/
159 
160 const char *XrdFrmMigrate::Eligible(XrdFrmFileset *sP, time_t &xTime)
161 {
162  XrdOucNSWalk::NSEnt *baseFile = sP->baseFile();
163  XrdOucNSWalk::NSEnt *failFile = sP->failFile();
164  time_t mTimeBF, mTimeLK, nowTime = time(0);
165  const char *eTxt;
166 
167 // File is inelegible if lockfile mtime is zero (i.e., an mstore placeholder)
168 //
169  mTimeLK = static_cast<time_t>(sP->cpyInfo.Attr.cpyTime);
170  if (!mTimeLK) return "migration deferred";
171 
172 // File is ineligible if it has not changed since last migration
173 //
174  mTimeBF = baseFile->Stat.st_mtime;
175  if (mTimeLK >= mTimeBF) return "file unchanged";
176 
177 // File is ineligible if it has a fail file that is still recent
178 //
179  if (failFile && (eTxt=XrdFrmTransfer::checkFF(sP->failPath()))) return eTxt;
180 
181 // Migration may need to be deferred if the file has been modified too recently
182 // (caller will check)
183 //
184  xTime = static_cast<int>(nowTime - mTimeBF);
185 
186 // File can be migrated
187 //
188  return 0;
189 }
190 
191 /******************************************************************************/
192 /* M i g r a t e */
193 /******************************************************************************/
194 
195 void *XrdMigrateStart(void *parg)
196 {
197  (void)parg;
199  return (void *)0;
200 }
201 
202 void XrdFrmMigrate::Migrate(int doinit)
203 {
204  XrdFrmFileset *fP;
205  char buff[80];
206  int migWait, wTime;
207 
208 // If we have not initialized yet, start a thread to handle this
209 //
210  if (doinit)
211  {pthread_t tid;
212  int retc;
213  if ((retc = XrdSysThread::Run(&tid, XrdMigrateStart, (void *)0,
214  XRDSYSTHREAD_BIND, "migration scan")))
215  Say.Emsg("Migrate", retc, "create migrtion thread");
216  return;
217  }
218 
219 // Start the migration sequence, first do a name space scan which will trigger
220 // all eligible migrations and defer any that need to wait. We then drain the
221 // defer queue and wait for the next period to start.
222 //
223 do{migWait = Config.WaitMigr; numMig = 0;
224  Scan();
225  while((wTime = Advance()))
226  {if ((migWait -= wTime) <= 0) break;
227  else XrdSysTimer::Snooze(wTime);
228  }
229  while(fsDefer) {fP = fsDefer; fsDefer = fsDefer->Next; delete fP;}
230  sprintf(buff, "%d file%s selected for transfer.",numMig,(numMig==1?"":"s"));
231  Say.Emsg("Migrate", buff);
232  if (migWait > 0) XrdSysTimer::Snooze(migWait);
233  } while(1);
234 }
235 
236 /******************************************************************************/
237 /* Private: Q u e u e */
238 /******************************************************************************/
239 
241 {
242  static int reqID = 0;
243  XrdFrcRequest myReq;
244 
245 // Convert the fileset to a request element
246 //
247  memset(&myReq, 0, sizeof(myReq));
248  strlcpy(myReq.User, Config.myProg, sizeof(myReq.User));
249  sprintf(myReq.ID, "Internal%d", reqID++);
251  myReq.addTOD = static_cast<long long>(time(0));
252  if (Config.LogicalPath(sP->basePath(), myReq.LFN, sizeof(myReq.LFN)))
253  {XrdFrmXfrQueue::Add(&myReq, 0, XrdFrcRequest::migQ); numMig++;}
254 
255 // All done
256 //
257  delete sP;
258 }
259 
260 /******************************************************************************/
261 /* Private: S c a n */
262 /******************************************************************************/
263 
264 void XrdFrmMigrate::Scan()
265 {
268  static time_t lastHP = time(0), nowT = time(0);
269 
270  XrdFrmConfig::VPInfo *vP = Config.pathList;
271  XrdFrmFileset *sP;
272  XrdFrmFiles *fP;
273  char buff[128];
274  int ec = 0, Bad = 0, aFiles = 0, bFiles = 0;
275 
276 // Purge that bad file table evey 24 hours to keep complaints down
277 //
278  if (nowT - lastHP >= 86400) {XrdFrmFileset::Purge(); lastHP = nowT;}
279 
280 // Indicate scan started
281 //
282  VMSG("Scan", "Name space scan started. . .");
283 
284 // Process each directory
285 //
286  do {fP = new XrdFrmFiles(vP->Name, Opts, vP->Dir);
287  while((sP = fP->Get(ec,1)))
288  {aFiles++;
289  if (sP->Screen()) Add(sP);
290  else {delete sP; bFiles++;}
291  }
292  if (ec) Bad = 1;
293  delete fP;
294  } while((vP = vP->Next));
295 
296 // Indicate scan ended
297 //
298  sprintf(buff, "%d file%s with %d error%s", aFiles, (aFiles != 1 ? "s":""),
299  bFiles, (bFiles != 1 ? "s":""));
300  VMSG("Scan", "Name space scan ended;", buff);
301 
302 // Issue warning if we encountered errors
303 //
304  if (Bad) Say.Emsg("Scan", "Errors encountered while scanning for "
305  "migratable files.");
306 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define VMSG(a,...)
Definition: XrdFrcTrace.hh:66
void * XrdMigrateStart(void *parg)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define XRDSYSTHREAD_BIND
const char * myProg
char LFN[3072]
static const int migQ
char User[256]
static const int Migrate
long long addTOD
long long cpyTime
Definition: XrdFrcXAttr.hh:55
static const int CompressD
Definition: XrdFrmFiles.hh:122
static const int NoAutoDel
Definition: XrdFrmFiles.hh:123
XrdFrmFileset * Get(int &rc, int noBase=0)
Definition: XrdFrmFiles.cc:340
static const int Recursive
Definition: XrdFrmFiles.hh:121
int Refresh(int isMig=0, int doLock=1)
Definition: XrdFrmFiles.cc:118
const char * failPath()
Definition: XrdFrmFiles.hh:63
XrdOucXAttr< XrdFrcXAttrCpy > cpyInfo
Definition: XrdFrmFiles.hh:55
int Screen(int needLF=1)
Definition: XrdFrmFiles.cc:170
XrdFrmFileset * Next
Definition: XrdFrmFiles.hh:90
XrdOucNSWalk::NSEnt * baseFile()
Definition: XrdFrmFiles.hh:60
const char * basePath()
Definition: XrdFrmFiles.hh:61
static void Purge()
Definition: XrdFrmFiles.hh:77
XrdOucNSWalk::NSEnt * failFile()
Definition: XrdFrmFiles.hh:62
static void Queue(XrdFrmFileset *sP)
static void Migrate(int doinit=1)
static void Display()
static const char * checkFF(const char *Path)
static int Add(XrdFrcRequest *rP, XrdFrcReqFile *reqF, int theQ)
XrdOucTList * next
Definition: XrdOucTList.hh:45
char * text
Definition: XrdOucTList.hh:46
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Snooze(int seconds)
Definition: XrdSysTimer.cc:168
XrdCmsConfig Config
XrdSysError Say
int Opts
Definition: XrdMpxStats.cc:58