35 #ifdef _POSIX_ASYNCHRONOUS_IO
62 #undef _POSIX_ASYNCHRONOUS_IO
80 int XrdOssFile::AioFailure = 0;
82 #ifdef _POSIX_ASYNCHRONOUS_IO
84 const int OSS_AIO_READ_DONE = SIGRTMAX-1;
85 const int OSS_AIO_WRITE_DONE = SIGRTMAX;
87 #define OSS_AIO_READ_DONE SIGUSR1
88 #define OSS_AIO_WRITE_DONE SIGUSR2
105 #ifdef _POSIX_ASYNCHRONOUS_IO
117 if (!(rc = aio_fsync(O_SYNC, &aiop->
sfsAio)))
return 0;
118 if (errno != EAGAIN && errno != ENOSYS)
return -errno;
123 {
int fcnt = AioFailure++;
124 if ((fcnt & 0x3ff) == 1)
OssEroute.
Emsg(
"aio", errno,
"fsync async");
157 #ifdef _POSIX_ASYNCHRONOUS_IO
173 if (!(rc = aio_read(&aiop->
sfsAio)))
return 0;
174 if (errno != EAGAIN && errno != ENOSYS)
return -errno;
179 {
int fcnt = AioFailure++;
180 if ((fcnt & 0x3ff) == 1)
OssEroute.
Emsg(
"aio", errno,
"read async");
214 #ifdef _POSIX_ASYNCHRONOUS_IO
230 if (!(rc = aio_write(&aiop->
sfsAio)))
return 0;
231 if (errno != EAGAIN && errno != ENOSYS)
return -errno;
236 {
int fcnt = AioFailure++;
237 if ((fcnt & 0x3ff) == 1)
OssEroute.
Emsg(
"Write",errno,
"write async");
263 #if defined(_POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI)
266 siginfo_t *XrdOssAioInfoR;
267 siginfo_t *XrdOssAioInfoW;
268 extern "C" {
extern void XrdOssAioRSH(
int, siginfo_t *,
void *);}
269 extern "C" {
extern void XrdOssAioWSH(
int, siginfo_t *,
void *);}
283 #if defined(_POSIX_ASYNCHRONOUS_IO)
297 sa.sa_sigaction = XrdOssAioRSH;
298 sa.sa_flags = SA_SIGINFO;
299 sigemptyset(&sa.sa_mask);
300 sigaddset(&sa.sa_mask, OSS_AIO_WRITE_DONE);
301 if (sigaction(OSS_AIO_READ_DONE, &sa, NULL) < 0)
302 {
OssEroute.
Emsg(
"AioInit", errno,
"creating AIO read signal handler; "
303 "AIO support terminated.");
307 sa.sa_sigaction = XrdOssAioWSH;
308 sa.sa_flags = SA_SIGINFO;
309 sigemptyset(&sa.sa_mask);
310 sigaddset(&sa.sa_mask, OSS_AIO_READ_DONE);
311 if (sigaction(OSS_AIO_WRITE_DONE, &sa, NULL) < 0)
312 {
OssEroute.
Emsg(
"AioInit", errno,
"creating AIO write signal handler; "
313 "AIO support terminated.");
323 (
void *)(&OSS_AIO_READ_DONE))) < 0)
324 OssEroute.
Emsg(
"AioInit", retc,
"creating AIO read signal thread; "
325 "AIO support terminated.");
327 else {
DEBUG(
"started AIO read signal thread.");
329 else {
DEBUG(
"started AIO read signal thread; tid=" <<(
unsigned int)tid);
332 (
void *)(&OSS_AIO_WRITE_DONE))) < 0)
333 OssEroute.
Emsg(
"AioInit", retc,
"creating AIO write signal thread; "
334 "AIO support terminated.");
336 else {
DEBUG(
"started AIO write signal thread.");
338 else {
DEBUG(
"started AIO write signal thread; tid=" <<(
unsigned int)tid);
358 #ifdef _POSIX_ASYNCHRONOUS_IO
360 int mySignum = *((
int *)mySigarg);
361 const char *sigType = (mySignum == OSS_AIO_READ_DONE ?
"read" :
"write");
362 const int isRead = (mySignum == OSS_AIO_READ_DONE);
369 extern int sigwaitinfo(
const sigset_t *set, siginfo_t *info);
370 extern siginfo_t *XrdOssAioInfoR;
371 extern siginfo_t *XrdOssAioInfoW;
377 if (isRead) XrdOssAioInfoR = &myInfo;
378 else XrdOssAioInfoW = &myInfo;
382 sigfillset(&mySigset);
383 sigdelset(&mySigset, mySignum);
388 sigemptyset(&mySigset);
389 sigaddset(&mySigset, mySignum);
394 do {
do {numsig = sigwaitinfo((
const sigset_t *)&mySigset, &myInfo);}
395 while (numsig < 0 && errno == EINTR);
397 {
OssEroute.
Emsg(
"AioWait",errno,sigType,
"wait for AIO signal");
401 if (numsig != mySignum || myInfo.si_code != SI_ASYNCIO)
403 sprintf(buff,
"%d %d", myInfo.si_code, numsig);
404 OssEroute.
Emsg(
"AioWait",
"received unexpected signal", buff);
409 aiop = (
XrdSfsAio *)myInfo.si_value.sigval_ptr;
411 aiop = (
XrdSfsAio *)myInfo.si_value.sival_ptr;
414 while ((rc = aio_error(&aiop->
sfsAio)) == EINPROGRESS) {}
415 retval = (ssize_t)aio_return(&aiop->
sfsAio);
417 DEBUG(sigType <<
" completed for " <<aiop->
TIdent <<
"; rc=" <<rc
418 <<
" result=" <<retval <<
" aiocb=" <<
Xrd::hex1 <<aiop);
420 if (retval < 0) aiop->
Result = -rc;
421 else aiop->
Result = retval;
430 #if defined( _POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI)
439 int sigwaitinfo(
const sigset_t *set, siginfo_t *info)
445 return info->si_signo;
457 void XrdOssAioRSH(
int signum, siginfo_t *info,
void *ucontext)
459 extern siginfo_t *XrdOssAioInfoR;
465 XrdOssAioInfoR->si_signo = info->si_signo;
466 XrdOssAioInfoR->si_errno = info->si_errno;
467 XrdOssAioInfoR->si_code = info->si_code;
469 XrdOssAioInfoR->si_value.sigval_ptr = info->si_addr;
471 XrdOssAioInfoR->si_value.sival_ptr = info->si_value.sival_ptr;
485 void XrdOssAioWSH(
int signum, siginfo_t *info,
void *ucontext)
487 extern siginfo_t *XrdOssAioInfoW;
493 XrdOssAioInfoW->si_signo = info->si_signo;
494 XrdOssAioInfoW->si_errno = info->si_errno;
495 XrdOssAioInfoW->si_code = info->si_code;
497 XrdOssAioInfoW->si_value.sigval_ptr = info->si_addr;
499 XrdOssAioInfoW->si_value.sival_ptr = info->si_value.sival_ptr;
void * XrdOssAioWait(void *mySigarg)
struct sigevent aio_sigevent
ssize_t Read(off_t, size_t)
ssize_t Write(const void *, off_t, size_t)
virtual void doneRead()=0
virtual void doneWrite()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)