8 #ifndef SRC_XRDCL_XRDCLECHANDLER_HH_
9 #define SRC_XRDCL_XRDCLECHANDLER_HH_
56 std::vector<FreeSpace> ServerList;
57 std::vector<std::string> ExportPaths;
58 time_t lastUpdateT = 0;
61 bool initExportPaths =
false;
63 void TryInitExportPaths();
64 uint64_t GetFreeSpace(
const std::string addr);
66 void UpdateSpaceInfo();
98 std::vector<uint32_t> cksums;
102 cksums.reserve( nbpages );
104 size_t size = chunk->
length;
105 char *buffer =
reinterpret_cast<char*
>( chunk->
buffer );
107 for(
size_t pg = 0; pg < nbpages; ++pg )
110 if( pgsize > size ) pgsize = size;
112 cksums.push_back( crcval );
120 response->
Set( pages );
132 std::unique_ptr<CheckSumHelper> cksHelper ) : redir( redir ),
136 cksHelper( std::move( cksHelper ) )
156 if( objcfg->plgr.empty() )
159 if( !st.
IsOK() )
return st;
162 writer->Open( handler, timeout );
171 if( objcfg->plgr.empty() )
174 if( !st.
IsOK() )
return st;
177 reader->Open( handler, timeout );
190 (void)url; (void)mode;
191 return Open( flags, handler, timeout );
206 if( st->
IsOK() &&
bool( cksHelper ) )
208 std::string commit = redir.GetPath()
209 +
"?xrdec.objid=" + objcfg->obj
210 +
"&xrdec.close=true&xrdec.size=" + std::to_string( curroff );
213 std::string ckstype = cksHelper->GetType();
215 auto st = cksHelper->GetCheckSum( cksval, ckstype );
218 handler->HandleResponse( new XRootDStatus( st ), nullptr );
221 commit +=
"&xrdec.cksum=" + cksval;
238 handler->HandleResponse( st, rsp );
240 return XRootDStatus();
254 if( !objcfg->nomtfile )
255 return fs.Stat( redir.GetPath(), handler, timeout );
257 if( !force && statcache )
259 auto rsp = StatRsp( statcache->GetSize() );
260 Schedule( handler, rsp );
267 statcache->SetSize( writer->GetSize() );
268 auto rsp = StatRsp( statcache->GetSize() );
269 Schedule( handler, rsp );
276 statcache->SetSize( reader->GetSize() );
277 auto rsp = StatRsp( statcache->GetSize() );
278 Schedule( handler, rsp );
296 reader->Read( offset, size, buffer, handler, timeout );
323 cksHelper->Update( buffer, size );
327 writer->Write( size, buffer, handler );
338 std::vector<uint32_t> &cksums,
340 uint16_t timeout = 0 )
342 if(! cksums.empty() )
344 const char *data =
static_cast<const char*
>( buffer );
345 std::vector<uint32_t> local_cksums;
347 if (data)
delete data;
348 if (local_cksums != cksums)
351 return Write(offset, size, buffer, handler, timeout);
359 return writer || reader;
368 std::unique_ptr<LocationInfo> ptr( infoAll );
369 if( !st.
IsOK() )
return st;
372 std::unique_ptr<LocationInfo> ptr1( info );
377 if( info->
GetSize() < objcfg->nbchunks )
379 unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
380 shuffle (info->
Begin(), info->
End(), std::default_random_engine(seed));
381 for(
size_t i = 0; i < objcfg->nbchunks; ++i )
383 auto &location = info->
At( i );
384 objcfg->plgr.emplace_back(
"root://" + location.GetAddress() +
'/' );
386 return XRootDStatus();
389 inline XRootDStatus LoadPlacement(
const std::string &path )
391 LocationInfo *info =
nullptr;
393 std::unique_ptr<LocationInfo> ptr( info );
394 if( !st.IsOK() )
return st;
396 if( info->GetSize() < objcfg->nbdata )
399 uint64_t verNumMax = 0;
400 std::vector<uint64_t> verNums;
401 std::vector<std::string> xattrkeys;
402 std::vector<XrdCl::XAttr> xattrvals;
403 xattrkeys.push_back(
"xrdec.strpver");
404 for(
size_t i = 0; i < info->GetSize(); ++i )
406 FileSystem *fs_i =
new FileSystem(info->At( i ).GetAddress());
408 st = fs_i->GetXAttr(path, xattrkeys, xattrvals, 0);
409 if (st.IsOK() && ! xattrvals[0].value.empty())
411 std::stringstream sstream(xattrvals[0].value);
414 verNums.push_back(verNum);
415 if (verNum > verNumMax)
419 verNums.push_back(0);
424 for(
size_t i = 0; i < info->GetSize(); ++i )
426 if ( verNums.at(i) == 0 || verNums.at(i) != verNumMax )
430 auto &location = info->At( i );
431 objcfg->plgr.emplace_back(
"root://" + location.GetAddress() +
'/' );
433 if (n < objcfg->nbdata )
435 return XRootDStatus();
438 inline static AnyObject* StatRsp( uint64_t size )
440 StatInfo *info =
new StatInfo();
441 info->SetSize( size );
442 AnyObject *rsp =
new AnyObject();
447 inline static void Schedule( ResponseHandler *handler, AnyObject *rsp )
449 ResponseJob *job =
new ResponseJob( handler,
new XRootDStatus(), rsp,
nullptr );
455 std::unique_ptr<XrdEc::ObjCfg> objcfg;
456 std::unique_ptr<XrdEc::StrmWriter> writer;
457 std::unique_ptr<XrdEc::Reader> reader;
459 std::unique_ptr<CheckSumHelper> cksHelper;
460 std::unique_ptr<StatInfo> statcache;
474 std::vector<std::string> && plgr ) :
475 nbdta( nbdta ), nbprt( nbprt ), chsz( chsz ), plgr( std::move( plgr ) )
494 objcfg->
plgr = std::move( plgr );
495 return new EcHandler( url, objcfg,
nullptr );
510 std::vector<std::string> plgr;
513 EcHandler*
GetEcHandler(
const URL &headnode,
const URL &redirurl );
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void FromString(const std::string str)
Fill the buffer from a string.
static PostMaster * GetPostMaster()
Get default post master.
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout)
XRootDStatus PgWrite(uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout)
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
EcHandler(const URL &redir, XrdEc::ObjCfg *objcfg, std::unique_ptr< CheckSumHelper > cksHelper)
EcPgReadResponseHandler(ResponseHandler *a)
void HandleResponse(XRootDStatus *status, AnyObject *rdresp)
EcPlugInFactory(uint8_t nbdta, uint8_t nbprt, uint64_t chsz, std::vector< std::string > &&plgr)
Constructor.
virtual FilePlugIn * CreateFile(const std::string &u)
Create a file plug-in for the given URL.
virtual FileSystemPlugIn * CreateFileSystem(const std::string &url)
Create a file system plug-in for the given URL.
virtual ~EcPlugInFactory()
Destructor.
An interface for file plug-ins.
An interface for file plug-ins.
XRootDStatus Query(QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool operator<(const FreeSpace &a) const
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
uint32_t GetSize() const
Get number of locations.
Iterator Begin()
Get the location begin iterator.
Location & At(uint32_t index)
Get the location at index.
Iterator End()
Get the location end iterator.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
void SelectLocations(XrdCl::LocationInfo &oldList, XrdCl::LocationInfo &newList, uint32_t n)
const std::string & GetPath() const
Get the path.
static Config & Instance()
Singleton access.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
const uint16_t errInternal
Internal error.
const uint16_t errInvalidOp
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
const uint16_t errNotSupported
static const int PageSize
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
@ Update
Open for reading and writing.
@ OpaqueFile
Implementation dependent.
bool IsOK() const
We're fine.
std::vector< std::string > plgr