XRootD
XrdOssAio.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O s s A i o . c c */
4 /* */
5 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <signal.h>
32 #include <cstdio>
33 #include <unistd.h>
34 #include <fcntl.h>
35 #ifdef _POSIX_ASYNCHRONOUS_IO
36 #ifdef __FreeBSD__
37 #include <fcntl.h>
38 #endif
39 #ifdef __APPLE__
40 #include <sys/aio.h>
41 #else
42 #include <aio.h>
43 #endif
44 #endif
45 
46 #include "XrdOss/XrdOssApi.hh"
47 #include "XrdOss/XrdOssTrace.hh"
48 #include "XrdSys/XrdSysError.hh"
49 #include "XrdSys/XrdSysPlatform.hh"
50 #include "XrdSys/XrdSysPthread.hh"
51 #include "XrdSfs/XrdSfsAio.hh"
52 
53 // All AIO interfaces are defined here.
54 
55 
56 // Currently we disable aio support for MacOS because it is way too
57 // buggy and incomplete. The two major problems are:
58 // 1) No implementation of sigwaitinfo(). Though we can simulate it...
59 // 2) Event notification returns an incomplete siginfo structure.
60 //
61 #ifdef __APPLE__
62 #undef _POSIX_ASYNCHRONOUS_IO
63 #endif
64 
65 #ifdef __GNU__
66 // Compiler warning:
67 // warning: sigwaitinfo is not implemented and will always fail
68 #undef HAVE_SIGWTI
69 #endif
70 
71 /******************************************************************************/
72 /* G l o b a l s */
73 /******************************************************************************/
74 
75 extern XrdSysTrace OssTrace;
76 //define tident aiop->TIdent
77 
78 extern XrdSysError OssEroute;
79 
80 int XrdOssFile::AioFailure = 0;
81 
82 #ifdef _POSIX_ASYNCHRONOUS_IO
83 #ifdef SIGRTMAX
84 const int OSS_AIO_READ_DONE = SIGRTMAX-1;
85 const int OSS_AIO_WRITE_DONE = SIGRTMAX;
86 #else
87 #define OSS_AIO_READ_DONE SIGUSR1
88 #define OSS_AIO_WRITE_DONE SIGUSR2
89 #endif
90 #endif
91 
92 /******************************************************************************/
93 /* F s y n c */
94 /******************************************************************************/
95 
96 /*
97  Function: Async fsync() a file
98 
99  Input: aiop - A aio request object
100 */
101 
103 {
104 
105 #ifdef _POSIX_ASYNCHRONOUS_IO
106  int rc;
107 
108 // Complete the aio request block and do the operation
109 //
111  {aiop->sfsAio.aio_fildes = fd;
112  aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_WRITE_DONE;
113  aiop->TIdent = tident;
114 
115  // Start the operation
116  //
117  if (!(rc = aio_fsync(O_SYNC, &aiop->sfsAio))) return 0;
118  if (errno != EAGAIN && errno != ENOSYS) return -errno;
119 
120  // Aio failed keep track of the problem (msg every 1024 events). Note
121  // that the handling of the counter is sloppy because we do not lock it.
122  //
123  {int fcnt = AioFailure++;
124  if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("aio", errno, "fsync async");
125  }
126  }
127 #endif
128 
129 // Execute this request in a synchronous fashion
130 //
131  if ((aiop->Result = Fsync())) aiop->Result = -errno;
132 
133 // Simply call the write completion routine and return as if all went well
134 //
135  aiop->doneWrite();
136  return 0;
137 }
138 
139 /******************************************************************************/
140 /* R e a d */
141 /******************************************************************************/
142 
143 /*
144  Function: Async read `blen' bytes from the associated file, placing in 'buff'
145 
146  Input: aiop - An aio request object
147 
148  Output: <0 -> Operation failed, value is negative errno value.
149  =0 -> Operation queued
150  >0 -> Operation not queued, system resources unavailable or
151  asynchronous I/O is not supported.
152 */
153 
155 {
156 
157 #ifdef _POSIX_ASYNCHRONOUS_IO
158  EPNAME("AioRead");
159  int rc;
160 
161 // Complete the aio request block and do the operation
162 //
164  {aiop->sfsAio.aio_fildes = fd;
165  aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_READ_DONE;
166  aiop->TIdent = tident;
167  TRACE(Debug, "fd=" <<fd <<" read " <<aiop->sfsAio.aio_nbytes <<'@'
168  <<aiop->sfsAio.aio_offset <<" started; aiocb="
169  <<Xrd::hex1 <<aiop);
170 
171  // Start the operation
172  //
173  if (!(rc = aio_read(&aiop->sfsAio))) return 0;
174  if (errno != EAGAIN && errno != ENOSYS) return -errno;
175 
176  // Aio failed keep track of the problem (msg every 1024 events). Note
177  // that the handling of the counter is sloppy because we do not lock it.
178  //
179  {int fcnt = AioFailure++;
180  if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("aio", errno, "read async");
181  }
182  }
183 #endif
184 
185 // Execute this request in a synchronous fashion
186 //
187  aiop->Result = this->Read((void *)aiop->sfsAio.aio_buf,
188  (off_t)aiop->sfsAio.aio_offset,
189  (size_t)aiop->sfsAio.aio_nbytes);
190 
191 // Simple call the read completion routine and return as if all went well
192 //
193  aiop->doneRead();
194  return 0;
195 }
196 
197 /******************************************************************************/
198 /* W r i t e */
199 /******************************************************************************/
200 
201 /*
202  Function: Async write `blen' bytes from 'buff' into the associated file
203 
204  Input: aiop - An aio request object.
205 
206  Output: <0 -> Operation failed, value is negative errno value.
207  =0 -> Operation queued
208  >0 -> Operation not queued, system resources unavailable or
209  asynchronous I/O is not supported.
210 */
211 
213 {
214 #ifdef _POSIX_ASYNCHRONOUS_IO
215  EPNAME("AioWrite");
216  int rc;
217 
218 // Complete the aio request block and do the operation
219 //
221  {aiop->sfsAio.aio_fildes = fd;
222  aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_WRITE_DONE;
223  aiop->TIdent = tident;
224  TRACE(Debug, "fd=" <<fd <<" write " <<aiop->sfsAio.aio_nbytes <<'@'
225  <<aiop->sfsAio.aio_offset <<" started; aiocb="
226  <<Xrd::hex1 <<aiop);
227 
228  // Start the operation
229  //
230  if (!(rc = aio_write(&aiop->sfsAio))) return 0;
231  if (errno != EAGAIN && errno != ENOSYS) return -errno;
232 
233  // Aio failed keep track of the problem (msg every 1024 events). Note
234  // that the handling of the counter is sloppy because we do not lock it.
235  //
236  {int fcnt = AioFailure++;
237  if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("Write",errno,"write async");
238  }
239  }
240 #endif
241 
242 // Execute this request in a synchronous fashion
243 //
244  aiop->Result = this->Write((const void *)aiop->sfsAio.aio_buf,
245  (off_t)aiop->sfsAio.aio_offset,
246  (size_t)aiop->sfsAio.aio_nbytes);
247 
248 // Simply call the write completion routine and return as if all went well
249 //
250  aiop->doneWrite();
251  return 0;
252 }
253 
254 /******************************************************************************/
255 /* X r d O s s S y s A I O M e t h o d s */
256 /******************************************************************************/
257 /******************************************************************************/
258 /* G l o b a l s */
259 /******************************************************************************/
260 
261 int XrdOssSys::AioAllOk = 0;
262 
263 #if defined(_POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI)
264 // The folowing is for sigwaitinfo() emulation
265 //
266 siginfo_t *XrdOssAioInfoR;
267 siginfo_t *XrdOssAioInfoW;
268 extern "C" {extern void XrdOssAioRSH(int, siginfo_t *, void *);}
269 extern "C" {extern void XrdOssAioWSH(int, siginfo_t *, void *);}
270 #endif
271 
272 /******************************************************************************/
273 /* A i o I n i t */
274 /******************************************************************************/
275 /*
276  Function: Initialize for AIO processing.
277 
278  Return: True if successful, false otherwise.
279 */
280 
282 {
283 #if defined(_POSIX_ASYNCHRONOUS_IO)
284  EPNAME("AioInit");
285  extern void *XrdOssAioWait(void *carg);
286  pthread_t tid;
287  int retc;
288 
289 #ifndef HAVE_SIGWTI
290 // For those platforms that do not have sigwaitinfo(), we provide the
291 // appropriate emulation using a signal handler. We actually provide for
292 // two handlers since we separate reads from writes. To emulate synchronous
293 // signals, we prohibit one signal hander from interrupting another one.
294 //
295  struct sigaction sa;
296 
297  sa.sa_sigaction = XrdOssAioRSH;
298  sa.sa_flags = SA_SIGINFO;
299  sigemptyset(&sa.sa_mask);
300  sigaddset(&sa.sa_mask, OSS_AIO_WRITE_DONE);
301  if (sigaction(OSS_AIO_READ_DONE, &sa, NULL) < 0)
302  {OssEroute.Emsg("AioInit", errno, "creating AIO read signal handler; "
303  "AIO support terminated.");
304  return 0;
305  }
306 
307  sa.sa_sigaction = XrdOssAioWSH;
308  sa.sa_flags = SA_SIGINFO;
309  sigemptyset(&sa.sa_mask);
310  sigaddset(&sa.sa_mask, OSS_AIO_READ_DONE);
311  if (sigaction(OSS_AIO_WRITE_DONE, &sa, NULL) < 0)
312  {OssEroute.Emsg("AioInit", errno, "creating AIO write signal handler; "
313  "AIO support terminated.");
314  return 0;
315  }
316 #endif
317 
318 // The AIO signal handler consists of two thread (one for read and one for
319 // write) that synhronously wait for AIO events. We assume, blithely, that
320 // the first two real-time signals have been blocked for all threads.
321 //
322  if ((retc = XrdSysThread::Run(&tid, XrdOssAioWait,
323  (void *)(&OSS_AIO_READ_DONE))) < 0)
324  OssEroute.Emsg("AioInit", retc, "creating AIO read signal thread; "
325  "AIO support terminated.");
326 #ifdef __FreeBSD__
327  else {DEBUG("started AIO read signal thread.");
328 #else
329  else {DEBUG("started AIO read signal thread; tid=" <<(unsigned int)tid);
330 #endif
331  if ((retc = XrdSysThread::Run(&tid, XrdOssAioWait,
332  (void *)(&OSS_AIO_WRITE_DONE))) < 0)
333  OssEroute.Emsg("AioInit", retc, "creating AIO write signal thread; "
334  "AIO support terminated.");
335 #ifdef __FreeBSD__
336  else {DEBUG("started AIO write signal thread.");
337 #else
338  else {DEBUG("started AIO write signal thread; tid=" <<(unsigned int)tid);
339 #endif
340  AioAllOk = 1;
341  }
342  }
343 
344 // All done
345 //
346  return AioAllOk;
347 #else
348  return 1;
349 #endif
350 }
351 
352 /******************************************************************************/
353 /* A i o W a i t */
354 /******************************************************************************/
355 
356 void *XrdOssAioWait(void *mySigarg)
357 {
358 #ifdef _POSIX_ASYNCHRONOUS_IO
359  EPNAME("AioWait");
360  int mySignum = *((int *)mySigarg);
361  const char *sigType = (mySignum == OSS_AIO_READ_DONE ? "read" : "write");
362  const int isRead = (mySignum == OSS_AIO_READ_DONE);
363  sigset_t mySigset;
364  siginfo_t myInfo;
365  XrdSfsAio *aiop;
366  int rc, numsig;
367  ssize_t retval;
368 #ifndef HAVE_SIGWTI
369  extern int sigwaitinfo(const sigset_t *set, siginfo_t *info);
370  extern siginfo_t *XrdOssAioInfoR;
371  extern siginfo_t *XrdOssAioInfoW;
372 
373 // We will catch one signal at a time. So, the address of siginfo_t can be
374 // placed in a global area where the signal handler will find it. We have one
375 // two places where this can go.
376 //
377  if (isRead) XrdOssAioInfoR = &myInfo;
378  else XrdOssAioInfoW = &myInfo;
379 
380 // Initialize the signal we will be suspended for
381 //
382  sigfillset(&mySigset);
383  sigdelset(&mySigset, mySignum);
384 #else
385 
386 // Initialize the signal we will be waiting for
387 //
388  sigemptyset(&mySigset);
389  sigaddset(&mySigset, mySignum);
390 #endif
391 
392 // Simply wait for events and requeue the completed AIO operation
393 //
394  do {do {numsig = sigwaitinfo((const sigset_t *)&mySigset, &myInfo);}
395  while (numsig < 0 && errno == EINTR);
396  if (numsig < 0)
397  {OssEroute.Emsg("AioWait",errno,sigType,"wait for AIO signal");
399  break;
400  }
401  if (numsig != mySignum || myInfo.si_code != SI_ASYNCIO)
402  {char buff[80];
403  sprintf(buff, "%d %d", myInfo.si_code, numsig);
404  OssEroute.Emsg("AioWait", "received unexpected signal", buff);
405  continue;
406  }
407 
408 #ifdef __APPLE__
409  aiop = (XrdSfsAio *)myInfo.si_value.sigval_ptr;
410 #else
411  aiop = (XrdSfsAio *)myInfo.si_value.sival_ptr;
412 #endif
413 
414  while ((rc = aio_error(&aiop->sfsAio)) == EINPROGRESS) {}
415  retval = (ssize_t)aio_return(&aiop->sfsAio);
416 
417  DEBUG(sigType <<" completed for " <<aiop->TIdent <<"; rc=" <<rc
418  <<" result=" <<retval <<" aiocb=" <<Xrd::hex1 <<aiop);
419 
420  if (retval < 0) aiop->Result = -rc;
421  else aiop->Result = retval;
422 
423  if (isRead) aiop->doneRead();
424  else aiop->doneWrite();
425  } while(1);
426 #endif
427  return (void *)0;
428 }
429 
430 #if defined( _POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI)
431 /******************************************************************************/
432 /* s i g w a i t i n f o */
433 /******************************************************************************/
434 
435 // Some platforms do not have sigwaitinfo() (e.g., MacOS). We provide for
436 // emulation here. It's not as good as the kernel version and the
437 // implementation is very specific to the task at hand.
438 //
439 int sigwaitinfo(const sigset_t *set, siginfo_t *info)
440 {
441 // Now enable the signal handler by unblocking the signal. It will move the
442 // siginfo into the waiting struct and we can return.
443 //
444  sigsuspend(set);
445  return info->si_signo;
446 }
447 
448 /******************************************************************************/
449 /* X r d O s s A i o R S H */
450 /******************************************************************************/
451 
452 // XrdOssAioRSH handles AIO read signals. This handler was setup at AIO
453 // initialization time but only when this platform does not have sigwaitinfo().
454 //
455 extern "C"
456 {
457 void XrdOssAioRSH(int signum, siginfo_t *info, void *ucontext)
458 {
459  extern siginfo_t *XrdOssAioInfoR;
460 
461 // If we received a signal, it must have been for an AIO read and the read
462 // signal thread enabled this signal. This means that a valid address exists
463 // in the global read info pointer that we can now fill out.
464 //
465  XrdOssAioInfoR->si_signo = info->si_signo;
466  XrdOssAioInfoR->si_errno = info->si_errno;
467  XrdOssAioInfoR->si_code = info->si_code;
468 #ifdef __APPLE__
469  XrdOssAioInfoR->si_value.sigval_ptr = info->si_addr;
470 #else
471  XrdOssAioInfoR->si_value.sival_ptr = info->si_value.sival_ptr;
472 #endif
473 }
474 }
475 
476 /******************************************************************************/
477 /* X r d O s s A i o W S H */
478 /******************************************************************************/
479 
480 // XrdOssAioRSH handles AIO read signals. This handler was setup at AIO
481 // initialization time but only when this platform does not have sigwaitinfo().
482 //
483 extern "C"
484 {
485 void XrdOssAioWSH(int signum, siginfo_t *info, void *ucontext)
486 {
487  extern siginfo_t *XrdOssAioInfoW;
488 
489 // If we received a signal, it must have been for an AIO read and the read
490 // signal thread enabled this signal. This means that a valid address exists
491 // in the global read info pointer that we can now fill out.
492 //
493  XrdOssAioInfoW->si_signo = info->si_signo;
494  XrdOssAioInfoW->si_errno = info->si_errno;
495  XrdOssAioInfoW->si_code = info->si_code;
496 #ifdef __APPLE__
497  XrdOssAioInfoW->si_value.sigval_ptr = info->si_addr;
498 #else
499  XrdOssAioInfoW->si_value.sival_ptr = info->si_value.sival_ptr;
500 #endif
501 }
502 }
503 #endif
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
XrdSysTrace OssTrace
void * XrdOssAioWait(void *mySigarg)
Definition: XrdOssAio.cc:356
XrdSysError OssEroute
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
struct sigevent aio_sigevent
Definition: XrdSfsAio.hh:51
int aio_fildes
Definition: XrdSfsAio.hh:46
void * aio_buf
Definition: XrdSfsAio.hh:47
#define TRACE(act, x)
Definition: XrdTrace.hh:63
const char * tident
Definition: XrdOss.hh:453
int fd
Definition: XrdOss.hh:455
int Fsync()
Definition: XrdOssApi.cc:1135
ssize_t Read(off_t, size_t)
Definition: XrdOssApi.cc:846
ssize_t Write(const void *, off_t, size_t)
Definition: XrdOssApi.cc:1022
static int AioInit()
Definition: XrdOssAio.cc:281
static int AioAllOk
Definition: XrdOssApi.hh:201
ssize_t Result
Definition: XrdSfsAio.hh:65
const char * TIdent
Definition: XrdSfsAio.hh:67
virtual void doneRead()=0
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
virtual void doneWrite()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
@ hex1
Definition: XrdSysTrace.hh:42