41 #include <arpa/inet.h>
63 memset( ptr, 0,
sizeof(
aiocb ) );
67 env->
GetInt(
"AioSignal", useSignals );
71 static SignalHandlerRegistrator registrator;
88 void SetWrite(
int fd,
size_t offset,
size_t size,
const void *buffer )
91 cb->aio_offset = offset;
92 cb->aio_buf =
const_cast<void*
>( buffer );
93 cb->aio_nbytes = size;
94 opcode = Opcode::Write;
97 void SetRead(
int fd,
size_t offset,
size_t size,
void *buffer )
100 cb->aio_offset = offset;
101 cb->aio_buf = buffer;
102 cb->aio_nbytes = size;
103 opcode = Opcode::Read;
106 void SetFsync(
int fd )
112 static void ThreadHandler( sigval arg )
114 std::unique_ptr<AioCtx> me(
reinterpret_cast<AioCtx*
>( arg.sival_ptr ) );
115 Handler( std::move( me ) );
118 static void SignalHandler(
int sig, siginfo_t *info,
void *ucontext )
120 std::unique_ptr<AioCtx> me(
reinterpret_cast<AioCtx*
>( info->si_value.sival_ptr ) );
121 Handler( std::move( me ) );
131 struct SignalHandlerRegistrator
133 SignalHandlerRegistrator()
135 struct sigaction newact, oldact;
136 newact.sa_sigaction = SignalHandler;
137 sigemptyset( &newact.sa_mask );
138 newact.sa_flags = SA_SIGINFO;
139 int rc = sigaction( SIGUSR1, &newact, &oldact );
141 throw std::runtime_error(
XrdSysE2T( errno ) );
145 static void Handler( std::unique_ptr<AioCtx> me )
147 if( me->opcode == Opcode::None )
150 using namespace XrdCl;
152 int rc = aio_return( me->cb.get() );
155 int errcode = aio_error( me->cb.get() );
156 Log *log = DefaultEnv::GetLog();
159 QueueTask( error, 0, me->hosts, me->handler );
165 if( me->opcode == Opcode::Read )
168 const_cast<void*
>( me->cb->aio_buf ) );
173 QueueTask(
new XRootDStatus(), resp, me->hosts, me->handler );
177 static const char* GetErrMsg( Opcode opcode )
179 static const char readmsg[] =
"Read: failed %s";
180 static const char writemsg[] =
"Write: failed %s";
181 static const char syncmsg[] =
"Sync: failed %s";
185 case Opcode::Read:
return readmsg;
187 case Opcode::Write:
return writemsg;
198 using namespace XrdCl;
204 if( syncHandler || DefaultEnv::GetPostMaster() == nullptr )
210 JobManager *jmngr = DefaultEnv::GetPostMaster()->GetJobManager();
216 std::unique_ptr<aiocb> cb;
230 LocalFileHandler::LocalFileHandler() :
261 uint16_t flags = ntohs( request->
options );
262 uint16_t mode = ntohs( request->
mode );
272 if(
close( fd ) == -1 )
292 if(
fstat( fd, &ssp ) == -1 )
298 std::ostringstream data;
299 data << ssp.st_dev <<
" " << ssp.st_size <<
" " << ssp.st_mode <<
" "
306 log->
Error(
FileMsg,
"Stat: ParseServerResponse failed." );
313 resp->
Set( statInfo );
323 #if defined(__APPLE__)
326 if( (
read =
pread( fd, buffer, size, offset ) ) == -1 )
337 AioCtx *ctx =
new AioCtx( pHostList, handler );
338 ctx->SetRead( fd, offset, size, buffer );
340 int rc = aio_read( *ctx );
364 #if defined(__APPLE__)
365 ssize_t ret =
lseek( fd, offset, SEEK_SET );
369 ssize_t ret = preadv( fd,
iov, iovcnt, offset );
379 uint64_t choff = offset;
381 for(
int i = 0; i < iovcnt; ++i )
383 uint32_t chlen =
iov[i].iov_len;
384 if( chlen > left ) chlen = left;
385 info->
GetChunks().emplace_back( choff, chlen,
iov[i].iov_base);
400 #if defined(__APPLE__)
401 const char *buff =
reinterpret_cast<const char*
>( buffer );
402 size_t bytesWritten = 0;
403 while( bytesWritten < size )
405 ssize_t ret =
pwrite( fd, buff, size, offset );
419 AioCtx *ctx =
new AioCtx( pHostList, handler );
420 ctx->SetWrite( fd, offset, size, buffer );
422 int rc = aio_write( *ctx );
441 #if defined(__APPLE__)
453 AioCtx *ctx =
new AioCtx( pHostList, handler );
455 int rc = aio_fsync( O_SYNC, *ctx );
475 log->
Error(
FileMsg,
"Truncate: failed, file descriptor: %i, %s", fd,
491 size_t totalSize = 0;
492 bool useBuffer( buffer );
494 for(
auto itr = chunks.begin(); itr != chunks.end(); ++itr )
498 buffer = chunk.buffer;
499 ssize_t bytesRead =
pread( fd, buffer, chunk.length,
504 log->
Error(
FileMsg,
"VectorRead: failed, file descriptor: %i, %s",
509 totalSize += bytesRead;
510 info->GetChunks().push_back(
ChunkInfo( chunk.offset, bytesRead, buffer ) );
512 buffer =
reinterpret_cast<char*
>( buffer ) + bytesRead;
515 info->SetSize( totalSize );
517 resp->
Set( info.release() );
528 for(
auto itr = chunks.begin(); itr != chunks.end(); ++itr )
531 ssize_t bytesWritten =
pwrite( fd, chunk.buffer, chunk.length,
533 if( bytesWritten < 0 )
536 log->
Error(
FileMsg,
"VectorWrite: failed, file descriptor: %i, %s",
554 size_t iovcnt = chunks->size();
557 for(
size_t i = 0; i < iovcnt; ++i )
559 iovcp[i].iov_base = (*chunks)[i].buffer;
560 iovcp[i].iov_len = (*chunks)[i].length;
561 size += (*chunks)[i].length;
563 iovec *iovptr = iovcp;
565 ssize_t bytesWritten = 0;
566 while( bytesWritten < size )
569 ssize_t ret =
lseek( fd, offset, SEEK_SET );
571 ret =
writev( fd, iovptr, iovcnt );
573 ssize_t ret = pwritev( fd, iovptr, iovcnt, offset );
586 if(
size_t( ret ) > iovptr[0].iov_len )
588 ret -= iovptr[0].iov_len;
594 iovptr[0].iov_len -= ret;
595 iovptr[0].iov_base =
reinterpret_cast<char*
>( iovptr[0].iov_base ) + ret;
630 std::vector<XAttrStatus> response;
632 auto itr = attrs.begin();
633 for( ; itr != attrs.end(); ++itr )
635 std::string name = std::get<xattr_name>( *itr );
636 std::string value = std::get<xattr_value>( *itr );
637 int err = xattr->
Set( name.c_str(), value.c_str(), value.size(), 0, fd );
645 resp->
Set(
new std::vector<XAttrStatus>( std::move( response ) ) );
658 std::vector<XAttr> response;
660 auto itr = attrs.begin();
661 for( ; itr != attrs.end(); ++itr )
663 std::string name = *itr;
664 std::unique_ptr<char[]> buffer;
666 int size = xattr->
Get( name.c_str(), 0, 0, 0, fd );
670 response.push_back(
XAttr( *itr,
"", status ) );
673 buffer.reset(
new char[size] );
674 int ret = xattr->
Get( name.c_str(), buffer.get(), size, 0, fd );
680 value.append( buffer.get(), ret );
684 response.push_back(
XAttr( *itr, value, status ) );
688 resp->
Set(
new std::vector<XAttr>( std::move( response ) ) );
701 std::vector<XAttrStatus> response;
703 auto itr = attrs.begin();
704 for( ; itr != attrs.end(); ++itr )
706 std::string name = *itr;
707 int err = xattr->
Del( name.c_str(), 0, fd );
715 resp->
Set(
new std::vector<XAttrStatus>( std::move( response ) ) );
727 std::vector<XAttr> response;
730 int err = xattr->
List( &alist, 0, fd, 1 );
741 std::string name( ptr->
Name, ptr->
Nlen );
742 int vlen = ptr->
Vlen;
745 std::unique_ptr<char[]> buffer(
new char[vlen] );
746 int ret = xattr->
Get( name.c_str(),
747 buffer.get(), vlen, 0, fd );
749 std::string value = ret >= 0 ? std::string( buffer.get(), ret ) :
753 response.push_back(
XAttr( name, value, status ) );
755 xattr->
Free( alist );
758 resp->
Set(
new std::vector<XAttr>( std::move( response ) ) );
794 size_t pos = path.rfind(
'/' );
795 while( pos != std::string::npos && pos != 0 )
797 std::string tmp = path.substr( 0, pos );
799 int rc =
lstat( tmp.c_str(), &st );
801 if( errno != ENOENT )
803 pos = path.rfind(
'/', pos - 1 );
806 pos = path.find(
'/', pos + 1 );
807 while( pos != std::string::npos && pos != 0 )
809 std::string tmp = path.substr( 0, pos );
810 if(
mkdir( tmp.c_str(), 0755 ) )
812 if( errno != EEXIST )
815 pos = path.find(
'/', pos + 1 );
820 XRootDStatus LocalFileHandler::OpenImpl(
const std::string &url, uint16_t flags,
829 if( !fileUrl.IsValid() )
832 if( fileUrl.GetHostName() !=
"localhost" )
835 std::string path = fileUrl.GetPath();
840 uint16_t openflags = 0;
842 openflags |= O_CREAT | O_EXCL;
844 openflags |= O_WRONLY;
848 openflags |= O_RDONLY;
850 openflags |= O_CREAT | O_TRUNC;
857 log->
Error(
FileMsg,
"Open MkdirPath failed %s: %s", path.c_str(),
866 if( mode == Access::Mode::None)
868 fd = XrdSysFD_Open( path.c_str(), openflags, mode );
871 log->
Error(
FileMsg,
"Open: open failed: %s: %s", path.c_str(),
881 if(
fstat( fd, &ssp ) == -1 )
888 std::ostringstream data;
889 data << ssp.st_dev <<
" " << ssp.st_size <<
" " << ssp.st_mode <<
" "
895 log->
Error(
FileMsg,
"Open: ParseServerResponse failed." );
901 pHostList.push_back(
HostInfo( pUrl,
false ) );
907 resp->
Set( openInfo );
932 std::vector<std::string> attrs;
934 for(
kXR_char i = 0; i < numattr; ++i )
941 size_t len = strlen( body );
943 attrs.push_back( std::string( body, len ) );
956 std::vector<xattr_t> attrs;
958 for(
kXR_char i = 0; i < numattr; ++i )
967 attrs.push_back( std::make_tuple( std::string( name ), std::string() ) );
968 bodylen -= strlen( name ) + 1;
972 for(
kXR_char i = 0; i < numattr; ++i )
984 std::get<xattr_value>( attrs[i] ) = value;
1034 auto &chunkList = *sendParams.
chunkList;
1035 struct iovec
iov[chunkList.size()];
1036 for(
size_t i = 0; i < chunkList.size() ; ++i )
1038 iov[i].iov_base = chunkList[i].buffer;
1039 iov[i].iov_len = chunkList[i].length;
1041 return ReadV( chunkList.front().offset,
iov, chunkList.
size(),
1042 handler, sendParams.
timeout );
1047 handler, sendParams.
timeout );
1053 if( chunks->size() == 1 )
1057 chunks->front().buffer, handler,
1062 handler, sendParams.
timeout );
1084 handler, sendParams.
timeout );
struct ClientTruncateRequest truncate
struct ClientFattrRequest fattr
struct ClientOpenRequest open
struct ClientRequestHdr header
struct ClientReadRequest read
struct ClientWriteRequest write
ssize_t pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
off_t lseek(int fildes, off_t offset, int whence)
int stat(const char *path, struct stat *buf)
int ftruncate(int fildes, off_t offset)
ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset)
int fstat(int fildes, struct stat *buf)
int lstat(const char *path, struct stat *buf)
int mkdir(const char *path, mode_t mode)
ssize_t readv(int fildes, const struct iovec *iov, int iovcnt)
ssize_t writev(int fildes, const struct iovec *iov, int iovcnt)
ssize_t read(int fildes, void *buf, size_t nbyte)
struct sigevent aio_sigevent
const char * XrdSysE2T(int errcode)
static int mapError(int rc)
void Set(Type object, bool own=true)
Binary blob representation.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus Truncate(uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorRead(const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Stat(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ReadV(uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus QueueTask(XRootDStatus *st, AnyObject *obj, ResponseHandler *handler)
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus MkdirPath(const std::string &path)
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus WriteV(uint64_t offset, ChunkList *chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus DelXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Fcntl(const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Visa(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorWrite(const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus GetXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
void Error(uint64_t topic, const char *format,...)
Report an error.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
uint16_t GetVirtReqID() const
Get virtual request ID for the message.
Information returned by file open operation.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
bool ParseServerResponse(const char *data)
Parse server response and fill up the object.
Synchronize the response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Handle the response.
std::string GetURL() const
Get the URL.
void SetSize(uint32_t size)
Set size.
ChunkList & GetChunks()
Get chunks.
char Name[1]
Start of the name (size of struct is dynamic)
int Vlen
The length of the attribute value;.
virtual int List(AList **aPL, const char *Path, int fd=-1, int getSz=0)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
int Nlen
The length of the attribute name that follows.
virtual void Free(AList *aPL)=0
virtual int Del(const char *Aname, const char *Path, int fd=-1)=0
AList * Next
-> next element.
const uint16_t stError
An error occurred that could potentially be retried.
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t stOK
Everything went OK.
const uint16_t errOSError
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
const uint16_t errInvalidArgs
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errNotSupported
const uint16_t errLocalError
const int DefaultAioSignal
static char * NVecRead(char *buffer, kXR_unt16 &rc)
static char * VVecRead(char *buffer, kXR_int32 &len)
Describe a data chunk for vector read.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
uint32_t errNo
Errno, if any.
Extended attribute operation status.
Extended attributes with status.