XRootD
XrdCephPosix.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN)
3 // Author: Sebastien Ponce <sebastien.ponce@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 /*
26  * This interface provides wrapper methods for using ceph through a POSIX API.
27  */
28 
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <cerrno>
32 #include <fcntl.h>
33 #include <unistd.h>
34 #include <stdlib.h>
35 #include <stdarg.h>
36 #include <memory>
37 #include <radosstriper/libradosstriper.hpp>
38 #include <map>
39 #include <stdexcept>
40 #include <string>
41 #include <sstream>
42 #include <sys/xattr.h>
43 #include <time.h>
44 #include <limits>
45 #include <pthread.h>
46 #include "XrdSfs/XrdSfsAio.hh"
47 #include "XrdSys/XrdSysPthread.hh"
49 #include "XrdSys/XrdSysPlatform.hh"
50 
51 #include "XrdCeph/XrdCephPosix.hh"
52 
54 struct CephFile {
55  std::string name;
56  std::string pool;
57  std::string userId;
58  unsigned int nbStripes;
59  unsigned long long stripeUnit;
60  unsigned long long objectSize;
61 };
62 
64  int flags;
65  mode_t mode;
66  uint64_t offset;
67  // This mutex protects against parallel updates of the stats.
69  uint64_t maxOffsetWritten;
71  uint64_t bytesWritten;
72  unsigned rdcount;
73  unsigned wrcount;
81 };
82 
84 struct DirIterator {
85  librados::NObjectIterator m_iterator;
86  librados::IoCtx *m_ioctx;
87 };
88 
90 struct AioArgs {
91  AioArgs(XrdSfsAio* a, AioCB *b, size_t n, int _fd, ceph::bufferlist *_bl=0) :
92  aiop(a), callback(b), nbBytes(n), fd(_fd), bl(_bl) { ::gettimeofday(&startTime, nullptr); }
95  size_t nbBytes;
96  int fd;
97  ::timeval startTime;
98  ceph::bufferlist *bl;
99 };
100 
104 typedef std::map<std::string, libradosstriper::RadosStriper*> StriperDict;
105 std::vector<StriperDict> g_radosStripers;
106 typedef std::map<std::string, librados::IoCtx*> IOCtxDict;
107 std::vector<IOCtxDict> g_ioCtx;
108 std::vector<librados::Rados*> g_cluster;
112 unsigned int g_cephPoolIdx = 0;
116 unsigned int g_maxCephPoolIdx = 1;
120 
122 std::multiset<std::string> g_filesOpenForWrite;
124 std::map<unsigned int, CephFileRef> g_fds;
126 unsigned int g_nextCephFd = 0;
131 
136  if (g_radosStripers.size() == 0) {
137  // make sure we do not have a race condition here
139  // double check now that we have the lock
140  if (g_radosStripers.size() == 0) {
141  // initialization phase : allocate corresponding places in the vectors
142  for (unsigned int i = 0; i < g_maxCephPoolIdx; i++) {
143  g_radosStripers.push_back(StriperDict());
144  g_ioCtx.push_back(IOCtxDict());
145  g_cluster.push_back(0);
146  }
147  }
148  }
149  unsigned int res = g_cephPoolIdx;
150  unsigned nextValue = g_cephPoolIdx+1;
151  if (nextValue >= g_maxCephPoolIdx) {
152  nextValue = 0;
153  }
154  g_cephPoolIdx = nextValue;
155  return res;
156 }
157 
159 bool isOpenForWrite(std::string& name) {
161  return g_filesOpenForWrite.find(name) != g_filesOpenForWrite.end();
162 }
163 
167  std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
168  if (it != g_fds.end()) {
169  // We will release the lock upon exiting this function.
170  // The structure here is not protected from deletion, but we trust xrootd to
171  // ensure close (which does the deletion) will not be called before all previous
172  // calls are complete (including the async ones).
173  return &(it->second);
174  } else {
175  return 0;
176  }
177 }
178 
180 void deleteFileRef(int fd, const CephFileRef &fr) {
182  if (fr.flags & (O_WRONLY|O_RDWR)) {
184  }
185  std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
186  if (it != g_fds.end()) {
187  g_fds.erase(it);
188  }
189 }
190 
197  g_fds[g_nextCephFd] = fr;
198  g_nextCephFd++;
199  if (fr.flags & (O_WRONLY|O_RDWR)) {
200  g_filesOpenForWrite.insert(fr.name);
201  }
202  return g_nextCephFd-1;
203 }
204 
207  "default", // default pool
208  "admin", // default user
209  1, // default nbStripes
210  4 * 1024 * 1024, // default stripeUnit : 4 MB
211  4 * 1024 * 1024}; // default objectSize : 4 MB
212 
213 std::string g_defaultUserId = "admin";
214 std::string g_defaultPool = "default";
215 
217 static void (*g_logfunc) (char *, va_list argp) = 0;
218 
219 static void logwrapper(char* format, ...) {
220  if (0 == g_logfunc) return;
221  va_list arg;
222  va_start(arg, format);
223  (*g_logfunc)(format, arg);
224  va_end(arg);
225 }
226 
228 static unsigned long long int stoull(const std::string &s) {
229  char* end;
230  errno = 0;
231  unsigned long long int res = strtoull(s.c_str(), &end, 10);
232  if (0 != *end) {
233  throw std::invalid_argument(s);
234  }
235  if (ERANGE == errno) {
236  throw std::out_of_range(s);
237  }
238  return res;
239 }
240 
242 static unsigned int stoui(const std::string &s) {
243  char* end;
244  errno = 0;
245  unsigned long int res = strtoul(s.c_str(), &end, 10);
246  if (0 != *end) {
247  throw std::invalid_argument(s);
248  }
249  if (ERANGE == errno || res > std::numeric_limits<unsigned int>::max()) {
250  throw std::out_of_range(s);
251  }
252  return (unsigned int)res;
253 }
254 
257 static int fillCephUserId(const std::string &params, XrdOucEnv *env, CephFile &file) {
258  // default
260  // parsing
261  size_t atPos = params.find('@');
262  if (std::string::npos != atPos) {
263  file.userId = params.substr(0, atPos);
264  return atPos+1;
265  } else {
266  if (0 != env) {
267  char* cuser = env->Get("cephUserId");
268  if (0 != cuser) {
269  file.userId = cuser;
270  }
271  }
272  return 0;
273  }
274 }
275 
278 static int fillCephPool(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
279  // default
280  file.pool = g_defaultParams.pool;
281  // parsing
282  size_t comPos = params.find(',', offset);
283  if (std::string::npos == comPos) {
284  if (params.size() == offset) {
285  if (NULL != env) {
286  char* cpool = env->Get("cephPool");
287  if (0 != cpool) {
288  file.pool = cpool;
289  }
290  }
291  } else {
292  file.pool = params.substr(offset);
293  }
294  return params.size();
295  } else {
296  file.pool = params.substr(offset, comPos-offset);
297  return comPos+1;
298  }
299 }
300 
303 // this may raise std::invalid_argument and std::out_of_range
304 static int fillCephNbStripes(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
305  // default
307  // parsing
308  size_t comPos = params.find(',', offset);
309  if (std::string::npos == comPos) {
310  if (params.size() == offset) {
311  if (NULL != env) {
312  char* cNbStripes = env->Get("cephNbStripes");
313  if (0 != cNbStripes) {
314  file.nbStripes = stoui(cNbStripes);
315  }
316  }
317  } else {
318  file.nbStripes = stoui(params.substr(offset));
319  }
320  return params.size();
321  } else {
322  file.nbStripes = stoui(params.substr(offset, comPos-offset));
323  return comPos+1;
324  }
325 }
326 
329 // this may raise std::invalid_argument and std::out_of_range
330 static int fillCephStripeUnit(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
331  // default
333  // parsing
334  size_t comPos = params.find(',', offset);
335  if (std::string::npos == comPos) {
336  if (params.size() == offset) {
337  if (NULL != env) {
338  char* cStripeUnit = env->Get("cephStripeUnit");
339  if (0 != cStripeUnit) {
340  file.stripeUnit = ::stoull(cStripeUnit);
341  }
342  }
343  } else {
344  file.stripeUnit = ::stoull(params.substr(offset));
345  }
346  return params.size();
347  } else {
348  file.stripeUnit = ::stoull(params.substr(offset, comPos-offset));
349  return comPos+1;
350  }
351 }
352 
355 // this may raise std::invalid_argument and std::out_of_range
356 static void fillCephObjectSize(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
357  // default
359  // parsing
360  if (params.size() == offset) {
361  if (NULL != env) {
362  char* cObjectSize = env->Get("cephObjectSize");
363  if (0 != cObjectSize) {
364  file.objectSize = ::stoull(cObjectSize);
365  }
366  }
367  } else {
368  file.objectSize = ::stoull(params.substr(offset));
369  }
370 }
371 
374 void fillCephFileParams(const std::string &params, XrdOucEnv *env, CephFile &file) {
375  // parse the params one by one
376  unsigned int afterUser = fillCephUserId(params, env, file);
377  unsigned int afterPool = fillCephPool(params, afterUser, env, file);
378  unsigned int afterNbStripes = fillCephNbStripes(params, afterPool, env, file);
379  unsigned int afterStripeUnit = fillCephStripeUnit(params, afterNbStripes, env, file);
380  fillCephObjectSize(params, afterStripeUnit, env, file);
381 }
382 
386 void ceph_posix_set_defaults(const char* value) {
387  if (value) {
388  CephFile newdefault;
389  fillCephFileParams(value, NULL, newdefault);
390  g_defaultParams = newdefault;
391  }
392 }
393 
395 void translateFileName(std::string &physName, std::string logName){
396  if (0 != g_namelib) {
397  char physCName[MAXPATHLEN+1];
398  int retc = g_namelib->lfn2pfn(logName.c_str(), physCName, sizeof(physCName));
399  if (retc) {
400  logwrapper((char*)"ceph_namelib : failed to translate %s using namelib plugin, using it as is", logName.c_str());
401  physName = logName;
402  } else {
403  physName = physCName;
404  }
405  } else {
406  physName = logName;
407  }
408 }
409 
411 void fillCephFile(const char *path, XrdOucEnv *env, CephFile &file) {
412  // Syntax of the given path is :
413  // [[userId@]pool[,nbStripes[,stripeUnit[,objectSize]]]:]<actual path>
414  // for the missing parts, if env is not null the entries
415  // cephUserId, cephPool, cephNbStripes, cephStripeUnit, and cephObjectSize
416  // of env will be used.
417  // If env is null or no entry is found for what is missing, defaults are
418  // applied. These defaults are initially set to 'admin', 'default', 1, 4MB and 4MB
419  // but can be changed via a call to ceph_posix_set_defaults
420  std::string spath = path;
421  size_t colonPos = spath.find(':');
422  if (std::string::npos == colonPos) {
423  // deal with name translation
424  translateFileName(file.name, spath);
425  fillCephFileParams("", env, file);
426  } else {
427  translateFileName(file.name, spath.substr(colonPos+1));
428  fillCephFileParams(spath.substr(0, colonPos), env, file);
429  }
430 }
431 
432 static CephFile getCephFile(const char *path, XrdOucEnv *env) {
433  CephFile file;
434  fillCephFile(path, env, file);
435  return file;
436 }
437 
438 static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags,
439  mode_t mode, unsigned long long offset) {
440  CephFileRef fr;
441  fillCephFile(path, env, fr);
442  fr.flags = flags;
443  fr.mode = mode;
444  fr.offset = 0;
445  fr.maxOffsetWritten = 0;
446  fr.bytesAsyncWritePending = 0;
447  fr.bytesWritten = 0;
448  fr.rdcount = 0;
449  fr.wrcount = 0;
450  fr.asyncRdStartCount = 0;
451  fr.asyncRdCompletionCount = 0;
452  fr.asyncWrStartCount = 0;
453  fr.asyncWrCompletionCount = 0;
454  fr.lastAsyncSubmission.tv_sec = 0;
455  fr.lastAsyncSubmission.tv_usec = 0;
456  fr.longestAsyncWriteTime = 0.0l;
457  fr.longestCallbackInvocation = 0.0l;
458  return fr;
459 }
460 
461 inline librados::Rados* checkAndCreateCluster(unsigned int cephPoolIdx,
462  std::string userId = g_defaultParams.userId) {
463  if (0 == g_cluster[cephPoolIdx]) {
464  // create connection to cluster
465  librados::Rados *cluster = new librados::Rados;
466  if (0 == cluster) {
467  return 0;
468  }
469  int rc = cluster->init(userId.c_str());
470  if (rc) {
471  logwrapper((char*)"checkAndCreateCluster : cluster init failed");
472  delete cluster;
473  return 0;
474  }
475  rc = cluster->conf_read_file(NULL);
476  if (rc) {
477  logwrapper((char*)"checkAndCreateCluster : cluster read config failed, rc = %d", rc);
478  cluster->shutdown();
479  delete cluster;
480  return 0;
481  }
482  cluster->conf_parse_env(NULL);
483  rc = cluster->connect();
484  if (rc) {
485  logwrapper((char*)"checkAndCreateCluster : cluster connect failed, rc = %d", rc);
486  cluster->shutdown();
487  delete cluster;
488  return 0;
489  }
490  g_cluster[cephPoolIdx] = cluster;
491  }
492  return g_cluster[cephPoolIdx];
493 }
494 
495 int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile& file) {
496  StriperDict &sDict = g_radosStripers[cephPoolIdx];
497  StriperDict::iterator it = sDict.find(userAtPool);
498  if (it == sDict.end()) {
499  // we need to create a new radosStriper
500  // Get a cluster
501  librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx, file.userId);
502  if (0 == cluster) {
503  logwrapper((char*)"checkAndCreateStriper : checkAndCreateCluster failed");
504  return 0;
505  }
506  // create IoCtx for our pool
507  librados::IoCtx *ioctx = new librados::IoCtx;
508  if (0 == ioctx) {
509  logwrapper((char*)"checkAndCreateStriper : IoCtx instantiation failed");
510  cluster->shutdown();
511  delete cluster;
512  g_cluster[cephPoolIdx] = 0;
513  return 0;
514  }
515  int rc = g_cluster[cephPoolIdx]->ioctx_create(file.pool.c_str(), *ioctx);
516  if (rc != 0) {
517  logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, rc = %d", rc);
518  cluster->shutdown();
519  delete cluster;
520  g_cluster[cephPoolIdx] = 0;
521  delete ioctx;
522  return 0;
523  }
524  // create RadosStriper connection
525  libradosstriper::RadosStriper *striper = new libradosstriper::RadosStriper;
526  if (0 == striper) {
527  logwrapper((char*)"checkAndCreateStriper : RadosStriper instantiation failed");
528  delete ioctx;
529  cluster->shutdown();
530  delete cluster;
531  g_cluster[cephPoolIdx] = 0;
532  return 0;
533  }
534  rc = libradosstriper::RadosStriper::striper_create(*ioctx, striper);
535  if (rc != 0) {
536  logwrapper((char*)"checkAndCreateStriper : striper_create failed, rc = %d", rc);
537  delete striper;
538  delete ioctx;
539  cluster->shutdown();
540  delete cluster;
541  g_cluster[cephPoolIdx] = 0;
542  return 0;
543  }
544  // setup layout
545  rc = striper->set_object_layout_stripe_count(file.nbStripes);
546  if (rc != 0) {
547  logwrapper((char*)"checkAndCreateStriper : invalid nbStripes %d", file.nbStripes);
548  delete striper;
549  delete ioctx;
550  cluster->shutdown();
551  delete cluster;
552  g_cluster[cephPoolIdx] = 0;
553  return 0;
554  }
555  rc = striper->set_object_layout_stripe_unit(file.stripeUnit);
556  if (rc != 0) {
557  logwrapper((char*)"checkAndCreateStriper : invalid stripeUnit %d (must be non 0, multiple of 64K)", file.stripeUnit);
558  delete striper;
559  delete ioctx;
560  cluster->shutdown();
561  delete cluster;
562  g_cluster[cephPoolIdx] = 0;
563  return 0;
564  }
565  rc = striper->set_object_layout_object_size(file.objectSize);
566  if (rc != 0) {
567  logwrapper((char*)"checkAndCreateStriper : invalid objectSize %d (must be non 0, multiple of stripe_unit)", file.objectSize);
568  delete striper;
569  delete ioctx;
570  cluster->shutdown();
571  delete cluster;
572  g_cluster[cephPoolIdx] = 0;
573  return 0;
574  }
575  IOCtxDict & ioDict = g_ioCtx[cephPoolIdx];
576  ioDict.emplace(userAtPool, ioctx);
577  sDict.emplace(userAtPool, striper);
578  }
579  return 1;
580 }
581 
582 static libradosstriper::RadosStriper* getRadosStriper(const CephFile& file) {
584  std::stringstream ss;
585  ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ','
586  << file.stripeUnit << ',' << file.objectSize;
587  std::string userAtPool = ss.str();
588  unsigned int cephPoolIdx = getCephPoolIdxAndIncrease();
589  if (checkAndCreateStriper(cephPoolIdx, userAtPool, file) == 0) {
590  logwrapper((char*)"getRadosStriper : checkAndCreateStriper failed");
591  return 0;
592  }
593  return g_radosStripers[cephPoolIdx][userAtPool];
594 }
595 
596 static librados::IoCtx* getIoCtx(const CephFile& file) {
598  std::stringstream ss;
599  ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ','
600  << file.stripeUnit << ',' << file.objectSize;
601  std::string userAtPool = ss.str();
602  unsigned int cephPoolIdx = getCephPoolIdxAndIncrease();
603  if (checkAndCreateStriper(cephPoolIdx, userAtPool, file) == 0) {
604  return 0;
605  }
606  return g_ioCtx[cephPoolIdx][userAtPool];
607 }
608 
611  for (unsigned int i= 0; i < g_maxCephPoolIdx; i++) {
612  for (StriperDict::iterator it2 = g_radosStripers[i].begin();
613  it2 != g_radosStripers[i].end();
614  it2++) {
615  delete it2->second;
616  }
617  for (IOCtxDict::iterator it2 = g_ioCtx[i].begin();
618  it2 != g_ioCtx[i].end();
619  it2++) {
620  delete it2->second;
621  }
622  delete g_cluster[i];
623  }
624  g_radosStripers.clear();
625  g_ioCtx.clear();
626  g_cluster.clear();
627 }
628 
629 void ceph_posix_set_logfunc(void (*logfunc) (char *, va_list argp)) {
630  g_logfunc = logfunc;
631 };
632 
633 static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size);
634 
649 int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode){
650 
651  CephFileRef fr = getCephFileRef(pathname, env, flags, mode, 0);
652 
653  struct stat buf;
654  libradosstriper::RadosStriper *striper = getRadosStriper(fr); //Get a handle to the RADOS striper API
655 
656  if (NULL == striper) {
657  logwrapper((char*)"Cannot create striper");
658  return -EINVAL;
659  }
660 
661  int rc = striper->stat(fr.name, (uint64_t*)&(buf.st_size), &(buf.st_atime)); //Get details about a file
662 
663 
664  bool fileExists = (rc != -ENOENT); //Make clear what condition we are testing
665 
666  if ((flags&O_ACCMODE) == O_RDONLY) { // Access mode is READ
667 
668  if (fileExists) {
669  int fd = insertFileRef(fr);
670  logwrapper((char*)"File descriptor %d associated to file %s opened in read mode", fd, pathname);
671  return fd;
672  } else {
673  return -ENOENT;
674  }
675 
676  } else { // Access mode is WRITE
677  if (fileExists) {
678  if (flags & O_TRUNC) {
679  int rc = ceph_posix_unlink(env, pathname);
680  if (rc < 0 && rc != -ENOENT) {
681  return rc;
682  }
683  } else {
684  return -EEXIST;
685  }
686  }
687  // At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it
688  int fd = insertFileRef(fr);
689  logwrapper((char*)"File descriptor %d associated to file %s opened in write mode", fd, pathname);
690  return fd;
691 
692  }
693 
694 }
695 
696 int ceph_posix_close(int fd) {
697  CephFileRef* fr = getFileRef(fd);
698  if (fr) {
699  ::timeval now;
700  ::gettimeofday(&now, nullptr);
701  XrdSysMutexHelper lock(fr->statsMutex);
702  double lastAsyncAge = 0.0;
703  // Only compute an age if the starting point was set.
704  if (fr->lastAsyncSubmission.tv_sec && fr->lastAsyncSubmission.tv_usec) {
705  lastAsyncAge = 1.0 * (now.tv_sec - fr->lastAsyncSubmission.tv_sec)
706  + 0.000001 * (now.tv_usec - fr->lastAsyncSubmission.tv_usec);
707  }
708  logwrapper((char*)"ceph_close: closed fd %d for file %s, read ops count %d, write ops count %d, "
709  "async write ops %d/%d, async pending write bytes %ld, "
710  "async read ops %d/%d, bytes written/max offset %ld/%ld, "
711  "longest async write %f, longest callback invocation %f, last async op age %f",
712  fd, fr->name.c_str(), fr->rdcount, fr->wrcount,
715  fr->longestAsyncWriteTime, fr->longestCallbackInvocation, (lastAsyncAge));
716  deleteFileRef(fd, *fr);
717  return 0;
718  } else {
719  return -EBADF;
720  }
721 }
722 
723 static off64_t lseek_compute_offset(CephFileRef &fr, off64_t offset, int whence) {
724  switch (whence) {
725  case SEEK_SET:
726  fr.offset = offset;
727  break;
728  case SEEK_CUR:
729  fr.offset += offset;
730  break;
731  default:
732  return -EINVAL;
733  }
734  return fr.offset;
735 }
736 
737 off_t ceph_posix_lseek(int fd, off_t offset, int whence) {
738  CephFileRef* fr = getFileRef(fd);
739  if (fr) {
740  logwrapper((char*)"ceph_lseek: for fd %d, offset=%lld, whence=%d", fd, offset, whence);
741  return (off_t)lseek_compute_offset(*fr, offset, whence);
742  } else {
743  return -EBADF;
744  }
745 }
746 
747 off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence) {
748  CephFileRef* fr = getFileRef(fd);
749  if (fr) {
750  logwrapper((char*)"ceph_lseek64: for fd %d, offset=%lld, whence=%d", fd, offset, whence);
751  return lseek_compute_offset(*fr, offset, whence);
752  } else {
753  return -EBADF;
754  }
755 }
756 
757 ssize_t ceph_posix_write(int fd, const void *buf, size_t count) {
758  CephFileRef* fr = getFileRef(fd);
759  if (fr) {
760  logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count);
761  if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
762  return -EBADF;
763  }
764  libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
765  if (0 == striper) {
766  return -EINVAL;
767  }
768  ceph::bufferlist bl;
769  bl.append((const char*)buf, count);
770  int rc = striper->write(fr->name, bl, count, fr->offset);
771  if (rc) return rc;
772  fr->offset += count;
773  XrdSysMutexHelper lock(fr->statsMutex);
774  fr->wrcount++;
775  fr->bytesWritten+=count;
776  if (fr->offset) fr->maxOffsetWritten = std::max(fr->offset - 1, fr->maxOffsetWritten);
777  return count;
778  } else {
779  return -EBADF;
780  }
781 }
782 
783 ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) {
784  CephFileRef* fr = getFileRef(fd);
785  if (fr) {
786  // TODO implement proper logging level for this plugin - this should be only debug
787  //logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count);
788  if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
789  return -EBADF;
790  }
791  libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
792  if (0 == striper) {
793  return -EINVAL;
794  }
795  ceph::bufferlist bl;
796  bl.append((const char*)buf, count);
797  int rc = striper->write(fr->name, bl, count, offset);
798  if (rc) return rc;
799  XrdSysMutexHelper lock(fr->statsMutex);
800  fr->wrcount++;
801  fr->bytesWritten+=count;
802  if (offset + count) fr->maxOffsetWritten = std::max(uint64_t(offset + count - 1), fr->maxOffsetWritten);
803  return count;
804  } else {
805  return -EBADF;
806  }
807 }
808 
809 static void ceph_aio_write_complete(rados_completion_t c, void *arg) {
810  AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
811  size_t rc = rados_aio_get_return_value(c);
812  // Compute statistics before reportng to xrootd, so that a close cannot happen
813  // in the meantime.
814  CephFileRef* fr = getFileRef(awa->fd);
815  if (fr) {
816  XrdSysMutexHelper lock(fr->statsMutex);
818  fr->bytesAsyncWritePending -= awa->nbBytes;
819  fr->bytesWritten += awa->nbBytes;
820  if (awa->aiop->sfsAio.aio_nbytes)
821  fr->maxOffsetWritten = std::max(fr->maxOffsetWritten, uint64_t(awa->aiop->sfsAio.aio_offset + awa->aiop->sfsAio.aio_nbytes - 1));
822  ::timeval now;
823  ::gettimeofday(&now, nullptr);
824  double writeTime = 0.000001 * (now.tv_usec - awa->startTime.tv_usec) + 1.0 * (now.tv_sec - awa->startTime.tv_sec);
825  fr->longestAsyncWriteTime = std::max(fr->longestAsyncWriteTime, writeTime);
826  }
827  ::timeval before, after;
828  if (fr) ::gettimeofday(&before, nullptr);
829  awa->callback(awa->aiop, rc == 0 ? awa->nbBytes : rc);
830  if (fr) {
831  ::gettimeofday(&after, nullptr);
832  double callbackInvocationTime = 0.000001 * (after.tv_usec - before.tv_usec) + 1.0 * (after.tv_sec - before.tv_sec);
833  XrdSysMutexHelper lock(fr->statsMutex);
834  fr->longestCallbackInvocation = std::max(fr->longestCallbackInvocation, callbackInvocationTime);
835  }
836  delete(awa);
837 }
838 
839 ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) {
840  CephFileRef* fr = getFileRef(fd);
841  if (fr) {
842  // get the parameters from the Xroot aio object
843  size_t count = aiop->sfsAio.aio_nbytes;
844  const char *buf = (const char*)aiop->sfsAio.aio_buf;
845  size_t offset = aiop->sfsAio.aio_offset;
846  // TODO implement proper logging level for this plugin - this should be only debug
847  //logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count);
848  if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
849  return -EBADF;
850  }
851  // get the striper object
852  libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
853  if (0 == striper) {
854  return -EINVAL;
855  }
856  // prepare a bufferlist around the given buffer
857  ceph::bufferlist bl;
858  bl.append(buf, count);
859  // get the poolIdx to use
860  int cephPoolIdx = getCephPoolIdxAndIncrease();
861  // Get the cluster to use
862  librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx);
863  if (0 == cluster) {
864  return -EINVAL;
865  }
866  // prepare a ceph AioCompletion object and do async call
867  AioArgs *args = new AioArgs(aiop, cb, count, fd);
868  librados::AioCompletion *completion =
869  cluster->aio_create_completion(args, ceph_aio_write_complete, NULL);
870  // do the write
871  int rc = striper->aio_write(fr->name, completion, bl, count, offset);
872  completion->release();
873  XrdSysMutexHelper lock(fr->statsMutex);
874  fr->asyncWrStartCount++;
875  ::gettimeofday(&fr->lastAsyncSubmission, nullptr);
876  fr->bytesAsyncWritePending+=count;
877  return rc;
878  } else {
879  return -EBADF;
880  }
881 }
882 
883 ssize_t ceph_posix_read(int fd, void *buf, size_t count) {
884  CephFileRef* fr = getFileRef(fd);
885  if (fr) {
886  // TODO implement proper logging level for this plugin - this should be only debug
887  //logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
888  if ((fr->flags & O_WRONLY) != 0) {
889  return -EBADF;
890  }
891  libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
892  if (0 == striper) {
893  return -EINVAL;
894  }
895  ceph::bufferlist bl;
896  int rc = striper->read(fr->name, &bl, count, fr->offset);
897  if (rc < 0) return rc;
898  bl.begin().copy(rc, (char*)buf);
899  XrdSysMutexHelper lock(fr->statsMutex);
900  fr->offset += rc;
901  fr->rdcount++;
902  return rc;
903  } else {
904  return -EBADF;
905  }
906 }
907 
908 ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset) {
909  CephFileRef* fr = getFileRef(fd);
910  if (fr) {
911  // TODO implement proper logging level for this plugin - this should be only debug
912  //logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
913  if ((fr->flags & O_WRONLY) != 0) {
914  return -EBADF;
915  }
916  libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
917  if (0 == striper) {
918  return -EINVAL;
919  }
920  ceph::bufferlist bl;
921  int rc = striper->read(fr->name, &bl, count, offset);
922  if (rc < 0) return rc;
923  bl.begin().copy(rc, (char*)buf);
924  XrdSysMutexHelper lock(fr->statsMutex);
925  fr->rdcount++;
926  return rc;
927  } else {
928  return -EBADF;
929  }
930 }
931 
932 static void ceph_aio_read_complete(rados_completion_t c, void *arg) {
933  AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
934  size_t rc = rados_aio_get_return_value(c);
935  if (awa->bl) {
936  if (rc > 0) {
937  awa->bl->begin().copy(rc, (char*)awa->aiop->sfsAio.aio_buf);
938  }
939  delete awa->bl;
940  awa->bl = 0;
941  }
942  // Compute statistics before reportng to xrootd, so that a close cannot happen
943  // in the meantime.
944  CephFileRef* fr = getFileRef(awa->fd);
945  if (fr) {
946  XrdSysMutexHelper lock(fr->statsMutex);
948  }
949  awa->callback(awa->aiop, rc );
950  delete(awa);
951 }
952 
953 ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) {
954  CephFileRef* fr = getFileRef(fd);
955  if (fr) {
956  // get the parameters from the Xroot aio object
957  size_t count = aiop->sfsAio.aio_nbytes;
958  size_t offset = aiop->sfsAio.aio_offset;
959  // TODO implement proper logging level for this plugin - this should be only debug
960  //logwrapper((char*)"ceph_aio_read: for fd %d, count=%d", fd, count);
961  if ((fr->flags & O_WRONLY) != 0) {
962  return -EBADF;
963  }
964  // get the striper object
965  libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
966  if (0 == striper) {
967  return -EINVAL;
968  }
969  // prepare a bufferlist to receive data
970  ceph::bufferlist *bl = new ceph::bufferlist();
971  // get the poolIdx to use
972  int cephPoolIdx = getCephPoolIdxAndIncrease();
973  // Get the cluster to use
974  librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx);
975  if (0 == cluster) {
976  return -EINVAL;
977  }
978  // prepare a ceph AioCompletion object and do async call
979  AioArgs *args = new AioArgs(aiop, cb, count, fd, bl);
980  librados::AioCompletion *completion =
981  cluster->aio_create_completion(args, ceph_aio_read_complete, NULL);
982  // do the read
983  int rc = striper->aio_read(fr->name, completion, bl, count, offset);
984  completion->release();
985  XrdSysMutexHelper lock(fr->statsMutex);
986  fr->asyncRdStartCount++;
987  return rc;
988  } else {
989  return -EBADF;
990  }
991 }
992 
993 int ceph_posix_fstat(int fd, struct stat *buf) {
994  CephFileRef* fr = getFileRef(fd);
995  if (fr) {
996  logwrapper((char*)"ceph_stat: fd %d", fd);
997  // minimal stat : only size and times are filled
998  // atime, mtime and ctime are set all to the same value
999  // mode is set arbitrarily to 0666 | S_IFREG
1000  libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
1001  if (0 == striper) {
1002  logwrapper((char*)"ceph_stat: getRadosStriper failed");
1003  return -EINVAL;
1004  }
1005  memset(buf, 0, sizeof(*buf));
1006  int rc = striper->stat(fr->name, (uint64_t*)&(buf->st_size), &(buf->st_atime));
1007  if (rc != 0) {
1008  return -rc;
1009  }
1010  buf->st_mtime = buf->st_atime;
1011  buf->st_ctime = buf->st_atime;
1012  buf->st_mode = 0666 | S_IFREG;
1013  return 0;
1014  } else {
1015  return -EBADF;
1016  }
1017 }
1018 
1019 int ceph_posix_stat(XrdOucEnv* env, const char *pathname, struct stat *buf) {
1020  logwrapper((char*)"ceph_stat: %s", pathname);
1021  // minimal stat : only size and times are filled
1022  // atime, mtime and ctime are set all to the same value
1023  // mode is set arbitrarily to 0666 | S_IFREG
1024  CephFile file = getCephFile(pathname, env);
1025  libradosstriper::RadosStriper *striper = getRadosStriper(file);
1026  if (0 == striper) {
1027  return -EINVAL;
1028  }
1029  memset(buf, 0, sizeof(*buf));
1030  int rc = striper->stat(file.name, (uint64_t*)&(buf->st_size), &(buf->st_atime));
1031  if (rc != 0) {
1032  // for non existing file. Check that we did not open it for write recently
1033  // in that case, we return 0 size and current time
1034  if (-ENOENT == rc && isOpenForWrite(file.name)) {
1035  buf->st_size = 0;
1036  buf->st_atime = time(NULL);
1037  } else {
1038  return -rc;
1039  }
1040  }
1041  buf->st_mtime = buf->st_atime;
1042  buf->st_ctime = buf->st_atime;
1043  buf->st_mode = 0666 | S_IFREG;
1044  return 0;
1045 }
1046 
1047 int ceph_posix_fsync(int fd) {
1048  CephFileRef* fr = getFileRef(fd);
1049  if (fr) {
1050  // no locking of fr as it is not used.
1051  logwrapper((char*)"ceph_sync: fd %d", fd);
1052  return 0;
1053  } else {
1054  return -EBADF;
1055  }
1056 }
1057 
1058 int ceph_posix_fcntl(int fd, int cmd, ... /* arg */ ) {
1059  CephFileRef* fr = getFileRef(fd);
1060  if (fr) {
1061  logwrapper((char*)"ceph_fcntl: fd %d cmd=%d", fd, cmd);
1062  // minimal implementation
1063  switch (cmd) {
1064  case F_GETFL:
1065  return fr->mode;
1066  default:
1067  return -EINVAL;
1068  }
1069  } else {
1070  return -EBADF;
1071  }
1072 }
1073 
1074 static ssize_t ceph_posix_internal_getxattr(const CephFile &file, const char* name,
1075  void* value, size_t size) {
1076  libradosstriper::RadosStriper *striper = getRadosStriper(file);
1077  if (0 == striper) {
1078  return -EINVAL;
1079  }
1080  ceph::bufferlist bl;
1081  int rc = striper->getxattr(file.name, name, bl);
1082  if (rc < 0) return rc;
1083  size_t returned_size = (size_t)rc<size?rc:size;
1084  bl.begin().copy(returned_size, (char*)value);
1085  return returned_size;
1086 }
1087 
1088 ssize_t ceph_posix_getxattr(XrdOucEnv* env, const char* path,
1089  const char* name, void* value,
1090  size_t size) {
1091  logwrapper((char*)"ceph_getxattr: path %s name=%s", path, name);
1092  return ceph_posix_internal_getxattr(getCephFile(path, env), name, value, size);
1093 }
1094 
1095 ssize_t ceph_posix_fgetxattr(int fd, const char* name,
1096  void* value, size_t size) {
1097  CephFileRef* fr = getFileRef(fd);
1098  if (fr) {
1099  logwrapper((char*)"ceph_fgetxattr: fd %d name=%s", fd, name);
1100  return ceph_posix_internal_getxattr(*fr, name, value, size);
1101  } else {
1102  return -EBADF;
1103  }
1104 }
1105 
1106 static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char* name,
1107  const void* value, size_t size, int flags) {
1108  libradosstriper::RadosStriper *striper = getRadosStriper(file);
1109  if (0 == striper) {
1110  return -EINVAL;
1111  }
1112  ceph::bufferlist bl;
1113  bl.append((const char*)value, size);
1114  int rc = striper->setxattr(file.name, name, bl);
1115  if (rc) {
1116  return -rc;
1117  }
1118  return 0;
1119 }
1120 
1121 ssize_t ceph_posix_setxattr(XrdOucEnv* env, const char* path,
1122  const char* name, const void* value,
1123  size_t size, int flags) {
1124  logwrapper((char*)"ceph_setxattr: path %s name=%s value=%s", path, name, value);
1125  return ceph_posix_internal_setxattr(getCephFile(path, env), name, value, size, flags);
1126 }
1127 
1129  const char* name, const void* value,
1130  size_t size, int flags) {
1131  CephFileRef* fr = getFileRef(fd);
1132  if (fr) {
1133  logwrapper((char*)"ceph_fsetxattr: fd %d name=%s value=%s", fd, name, value);
1134  return ceph_posix_internal_setxattr(*fr, name, value, size, flags);
1135  } else {
1136  return -EBADF;
1137  }
1138 }
1139 
1140 static int ceph_posix_internal_removexattr(const CephFile &file, const char* name) {
1141  libradosstriper::RadosStriper *striper = getRadosStriper(file);
1142  if (0 == striper) {
1143  return -EINVAL;
1144  }
1145  int rc = striper->rmxattr(file.name, name);
1146  if (rc) {
1147  return -rc;
1148  }
1149  return 0;
1150 }
1151 
1152 int ceph_posix_removexattr(XrdOucEnv* env, const char* path,
1153  const char* name) {
1154  logwrapper((char*)"ceph_removexattr: path %s name=%s", path, name);
1155  return ceph_posix_internal_removexattr(getCephFile(path, env), name);
1156 }
1157 
1158 int ceph_posix_fremovexattr(int fd, const char* name) {
1159  CephFileRef* fr = getFileRef(fd);
1160  if (fr) {
1161  logwrapper((char*)"ceph_fremovexattr: fd %d name=%s", fd, name);
1162  return ceph_posix_internal_removexattr(*fr, name);
1163  } else {
1164  return -EBADF;
1165  }
1166 }
1167 
1168 static int ceph_posix_internal_listxattrs(const CephFile &file, XrdSysXAttr::AList **aPL, int getSz) {
1169  libradosstriper::RadosStriper *striper = getRadosStriper(file);
1170  if (0 == striper) {
1171  return -EINVAL;
1172  }
1173  // call ceph
1174  std::map<std::string, ceph::bufferlist> attrset;
1175  int rc = striper->getxattrs(file.name, attrset);
1176  if (rc) {
1177  return -rc;
1178  }
1179  // build result
1180  *aPL = 0;
1181  int maxSize = 0;
1182  for (std::map<std::string, ceph::bufferlist>::const_iterator it = attrset.begin();
1183  it != attrset.end();
1184  it++) {
1185  XrdSysXAttr::AList* newItem = (XrdSysXAttr::AList*)malloc(sizeof(XrdSysXAttr::AList)+it->first.size());
1186  newItem->Next = *aPL;
1187  newItem->Vlen = it->second.length();
1188  if (newItem->Vlen > maxSize) {
1189  maxSize = newItem->Vlen;
1190  }
1191  newItem->Nlen = it->first.size();
1192  strncpy(newItem->Name, it->first.c_str(), newItem->Vlen+1);
1193  *aPL = newItem;
1194  }
1195  if (getSz) {
1196  return 0;
1197  } else {
1198  return maxSize;
1199  }
1200 }
1201 
1202 int ceph_posix_listxattrs(XrdOucEnv* env, const char* path, XrdSysXAttr::AList **aPL, int getSz) {
1203  logwrapper((char*)"ceph_listxattrs: path %s", path);
1204  return ceph_posix_internal_listxattrs(getCephFile(path, env), aPL, getSz);
1205 }
1206 
1207 int ceph_posix_flistxattrs(int fd, XrdSysXAttr::AList **aPL, int getSz) {
1208  CephFileRef* fr = getFileRef(fd);
1209  if (fr) {
1210  logwrapper((char*)"ceph_flistxattrs: fd %d", fd);
1211  return ceph_posix_internal_listxattrs(*fr, aPL, getSz);
1212  } else {
1213  return -EBADF;
1214  }
1215 }
1216 
1218  while (aPL) {
1219  free(aPL->Name);
1220  XrdSysXAttr::AList *cur = aPL;
1221  aPL = aPL->Next;
1222  free(cur);
1223  }
1224 }
1225 
1226 int ceph_posix_statfs(long long *totalSpace, long long *freeSpace) {
1227  logwrapper((char*)"ceph_posix_statfs");
1228  // get the poolIdx to use
1229  int cephPoolIdx = getCephPoolIdxAndIncrease();
1230  // Get the cluster to use
1231  librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx);
1232  if (0 == cluster) {
1233  return -EINVAL;
1234  }
1235  // call ceph stat
1236  librados::cluster_stat_t result;
1237  int rc = cluster->cluster_stat(result);
1238  if (0 == rc) {
1239  *totalSpace = result.kb * 1024;
1240  *freeSpace = result.kb_avail * 1024;
1241  }
1242  return rc;
1243 }
1244 
1245 static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size) {
1246  libradosstriper::RadosStriper *striper = getRadosStriper(file);
1247  if (0 == striper) {
1248  return -EINVAL;
1249  }
1250  return striper->trunc(file.name, size);
1251 }
1252 
1253 int ceph_posix_ftruncate(int fd, unsigned long long size) {
1254  CephFileRef* fr = getFileRef(fd);
1255  if (fr) {
1256  logwrapper((char*)"ceph_posix_ftruncate: fd %d, size %d", fd, size);
1257  return ceph_posix_internal_truncate(*fr, size);
1258  } else {
1259  return -EBADF;
1260  }
1261 }
1262 
1263 int ceph_posix_truncate(XrdOucEnv* env, const char *pathname, unsigned long long size) {
1264  logwrapper((char*)"ceph_posix_truncate : %s", pathname);
1265  // minimal stat : only size and times are filled
1266  CephFile file = getCephFile(pathname, env);
1267  return ceph_posix_internal_truncate(file, size);
1268 }
1269 
1270 int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) {
1271  logwrapper((char*)"ceph_posix_unlink : %s", pathname);
1272  // minimal stat : only size and times are filled
1273  CephFile file = getCephFile(pathname, env);
1274  libradosstriper::RadosStriper *striper = getRadosStriper(file);
1275  if (0 == striper) {
1276  return -EINVAL;
1277  }
1278  return striper->remove(file.name);
1279 }
1280 
1281 DIR* ceph_posix_opendir(XrdOucEnv* env, const char *pathname) {
1282  logwrapper((char*)"ceph_posix_opendir : %s", pathname);
1283  // only accept root dir, as there is no concept of dirs in object stores
1284  CephFile file = getCephFile(pathname, env);
1285  if (file.name.size() != 1 || file.name[0] != '/') {
1286  errno = -ENOENT;
1287  return 0;
1288  }
1289  librados::IoCtx *ioctx = getIoCtx(file);
1290  if (0 == ioctx) {
1291  errno = EINVAL;
1292  return 0;
1293  }
1294  DirIterator* res = new DirIterator();
1295  res->m_iterator = ioctx->nobjects_begin();
1296  res->m_ioctx = ioctx;
1297  return (DIR*)res;
1298 }
1299 
1300 int ceph_posix_readdir(DIR *dirp, char *buff, int blen) {
1301  librados::NObjectIterator &iterator = ((DirIterator*)dirp)->m_iterator;
1302  librados::IoCtx *ioctx = ((DirIterator*)dirp)->m_ioctx;
1303  while (iterator->get_oid().compare(iterator->get_oid().size()-17, 17, ".0000000000000000") &&
1304  iterator != ioctx->nobjects_end()) {
1305  iterator++;
1306  }
1307  if (iterator == ioctx->nobjects_end()) {
1308  buff[0] = 0;
1309  } else {
1310  int l = iterator->get_oid().size()-17;
1311  if (l < blen) blen = l;
1312  strncpy(buff, iterator->get_oid().c_str(), blen-1);
1313  buff[blen-1] = 0;
1314  iterator++;
1315  }
1316  return 0;
1317 }
1318 
1319 int ceph_posix_closedir(DIR *dirp) {
1320  delete ((DirIterator*)dirp);
1321  return 0;
1322 }
unsigned int g_cephPoolIdx
index of current Striper/IoCtx to be used
ssize_t ceph_posix_write(int fd, const void *buf, size_t count)
unsigned int getCephPoolIdxAndIncrease()
void ceph_posix_set_logfunc(void(*logfunc)(char *, va_list argp))
int ceph_posix_truncate(XrdOucEnv *env, const char *pathname, unsigned long long size)
CephFile g_defaultParams
global variable containing defaults for CephFiles
ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb)
int ceph_posix_unlink(XrdOucEnv *env, const char *pathname)
XrdOucName2Name * g_namelib
void fillCephFileParams(const std::string &params, XrdOucEnv *env, CephFile &file)
ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb)
int ceph_posix_readdir(DIR *dirp, char *buff, int blen)
std::multiset< std::string > g_filesOpenForWrite
global variable holding a list of files currently opened for write
int ceph_posix_fcntl(int fd, int cmd,...)
static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char *name, const void *value, size_t size, int flags)
ssize_t ceph_posix_getxattr(XrdOucEnv *env, const char *path, const char *name, void *value, size_t size)
DIR * ceph_posix_opendir(XrdOucEnv *env, const char *pathname)
std::vector< librados::Rados * > g_cluster
static int fillCephStripeUnit(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
static void ceph_aio_write_complete(rados_completion_t c, void *arg)
static CephFile getCephFile(const char *path, XrdOucEnv *env)
XrdSysMutex g_striper_mutex
mutex protecting the striper and ioctx maps
std::map< std::string, libradosstriper::RadosStriper * > StriperDict
ssize_t ceph_posix_read(int fd, void *buf, size_t count)
static int fillCephPool(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset)
int ceph_posix_listxattrs(XrdOucEnv *env, const char *path, XrdSysXAttr::AList **aPL, int getSz)
int ceph_posix_fstat(int fd, struct stat *buf)
static unsigned int stoui(const std::string &s)
simple integer parsing, to be replaced by std::stoi when C++11 can be used
void ceph_posix_disconnect_all()
int ceph_posix_fsync(int fd)
XrdSysMutex g_init_mutex
mutex protecting initialization of ceph clusters
int ceph_posix_closedir(DIR *dirp)
static int fillCephNbStripes(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
std::map< unsigned int, CephFileRef > g_fds
global variable holding a map of file descriptor to file reference
unsigned int g_nextCephFd
global variable remembering the next available file descriptor
static ssize_t ceph_posix_internal_getxattr(const CephFile &file, const char *name, void *value, size_t size)
std::vector< IOCtxDict > g_ioCtx
int insertFileRef(CephFileRef &fr)
librados::Rados * checkAndCreateCluster(unsigned int cephPoolIdx, std::string userId=g_defaultParams.userId)
static libradosstriper::RadosStriper * getRadosStriper(const CephFile &file)
int ceph_posix_statfs(long long *totalSpace, long long *freeSpace)
int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile &file)
unsigned int g_maxCephPoolIdx
static void fillCephObjectSize(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
static librados::IoCtx * getIoCtx(const CephFile &file)
static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags, mode_t mode, unsigned long long offset)
int ceph_posix_fremovexattr(int fd, const char *name)
void translateFileName(std::string &physName, std::string logName)
converts a logical filename to physical one if needed
int ceph_posix_close(int fd)
librados::IoCtx * m_ioctx
Definition: XrdCephPosix.cc:86
std::map< std::string, librados::IoCtx * > IOCtxDict
off_t ceph_posix_lseek(int fd, off_t offset, int whence)
void ceph_posix_set_defaults(const char *value)
void deleteFileRef(int fd, const CephFileRef &fr)
deletes a FileRef from the global table of file descriptors
int ceph_posix_fsetxattr(int fd, const char *name, const void *value, size_t size, int flags)
static void ceph_aio_read_complete(rados_completion_t c, void *arg)
int ceph_posix_ftruncate(int fd, unsigned long long size)
static int ceph_posix_internal_removexattr(const CephFile &file, const char *name)
std::vector< StriperDict > g_radosStripers
int ceph_posix_open(XrdOucEnv *env, const char *pathname, int flags, mode_t mode)
librados::NObjectIterator m_iterator
Definition: XrdCephPosix.cc:85
static unsigned long long int stoull(const std::string &s)
simple integer parsing, to be replaced by std::stoll when C++11 can be used
static void logwrapper(char *format,...)
int ceph_posix_removexattr(XrdOucEnv *env, const char *path, const char *name)
off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence)
int ceph_posix_stat(XrdOucEnv *env, const char *pathname, struct stat *buf)
bool isOpenForWrite(std::string &name)
check whether a file is open for write
static int fillCephUserId(const std::string &params, XrdOucEnv *env, CephFile &file)
void fillCephFile(const char *path, XrdOucEnv *env, CephFile &file)
fill a ceph file struct from a path and an environment
ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset)
std::string g_defaultPool
XrdSysMutex g_fd_mutex
mutex protecting the map of file descriptors and the openForWrite multiset
ssize_t ceph_posix_setxattr(XrdOucEnv *env, const char *path, const char *name, const void *value, size_t size, int flags)
static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size)
static void(* g_logfunc)(char *, va_list argp)=0
global variable for the log function
std::string g_defaultUserId
CephFileRef * getFileRef(int fd)
look for a FileRef from its file descriptor
int ceph_posix_flistxattrs(int fd, XrdSysXAttr::AList **aPL, int getSz)
ssize_t ceph_posix_fgetxattr(int fd, const char *name, void *value, size_t size)
static int ceph_posix_internal_listxattrs(const CephFile &file, XrdSysXAttr::AList **aPL, int getSz)
static off64_t lseek_compute_offset(CephFileRef &fr, off64_t offset, int whence)
void ceph_posix_freexattrlist(XrdSysXAttr::AList *aPL)
small struct for directory listing
Definition: XrdCephPosix.cc:84
void() AioCB(XrdSfsAio *, size_t)
Definition: XrdCephPosix.hh:39
int stat(const char *path, struct stat *buf)
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
if(Avsz)
char * Get(const char *varname)
Definition: XrdOucEnv.hh:69
virtual int lfn2pfn(const char *lfn, char *buff, int blen)=0
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
char Name[1]
Start of the name (size of struct is dynamic)
Definition: XrdSysXAttr.hh:56
int Vlen
The length of the attribute value;.
Definition: XrdSysXAttr.hh:54
int Nlen
The length of the attribute name that follows.
Definition: XrdSysXAttr.hh:55
AList * Next
-> next element.
Definition: XrdSysXAttr.hh:53
small struct for aio API callbacks
Definition: XrdCephPosix.cc:90
size_t nbBytes
Definition: XrdCephPosix.cc:95
::timeval startTime
Definition: XrdCephPosix.cc:97
AioCB * callback
Definition: XrdCephPosix.cc:94
AioArgs(XrdSfsAio *a, AioCB *b, size_t n, int _fd, ceph::bufferlist *_bl=0)
Definition: XrdCephPosix.cc:91
XrdSfsAio * aiop
Definition: XrdCephPosix.cc:93
ceph::bufferlist * bl
Definition: XrdCephPosix.cc:98
uint64_t bytesAsyncWritePending
Definition: XrdCephPosix.cc:70
unsigned asyncRdStartCount
Definition: XrdCephPosix.cc:74
unsigned asyncWrStartCount
Definition: XrdCephPosix.cc:76
uint64_t maxOffsetWritten
Definition: XrdCephPosix.cc:69
::timeval lastAsyncSubmission
Definition: XrdCephPosix.cc:78
double longestCallbackInvocation
Definition: XrdCephPosix.cc:80
uint64_t bytesWritten
Definition: XrdCephPosix.cc:71
uint64_t offset
Definition: XrdCephPosix.cc:66
double longestAsyncWriteTime
Definition: XrdCephPosix.cc:79
unsigned wrcount
Definition: XrdCephPosix.cc:73
unsigned rdcount
Definition: XrdCephPosix.cc:72
XrdSysMutex statsMutex
Definition: XrdCephPosix.cc:68
unsigned asyncWrCompletionCount
Definition: XrdCephPosix.cc:77
unsigned asyncRdCompletionCount
Definition: XrdCephPosix.cc:75
small structs to store file metadata
Definition: XrdCephPosix.cc:54
std::string userId
Definition: XrdCephPosix.cc:57
unsigned int nbStripes
Definition: XrdCephPosix.cc:58
std::string pool
Definition: XrdCephPosix.cc:56
unsigned long long stripeUnit
Definition: XrdCephPosix.cc:59
std::string name
Definition: XrdCephPosix.cc:55
unsigned long long objectSize
Definition: XrdCephPosix.cc:60