XRootD
XrdOfsPoscq.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O f s P o s c q . c c */
4 /* */
5 /* (c) 2009 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstring>
32 #include <strings.h>
33 #include <stddef.h>
34 #include <cstdio>
35 #include <fcntl.h>
36 #include <unistd.h>
37 #include <cerrno>
38 #include <sys/param.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 
42 #include "XrdOfs/XrdOfsPoscq.hh"
43 #include "XrdOss/XrdOss.hh"
44 #include "XrdSfs/XrdSfsFlags.hh"
45 #include "XrdSys/XrdSysError.hh"
46 #include "XrdSys/XrdSysFD.hh"
47 #include "XrdSys/XrdSysPlatform.hh"
48 
49 /******************************************************************************/
50 /* C o n s t r u c t o r */
51 /******************************************************************************/
52 
53 XrdOfsPoscq::XrdOfsPoscq(XrdSysError *erp, XrdOss *oss, const char *fn, int sv)
54 {
55  eDest = erp;
56  ossFS = oss;
57  pocFN = strdup(fn);
58  pocFD = -1;
59  pocSZ = 0;
60  pocIQ = 0;
61  SlotList = SlotLust = 0;
62 
63  if (sv > 32767) sv = 32767;
64  else if (sv < 0) sv = 0;
65  pocWS = pocSV = sv-1;
66 }
67 
68 /******************************************************************************/
69 /* A d d */
70 /******************************************************************************/
71 
72 int XrdOfsPoscq::Add(const char *Tident, const char *Lfn, bool isNew)
73 {
74  XrdSysMutexHelper myHelp(myMutex);
75  std::map<std::string,int>::iterator it = pqMap.end();
76  XrdOfsPoscq::Request tmpReq;
77  struct stat Stat;
78  FileSlot *freeSlot;
79  int fP;
80 
81 // Add is only called when file is to be created. Therefore, it must not exist
82 // unless it is being replaced typically due to a retry. If not being replaced
83 // then We need to check this to avoid deleting already created files.
84 // Otherwise, we need to see if the file is already in the queue to avoid it
85 // being deleted after the fact because it would be in the queue twice.
86 //
87  if (!ossFS->Stat(Lfn, &Stat))
88  {if (isNew) return -EEXIST;
89  it = pqMap.find(std::string(Lfn));
90  if (it != pqMap.end() && VerOffset(Lfn, it->second)) return it->second;
91  }
92 
93 // Construct the request
94 //
95  tmpReq.addT = 0;
96  strlcpy(tmpReq.LFN, Lfn, sizeof(tmpReq.LFN));
97  strlcpy(tmpReq.User, Tident, sizeof(tmpReq.User));
98  memset(tmpReq.Reserved, 0, sizeof(tmpReq.Reserved));
99 
100 // Obtain a free slot
101 //
102  if ((freeSlot = SlotList))
103  {fP = freeSlot->Offset;
104  SlotList = freeSlot->Next;
105  freeSlot->Next = SlotLust;
106  SlotLust = freeSlot;
107  } else {fP = pocSZ; pocSZ += ReqSize;}
108  pocIQ++;
109 
110 // Write out the record
111 //
112  if (!reqWrite((void *)&tmpReq, sizeof(tmpReq), fP))
113  {eDest->Emsg("Add", Lfn, "not added to the persist queue.");
114  myMutex.Lock(); pocIQ--; myMutex.UnLock();
115  return -EIO;
116  }
117 
118 // Check if we update the map or simply add it to the map
119 //
120  if (it != pqMap.end()) it->second = fP;
121  else pqMap[std::string(Lfn)] = fP;
122 
123 // Return the record offset
124 //
125  return fP;
126 }
127 
128 /******************************************************************************/
129 /* C o m m i t */
130 /******************************************************************************/
131 
132 int XrdOfsPoscq::Commit(const char *Lfn, int Offset)
133 {
134  long long addT = static_cast<long long>(time(0));
135 
136 // Verify the offset it must be correct
137 //
138  if (!VerOffset(Lfn, Offset)) return -EINVAL;
139 
140 // Indicate the record is free
141 //
142  if (!reqWrite((void *)&addT, sizeof(addT), Offset))
143  {eDest->Emsg("Commit", Lfn, "not committed to the persist queue.");
144  return -EIO;
145  }
146 
147 // Remove entry from the map and return
148 //
149  myMutex.Lock();
150  pqMap.erase(std::string(Lfn));
151  myMutex.UnLock();
152  return 0;
153 }
154 
155 /******************************************************************************/
156 /* D e l */
157 /******************************************************************************/
158 
159 int XrdOfsPoscq::Del(const char *Lfn, int Offset, int Unlink)
160 {
161  static int Zero = 0;
162  FileSlot *freeSlot;
163  int retc;
164 
165 // Verify the offset it must be correct
166 //
167  if (!VerOffset(Lfn, Offset)) return -EINVAL;
168 
169 // Unlink the file if need be
170 //
171  if (Unlink && (retc = ossFS->Unlink(Lfn)) && retc != -ENOENT)
172  {eDest->Emsg("Del", retc, "remove", Lfn);
173  return (retc < 0 ? retc : -retc);
174  }
175 
176 // Indicate the record is free
177 //
178  if (!reqWrite((void *)&Zero, sizeof(Zero), Offset+offsetof(Request,LFN)))
179  {eDest->Emsg("Del", Lfn, "not removed from the persist queue.");
180  return -EIO;
181  }
182 
183 // Serialize and place this on the free queue
184 //
185  myMutex.Lock();
186  if ((freeSlot = SlotLust)) SlotLust = freeSlot->Next;
187  else freeSlot = new FileSlot;
188  freeSlot->Offset = Offset;
189  freeSlot->Next = SlotList;
190  SlotList = freeSlot;
191  if (pocIQ > 0) pocIQ--;
192 
193 // Remove item from the map
194 //
195  pqMap.erase(std::string(Lfn));
196  myMutex.UnLock();
197 
198 // All done
199 //
200  return 0;
201 }
202 
203 /******************************************************************************/
204 /* I n i t */
205 /******************************************************************************/
206 
208 {
209  static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
210  Request tmpReq;
211  struct stat buf, Stat;
212  recEnt *First = 0;
213  char Buff[80];
214  int rc, Offs, numreq = 0;
215 
216 // Assume we will fail
217 //
218  Ok = 0;
219 
220 // Open the file first in r/w mode
221 //
222  if ((pocFD = XrdSysFD_Open(pocFN, O_RDWR|O_CREAT, Mode)) < 0)
223  {eDest->Emsg("Init",errno,"open",pocFN);
224  return 0;
225  }
226 
227 // Get file status
228 //
229  if (fstat(pocFD, &buf)) {FailIni("stat"); return 0;}
230 
231 // Check for a new file here
232 //
233  if (buf.st_size < ReqSize)
234  {pocSZ = ReqOffs;
235  if (ftruncate(pocFD, ReqOffs)) FailIni("trunc");
236  else Ok = 1;
237  return 0;
238  }
239 
240 // Read the full file
241 //
242  for (Offs = ReqOffs; Offs < buf.st_size; Offs += ReqSize)
243  {do {rc = pread(pocFD, (void *)&tmpReq, ReqSize, Offs);}
244  while(rc < 0 && errno == EINTR);
245  if (rc < 0) {eDest->Emsg("Init",errno,"read",pocFN); return First;}
246  if (*tmpReq.LFN == '\0'
247  || ossFS->Stat(tmpReq.LFN, &Stat)
248  || !(S_ISREG(Stat.st_mode) || !(Stat.st_mode & XRDSFS_POSCPEND))) continue;
249  First = new recEnt(tmpReq, Stat.st_mode & S_IAMB, First); numreq++;
250  }
251 
252 // Now write out the file and return
253 //
254  sprintf(Buff, " %d pending create%s", numreq, (numreq != 1 ? "s" : ""));
255  eDest->Say("Init", Buff, " recovered from ", pocFN);
256  if (ReWrite(First)) Ok = 1;
257  return First;
258 }
259 
260 /******************************************************************************/
261 /* L i s t */
262 /******************************************************************************/
263 
265 {
266  XrdOfsPoscq::Request tmpReq;
267  struct stat buf;
268  recEnt *First = 0;
269  int rc, theFD, Offs;
270 
271 // Open the file first in r/o mode
272 //
273  if ((theFD = XrdSysFD_Open(theFN, O_RDONLY)) < 0)
274  {Say->Emsg("Init",errno,"open",theFN);
275  return 0;
276  }
277 
278 // Get file status
279 //
280  if (fstat(theFD, &buf))
281  {Say->Emsg("Init",errno,"stat",theFN);
282  close(theFD);
283  return 0;
284  }
285  if (buf.st_size < ReqSize) buf.st_size = 0;
286 
287 // Read the full file
288 //
289  for (Offs = ReqOffs; Offs < buf.st_size; Offs += ReqSize)
290  {do {rc = pread(theFD, (void *)&tmpReq, ReqSize, Offs);}
291  while(rc < 0 && errno == EINTR);
292  if (rc < 0) {Say->Emsg("List",errno,"read",theFN);
293  close(theFD); return First;
294  }
295  if (*tmpReq.LFN != '\0') First = new recEnt(tmpReq, 0, First);
296  }
297 
298 // All done
299 //
300  close(theFD);
301  return First;
302 }
303 
304 /******************************************************************************/
305 /* F a i l I n i */
306 /******************************************************************************/
307 
308 void XrdOfsPoscq::FailIni(const char *txt)
309 {
310  eDest->Emsg("Init", errno, txt, pocFN);
311 }
312 
313 /******************************************************************************/
314 /* r e q W r i t e */
315 /******************************************************************************/
316 
317 bool XrdOfsPoscq::reqWrite(void *Buff, int Bsz, int Offs)
318 {
319  int rc = 0;
320 
321  do {rc = pwrite(pocFD, Buff, Bsz, Offs);} while(rc < 0 && errno == EINTR);
322 
323  if (rc >= 0 && Bsz > 8)
324  {if (!pocWS) {pocWS = pocSV; rc = fsync(pocFD);}
325  else pocWS--;
326  }
327 
328  if (rc < 0) {eDest->Emsg("reqWrite",errno,"write", pocFN); return false;}
329  return true;
330 }
331 
332 /******************************************************************************/
333 /* R e W r i t e */
334 /******************************************************************************/
335 
336 bool XrdOfsPoscq::ReWrite(XrdOfsPoscq::recEnt *rP)
337 {
338  static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
339  char newFN[MAXPATHLEN], *oldFN;
340  int newFD, oldFD, Offs = ReqOffs;
341  bool aOK = true;
342 
343 // Construct new file and open it
344 //
345  strcpy(newFN, pocFN); strcat(newFN, ".new");
346  if ((newFD = XrdSysFD_Open(newFN, O_RDWR|O_CREAT|O_TRUNC, Mode)) < 0)
347  {eDest->Emsg("ReWrite",errno,"open",newFN); return false;}
348 
349 // Setup to write/swap the file
350 //
351  oldFD = pocFD; pocFD = newFD;
352  oldFN = pocFN; pocFN = newFN;
353 
354 // Rewrite all records if we have any
355 //
356  while(rP)
357  {rP->Offset = Offs;
358  if (!reqWrite((void *)&rP->reqData, ReqSize, Offs))
359  {aOK = false; break;}
360  pqMap[std::string(rP->reqData.LFN)] = Offs;
361  Offs += ReqSize;
362  rP = rP->Next;
363  }
364 
365 // If all went well, rename the file
366 //
367  if (aOK && rename(newFN, oldFN) < 0)
368  {eDest->Emsg("ReWrite",errno,"rename",newFN); aOK = false;}
369 
370 // Perform post processing
371 //
372  if (aOK) close(oldFD);
373  else {close(newFD); pocFD = oldFD;}
374  pocFN = oldFN;
375  pocSZ = Offs;
376  return aOK;
377 }
378 
379 /******************************************************************************/
380 /* V e r O f f s e t */
381 /******************************************************************************/
382 
383 bool XrdOfsPoscq::VerOffset(const char *Lfn, int Offset)
384 {
385 
386 // Verify the offset
387 //
388  if (Offset < ReqOffs || (Offset-ReqOffs)%ReqSize)
389  {char buff[128];
390  sprintf(buff, "Invalid slot %d for", Offset);
391  eDest->Emsg("VerOffset", buff, Lfn);
392  return false;
393  }
394  return true;
395 }
struct stat Stat
Definition: XrdCks.cc:49
#define S_IAMB
Definition: XrdConfig.cc:159
static XrdSysError eDest(0,"crypto_")
ssize_t pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
int stat(const char *path, struct stat *buf)
int ftruncate(int fildes, off_t offset)
ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset)
int fstat(int fildes, struct stat *buf)
int rename(const char *oldpath, const char *newpath)
int fsync(int fildes)
#define close(a)
Definition: XrdPosix.hh:43
int Mode
#define XRDSFS_POSCPEND
Definition: XrdSfsFlags.hh:89
size_t strlcpy(char *dst, const char *src, size_t sz)
static const int ReqOffs
Definition: XrdOfsPoscq.hh:53
int Del(const char *Lfn, int Offset, int Unlink=0)
Definition: XrdOfsPoscq.cc:159
static recEnt * List(XrdSysError *Say, const char *theFN)
Definition: XrdOfsPoscq.cc:264
XrdOfsPoscq(XrdSysError *erp, XrdOss *oss, const char *fn, int sv=1)
Definition: XrdOfsPoscq.cc:53
recEnt * Init(int &Ok)
Definition: XrdOfsPoscq.cc:207
static const int ReqSize
Definition: XrdOfsPoscq.hh:54
int Commit(const char *Lfn, int Offset)
Definition: XrdOfsPoscq.cc:132
int Add(const char *Tident, const char *Lfn, bool isNew)
Definition: XrdOfsPoscq.cc:72
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
XRootDStatus Unlink(Davix::DavPosix &davix_client, const std::string &url, uint16_t timeout)
XrdSysError Say
struct Request reqData
Definition: XrdOfsPoscq.hh:61