59 #include <uuid/uuid.h>
69 friend class PgReadRetryHandler;
76 PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
78 uint64_t orgOffset ) :
79 stateHandler( stateHandler ),
80 userHandler( userHandler ),
81 orgOffset( orgOffset ),
95 using namespace XrdCl;
97 std::unique_lock<std::mutex> lck( mtx );
105 if( !status->
IsOK() )
120 PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
122 userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
125 userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
136 if( !status->
IsOK() )
141 userHandler->HandleResponseWithHosts( status, response, hostList );
153 response->
Get( pginf );
157 std::vector<uint32_t> &cksums = pginf->
GetCksums();
158 char *buffer =
reinterpret_cast<char*
>( pginf->
GetBuffer() );
161 if( pgsize > bytesRead ) pgsize = bytesRead;
163 for(
size_t pgnb = 0; pgnb < nbpages; ++pgnb )
166 if( crcval != cksums[pgnb] )
168 Log *log = DefaultEnv::GetLog();
169 log->
Info(
FileMsg,
"[%p@%s] Received corrupted page, will retry page #%zu.",
170 this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
185 if( pgsize > bytesRead ) pgsize = bytesRead;
194 userHandler->HandleResponseWithHosts( status, response, hostList );
203 resp.reset( response );
204 hosts.reset( hostList );
208 void UpdateCksum(
size_t pgnb, uint32_t crcval )
220 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
224 std::unique_ptr<XrdCl::AnyObject> resp;
225 std::unique_ptr<XrdCl::HostList> hosts;
226 std::unique_ptr<XrdCl::XRootDStatus> st;
242 PgReadRetryHandler( PgReadHandler *pgReadHandler,
size_t pgnb ) : pgReadHandler( pgReadHandler ),
255 using namespace XrdCl;
257 if( !status->
IsOK() )
259 Log *log = DefaultEnv::GetLog();
260 log->
Info(
FileMsg,
"[%p@%s] Failed to recover page #%zu.",
261 this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262 pgReadHandler->HandleResponseWithHosts( status, response, hostList );
268 response->
Get( pginf );
271 Log *log = DefaultEnv::GetLog();
272 log->
Info(
FileMsg,
"[%p@%s] Failed to recover page #%zu.",
273 this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
275 DeleteArgs( status, response, hostList );
282 if( crcval != pginf->
GetCksums().front() )
284 Log *log = DefaultEnv::GetLog();
285 log->
Info(
FileMsg,
"[%p@%s] Failed to recover page #%zu.",
286 this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287 DeleteArgs( status, response, hostList );
293 Log *log = DefaultEnv::GetLog();
294 log->
Info(
FileMsg,
"[%p@%s] Successfully recovered page #%zu.",
295 this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
297 DeleteArgs( 0, response, hostList );
298 pgReadHandler->UpdateCksum( pgnb, crcval );
299 pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
314 PgReadHandler *pgReadHandler;
330 stateHandler( stateHandler ),
331 userHandler( userHandler )
342 if( !status->
IsOK() )
349 using namespace XrdCl;
352 rdresp->
Get( chunk );
354 std::vector<uint32_t> cksums;
355 if( stateHandler->pIsChannelEncrypted )
360 cksums.reserve( nbpages );
362 size_t size = chunk->
length;
363 char *buffer =
reinterpret_cast<char*
>( chunk->
buffer );
365 for(
size_t pg = 0; pg < nbpages; ++pg )
368 if( pgsize > size ) pgsize = size;
370 cksums.push_back( crcval );
377 chunk->
buffer, std::move( cksums ) );
380 response->
Set( pages );
381 userHandler->HandleResponseWithHosts( status, response, hostList );
388 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
402 OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
404 pStateHandler( stateHandler ),
405 pUserHandler( userHandler )
416 using namespace XrdCl;
423 response->
Get( openInfo );
435 pStateHandler->pPlugin = ecHandler;
436 ecHandler->
Open( pStateHandler->pOpenFlags, pUserHandler, 0 );
444 pStateHandler->OnOpen( status, openInfo, hostList );
447 pUserHandler->HandleResponseWithHosts( status, 0, hostList );
457 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
471 CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
474 pStateHandler( stateHandler ),
475 pUserHandler( userHandler ),
483 virtual ~CloseHandler()
495 pStateHandler->OnClose( status );
497 pUserHandler->HandleResponseWithHosts( status, response, hostList );
509 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
523 StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
527 pStateHandler( stateHandler ),
528 pUserHandler( userHandler ),
530 pSendParams( sendParams )
537 virtual ~StatefulHandler()
540 delete pSendParams.chunkList;
541 delete pSendParams.kbuff;
551 using namespace XrdCl;
552 std::unique_ptr<AnyObject> responsePtr( response );
553 pSendParams.hostList = hostList;
558 if( !status->
IsOK() )
567 responsePtr.release();
570 pUserHandler->HandleResponseWithHosts( status, response, hostList );
589 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
606 buffer( std::move( buffer ) ),
642 pFileState( Closed ),
648 pWrtRecoveryRedir( 0 ),
653 pDoRecoverRead( true ),
654 pDoRecoverWrite( true ),
655 pFollowRedirects( true ),
656 pUseVirtRedirector( true ),
657 pIsChannelEncrypted( false ),
658 pAllowBundledClose( false ),
661 pFileHandle =
new uint8_t[4];
662 ResetMonitoringVars();
675 pFileState( Closed ),
681 pWrtRecoveryRedir( 0 ),
686 pDoRecoverRead( true ),
687 pDoRecoverWrite( true ),
688 pFollowRedirects( true ),
689 pUseVirtRedirector( useVirtRedirector ),
690 pAllowBundledClose( false ),
693 pFileHandle =
new uint8_t[4];
694 ResetMonitoringVars();
725 ResetMonitoringVars();
739 delete pLoadBalancer;
740 delete [] pFileHandle;
741 delete pLFileHandler;
748 const std::string &url,
759 if( self->pFileState ==
Error )
760 return self->pStatus;
778 if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
781 registry.
Release( *self->pFileUrl );
783 delete self->pFileUrl;
787 self->pFileUrl =
new URL( url );
795 char requuid[37]= {0};
796 uuid_generate( uuid );
797 uuid_unparse( uuid, requuid );
798 cgi[
"xrdcl.requuid"] = requuid;
799 self->pFileUrl->SetParams( cgi );
801 if( !self->pFileUrl->IsValid() )
803 log->
Error(
FileMsg,
"[%p@%s] Trying to open invalid url: %s",
804 self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
806 self->pFileState =
Closed;
807 return self->pStatus;
814 URL::ParamsMap::const_iterator it;
815 it = urlParams.find(
"xrdcl.recover-reads" );
816 if( (it != urlParams.end() && it->second ==
"false") ||
817 !self->pDoRecoverRead )
819 self->pDoRecoverRead =
false;
820 log->
Debug(
FileMsg,
"[%p@%s] Read recovery procedures are disabled",
821 self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
824 it = urlParams.find(
"xrdcl.recover-writes" );
825 if( (it != urlParams.end() && it->second ==
"false") ||
826 !self->pDoRecoverWrite )
828 self->pDoRecoverWrite =
false;
829 log->
Debug(
FileMsg,
"[%p@%s] Write recovery procedures are disabled",
830 self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
836 log->
Debug(
FileMsg,
"[%p@%s] Sending an open command",
self.get(),
837 self->pFileUrl->GetObfuscatedURL().c_str() );
839 self->pOpenMode = mode;
840 self->pOpenFlags = flags;
841 OpenHandler *openHandler =
new OpenHandler(
self, handler );
845 std::string path =
self->pFileUrl->GetPathWithFilteredParams();
851 req->
dlen = path.length();
852 msg->
Append( path.c_str(), path.length(), 24 );
859 XRootDStatus st =
self->IssueRequest( *self->pFileUrl, msg, openHandler, params );
865 self->pFileState =
Closed;
883 if( self->pFileState ==
Error )
884 return self->pStatus;
889 if( self->pFileState ==
Closed )
895 if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
901 log->
Debug(
FileMsg,
"[%p@%s] Sending a close command for handle %#x to %s",
902 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
903 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
913 memcpy( req->
fhandle, self->pFileHandle, 4 );
917 CloseHandler *closeHandler =
new CloseHandler(
self, handler, msg );
924 XRootDStatus st =
self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
934 self->pFileState =
Closed;
943 self->pFileState =
Error;
959 if( self->pFileState ==
Error )
return self->pStatus;
977 log->
Debug(
FileMsg,
"[%p@%s] Sending a stat command for handle %#x to %s",
978 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
979 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
988 std::string path =
self->pFileUrl->GetPath();
992 memcpy( req->
fhandle, self->pFileHandle, 4 );
1001 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1003 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1018 if( self->pFileState ==
Error )
return self->pStatus;
1024 log->
Debug(
FileMsg,
"[%p@%s] Sending a read command for handle %#x to %s",
1025 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1026 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1035 memcpy( req->
fhandle, self->pFileHandle, 4 );
1038 list->push_back(
ChunkInfo( offset, size, buffer ) );
1047 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1049 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1062 int issupported =
true;
1075 issupported =
false;
1080 self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1082 auto st =
Read(
self, offset, size, buffer, substitHandler, timeout );
1083 if( !st.
IsOK() )
delete substitHandler;
1087 ResponseHandler* pgHandler =
new PgReadHandler(
self, handler, offset );
1089 if( !st.
IsOK() )
delete pgHandler;
1098 PgReadHandler *handler,
1103 "PgRead retry size exceeded 4KB." );
1105 ResponseHandler *retryHandler =
new PgReadRetryHandler( handler, pgnb );
1107 if( !st.
IsOK() )
delete retryHandler;
1121 if( self->pFileState ==
Error )
return self->pStatus;
1127 log->
Debug(
FileMsg,
"[%p@%s] Sending a pgread command for handle %#x to %s",
1128 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1129 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1138 memcpy( req->
fhandle, self->pFileHandle, 4 );
1151 list->push_back(
ChunkInfo( offset, size, buffer ) );
1160 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1162 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1177 if( self->pFileState ==
Error )
return self->pStatus;
1183 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
1184 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1185 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1194 memcpy( req->
fhandle, self->pFileHandle, 4 );
1197 list->push_back(
ChunkInfo( 0, size, (
char*)buffer ) );
1208 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1210 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1229 log->
Info(
FileMsg,
"[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1230 "cannot convert it to kernel space buffer.",
self.get(),
1231 self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1233 void *buff = buffer.GetBuffer();
1234 uint32_t size = buffer.GetSize();
1235 ReleaseBufferHandler *wrtHandler =
1236 new ReleaseBufferHandler( std::move( buffer ), handler );
1237 XRootDStatus st =
self->Write(
self, offset, size, buff, wrtHandler, timeout );
1240 buffer = std::move( wrtHandler->GetBuffer() );
1249 uint32_t length = buffer.GetSize();
1250 char *ubuff = buffer.Release();
1260 return WriteKernelBuffer(
self, offset, ret, std::move( kbuff ), handler, timeout );
1278 ssize_t ret = fdoff ?
XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1286 return WriteKernelBuffer(
self, offset, ret, std::move( kbuff ), handler, timeout );
1296 std::vector<uint32_t> &cksums,
1313 if( cksums.empty() )
1315 const char *data =
static_cast<const char*
>( buffer );
1321 if( crc32cCnt != cksums.size() )
1344 static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1346 if( pgoff == offset )
return 0;
1352 if( !status ) status = s;
1359 auto pgwrt = std::make_shared<pgwrt_t>( handler );
1363 uint32_t fstpglen = fLen;
1365 time_t start = ::time(
nullptr );
1368 std::unique_ptr<AnyObject> scoped( r );
1373 pgwrt->SetStatus( s );
1382 pgwrt->SetStatus( s );
1387 uint16_t elapsed = ::time(
nullptr ) - start;
1388 if( elapsed >= timeout )
1393 else timeout -= elapsed;
1395 for(
size_t i = 0; i < inf->
Size(); ++i )
1397 auto tpl = inf->
At( i );
1398 uint64_t pgoff = std::get<0>( tpl );
1399 uint32_t pglen = std::get<1>( tpl );
1400 const void *pgbuf =
static_cast<const char*
>( buffer ) + ( pgoff - offset );
1401 uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1404 std::unique_ptr<AnyObject> scoped( r );
1408 pgwrt->SetStatus( s );
1418 "page: pgoff=%llu, pglen=%u, pgdigest=%u",
self.get(),
1419 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1421 "Failed to retransmit corrupted page" ) );
1425 "page: pgoff=%llu, pglen=%u, pgdigest=%u",
self.get(),
1426 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1428 auto st =
PgWriteRetry(
self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1429 if( !st.IsOK() ) pgwrt->SetStatus(
new XRootDStatus( st ) );
1431 "pgoff=%llu, pglen=%u, pgdigest=%u",
self.get(),
1432 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1436 auto st =
PgWriteImpl(
self, offset, size, buffer, cksums, 0, h, timeout );
1439 pgwrt->handler =
nullptr;
1456 std::vector<uint32_t> cksums{ digest };
1467 std::vector<uint32_t> &cksums,
1474 if( self->pFileState ==
Error )
return self->pStatus;
1480 log->
Debug(
FileMsg,
"[%p@%s] Sending a pgwrite command for handle %#x to %s",
1481 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1482 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1493 req->
dlen = size + cksums.size() *
sizeof( uint32_t );
1495 memcpy( req->
fhandle, self->pFileHandle, 4 );
1498 list->push_back(
ChunkInfo( offset, size, (
char*)buffer ) );
1510 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1512 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1524 if( self->pFileState ==
Error )
return self->pStatus;
1530 log->
Debug(
FileMsg,
"[%p@%s] Sending a sync command for handle %#x to %s",
1531 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1532 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1539 memcpy( req->
fhandle, self->pFileHandle, 4 );
1548 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1550 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1563 if( self->pFileState ==
Error )
return self->pStatus;
1569 log->
Debug(
FileMsg,
"[%p@%s] Sending a truncate command for handle %#x to %s",
1570 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1571 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1578 memcpy( req->
fhandle, self->pFileHandle, 4 );
1588 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1590 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1607 if( self->pFileState ==
Error )
return self->pStatus;
1613 log->
Debug(
FileMsg,
"[%p@%s] Sending a vector read command for handle %#x to %s",
1614 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1615 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1628 char *cursor = (
char*)buffer;
1634 for(
size_t i = 0; i < chunks.size(); ++i )
1636 dataChunk[i].
rlen = chunks[i].length;
1637 dataChunk[i].
offset = chunks[i].offset;
1638 memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1643 chunkBuffer = cursor;
1644 cursor += chunks[i].length;
1647 chunkBuffer = chunks[i].buffer;
1649 list->push_back(
ChunkInfo( chunks[i].offset,
1665 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1667 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1683 if( self->pFileState ==
Error )
return self->pStatus;
1689 log->
Debug(
FileMsg,
"[%p@%s] Sending a vector write command for handle %#x to %s",
1690 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1691 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1720 for(
size_t i = 0; i < chunks.size(); ++i )
1722 writeList[i].
wlen = chunks[i].length;
1723 writeList[i].
offset = chunks[i].offset;
1724 memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1726 list->push_back(
ChunkInfo( chunks[i].offset,
1728 chunks[i].buffer ) );
1742 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1744 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1752 const struct iovec *
iov,
1759 if( self->pFileState ==
Error )
return self->pStatus;
1765 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
1766 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1767 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1776 for(
int i = 0; i < iovcnt; ++i )
1778 if(
iov[i].iov_len == 0 )
continue;
1779 size +=
iov[i].iov_len;
1781 (
char*)
iov[i].iov_base ) );
1787 memcpy( req->
fhandle, self->pFileHandle, 4 );
1798 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1800 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1815 if( self->pFileState ==
Error )
return self->pStatus;
1821 log->
Debug(
FileMsg,
"[%p@%s] Sending a read command for handle %#x to %s",
1822 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1823 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1830 size_t size = std::accumulate(
iov,
iov + iovcnt, 0, [](
size_t acc, iovec &rhs )
1832 return acc + rhs.iov_len;
1838 memcpy( req->
fhandle, self->pFileHandle, 4 );
1841 list->reserve( iovcnt );
1842 uint64_t choff = offset;
1843 for(
int i = 0; i < iovcnt; ++i )
1845 list->emplace_back( choff,
iov[i].iov_len,
iov[i].iov_base );
1846 choff +=
iov[i].iov_len;
1856 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1858 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1872 if( self->pFileState ==
Error )
return self->pStatus;
1878 log->
Debug(
FileMsg,
"[%p@%s] Sending a fcntl command for handle %#x to %s",
1879 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1880 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1889 memcpy( req->
fhandle, self->pFileHandle, 4 );
1899 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1901 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1913 if( self->pFileState ==
Error )
return self->pStatus;
1919 log->
Debug(
FileMsg,
"[%p@%s] Sending a visa command for handle %#x to %s",
1920 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1921 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1929 memcpy( req->
fhandle, self->pFileHandle, 4 );
1938 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
1940 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
1947 const std::vector<xattr_t> &attrs,
1953 if( self->pFileState ==
Error )
return self->pStatus;
1959 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr set command for handle %#x to %s",
1960 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1961 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1966 return XAttrOperationImpl(
self,
kXR_fattrSet, 0, attrs, handler, timeout );
1973 const std::vector<std::string> &attrs,
1979 if( self->pFileState ==
Error )
return self->pStatus;
1985 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr get command for handle %#x to %s",
1986 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1987 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1992 return XAttrOperationImpl(
self,
kXR_fattrGet, 0, attrs, handler, timeout );
1999 const std::vector<std::string> &attrs,
2005 if( self->pFileState ==
Error )
return self->pStatus;
2011 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr del command for handle %#x to %s",
2012 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2013 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2018 return XAttrOperationImpl(
self,
kXR_fattrDel, 0, attrs, handler, timeout );
2030 if( self->pFileState ==
Error )
return self->pStatus;
2036 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr list command for handle %#x to %s",
2037 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2038 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2043 static const std::vector<std::string> nothing;
2045 nothing, handler, timeout );
2066 if( self->pFileState ==
Error )
return self->pStatus;
2072 log->
Debug(
FileMsg,
"[%p@%s] Sending a checkpoint command for handle %#x to %s",
2073 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2074 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2082 memcpy( req->
fhandle, self->pFileHandle, 4 );
2092 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
2094 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
2117 if( self->pFileState ==
Error )
return self->pStatus;
2123 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
2124 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2125 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2134 memcpy( req->
fhandle, self->pFileHandle, 4 );
2139 wrtreq->
dlen = size;
2140 memcpy( wrtreq->
fhandle, self->pFileHandle, 4 );
2143 list->push_back(
ChunkInfo( 0, size, (
char*)buffer ) );
2154 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
2156 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
2172 const struct iovec *
iov,
2179 if( self->pFileState ==
Error )
return self->pStatus;
2185 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
2186 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2187 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2196 memcpy( req->
fhandle, self->pFileHandle, 4 );
2200 for(
int i = 0; i < iovcnt; ++i )
2202 if(
iov[i].iov_len == 0 )
continue;
2203 size +=
iov[i].iov_len;
2205 (
char*)
iov[i].iov_base ) );
2211 wrtreq->
dlen = size;
2212 memcpy( wrtreq->
fhandle, self->pFileHandle, 4 );
2223 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
2225 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
2244 const std::string &value )
2247 if( name ==
"ReadRecovery" )
2249 if( value ==
"true" ) pDoRecoverRead =
true;
2250 else pDoRecoverRead =
false;
2253 else if( name ==
"WriteRecovery" )
2255 if( value ==
"true" ) pDoRecoverWrite =
true;
2256 else pDoRecoverWrite =
false;
2259 else if( name ==
"FollowRedirects" )
2261 if( value ==
"true" ) pFollowRedirects =
true;
2262 else pFollowRedirects =
false;
2265 else if( name ==
"BundledClose" )
2267 if( value ==
"true" ) pAllowBundledClose =
true;
2268 else pAllowBundledClose =
false;
2278 std::string &value )
const
2281 if( name ==
"ReadRecovery" )
2283 if( pDoRecoverRead ) value =
"true";
2284 else value =
"false";
2287 else if( name ==
"WriteRecovery" )
2289 if( pDoRecoverWrite ) value =
"true";
2290 else value =
"false";
2293 else if( name ==
"FollowRedirects" )
2295 if( pFollowRedirects ) value =
"true";
2296 else value =
"false";
2299 else if( name ==
"DataServer" && pDataServer )
2300 { value = pDataServer->
GetHostId();
return true; }
2301 else if( name ==
"LastURL" && pDataServer )
2302 { value = pDataServer->
GetURL();
return true; }
2303 else if( name ==
"WrtRecoveryRedir" && pWrtRecoveryRedir )
2304 { value = pWrtRecoveryRedir->
GetHostId();
return true; }
2322 std::string lastServer = pFileUrl->
GetHostId();
2326 delete pLoadBalancer;
2328 delete pWrtRecoveryRedir;
2329 pWrtRecoveryRedir = 0;
2331 pDataServer =
new URL( hostList->back().url );
2335 HostList::const_iterator itC;
2337 for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2340 itC->url.GetParams(),
2345 HostList::const_reverse_iterator it;
2346 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2347 if( it->loadBalancer )
2349 pLoadBalancer =
new URL( it->url );
2353 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2356 pWrtRecoveryRedir =
new URL( it->url );
2361 log->
Debug(
FileMsg,
"[%p@%s] Open has returned with status %s",
2375 isencobj.
Get( isenc );
2376 pIsChannelEncrypted = *isenc;
2385 if( !pStatus.
IsOK() || !openInfo )
2387 log->
Debug(
FileMsg,
"[%p@%s] Error while opening at %s: %s",
2389 pStatus.
ToStr().c_str() );
2390 FailQueuedMessages( pStatus );
2422 log->
Debug(
FileMsg,
"[%p@%s] successfully opened at %s, handle: %#x, "
2424 pDataServer->
GetHostId().c_str(), *((uint32_t*)pFileHandle),
2425 (
unsigned long long) pSessionId );
2430 gettimeofday( &pOpenTime, 0 );
2445 ReSendQueuedMessages();
2458 log->
Debug(
FileMsg,
"[%p@%s] Close returned from %s with: %s",
this,
2460 status->
ToStr().c_str() );
2462 log->
Dump(
FileMsg,
"[%p@%s] Items in the fly %zu, queued for recovery %zu",
2463 this, pFileUrl->
GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2465 MonitorClose( status );
2466 ResetMonitoringVars();
2486 static const std::string root =
"root", xroot =
"xroot", file =
"file",
2487 roots =
"roots", xroots =
"xroots";
2489 if( !msg.compare( 0, root.size(), root ) ||
2490 !msg.compare( 0, xroot.size(), xroot ) ||
2491 !msg.compare( 0, file.size(), file ) ||
2492 !msg.compare( 0, roots.size(), roots ) ||
2493 !msg.compare( 0, xroots.size(), xroots ) )
2505 self->pInTheFly.erase( message );
2507 log->
Dump(
FileMsg,
"[%p@%s] File state error encountered. Message %s "
2508 "returned with %s",
self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2518 i.
file =
self->pFileUrl;
2540 if( !self->IsRecoverable( *status ) || sendParams.
kbuff )
2542 log->
Error(
FileMsg,
"[%p@%s] Fatal file state error. Message %s "
2543 "returned with %s",
self.get(),
self->pFileUrl->GetObfuscatedURL().c_str(),
2546 self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2555 self->pCloseReason = *status;
2556 RecoverMessage(
self, RequestData( message, userHandler, sendParams ) );
2564 const std::string &redirectUrl,
2570 self->pInTheFly.erase( message );
2576 if( !self->pStateRedirect )
2578 std::ostringstream o;
2579 self->pStateRedirect =
new URL( redirectUrl );
2582 self->pStateRedirect->GetParams(),
2584 self->pFileUrl->SetParams( params );
2587 RecoverMessage(
self, RequestData( message, userHandler, sendParams ) );
2602 log->
Dump(
FileMsg,
"[%p@%s] Got state response for message %s",
2603 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2610 self->pInTheFly.erase( message );
2611 RunRecovery(
self );
2626 response->
Get( info );
2627 delete self->pStatInfo;
2628 self->pStatInfo =
new StatInfo( *info );
2660 for(
size_t i = 0; i < segs; ++i )
2661 self->pVRBytes += dataChunk[i].
rlen;
2662 self->pVSegs += segs;
2695 for(
size_t i = 0; i < size; ++i )
2696 self->pVWBytes += wrtList[i].
wlen;
2718 if( !pToBeRecovered.empty() )
2721 log->
Dump(
FileMsg,
"[%p@%s] Got a timer event",
this,
2723 RequestList::iterator it;
2725 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2727 if( it->params.expires <= now )
2732 0, it->params.hostList ) );
2733 it = pToBeRecovered.erase( it );
2751 if( (IsReadOnly() && pDoRecoverRead) ||
2752 (!IsReadOnly() && pDoRecoverWrite) )
2754 log->
Debug(
FileMsg,
"[%p@%s] Putting the file in recovery state in "
2758 pToBeRecovered.clear();
2771 if( self->pFileState !=
Opened || !self->pLoadBalancer )
2777 log->
Debug(
FileMsg,
"[%p@%s] Reopen file at next data server.",
2778 self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2781 auto lbcgi =
self->pLoadBalancer->GetParams();
2782 auto dtcgi =
self->pDataServer->GetParams();
2785 auto itr = lbcgi.find(
"tried" );
2786 if( itr == lbcgi.end() )
2787 lbcgi[
"tried"] =
self->pDataServer->GetHostName();
2790 std::string tried = itr->second;
2791 tried +=
"," +
self->pDataServer->GetHostName();
2792 lbcgi[
"tried"] = tried;
2794 self->pLoadBalancer->SetParams( lbcgi );
2796 return ReOpenFileAtServer(
self, *self->pLoadBalancer, timeout );
2802 template<
typename T>
2803 Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &
self,
2806 const std::vector<T> &attrs,
2821 memcpy( req->
fhandle, self->pFileHandle, 4 );
2823 if( !st.
IsOK() )
return st;
2832 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
2834 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
2840 Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &
self,
2851 return RecoverMessage(
self, RequestData( msg, handler, sendParams ),
false );
2857 if( self->pFileState ==
Opened )
2860 XRootDStatus st =
self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2867 return RecoverMessage(
self, RequestData( msg, handler, sendParams ),
false );
2870 self->pInTheFly.insert(msg);
2881 bool FileStateHandler::IsRecoverable(
const XRootDStatus &status )
const
2883 const auto recoverable_errors = {
2892 if (pDoRecoverRead || pDoRecoverWrite)
2893 for (
const auto error : recoverable_errors)
2894 if (status.
code == error)
2895 return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
2903 bool FileStateHandler::IsReadOnly()
const
2914 Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &
self,
2916 bool callbackOnFailure )
2921 log->
Dump(
FileMsg,
"[%p@%s] Putting message %s in the recovery list",
2922 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2923 rd.request->GetObfuscatedDescription().c_str() );
2925 Status st = RunRecovery(
self );
2928 self->pToBeRecovered.push_back( rd );
2932 if( callbackOnFailure )
2933 self->FailMessage( rd, st );
2941 Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &
self )
2946 if( !self->pInTheFly.empty() )
2950 log->
Debug(
FileMsg,
"[%p@%s] Running the recovery procedure",
self.get(),
2951 self->pFileUrl->GetObfuscatedURL().c_str() );
2954 if( self->pStateRedirect )
2956 SendClose(
self, 0 );
2957 st = ReOpenFileAtServer(
self, *self->pStateRedirect, 0 );
2958 delete self->pStateRedirect;
self->pStateRedirect = 0;
2960 else if( self->IsReadOnly() && self->pLoadBalancer )
2961 st = ReOpenFileAtServer(
self, *self->pLoadBalancer, 0 );
2963 st = ReOpenFileAtServer(
self, *self->pDataServer, 0 );
2967 self->pFileState =
Error;
2969 self->FailQueuedMessages( st );
2978 XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &
self,
2986 memcpy( req->
fhandle, self->pFileHandle, 4 );
2994 params.followRedirects =
false;
2995 params.stateful =
true;
2999 return self->IssueRequest( *self->pDataServer, msg, handler, params );
3005 XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &
self,
3010 log->
Dump(
FileMsg,
"[%p@%s] Sending a recovery open command to %s",
3011 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.
GetObfuscatedURL().c_str() );
3031 u.
SetPath( self->pFileUrl->GetPath() );
3037 req->
mode =
self->pOpenMode;
3038 req->
options =
self->pOpenFlags;
3039 req->
dlen = path.length();
3040 msg->
Append( path.c_str(), path.length(), 24 );
3045 OpenHandler *openHandler =
new OpenHandler(
self, 0 );
3053 XRootDStatus st =
self->IssueRequest( url, msg, openHandler, params );
3060 self->pFileState =
Closed;
3068 void FileStateHandler::FailMessage( RequestData rd,
XRootDStatus status )
3071 log->
Dump(
FileMsg,
"[%p@%s] Failing message %s with %s",
3073 rd.request->GetObfuscatedDescription().c_str(),
3074 status.
ToStr().c_str() );
3076 StatefulHandler *sh =
dynamic_cast<StatefulHandler*
>(rd.handler);
3080 log->
Error(
FileMsg,
"[%p@%s] Internal error while recovering %s",
3082 rd.request->GetObfuscatedDescription().c_str() );
3091 0, rd.params.hostList ) );
3099 void FileStateHandler::FailQueuedMessages(
XRootDStatus status )
3101 RequestList::iterator it;
3102 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3103 FailMessage( *it, status );
3104 pToBeRecovered.clear();
3110 void FileStateHandler::ReSendQueuedMessages()
3112 RequestList::iterator it;
3113 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3115 it->request->SetSessionId( pSessionId );
3116 ReWriteFileHandle( it->request );
3117 XRootDStatus st = IssueRequest( *pDataServer, it->request,
3118 it->handler, it->params );
3120 FailMessage( *it, st );
3122 pToBeRecovered.clear();
3128 void FileStateHandler::ReWriteFileHandle(
Message *msg )
3136 memcpy( req->
fhandle, pFileHandle, 4 );
3142 memcpy( req->
fhandle, pFileHandle, 4 );
3148 memcpy( req->
fhandle, pFileHandle, 4 );
3154 memcpy( req->
fhandle, pFileHandle, 4 );
3162 memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3172 for(
size_t i = 0; i < size; ++i )
3173 memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3179 memcpy( req->
fhandle, pFileHandle, 4 );
3185 memcpy( req->
fhandle, pFileHandle, 4 );
3191 log->
Dump(
FileMsg,
"[%p@%s] Rewritten file handle for %s to %#x",
3193 *((uint32_t*)pFileHandle) );
3200 void FileStateHandler::MonitorClose(
const XRootDStatus *status )
3208 gettimeofday( &i.
cTOD, 0 );
3230 sendParams, pLFileHandler );
3234 return pLFileHandler->
ExecRequest( url, msg, handler, sendParams );
3238 sendParams, pLFileHandler );
3244 XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &
self,
3247 std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3260 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
3261 self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3262 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3271 memcpy( req->
fhandle, self->pFileHandle, 4 );
3277 params.
kbuff = kbuff.release();
3283 StatefulHandler *stHandler =
new StatefulHandler(
self, handler, msg, params );
3285 return SendOrQueue(
self, *self->pDataServer, msg, stHandler, params );
struct ClientPgReadRequest pgread
static const int kXR_ckpXeq
struct ClientPgWriteRequest pgwrite
struct ClientRequestHdr header
struct ClientReadRequest read
#define kXR_PROTPGRWVERSION
struct ClientWriteRequest write
static int mapError(int rc)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
uint32_t GetSize() const
Get the size of the message.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
bool GetInt(const std::string &key, int &value)
An interface for file plug-ins.
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, uint16_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
void Tick(time_t now)
Tick.
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, uint16_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Closed
The file is closed.
@ Opened
Opening has succeeded.
@ Error
Opening has failed.
@ Recovering
Recovering from an error.
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
~FileStateHandler()
Destructor.
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool IsOpen() const
Check if the file is open.
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, uint16_t timeout)
Try other data server.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
An abstract class to describe the client-side monitoring plugin interface.
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
uint64_t GetSize() const
Get size (in bytes)
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
bool IsMetalink() const
Is it a URL to a metalink.
std::map< std::string, std::string > ParamsMap
void SetParams(const std::string ¶ms)
Set params.
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
std::string GetURL() const
Get the URL.
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
void SetPath(const std::string &path)
Set the path.
const ParamsMap & GetParams() const
Get the URL params.
const std::string & GetPath() const
Get the path.
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInProgress
const uint16_t errSocketTimeout
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const uint16_t suAlreadyDone
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
const uint16_t errSocketError
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
const uint16_t errRedirect
const uint16_t errSocketDisconnected
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Describe a file close event.
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted