XRootD
XrdClLocalFileHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
3 // Author: Paul-Niklas Kramp <p.n.kramp@gsi.de>
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // XRootD is free software: you can redistribute it and/or modify
7 // it under the terms of the GNU Lesser General Public License as published by
8 // the Free Software Foundation, either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // XRootD is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public License
17 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
18 //------------------------------------------------------------------------------
20 #include "XrdCl/XrdClConstants.hh"
21 #include "XrdCl/XrdClPostMaster.hh"
22 #include "XrdCl/XrdClURL.hh"
24 #include "XrdCl/XrdClFileSystem.hh"
25 #include "XProtocol/XProtocol.hh"
26 
27 #include "XrdSys/XrdSysE2T.hh"
28 #include "XrdSys/XrdSysXAttr.hh"
29 #include "XrdSys/XrdSysFAttr.hh"
30 #include "XrdSys/XrdSysFD.hh"
31 
32 #include <string>
33 #include <memory>
34 #include <stdexcept>
35 
36 #include <fcntl.h>
37 #include <cstdio>
38 #include <cstdlib>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <arpa/inet.h>
42 #include <aio.h>
43 
44 namespace
45 {
46 
47  class AioCtx
48  {
49  public:
50 
51  enum Opcode
52  {
53  None,
54  Read,
55  Write,
56  Sync
57  };
58 
59  AioCtx( const XrdCl::HostList &hostList, XrdCl::ResponseHandler *handler ) :
60  opcode( None ), hosts( new XrdCl::HostList( hostList ) ), handler( handler )
61  {
62  aiocb *ptr = new aiocb();
63  memset( ptr, 0, sizeof( aiocb ) );
64 
66  int useSignals = XrdCl::DefaultAioSignal;
67  env->GetInt( "AioSignal", useSignals );
68 
69  if( useSignals )
70  {
71  static SignalHandlerRegistrator registrator; // registers the signal handler
72 
73  ptr->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
74  ptr->aio_sigevent.sigev_signo = SIGUSR1;
75  }
76  else
77  {
78  ptr->aio_sigevent.sigev_notify = SIGEV_THREAD;
79  ptr->aio_sigevent.sigev_notify_function = ThreadHandler;
80  }
81 
82  ptr->aio_sigevent.sigev_value.sival_ptr = this;
83 
84  cb.reset( ptr );
85  }
86 
87 
88  void SetWrite( int fd, size_t offset, size_t size, const void *buffer )
89  {
90  cb->aio_fildes = fd;
91  cb->aio_offset = offset;
92  cb->aio_buf = const_cast<void*>( buffer );
93  cb->aio_nbytes = size;
94  opcode = Opcode::Write;
95  }
96 
97  void SetRead( int fd, size_t offset, size_t size, void *buffer )
98  {
99  cb->aio_fildes = fd;
100  cb->aio_offset = offset;
101  cb->aio_buf = buffer;
102  cb->aio_nbytes = size;
103  opcode = Opcode::Read;
104  }
105 
106  void SetFsync( int fd )
107  {
108  cb->aio_fildes = fd;
109  opcode = Opcode::Sync;
110  }
111 
112  static void ThreadHandler( sigval arg )
113  {
114  std::unique_ptr<AioCtx> me( reinterpret_cast<AioCtx*>( arg.sival_ptr ) );
115  Handler( std::move( me ) );
116  }
117 
118  static void SignalHandler( int sig, siginfo_t *info, void *ucontext )
119  {
120  std::unique_ptr<AioCtx> me( reinterpret_cast<AioCtx*>( info->si_value.sival_ptr ) );
121  Handler( std::move( me ) );
122  }
123 
124  operator aiocb*()
125  {
126  return cb.get();
127  }
128 
129  private:
130 
131  struct SignalHandlerRegistrator
132  {
133  SignalHandlerRegistrator()
134  {
135  struct sigaction newact, oldact;
136  newact.sa_sigaction = SignalHandler;
137  sigemptyset( &newact.sa_mask );
138  newact.sa_flags = SA_SIGINFO;
139  int rc = sigaction( SIGUSR1, &newact, &oldact );
140  if( rc < 0 )
141  throw std::runtime_error( XrdSysE2T( errno ) );
142  }
143  };
144 
145  static void Handler( std::unique_ptr<AioCtx> me )
146  {
147  if( me->opcode == Opcode::None )
148  return;
149 
150  using namespace XrdCl;
151 
152  int rc = aio_return( me->cb.get() );
153  if( rc < 0 )
154  {
155  int errcode = aio_error( me->cb.get() );
156  Log *log = DefaultEnv::GetLog();
157  log->Error( FileMsg, GetErrMsg( me->opcode ), XrdSysE2T( errcode ) );
158  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errcode ) ;
159  QueueTask( error, 0, me->hosts, me->handler );
160  }
161  else
162  {
163  AnyObject *resp = 0;
164 
165  if( me->opcode == Opcode::Read )
166  {
167  ChunkInfo *chunk = new ChunkInfo( me->cb->aio_offset, rc,
168  const_cast<void*>( me->cb->aio_buf ) );
169  resp = new AnyObject();
170  resp->Set( chunk );
171  }
172 
173  QueueTask( new XRootDStatus(), resp, me->hosts, me->handler );
174  }
175  }
176 
177  static const char* GetErrMsg( Opcode opcode )
178  {
179  static const char readmsg[] = "Read: failed %s";
180  static const char writemsg[] = "Write: failed %s";
181  static const char syncmsg[] = "Sync: failed %s";
182 
183  switch( opcode )
184  {
185  case Opcode::Read: return readmsg;
186 
187  case Opcode::Write: return writemsg;
188 
189  case Opcode::Sync: return syncmsg;
190 
191  default: return 0;
192  }
193  }
194 
195  static void QueueTask( XrdCl::XRootDStatus *status, XrdCl::AnyObject *resp,
196  XrdCl::HostList *hosts, XrdCl::ResponseHandler *handler )
197  {
198  using namespace XrdCl;
199 
200  // if it is simply the sync handler we can release the semaphore
201  // and return there is no need to execute this in the thread-pool
202  SyncResponseHandler *syncHandler =
203  dynamic_cast<SyncResponseHandler*>( handler );
204  if( syncHandler || DefaultEnv::GetPostMaster() == nullptr )
205  {
206  syncHandler->HandleResponse( status, resp );
207  }
208  else
209  {
210  JobManager *jmngr = DefaultEnv::GetPostMaster()->GetJobManager();
211  LocalFileTask *task = new LocalFileTask( status, resp, hosts, handler );
212  jmngr->QueueJob( task );
213  }
214  }
215 
216  std::unique_ptr<aiocb> cb;
217  Opcode opcode;
218  XrdCl::HostList *hosts;
219  XrdCl::ResponseHandler *handler;
220  };
221 
222 };
223 
224 namespace XrdCl
225 {
226 
227  //------------------------------------------------------------------------
228  // Constructor
229  //------------------------------------------------------------------------
230  LocalFileHandler::LocalFileHandler() :
231  fd( -1 )
232  {
233  }
234 
235  //------------------------------------------------------------------------
236  // Destructor
237  //------------------------------------------------------------------------
239  {
240 
241  }
242 
243  //------------------------------------------------------------------------
244  // Open
245  //------------------------------------------------------------------------
246  XRootDStatus LocalFileHandler::Open( const std::string& url, uint16_t flags,
247  uint16_t mode, ResponseHandler* handler, uint16_t timeout )
248  {
249  AnyObject *resp = 0;
250  XRootDStatus st = OpenImpl( url, flags, mode, resp );
251  if( !st.IsOK() && st.code != errLocalError )
252  return st;
253 
254  return QueueTask( new XRootDStatus( st ), resp, handler );
255  }
256 
257  XRootDStatus LocalFileHandler::Open( const URL *url, const Message *req, AnyObject *&resp )
258  {
259  const ClientOpenRequest* request =
260  reinterpret_cast<const ClientOpenRequest*>( req->GetBuffer() );
261  uint16_t flags = ntohs( request->options );
262  uint16_t mode = ntohs( request->mode );
263  return OpenImpl( url->GetURL(), flags, mode, resp );
264  }
265 
266  //------------------------------------------------------------------------
267  // Close
268  //------------------------------------------------------------------------
270  uint16_t timeout )
271  {
272  if( close( fd ) == -1 )
273  {
274  Log *log = DefaultEnv::GetLog();
275  log->Error( FileMsg, "Close: file fd: %i %s", fd, XrdSysE2T( errno ) );
276  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
277  return QueueTask( error, 0, handler );
278  }
279 
280  return QueueTask( new XRootDStatus(), 0, handler );
281  }
282 
283  //------------------------------------------------------------------------
284  // Stat
285  //------------------------------------------------------------------------
287  uint16_t timeout )
288  {
289  Log *log = DefaultEnv::GetLog();
290 
291  struct stat ssp;
292  if( fstat( fd, &ssp ) == -1 )
293  {
294  log->Error( FileMsg, "Stat: failed fd: %i %s", fd, XrdSysE2T( errno ) );
295  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
296  return QueueTask( error, 0, handler );
297  }
298  std::ostringstream data;
299  data << ssp.st_dev << " " << ssp.st_size << " " << ssp.st_mode << " "
300  << ssp.st_mtime;
301  log->Debug( FileMsg, "%s", data.str().c_str() );
302 
303  StatInfo *statInfo = new StatInfo();
304  if( !statInfo->ParseServerResponse( data.str().c_str() ) )
305  {
306  log->Error( FileMsg, "Stat: ParseServerResponse failed." );
307  delete statInfo;
309  0, handler );
310  }
311 
312  AnyObject *resp = new AnyObject();
313  resp->Set( statInfo );
314  return QueueTask( new XRootDStatus(), resp, handler );
315  }
316 
317  //------------------------------------------------------------------------
318  // Read
319  //------------------------------------------------------------------------
320  XRootDStatus LocalFileHandler::Read( uint64_t offset, uint32_t size,
321  void* buffer, ResponseHandler* handler, uint16_t timeout )
322  {
323 #if defined(__APPLE__)
324  Log *log = DefaultEnv::GetLog();
325  int read = 0;
326  if( ( read = pread( fd, buffer, size, offset ) ) == -1 )
327  {
328  log->Error( FileMsg, "Read: failed %s", XrdSysE2T( errno ) );
329  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
330  return QueueTask( error, 0, handler );
331  }
332  ChunkInfo *chunk = new ChunkInfo( offset, read, buffer );
333  AnyObject *resp = new AnyObject();
334  resp->Set( chunk );
335  return QueueTask( new XRootDStatus(), resp, handler );
336 #else
337  AioCtx *ctx = new AioCtx( pHostList, handler );
338  ctx->SetRead( fd, offset, size, buffer );
339 
340  int rc = aio_read( *ctx );
341 
342  if( rc < 0 )
343  {
344  Log *log = DefaultEnv::GetLog();
345  log->Error( FileMsg, "Read: failed %s", XrdSysE2T( errno ) );
346  return XRootDStatus( stError, errLocalError, errno );
347  }
348 
349  return XRootDStatus();
350 #endif
351  }
352 
353 
354  //------------------------------------------------------------------------
355  // ReadV
356  //------------------------------------------------------------------------
358  struct iovec *iov,
359  int iovcnt,
360  ResponseHandler *handler,
361  uint16_t timeout )
362  {
363  Log *log = DefaultEnv::GetLog();
364 #if defined(__APPLE__)
365  ssize_t ret = lseek( fd, offset, SEEK_SET );
366  if( ret >= 0 )
367  ret = readv( fd, iov, iovcnt );
368 #else
369  ssize_t ret = preadv( fd, iov, iovcnt, offset );
370 #endif
371  if( ret == -1 )
372  {
373  log->Error( FileMsg, "ReadV: failed %s", XrdSysE2T( errno ) );
374  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
375  return QueueTask( error, 0, handler );
376  }
377  VectorReadInfo *info = new VectorReadInfo();
378  info->SetSize( ret );
379  uint64_t choff = offset;
380  uint32_t left = ret;
381  for( int i = 0; i < iovcnt; ++i )
382  {
383  uint32_t chlen = iov[i].iov_len;
384  if( chlen > left ) chlen = left;
385  info->GetChunks().emplace_back( choff, chlen, iov[i].iov_base);
386  left -= chlen;
387  choff += chlen;
388  }
389  AnyObject *resp = new AnyObject();
390  resp->Set( info );
391  return QueueTask( new XRootDStatus(), resp, handler );
392  }
393 
394  //------------------------------------------------------------------------
395  // Write
396  //------------------------------------------------------------------------
397  XRootDStatus LocalFileHandler::Write( uint64_t offset, uint32_t size,
398  const void* buffer, ResponseHandler* handler, uint16_t timeout )
399  {
400 #if defined(__APPLE__)
401  const char *buff = reinterpret_cast<const char*>( buffer );
402  size_t bytesWritten = 0;
403  while( bytesWritten < size )
404  {
405  ssize_t ret = pwrite( fd, buff, size, offset );
406  if( ret < 0 )
407  {
408  Log *log = DefaultEnv::GetLog();
409  log->Error( FileMsg, "Write: failed %s", XrdSysE2T( errno ) );
410  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
411  return QueueTask( error, 0, handler );
412  }
413  offset += ret;
414  buff += ret;
415  bytesWritten += ret;
416  }
417  return QueueTask( new XRootDStatus(), 0, handler );
418 #else
419  AioCtx *ctx = new AioCtx( pHostList, handler );
420  ctx->SetWrite( fd, offset, size, buffer );
421 
422  int rc = aio_write( *ctx );
423 
424  if( rc < 0 )
425  {
426  Log *log = DefaultEnv::GetLog();
427  log->Error( FileMsg, "Write: failed %s", XrdSysE2T( errno ) );
428  return XRootDStatus( stError, errLocalError, errno );
429  }
430 
431  return XRootDStatus();
432 #endif
433  }
434 
435  //------------------------------------------------------------------------
436  // Sync
437  //------------------------------------------------------------------------
439  uint16_t timeout )
440  {
441 #if defined(__APPLE__)
442  if( fsync( fd ) )
443  {
444  Log *log = DefaultEnv::GetLog();
445  log->Error( FileMsg, "Sync: failed %s", XrdSysE2T( errno ) );
447  XProtocol::mapError( errno ),
448  XrdSysE2T( errno ) );
449  return QueueTask( error, 0, handler );
450  }
451  return QueueTask( new XRootDStatus(), 0, handler );
452 #else
453  AioCtx *ctx = new AioCtx( pHostList, handler );
454  ctx->SetFsync( fd );
455  int rc = aio_fsync( O_SYNC, *ctx );
456  if( rc < 0 )
457  {
458  Log *log = DefaultEnv::GetLog();
459  log->Error( FileMsg, "Sync: failed %s", XrdSysE2T( errno ) );
460  return XRootDStatus( stError, errLocalError, errno );
461  }
462 #endif
463  return XRootDStatus();
464  }
465 
466  //------------------------------------------------------------------------
467  // Truncate
468  //------------------------------------------------------------------------
470  ResponseHandler* handler, uint16_t timeout )
471  {
472  if( ftruncate( fd, size ) )
473  {
474  Log *log = DefaultEnv::GetLog();
475  log->Error( FileMsg, "Truncate: failed, file descriptor: %i, %s", fd,
476  XrdSysE2T( errno ) );
477  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
478  return QueueTask( error, 0, handler );
479  }
480 
481  return QueueTask( new XRootDStatus( stOK ), 0, handler );
482  }
483 
484  //------------------------------------------------------------------------
485  // VectorRead
486  //------------------------------------------------------------------------
488  void* buffer, ResponseHandler* handler, uint16_t timeout )
489  {
490  std::unique_ptr<VectorReadInfo> info( new VectorReadInfo() );
491  size_t totalSize = 0;
492  bool useBuffer( buffer );
493 
494  for( auto itr = chunks.begin(); itr != chunks.end(); ++itr )
495  {
496  auto &chunk = *itr;
497  if( !useBuffer )
498  buffer = chunk.buffer;
499  ssize_t bytesRead = pread( fd, buffer, chunk.length,
500  chunk.offset );
501  if( bytesRead < 0 )
502  {
503  Log *log = DefaultEnv::GetLog();
504  log->Error( FileMsg, "VectorRead: failed, file descriptor: %i, %s",
505  fd, XrdSysE2T( errno ) );
506  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
507  return QueueTask( error, 0, handler );
508  }
509  totalSize += bytesRead;
510  info->GetChunks().push_back( ChunkInfo( chunk.offset, bytesRead, buffer ) );
511  if( useBuffer )
512  buffer = reinterpret_cast<char*>( buffer ) + bytesRead;
513  }
514 
515  info->SetSize( totalSize );
516  AnyObject *resp = new AnyObject();
517  resp->Set( info.release() );
518  return QueueTask( new XRootDStatus(), resp, handler );
519  }
520 
521  //------------------------------------------------------------------------
522  // VectorWrite
523  //------------------------------------------------------------------------
525  ResponseHandler *handler, uint16_t timeout )
526  {
527 
528  for( auto itr = chunks.begin(); itr != chunks.end(); ++itr )
529  {
530  auto &chunk = *itr;
531  ssize_t bytesWritten = pwrite( fd, chunk.buffer, chunk.length,
532  chunk.offset );
533  if( bytesWritten < 0 )
534  {
535  Log *log = DefaultEnv::GetLog();
536  log->Error( FileMsg, "VectorWrite: failed, file descriptor: %i, %s",
537  fd, XrdSysE2T( errno ) );
538  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
539  return QueueTask( error, 0, handler );
540  }
541  }
542 
543  return QueueTask( new XRootDStatus(), 0, handler );
544  }
545 
546  //------------------------------------------------------------------------
547  // WriteV
548  //------------------------------------------------------------------------
550  ChunkList *chunks,
551  ResponseHandler *handler,
552  uint16_t timeout )
553  {
554  size_t iovcnt = chunks->size();
555  iovec iovcp[iovcnt];
556  ssize_t size = 0;
557  for( size_t i = 0; i < iovcnt; ++i )
558  {
559  iovcp[i].iov_base = (*chunks)[i].buffer;
560  iovcp[i].iov_len = (*chunks)[i].length;
561  size += (*chunks)[i].length;
562  }
563  iovec *iovptr = iovcp;
564 
565  ssize_t bytesWritten = 0;
566  while( bytesWritten < size )
567  {
568 #ifdef __APPLE__
569  ssize_t ret = lseek( fd, offset, SEEK_SET );
570  if( ret >= 0 )
571  ret = writev( fd, iovptr, iovcnt );
572 #else
573  ssize_t ret = pwritev( fd, iovptr, iovcnt, offset );
574 #endif
575  if( ret < 0 )
576  {
577  Log *log = DefaultEnv::GetLog();
578  log->Error( FileMsg, "WriteV: failed %s", XrdSysE2T( errno ) );
579  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
580  return QueueTask( error, 0, handler );
581  }
582 
583  bytesWritten += ret;
584  while( ret )
585  {
586  if( size_t( ret ) > iovptr[0].iov_len )
587  {
588  ret -= iovptr[0].iov_len;
589  --iovcnt;
590  ++iovptr;
591  }
592  else
593  {
594  iovptr[0].iov_len -= ret;
595  iovptr[0].iov_base = reinterpret_cast<char*>( iovptr[0].iov_base ) + ret;
596  ret = 0;
597  }
598  }
599  }
600 
601  return QueueTask( new XRootDStatus(), 0, handler );
602  }
603 
604  //------------------------------------------------------------------------
605  // Fcntl
606  //------------------------------------------------------------------------
608  ResponseHandler *handler, uint16_t timeout )
609  {
611  }
612 
613  //------------------------------------------------------------------------
614  // Visa
615  //------------------------------------------------------------------------
617  uint16_t timeout )
618  {
620  }
621 
622  //------------------------------------------------------------------------
623  // Set extended attributes - async
624  //------------------------------------------------------------------------
625  XRootDStatus LocalFileHandler::SetXAttr( const std::vector<xattr_t> &attrs,
626  ResponseHandler *handler,
627  uint16_t timeout )
628  {
629  XrdSysXAttr *xattr = XrdSysFAttr::Xat;
630  std::vector<XAttrStatus> response;
631 
632  auto itr = attrs.begin();
633  for( ; itr != attrs.end(); ++itr )
634  {
635  std::string name = std::get<xattr_name>( *itr );
636  std::string value = std::get<xattr_value>( *itr );
637  int err = xattr->Set( name.c_str(), value.c_str(), value.size(), 0, fd );
638  XRootDStatus status = err < 0 ? XRootDStatus( stError, errLocalError, -err ) :
639  XRootDStatus();
640 
641  response.push_back( XAttrStatus( name, status ) );
642  }
643 
644  AnyObject *resp = new AnyObject();
645  resp->Set( new std::vector<XAttrStatus>( std::move( response ) ) );
646 
647  return QueueTask( new XRootDStatus(), resp, handler );
648  }
649 
650  //------------------------------------------------------------------------
651  // Get extended attributes - async
652  //------------------------------------------------------------------------
653  XRootDStatus LocalFileHandler::GetXAttr( const std::vector<std::string> &attrs,
654  ResponseHandler *handler,
655  uint16_t timeout )
656  {
657  XrdSysXAttr *xattr = XrdSysFAttr::Xat;
658  std::vector<XAttr> response;
659 
660  auto itr = attrs.begin();
661  for( ; itr != attrs.end(); ++itr )
662  {
663  std::string name = *itr;
664  std::unique_ptr<char[]> buffer;
665 
666  int size = xattr->Get( name.c_str(), 0, 0, 0, fd );
667  if( size < 0 )
668  {
669  XRootDStatus status( stError, errLocalError, -size );
670  response.push_back( XAttr( *itr, "", status ) );
671  continue;
672  }
673  buffer.reset( new char[size] );
674  int ret = xattr->Get( name.c_str(), buffer.get(), size, 0, fd );
675 
676  XRootDStatus status;
677  std::string value;
678 
679  if( ret >= 0 )
680  value.append( buffer.get(), ret );
681  else if( ret < 0 )
682  status = XRootDStatus( stError, errLocalError, -ret );
683 
684  response.push_back( XAttr( *itr, value, status ) );
685  }
686 
687  AnyObject *resp = new AnyObject();
688  resp->Set( new std::vector<XAttr>( std::move( response ) ) );
689 
690  return QueueTask( new XRootDStatus(), resp, handler );
691  }
692 
693  //------------------------------------------------------------------------
694  // Delete extended attributes - async
695  //------------------------------------------------------------------------
696  XRootDStatus LocalFileHandler::DelXAttr( const std::vector<std::string> &attrs,
697  ResponseHandler *handler,
698  uint16_t timeout )
699  {
700  XrdSysXAttr *xattr = XrdSysFAttr::Xat;
701  std::vector<XAttrStatus> response;
702 
703  auto itr = attrs.begin();
704  for( ; itr != attrs.end(); ++itr )
705  {
706  std::string name = *itr;
707  int err = xattr->Del( name.c_str(), 0, fd );
708  XRootDStatus status = err < 0 ? XRootDStatus( stError, errLocalError, -err ) :
709  XRootDStatus();
710 
711  response.push_back( XAttrStatus( name, status ) );
712  }
713 
714  AnyObject *resp = new AnyObject();
715  resp->Set( new std::vector<XAttrStatus>( std::move( response ) ) );
716 
717  return QueueTask( new XRootDStatus(), resp, handler );
718  }
719 
720  //------------------------------------------------------------------------
721  // List extended attributes - async
722  //------------------------------------------------------------------------
724  uint16_t timeout )
725  {
726  XrdSysXAttr *xattr = XrdSysFAttr::Xat;
727  std::vector<XAttr> response;
728 
729  XrdSysXAttr::AList *alist = 0;
730  int err = xattr->List( &alist, 0, fd, 1 );
731 
732  if( err < 0 )
733  {
734  XRootDStatus *status = new XRootDStatus( stError, XProtocol::mapError( -err ) );
735  return QueueTask( status, 0, handler );
736  }
737 
738  XrdSysXAttr::AList *ptr = alist;
739  while( ptr )
740  {
741  std::string name( ptr->Name, ptr->Nlen );
742  int vlen = ptr->Vlen;
743  ptr = ptr->Next;
744 
745  std::unique_ptr<char[]> buffer( new char[vlen] );
746  int ret = xattr->Get( name.c_str(),
747  buffer.get(), vlen, 0, fd );
748 
749  std::string value = ret >= 0 ? std::string( buffer.get(), ret ) :
750  std::string();
751  XRootDStatus status = ret >= 0 ? XRootDStatus() :
753  response.push_back( XAttr( name, value, status ) );
754  }
755  xattr->Free( alist );
756 
757  AnyObject *resp = new AnyObject();
758  resp->Set( new std::vector<XAttr>( std::move( response ) ) );
759 
760  return QueueTask( new XRootDStatus(), resp, handler );
761  }
762 
763  //------------------------------------------------------------------------
764  // QueueTask - queues error/success tasks for all operations.
765  // Must always return stOK.
766  // Is always creating the same HostList containing only localhost.
767  //------------------------------------------------------------------------
769  ResponseHandler *handler )
770  {
771  // if it is simply the sync handler we can release the semaphore
772  // and return there is no need to execute this in the thread-pool
773  SyncResponseHandler *syncHandler =
774  dynamic_cast<SyncResponseHandler*>( handler );
775  if( syncHandler || DefaultEnv::GetPostMaster() == nullptr )
776  {
777  syncHandler->HandleResponse( st, resp );
778  return XRootDStatus();
779  }
780 
781  HostList *hosts = pHostList.empty() ? 0 : new HostList( pHostList );
782  LocalFileTask *task = new LocalFileTask( st, resp, hosts, handler );
784  return XRootDStatus();
785  }
786 
787  //------------------------------------------------------------------------
788  // MkdirPath - creates the folders specified in file_path
789  // called if kXR_mkdir flag is set
790  //------------------------------------------------------------------------
791  XRootDStatus LocalFileHandler::MkdirPath( const std::string &path )
792  {
793  // first find the most up-front component that exists
794  size_t pos = path.rfind( '/' );
795  while( pos != std::string::npos && pos != 0 )
796  {
797  std::string tmp = path.substr( 0, pos );
798  struct stat st;
799  int rc = lstat( tmp.c_str(), &st );
800  if( rc == 0 ) break;
801  if( errno != ENOENT )
802  return XRootDStatus( stError, errLocalError, errno );
803  pos = path.rfind( '/', pos - 1 );
804  }
805 
806  pos = path.find( '/', pos + 1 );
807  while( pos != std::string::npos && pos != 0 )
808  {
809  std::string tmp = path.substr( 0, pos );
810  if( mkdir( tmp.c_str(), 0755 ) )
811  {
812  if( errno != EEXIST )
813  return XRootDStatus( stError, errLocalError, errno );
814  }
815  pos = path.find( '/', pos + 1 );
816  }
817  return XRootDStatus();
818  }
819 
820  XRootDStatus LocalFileHandler::OpenImpl( const std::string &url, uint16_t flags,
821  uint16_t mode, AnyObject *&resp)
822  {
823  Log *log = DefaultEnv::GetLog();
824 
825  // safe the file URL for the HostList for later
826  pUrl = url;
827 
828  URL fileUrl( url );
829  if( !fileUrl.IsValid() )
831 
832  if( fileUrl.GetHostName() != "localhost" )
834 
835  std::string path = fileUrl.GetPath();
836 
837  //---------------------------------------------------------------------
838  // Prepare Flags
839  //---------------------------------------------------------------------
840  uint16_t openflags = 0;
841  if( flags & kXR_new )
842  openflags |= O_CREAT | O_EXCL;
843  if( flags & kXR_open_wrto )
844  openflags |= O_WRONLY;
845  else if( flags & kXR_open_updt )
846  openflags |= O_RDWR;
847  else
848  openflags |= O_RDONLY;
849  if( flags & kXR_delete )
850  openflags |= O_CREAT | O_TRUNC;
851 
852  if( flags & kXR_mkdir )
853  {
854  XRootDStatus st = MkdirPath( path );
855  if( !st.IsOK() )
856  {
857  log->Error( FileMsg, "Open MkdirPath failed %s: %s", path.c_str(),
858  XrdSysE2T( st.errNo ) );
859  return st;
860  }
861 
862  }
863  //---------------------------------------------------------------------
864  // Open File
865  //---------------------------------------------------------------------
866  if( mode == Access::Mode::None)
867  mode = 0644;
868  fd = XrdSysFD_Open( path.c_str(), openflags, mode );
869  if( fd == -1 )
870  {
871  log->Error( FileMsg, "Open: open failed: %s: %s", path.c_str(),
872  XrdSysE2T( errno ) );
873 
875  XProtocol::mapError( errno ) );
876  }
877  //---------------------------------------------------------------------
878  // Stat File and cache statInfo in openInfo
879  //---------------------------------------------------------------------
880  struct stat ssp;
881  if( fstat( fd, &ssp ) == -1 )
882  {
883  log->Error( FileMsg, "Open: stat failed." );
885  XProtocol::mapError( errno ) );
886  }
887 
888  std::ostringstream data;
889  data << ssp.st_dev << " " << ssp.st_size << " " << ssp.st_mode << " "
890  << ssp.st_mtime;
891 
892  StatInfo *statInfo = new StatInfo();
893  if( !statInfo->ParseServerResponse( data.str().c_str() ) )
894  {
895  log->Error( FileMsg, "Open: ParseServerResponse failed." );
896  delete statInfo;
898  }
899 
900  // add the URL to hosts list
901  pHostList.push_back( HostInfo( pUrl, false ) );
902 
903  //All went well
904  uint32_t ufd = fd;
905  OpenInfo *openInfo = new OpenInfo( (uint8_t*)&ufd, 1, statInfo );
906  resp = new AnyObject();
907  resp->Set( openInfo );
908  return XRootDStatus();
909  }
910 
911  //------------------------------------------------------------------------
912  // Parses kXR_fattr request and calls respective XAttr operation
913  //------------------------------------------------------------------------
914  XRootDStatus LocalFileHandler::XAttrImpl( kXR_char code,
915  kXR_char numattr,
916  size_t bodylen,
917  char *body,
918  ResponseHandler *handler )
919  {
920  // shift body by 1 to omit the empty path
921  if( bodylen > 0 )
922  {
923  ++body;
924  --bodylen;
925  }
926 
927  switch( code )
928  {
929  case kXR_fattrGet:
930  case kXR_fattrDel:
931  {
932  std::vector<std::string> attrs;
933  // parse namevec
934  for( kXR_char i = 0; i < numattr; ++i )
935  {
936  if( bodylen < sizeof( kXR_unt16 ) ) return XRootDStatus( stError, errDataError );
937  // shift by RC size
938  body += sizeof( kXR_unt16 );
939  bodylen -= sizeof( kXR_unt16 );
940  // get the size of attribute name
941  size_t len = strlen( body );
942  if( len > bodylen ) return XRootDStatus( stError, errDataError );
943  attrs.push_back( std::string( body, len ) );
944  body += len + 1; // +1 for the null terminating the string
945  bodylen -= len + 1; // +1 for the null terminating the string
946  }
947 
948  if( code == kXR_fattrGet )
949  return GetXAttr( attrs, handler );
950 
951  return DelXAttr( attrs, handler );
952  }
953 
954  case kXR_fattrSet:
955  {
956  std::vector<xattr_t> attrs;
957  // parse namevec
958  for( kXR_char i = 0; i < numattr; ++i )
959  {
960  if( bodylen < sizeof( kXR_unt16 ) ) return XRootDStatus( stError, errDataError );
961  // shift by RC size
962  body += sizeof( kXR_unt16 );
963  bodylen -= sizeof( kXR_unt16 );
964  // get the size of attribute name
965  char *name = 0;
966  body = ClientFattrRequest::NVecRead( body, name );
967  attrs.push_back( std::make_tuple( std::string( name ), std::string() ) );
968  bodylen -= strlen( name ) + 1; // +1 for the null terminating the string
969  free( name );
970  }
971  // parse valuevec
972  for( kXR_char i = 0; i < numattr; ++i )
973  {
974  // get value length
975  if( bodylen < sizeof( kXR_int32 ) ) return XRootDStatus( stError, errDataError );
976  kXR_int32 len = 0;
977  body = ClientFattrRequest::VVecRead( body, len );
978  bodylen -= sizeof( kXR_int32 );
979  // get value
980  if( size_t( len ) > bodylen ) return XRootDStatus( stError, errDataError );
981  char *value = 0;
982  body = ClientFattrRequest::VVecRead( body, len, value );
983  bodylen -= len;
984  std::get<xattr_value>( attrs[i] ) = value;
985  free( value );
986  }
987 
988  return SetXAttr( attrs, handler );
989  }
990 
991  case kXR_fattrList:
992  {
993  return ListXAttr( handler );
994  }
995 
996  default:
998  }
999 
1000  return XRootDStatus();
1001  }
1002 
1004  Message *msg,
1005  ResponseHandler *handler,
1006  MessageSendParams &sendParams )
1007  {
1008  ClientRequest *req = reinterpret_cast<ClientRequest*>( msg->GetBuffer() );
1009 
1010  switch( req->header.requestid )
1011  {
1012  case kXR_open:
1013  {
1014  XRootDStatus st = Open( url.GetURL(), req->open.options,
1015  req->open.mode, handler, sendParams.timeout );
1016  delete msg; // in case of other operations msg is owned by the handler
1017  return st;
1018  }
1019 
1020  case kXR_close:
1021  {
1022  return Close( handler, sendParams.timeout );
1023  }
1024 
1025  case kXR_stat:
1026  {
1027  return Stat( handler, sendParams.timeout );
1028  }
1029 
1030  case kXR_read:
1031  {
1032  if( msg->GetVirtReqID() == kXR_virtReadv )
1033  {
1034  auto &chunkList = *sendParams.chunkList;
1035  struct iovec iov[chunkList.size()];
1036  for( size_t i = 0; i < chunkList.size() ; ++i )
1037  {
1038  iov[i].iov_base = chunkList[i].buffer;
1039  iov[i].iov_len = chunkList[i].length;
1040  }
1041  return ReadV( chunkList.front().offset, iov, chunkList.size(),
1042  handler, sendParams.timeout );
1043  }
1044 
1045  return Read( req->read.offset, req->read.rlen,
1046  sendParams.chunkList->front().buffer,
1047  handler, sendParams.timeout );
1048  }
1049 
1050  case kXR_write:
1051  {
1052  ChunkList *chunks = sendParams.chunkList;
1053  if( chunks->size() == 1 )
1054  {
1055  // it's an ordinary write
1056  return Write( req->write.offset, req->write.dlen,
1057  chunks->front().buffer, handler,
1058  sendParams.timeout );
1059  }
1060  // it's WriteV call
1061  return WriteV( req->write.offset, sendParams.chunkList,
1062  handler, sendParams.timeout );
1063  }
1064 
1065  case kXR_sync:
1066  {
1067  return Sync( handler, sendParams.timeout );
1068  }
1069 
1070  case kXR_truncate:
1071  {
1072  return Truncate( req->truncate.offset, handler, sendParams.timeout );
1073  }
1074 
1075  case kXR_writev:
1076  {
1077  return VectorWrite( *sendParams.chunkList, handler,
1078  sendParams.timeout );
1079  }
1080 
1081  case kXR_readv:
1082  {
1083  return VectorRead( *sendParams.chunkList, 0,
1084  handler, sendParams.timeout );
1085  }
1086 
1087  case kXR_fattr:
1088  {
1089  return XAttrImpl( req->fattr.subcode, req->fattr.numattr, req->fattr.dlen,
1090  msg->GetBuffer( sizeof(ClientRequestHdr ) ), handler );
1091  }
1092 
1093  default:
1094  {
1096  }
1097  }
1098  }
1099 }
@ kXR_FSError
Definition: XProtocol.hh:995
struct ClientTruncateRequest truncate
Definition: XProtocol.hh:875
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
struct ClientFattrRequest fattr
Definition: XProtocol.hh:854
kXR_int64 offset
Definition: XProtocol.hh:646
@ kXR_virtReadv
Definition: XProtocol.hh:150
kXR_unt16 options
Definition: XProtocol.hh:481
@ kXR_open_wrto
Definition: XProtocol.hh:469
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_open_updt
Definition: XProtocol.hh:457
@ kXR_new
Definition: XProtocol.hh:455
struct ClientOpenRequest open
Definition: XProtocol.hh:860
struct ClientRequestHdr header
Definition: XProtocol.hh:846
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_close
Definition: XProtocol.hh:115
struct ClientReadRequest read
Definition: XProtocol.hh:867
kXR_unt16 mode
Definition: XProtocol.hh:480
kXR_int64 offset
Definition: XProtocol.hh:808
struct ClientWriteRequest write
Definition: XProtocol.hh:876
kXR_int32 rlen
Definition: XProtocol.hh:647
int kXR_int32
Definition: XPtypes.hh:89
unsigned short kXR_unt16
Definition: XPtypes.hh:67
unsigned char kXR_char
Definition: XPtypes.hh:65
ssize_t pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
off_t lseek(int fildes, off_t offset, int whence)
int stat(const char *path, struct stat *buf)
int ftruncate(int fildes, off_t offset)
ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset)
int fstat(int fildes, struct stat *buf)
int lstat(const char *path, struct stat *buf)
int mkdir(const char *path, mode_t mode)
int fsync(int fildes)
ssize_t readv(int fildes, const struct iovec *iov, int iovcnt)
ssize_t writev(int fildes, const struct iovec *iov, int iovcnt)
ssize_t read(int fildes, void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:43
struct sigevent aio_sigevent
Definition: XrdSfsAio.hh:51
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
static int mapError(int rc)
Definition: XProtocol.hh:1361
void Set(Type object, bool own=true)
Binary blob representation.
Definition: XrdClBuffer.hh:34
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus Truncate(uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorRead(const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Stat(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ReadV(uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus QueueTask(XRootDStatus *st, AnyObject *obj, ResponseHandler *handler)
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus MkdirPath(const std::string &path)
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus WriteV(uint64_t offset, ChunkList *chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus DelXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Fcntl(const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Visa(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorWrite(const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus GetXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
uint16_t GetVirtReqID() const
Get virtual request ID for the message.
Open operation (.
Information returned by file open operation.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
Object stat info.
bool ParseServerResponse(const char *data)
Parse server response and fill up the object.
Synchronize the response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Handle the response.
URL representation.
Definition: XrdClURL.hh:31
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
void SetSize(uint32_t size)
Set size.
ChunkList & GetChunks()
Get chunks.
static XrdSysXAttr * Xat
Definition: XrdSysFAttr.hh:51
char Name[1]
Start of the name (size of struct is dynamic)
Definition: XrdSysXAttr.hh:56
int Vlen
The length of the attribute value;.
Definition: XrdSysXAttr.hh:54
virtual int List(AList **aPL, const char *Path, int fd=-1, int getSz=0)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
int Nlen
The length of the attribute name that follows.
Definition: XrdSysXAttr.hh:55
virtual void Free(AList *aPL)=0
virtual int Del(const char *Aname, const char *Path, int fd=-1)=0
AList * Next
-> next element.
Definition: XrdSysXAttr.hh:53
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint64_t FileMsg
const uint16_t errOSError
Definition: XrdClStatus.hh:61
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const uint16_t errLocalError
Definition: XrdClStatus.hh:107
const int DefaultAioSignal
static char * NVecRead(char *buffer, kXR_unt16 &rc)
Definition: XProtocol.cc:205
static char * VVecRead(char *buffer, kXR_int32 &len)
Definition: XProtocol.cc:224
Describe a data chunk for vector read.
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:148
Extended attribute operation status.
Extended attributes with status.