29 #include <sys/types.h>
37 #include <radosstriper/libradosstriper.hpp>
42 #include <sys/xattr.h>
104 typedef std::map<std::string, libradosstriper::RadosStriper*>
StriperDict;
106 typedef std::map<std::string, librados::IoCtx*>
IOCtxDict;
124 std::map<unsigned int, CephFileRef>
g_fds;
167 std::map<unsigned int, CephFileRef>::iterator it =
g_fds.find(fd);
168 if (it !=
g_fds.end()) {
173 return &(it->second);
182 if (fr.
flags & (O_WRONLY|O_RDWR)) {
185 std::map<unsigned int, CephFileRef>::iterator it =
g_fds.find(fd);
186 if (it !=
g_fds.end()) {
199 if (fr.
flags & (O_WRONLY|O_RDWR)) {
222 va_start(arg, format);
223 (*g_logfunc)(format, arg);
228 static unsigned long long int stoull(
const std::string &s) {
231 unsigned long long int res = strtoull(s.c_str(), &end, 10);
233 throw std::invalid_argument(s);
235 if (ERANGE == errno) {
236 throw std::out_of_range(s);
242 static unsigned int stoui(
const std::string &s) {
245 unsigned long int res = strtoul(s.c_str(), &end, 10);
247 throw std::invalid_argument(s);
249 if (ERANGE == errno || res > std::numeric_limits<unsigned int>::max()) {
250 throw std::out_of_range(s);
252 return (
unsigned int)res;
261 size_t atPos = params.find(
'@');
262 if (std::string::npos != atPos) {
263 file.
userId = params.substr(0, atPos);
267 char* cuser = env->
Get(
"cephUserId");
282 size_t comPos = params.find(
',', offset);
283 if (std::string::npos == comPos) {
284 if (params.size() == offset) {
286 char* cpool = env->
Get(
"cephPool");
292 file.
pool = params.substr(offset);
294 return params.size();
296 file.
pool = params.substr(offset, comPos-offset);
308 size_t comPos = params.find(
',', offset);
309 if (std::string::npos == comPos) {
310 if (params.size() == offset) {
312 char* cNbStripes = env->
Get(
"cephNbStripes");
313 if (0 != cNbStripes) {
320 return params.size();
334 size_t comPos = params.find(
',', offset);
335 if (std::string::npos == comPos) {
336 if (params.size() == offset) {
338 char* cStripeUnit = env->
Get(
"cephStripeUnit");
339 if (0 != cStripeUnit) {
346 return params.size();
360 if (params.size() == offset) {
362 char* cObjectSize = env->
Get(
"cephObjectSize");
363 if (0 != cObjectSize) {
377 unsigned int afterPool =
fillCephPool(params, afterUser, env, file);
397 char physCName[MAXPATHLEN+1];
398 int retc =
g_namelib->
lfn2pfn(logName.c_str(), physCName,
sizeof(physCName));
400 logwrapper((
char*)
"ceph_namelib : failed to translate %s using namelib plugin, using it as is", logName.c_str());
403 physName = physCName;
420 std::string spath = path;
421 size_t colonPos = spath.find(
':');
422 if (std::string::npos == colonPos) {
439 mode_t mode,
unsigned long long offset) {
465 librados::Rados *cluster =
new librados::Rados;
469 int rc = cluster->init(userId.c_str());
471 logwrapper((
char*)
"checkAndCreateCluster : cluster init failed");
475 rc = cluster->conf_read_file(NULL);
477 logwrapper((
char*)
"checkAndCreateCluster : cluster read config failed, rc = %d", rc);
482 cluster->conf_parse_env(NULL);
483 rc = cluster->connect();
485 logwrapper((
char*)
"checkAndCreateCluster : cluster connect failed, rc = %d", rc);
497 StriperDict::iterator it = sDict.find(userAtPool);
498 if (it == sDict.end()) {
503 logwrapper((
char*)
"checkAndCreateStriper : checkAndCreateCluster failed");
507 librados::IoCtx *ioctx =
new librados::IoCtx;
509 logwrapper((
char*)
"checkAndCreateStriper : IoCtx instantiation failed");
515 int rc =
g_cluster[cephPoolIdx]->ioctx_create(file.
pool.c_str(), *ioctx);
517 logwrapper((
char*)
"checkAndCreateStriper : ioctx_create failed, rc = %d", rc);
525 libradosstriper::RadosStriper *striper =
new libradosstriper::RadosStriper;
527 logwrapper((
char*)
"checkAndCreateStriper : RadosStriper instantiation failed");
534 rc = libradosstriper::RadosStriper::striper_create(*ioctx, striper);
536 logwrapper((
char*)
"checkAndCreateStriper : striper_create failed, rc = %d", rc);
545 rc = striper->set_object_layout_stripe_count(file.
nbStripes);
555 rc = striper->set_object_layout_stripe_unit(file.
stripeUnit);
557 logwrapper((
char*)
"checkAndCreateStriper : invalid stripeUnit %d (must be non 0, multiple of 64K)", file.
stripeUnit);
565 rc = striper->set_object_layout_object_size(file.
objectSize);
567 logwrapper((
char*)
"checkAndCreateStriper : invalid objectSize %d (must be non 0, multiple of stripe_unit)", file.
objectSize);
576 ioDict.emplace(userAtPool, ioctx);
577 sDict.emplace(userAtPool, striper);
584 std::stringstream ss;
587 std::string userAtPool = ss.str();
590 logwrapper((
char*)
"getRadosStriper : checkAndCreateStriper failed");
598 std::stringstream ss;
601 std::string userAtPool = ss.str();
606 return g_ioCtx[cephPoolIdx][userAtPool];
617 for (IOCtxDict::iterator it2 =
g_ioCtx[i].begin();
656 if (NULL == striper) {
661 int rc = striper->stat(fr.
name, (uint64_t*)&(buf.st_size), &(buf.st_atime));
664 bool fileExists = (rc != -ENOENT);
666 if ((flags&O_ACCMODE) == O_RDONLY) {
670 logwrapper((
char*)
"File descriptor %d associated to file %s opened in read mode", fd, pathname);
678 if (flags & O_TRUNC) {
680 if (rc < 0 && rc != -ENOENT) {
689 logwrapper((
char*)
"File descriptor %d associated to file %s opened in write mode", fd, pathname);
700 ::gettimeofday(&now,
nullptr);
702 double lastAsyncAge = 0.0;
708 logwrapper((
char*)
"ceph_close: closed fd %d for file %s, read ops count %d, write ops count %d, "
709 "async write ops %d/%d, async pending write bytes %ld, "
710 "async read ops %d/%d, bytes written/max offset %ld/%ld, "
711 "longest async write %f, longest callback invocation %f, last async op age %f",
740 logwrapper((
char*)
"ceph_lseek: for fd %d, offset=%lld, whence=%d", fd, offset, whence);
750 logwrapper((
char*)
"ceph_lseek64: for fd %d, offset=%lld, whence=%d", fd, offset, whence);
760 logwrapper((
char*)
"ceph_write: for fd %d, count=%d", fd, count);
761 if ((fr->
flags & (O_WRONLY|O_RDWR)) == 0) {
769 bl.append((
const char*)buf, count);
770 int rc = striper->write(fr->
name, bl, count, fr->
offset);
788 if ((fr->
flags & (O_WRONLY|O_RDWR)) == 0) {
796 bl.append((
const char*)buf, count);
797 int rc = striper->write(fr->
name, bl, count, offset);
811 size_t rc = rados_aio_get_return_value(c);
823 ::gettimeofday(&now,
nullptr);
824 double writeTime = 0.000001 * (now.tv_usec - awa->
startTime.tv_usec) + 1.0 * (now.tv_sec - awa->
startTime.tv_sec);
827 ::timeval before, after;
828 if (fr) ::gettimeofday(&before,
nullptr);
831 ::gettimeofday(&after,
nullptr);
832 double callbackInvocationTime = 0.000001 * (after.tv_usec - before.tv_usec) + 1.0 * (after.tv_sec - before.tv_sec);
848 if ((fr->
flags & (O_WRONLY|O_RDWR)) == 0) {
858 bl.append(buf, count);
868 librados::AioCompletion *completion =
871 int rc = striper->aio_write(fr->
name, completion, bl, count, offset);
872 completion->release();
888 if ((fr->
flags & O_WRONLY) != 0) {
896 int rc = striper->read(fr->
name, &bl, count, fr->
offset);
897 if (rc < 0)
return rc;
898 bl.begin().copy(rc, (
char*)buf);
913 if ((fr->
flags & O_WRONLY) != 0) {
921 int rc = striper->read(fr->
name, &bl, count, offset);
922 if (rc < 0)
return rc;
923 bl.begin().copy(rc, (
char*)buf);
934 size_t rc = rados_aio_get_return_value(c);
961 if ((fr->
flags & O_WRONLY) != 0) {
970 ceph::bufferlist *bl =
new ceph::bufferlist();
980 librados::AioCompletion *completion =
983 int rc = striper->aio_read(fr->
name, completion, bl, count, offset);
984 completion->release();
1002 logwrapper((
char*)
"ceph_stat: getRadosStriper failed");
1005 memset(buf, 0,
sizeof(*buf));
1006 int rc = striper->stat(fr->
name, (uint64_t*)&(buf->st_size), &(buf->st_atime));
1010 buf->st_mtime = buf->st_atime;
1011 buf->st_ctime = buf->st_atime;
1012 buf->st_mode = 0666 | S_IFREG;
1020 logwrapper((
char*)
"ceph_stat: %s", pathname);
1029 memset(buf, 0,
sizeof(*buf));
1030 int rc = striper->stat(file.
name, (uint64_t*)&(buf->st_size), &(buf->st_atime));
1036 buf->st_atime = time(NULL);
1041 buf->st_mtime = buf->st_atime;
1042 buf->st_ctime = buf->st_atime;
1043 buf->st_mode = 0666 | S_IFREG;
1061 logwrapper((
char*)
"ceph_fcntl: fd %d cmd=%d", fd, cmd);
1075 void* value,
size_t size) {
1080 ceph::bufferlist bl;
1081 int rc = striper->getxattr(file.
name, name, bl);
1082 if (rc < 0)
return rc;
1083 size_t returned_size = (size_t)rc<size?rc:size;
1084 bl.begin().copy(returned_size, (
char*)value);
1085 return returned_size;
1089 const char* name,
void* value,
1091 logwrapper((
char*)
"ceph_getxattr: path %s name=%s", path, name);
1096 void* value,
size_t size) {
1099 logwrapper((
char*)
"ceph_fgetxattr: fd %d name=%s", fd, name);
1107 const void* value,
size_t size,
int flags) {
1112 ceph::bufferlist bl;
1113 bl.append((
const char*)value, size);
1114 int rc = striper->setxattr(file.
name, name, bl);
1122 const char* name,
const void* value,
1123 size_t size,
int flags) {
1124 logwrapper((
char*)
"ceph_setxattr: path %s name=%s value=%s", path, name, value);
1129 const char* name,
const void* value,
1130 size_t size,
int flags) {
1133 logwrapper((
char*)
"ceph_fsetxattr: fd %d name=%s value=%s", fd, name, value);
1145 int rc = striper->rmxattr(file.
name, name);
1154 logwrapper((
char*)
"ceph_removexattr: path %s name=%s", path, name);
1161 logwrapper((
char*)
"ceph_fremovexattr: fd %d name=%s", fd, name);
1174 std::map<std::string, ceph::bufferlist> attrset;
1175 int rc = striper->getxattrs(file.
name, attrset);
1182 for (std::map<std::string, ceph::bufferlist>::const_iterator it = attrset.begin();
1183 it != attrset.end();
1186 newItem->
Next = *aPL;
1187 newItem->
Vlen = it->second.length();
1188 if (newItem->
Vlen > maxSize) {
1189 maxSize = newItem->
Vlen;
1191 newItem->
Nlen = it->first.size();
1192 strncpy(newItem->
Name, it->first.c_str(), newItem->
Vlen+1);
1203 logwrapper((
char*)
"ceph_listxattrs: path %s", path);
1210 logwrapper((
char*)
"ceph_flistxattrs: fd %d", fd);
1236 librados::cluster_stat_t result;
1237 int rc = cluster->cluster_stat(result);
1239 *totalSpace = result.kb * 1024;
1240 *freeSpace = result.kb_avail * 1024;
1250 return striper->trunc(file.
name, size);
1256 logwrapper((
char*)
"ceph_posix_ftruncate: fd %d, size %d", fd, size);
1264 logwrapper((
char*)
"ceph_posix_truncate : %s", pathname);
1271 logwrapper((
char*)
"ceph_posix_unlink : %s", pathname);
1278 return striper->remove(file.
name);
1282 logwrapper((
char*)
"ceph_posix_opendir : %s", pathname);
1285 if (file.
name.size() != 1 || file.
name[0] !=
'/') {
1289 librados::IoCtx *ioctx =
getIoCtx(file);
1301 librados::NObjectIterator &iterator = ((
DirIterator*)dirp)->m_iterator;
1302 librados::IoCtx *ioctx = ((
DirIterator*)dirp)->m_ioctx;
1303 while (iterator->get_oid().compare(iterator->get_oid().size()-17, 17,
".0000000000000000") &&
1304 iterator != ioctx->nobjects_end()) {
1307 if (iterator == ioctx->nobjects_end()) {
1310 int l = iterator->get_oid().size()-17;
1311 if (l < blen) blen = l;
1312 strncpy(buff, iterator->get_oid().c_str(), blen-1);
unsigned int g_cephPoolIdx
index of current Striper/IoCtx to be used
ssize_t ceph_posix_write(int fd, const void *buf, size_t count)
unsigned int getCephPoolIdxAndIncrease()
void ceph_posix_set_logfunc(void(*logfunc)(char *, va_list argp))
int ceph_posix_truncate(XrdOucEnv *env, const char *pathname, unsigned long long size)
CephFile g_defaultParams
global variable containing defaults for CephFiles
ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb)
int ceph_posix_unlink(XrdOucEnv *env, const char *pathname)
XrdOucName2Name * g_namelib
void fillCephFileParams(const std::string ¶ms, XrdOucEnv *env, CephFile &file)
ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb)
int ceph_posix_readdir(DIR *dirp, char *buff, int blen)
std::multiset< std::string > g_filesOpenForWrite
global variable holding a list of files currently opened for write
int ceph_posix_fcntl(int fd, int cmd,...)
static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char *name, const void *value, size_t size, int flags)
ssize_t ceph_posix_getxattr(XrdOucEnv *env, const char *path, const char *name, void *value, size_t size)
DIR * ceph_posix_opendir(XrdOucEnv *env, const char *pathname)
std::vector< librados::Rados * > g_cluster
static int fillCephStripeUnit(const std::string ¶ms, unsigned int offset, XrdOucEnv *env, CephFile &file)
static void ceph_aio_write_complete(rados_completion_t c, void *arg)
static CephFile getCephFile(const char *path, XrdOucEnv *env)
XrdSysMutex g_striper_mutex
mutex protecting the striper and ioctx maps
std::map< std::string, libradosstriper::RadosStriper * > StriperDict
ssize_t ceph_posix_read(int fd, void *buf, size_t count)
static int fillCephPool(const std::string ¶ms, unsigned int offset, XrdOucEnv *env, CephFile &file)
ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset)
int ceph_posix_listxattrs(XrdOucEnv *env, const char *path, XrdSysXAttr::AList **aPL, int getSz)
int ceph_posix_fstat(int fd, struct stat *buf)
static unsigned int stoui(const std::string &s)
simple integer parsing, to be replaced by std::stoi when C++11 can be used
void ceph_posix_disconnect_all()
int ceph_posix_fsync(int fd)
XrdSysMutex g_init_mutex
mutex protecting initialization of ceph clusters
int ceph_posix_closedir(DIR *dirp)
static int fillCephNbStripes(const std::string ¶ms, unsigned int offset, XrdOucEnv *env, CephFile &file)
std::map< unsigned int, CephFileRef > g_fds
global variable holding a map of file descriptor to file reference
unsigned int g_nextCephFd
global variable remembering the next available file descriptor
static ssize_t ceph_posix_internal_getxattr(const CephFile &file, const char *name, void *value, size_t size)
std::vector< IOCtxDict > g_ioCtx
int insertFileRef(CephFileRef &fr)
librados::Rados * checkAndCreateCluster(unsigned int cephPoolIdx, std::string userId=g_defaultParams.userId)
static libradosstriper::RadosStriper * getRadosStriper(const CephFile &file)
int ceph_posix_statfs(long long *totalSpace, long long *freeSpace)
int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile &file)
unsigned int g_maxCephPoolIdx
static void fillCephObjectSize(const std::string ¶ms, unsigned int offset, XrdOucEnv *env, CephFile &file)
static librados::IoCtx * getIoCtx(const CephFile &file)
static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags, mode_t mode, unsigned long long offset)
int ceph_posix_fremovexattr(int fd, const char *name)
void translateFileName(std::string &physName, std::string logName)
converts a logical filename to physical one if needed
int ceph_posix_close(int fd)
librados::IoCtx * m_ioctx
std::map< std::string, librados::IoCtx * > IOCtxDict
off_t ceph_posix_lseek(int fd, off_t offset, int whence)
void ceph_posix_set_defaults(const char *value)
void deleteFileRef(int fd, const CephFileRef &fr)
deletes a FileRef from the global table of file descriptors
int ceph_posix_fsetxattr(int fd, const char *name, const void *value, size_t size, int flags)
static void ceph_aio_read_complete(rados_completion_t c, void *arg)
int ceph_posix_ftruncate(int fd, unsigned long long size)
static int ceph_posix_internal_removexattr(const CephFile &file, const char *name)
std::vector< StriperDict > g_radosStripers
int ceph_posix_open(XrdOucEnv *env, const char *pathname, int flags, mode_t mode)
librados::NObjectIterator m_iterator
static unsigned long long int stoull(const std::string &s)
simple integer parsing, to be replaced by std::stoll when C++11 can be used
static void logwrapper(char *format,...)
int ceph_posix_removexattr(XrdOucEnv *env, const char *path, const char *name)
off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence)
int ceph_posix_stat(XrdOucEnv *env, const char *pathname, struct stat *buf)
bool isOpenForWrite(std::string &name)
check whether a file is open for write
static int fillCephUserId(const std::string ¶ms, XrdOucEnv *env, CephFile &file)
void fillCephFile(const char *path, XrdOucEnv *env, CephFile &file)
fill a ceph file struct from a path and an environment
ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset)
std::string g_defaultPool
XrdSysMutex g_fd_mutex
mutex protecting the map of file descriptors and the openForWrite multiset
ssize_t ceph_posix_setxattr(XrdOucEnv *env, const char *path, const char *name, const void *value, size_t size, int flags)
static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size)
static void(* g_logfunc)(char *, va_list argp)=0
global variable for the log function
std::string g_defaultUserId
CephFileRef * getFileRef(int fd)
look for a FileRef from its file descriptor
int ceph_posix_flistxattrs(int fd, XrdSysXAttr::AList **aPL, int getSz)
ssize_t ceph_posix_fgetxattr(int fd, const char *name, void *value, size_t size)
static int ceph_posix_internal_listxattrs(const CephFile &file, XrdSysXAttr::AList **aPL, int getSz)
static off64_t lseek_compute_offset(CephFileRef &fr, off64_t offset, int whence)
void ceph_posix_freexattrlist(XrdSysXAttr::AList *aPL)
small struct for directory listing
void() AioCB(XrdSfsAio *, size_t)
int stat(const char *path, struct stat *buf)
char * Get(const char *varname)
virtual int lfn2pfn(const char *lfn, char *buff, int blen)=0
char Name[1]
Start of the name (size of struct is dynamic)
int Vlen
The length of the attribute value;.
int Nlen
The length of the attribute name that follows.
AList * Next
-> next element.
small struct for aio API callbacks
AioArgs(XrdSfsAio *a, AioCB *b, size_t n, int _fd, ceph::bufferlist *_bl=0)
uint64_t bytesAsyncWritePending
unsigned asyncRdStartCount
unsigned asyncWrStartCount
uint64_t maxOffsetWritten
::timeval lastAsyncSubmission
double longestCallbackInvocation
double longestAsyncWriteTime
unsigned asyncWrCompletionCount
unsigned asyncRdCompletionCount
small structs to store file metadata
unsigned long long stripeUnit
unsigned long long objectSize