XRootD
XrdFfsWcache.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* XrdFfsWcache.cc simple write cache that captures consecutive small writes */
3 /* */
4 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
5 /* All Rights Reserved */
6 /* Author: Wei Yang (SLAC National Accelerator Laboratory, 2009) */
7 /* Contract DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 /*
31  When direct_io is not used, kernel will break large write to 4Kbyte
32  writes. This significantly reduces the writting performance. This
33  simple cache mechanism is to improve the performace on small writes.
34 
35  Note that fuse 2.8.0 pre2 or above and kernel 2.6.27 or above provide
36  a big_writes option to allow > 4KByte writing. It will make this
37  smiple write caching obsolete.
38 */
39 
40 #if defined(__linux__)
41 /* For pread()/pwrite() */
42 #ifndef _XOPEN_SOURCE
43 #define _XOPEN_SOURCE 500
44 #endif
45 #endif
46 
47 #include <cstring>
48 #include <cstdlib>
49 #include <sys/types.h>
50 #include <sys/resource.h>
51 #include <unistd.h>
52 #include <cerrno>
53 
54 #include <pthread.h>
55 
56 #include "XrdFfs/XrdFfsWcache.hh"
57 #ifndef NOXRD
58  #include "XrdFfs/XrdFfsPosix.hh"
59 #endif
60 
61 #ifndef O_DIRECT
62 #define O_DIRECT 0
63 #endif
64 
65 #ifdef __cplusplus
66  extern "C" {
67 #endif
68 
70 ssize_t XrdFfsWcacheBufsize = 131072;
71 
73  off_t offset;
74  size_t len;
75  char *buf;
76  size_t bufsize;
77  pthread_mutex_t *mlock;
78 };
79 
81 
82 /* #include "xrdposix.h" */
83 
85 void XrdFfsWcache_init(int basefd, int maxfd)
86 {
87  int fd;
88 /* We are now using virtual file descriptors (from Xrootd Posix interface) in XrdFfsXrootdfs.cc so we need to set
89  * base (lowest) file descriptor, and max number of file descriptors..
90  *
91  struct rlimit rlp;
92 
93  getrlimit(RLIMIT_NOFILE, &rlp);
94  XrdFfsWcacheNFILES = rlp.rlim_cur;
95  XrdFfsWcacheNFILES = (XrdFfsWcacheNFILES == (int)RLIM_INFINITY? 4096 : XrdFfsWcacheNFILES);
96  */
97 
98  XrdFfsPosix_baseFD = basefd;
99  XrdFfsWcacheNFILES = maxfd;
100 
101 /* printf("%d %d\n", XrdFfsWcacheNFILES, sizeof(struct XrdFfsWcacheFilebuf)); */
103  for (fd = 0; fd < XrdFfsWcacheNFILES; fd++)
104  {
105  XrdFfsWcacheFbufs[fd].offset = 0;
106  XrdFfsWcacheFbufs[fd].len = 0;
107  XrdFfsWcacheFbufs[fd].buf = NULL;
108  XrdFfsWcacheFbufs[fd].mlock = NULL;
109  }
110  if (!getenv("XRDCL_EC"))
111  {
112  XrdFfsRcacheBufsize = 1024 * 128;
113  }
114  else
115  {
116  char *savptr;
117  int nbdat = atoi(strtok_r(getenv("XRDCL_EC"), ",", &savptr));
118  strtok_r(NULL, ",", &savptr);
119  int chsz = atoi(strtok_r(NULL, ",", &savptr));
120  XrdFfsRcacheBufsize = nbdat * chsz;
121  }
122  if (getenv("XROOTDFS_WCACHESZ"))
123  XrdFfsRcacheBufsize = atoi(getenv("XROOTDFS_WCACHESZ"));
124 }
125 
126 int XrdFfsWcache_create(int fd, int flags)
127 /* Create a write cache buffer for a given file descriptor
128  *
129  * fd: file descriptor
130  *
131  * returns: 1 - ok
132  * 0 - error, error code in errno
133  */
134 {
136  fd -= XrdFfsPosix_baseFD;
137 
138  XrdFfsWcacheFbufs[fd].offset = 0;
139  XrdFfsWcacheFbufs[fd].len = 0;
140  // "flag & O_RDONLY" is not equivalant to ! (flags & O_RDWR) && ! (flags & O_WRONLY)
141  if ( ! (flags & O_RDWR) &&
142  ! (flags & O_WRONLY) &&
143  (flags & O_DIRECT) ) // Limit the usage scenario of the read cache
144  {
145  XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsRcacheBufsize);
147  }
148  else
149  {
150  XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsWcacheBufsize);
152  }
153  if (XrdFfsWcacheFbufs[fd].buf == NULL)
154  {
155  errno = ENOMEM;
156  return 0;
157  }
158  XrdFfsWcacheFbufs[fd].mlock = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
159  if (XrdFfsWcacheFbufs[fd].mlock == NULL)
160  {
161  errno = ENOMEM;
162  return 0;
163  }
164  errno = pthread_mutex_init(XrdFfsWcacheFbufs[fd].mlock, NULL);
165  if (errno)
166  return 0;
167  return 1;
168 }
169 
171 {
172 /* XrdFfsWcache_flush(fd); */
173  fd -= XrdFfsPosix_baseFD;
174 
175  XrdFfsWcacheFbufs[fd].offset = 0;
176  XrdFfsWcacheFbufs[fd].len = 0;
177  if (XrdFfsWcacheFbufs[fd].buf != NULL)
178  free(XrdFfsWcacheFbufs[fd].buf);
179  XrdFfsWcacheFbufs[fd].buf = NULL;
180  if (XrdFfsWcacheFbufs[fd].mlock != NULL)
181  {
182  pthread_mutex_destroy(XrdFfsWcacheFbufs[fd].mlock);
183  free(XrdFfsWcacheFbufs[fd].mlock);
184  }
185  XrdFfsWcacheFbufs[fd].mlock = NULL;
186 }
187 
188 ssize_t XrdFfsWcache_flush(int fd)
189 {
190  ssize_t rc;
191  fd -= XrdFfsPosix_baseFD;
192 
193  if (XrdFfsWcacheFbufs[fd].len == 0 || XrdFfsWcacheFbufs[fd].buf == NULL )
194  return 0;
195 
198  if (rc > 0)
199  {
200  XrdFfsWcacheFbufs[fd].offset = 0;
201  XrdFfsWcacheFbufs[fd].len = 0;
202  }
203  return rc;
204 }
205 
206 /*
207 struct fd_n_offset {
208  int fd;
209  off_t offset;
210  fd_n_offset(int myfd, off_t myoffset) : fd(myfd), offset(myoffset) {}
211 };
212 
213 void *XrdFfsWcache_updateReadCache(void *x)
214 {
215  struct fd_n_offset *a = (struct fd_n_offset*) x;
216  size_t bufsize = XrdFfsWcacheFbufs[a->fd].bufsize;
217 
218  pthread_mutex_lock(XrdFfsWcacheFbufs[a->fd].mlock);
219  XrdFfsWcacheFbufs[a->fd].offset = (a->offset / bufsize) * bufsize;
220  XrdFfsWcacheFbufs[a->fd].len = XrdFfsPosix_pread(a->fd + XrdFfsPosix_baseFD,
221  XrdFfsWcacheFbufs[a->fd].buf,
222  bufsize,
223  XrdFfsWcacheFbufs[a->fd].offset);
224  pthread_mutex_unlock(XrdFfsWcacheFbufs[a->fd].mlock);
225  return NULL;
226 }
227 */
228 
229 // this is a read cache
230 ssize_t XrdFfsWcache_pread(int fd, char *buf, size_t len, off_t offset)
231 {
232  ssize_t rc;
233  fd -= XrdFfsPosix_baseFD;
234  if (fd < 0)
235  {
236  errno = EBADF;
237  return -1;
238  }
239 
240  char *bufptr;
241  size_t bufsize = XrdFfsWcacheFbufs[fd].bufsize;
242 
243  pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);
244 
245  // identity which block to cache
246  if (XrdFfsWcacheFbufs[fd].len == 0 ||
248  {
252  bufsize,
254  } // when XrdFfsWcacheFbufs[fd].len < bufsize, the block is partially cached.
255 
256 
257  // fetch data from the cache, up to the block's upper boundary.
258  if (XrdFfsWcacheFbufs[fd].offset <= offset &&
260  { // read from cache,
261 //----------------------------------------------------------
262 // FUSE doesn't like this block of the code, unless direct_io is enabled, or
263 // O_DIRECT flags is used. Otherwise, FUSES will stop reading prematurely
264 // when two processes read the same file at the same time.
265  bufptr = &XrdFfsWcacheFbufs[fd].buf[offset - XrdFfsWcacheFbufs[fd].offset];
266  rc = (len < XrdFfsWcacheFbufs[fd].len - (offset - XrdFfsWcacheFbufs[fd].offset))?
268  memcpy(buf, bufptr, rc);
269 //----------------------------------------------------------
270  }
271  else
272  { // offset fall into the uncached part of the partically cached block
274  }
275  pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
276 /*
277  // prefetch the next block
278  if ( (offset + rc) ==
279  (XrdFfsWcacheFbufs[fd].offset + bufsize) )
280  {
281  pthread_t thread;
282  pthread_attr_t attr;
283  //size_t stacksize = 4*1024*1024;
284 
285  pthread_attr_init(&attr);
286  //pthread_attr_setstacksize(&attr, stacksize);
287  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
288 
289  struct fd_n_offset nextblock(fd, (offset + bufsize));
290  if (! pthread_create(&thread, &attr, XrdFfsWcache_updateReadCache, &nextblock))
291  pthread_detach(thread);
292  pthread_attr_destroy(&attr);
293  }
294 */
295  return rc;
296 }
297 
298 ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
299 {
300  ssize_t rc;
301  char *bufptr;
302  fd -= XrdFfsPosix_baseFD;
303  if (fd < 0)
304  {
305  errno = EBADF;
306  return -1;
307  }
308 
309 /* do not use caching under these cases */
310  if (len > (size_t)(XrdFfsWcacheBufsize/2) || fd >= XrdFfsWcacheNFILES)
311  {
313  return rc;
314  }
315 
316  pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);
317  rc = XrdFfsWcacheFbufs[fd].len;
318 /*
319  in the following two cases, a XrdFfsWcache_flush is required:
320  1. current offset isnn't pointing to the tail of data in buffer
321  2. adding new data will exceed the current buffer
322 */
323  if (offset != (off_t)(XrdFfsWcacheFbufs[fd].offset + XrdFfsWcacheFbufs[fd].len) ||
326 
327  errno = 0;
328  if (rc < 0)
329  {
330  errno = ENOSPC;
331  pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
332  return -1;
333  }
334 
335  bufptr = &XrdFfsWcacheFbufs[fd].buf[XrdFfsWcacheFbufs[fd].len];
336  memcpy(bufptr, buf, len);
337  if (XrdFfsWcacheFbufs[fd].len == 0)
339  XrdFfsWcacheFbufs[fd].len += len;
340 
341  pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
342  return (ssize_t)len;
343 }
344 
345 #ifdef __cplusplus
346  }
347 #endif
ssize_t XrdFfsPosix_pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
Definition: XrdFfsPosix.cc:152
ssize_t XrdFfsPosix_pread(int fildes, void *buf, size_t nbyte, off_t offset)
Definition: XrdFfsPosix.cc:142
void XrdFfsWcache_init(int basefd, int maxfd)
Definition: XrdFfsWcache.cc:85
void XrdFfsWcache_destroy(int fd)
int XrdFfsWcacheNFILES
Definition: XrdFfsWcache.cc:84
ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
ssize_t XrdFfsWcacheBufsize
Definition: XrdFfsWcache.cc:70
#define O_DIRECT
Definition: XrdFfsWcache.cc:62
pthread_mutex_t * mlock
Definition: XrdFfsWcache.cc:77
ssize_t XrdFfsWcache_pread(int fd, char *buf, size_t len, off_t offset)
ssize_t XrdFfsRcacheBufsize
Definition: XrdFfsWcache.cc:69
int XrdFfsPosix_baseFD
Definition: XrdFfsWcache.cc:84
ssize_t XrdFfsWcache_flush(int fd)
struct XrdFfsWcacheFilebuf * XrdFfsWcacheFbufs
Definition: XrdFfsWcache.cc:80
int XrdFfsWcache_create(int fd, int flags)