69 friend class PgReadRetryHandler;
76 PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77 XrdCl::ResponseHandler *userHandler,
78 uint64_t orgOffset ) :
79 stateHandler( stateHandler ),
80 userHandler( userHandler ),
81 orgOffset( orgOffset ),
91 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
92 XrdCl::AnyObject *response,
95 using namespace XrdCl;
97 std::unique_lock<std::mutex> lck( mtx );
105 if( !status->
IsOK() )
122 userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
125 userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
136 if( !status->
IsOK() )
141 userHandler->HandleResponseWithHosts( status, response, hostList );
153 response->
Get( pginf );
157 std::vector<uint32_t> &cksums = pginf->
GetCksums();
158 char *buffer =
reinterpret_cast<char*
>( pginf->
GetBuffer() );
161 if( pgsize > bytesRead ) pgsize = bytesRead;
163 for(
size_t pgnb = 0; pgnb < nbpages; ++pgnb )
166 if( crcval != cksums[pgnb] )
168 Log *log = DefaultEnv::GetLog();
169 log->
Info( FileMsg,
"[%p@%s] Received corrupted page, will retry page #%zu.",
170 (
void*)
this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
185 if( pgsize > bytesRead ) pgsize = bytesRead;
194 userHandler->HandleResponseWithHosts( status, response, hostList );
203 resp.reset( response );
204 hosts.reset( hostList );
208 void UpdateCksum(
size_t pgnb, uint32_t crcval )
212 XrdCl::PageInfo *pginf = 0;
220 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221 XrdCl::ResponseHandler *userHandler;
224 std::unique_ptr<XrdCl::AnyObject> resp;
225 std::unique_ptr<XrdCl::HostList> hosts;
226 std::unique_ptr<XrdCl::XRootDStatus> st;
242 PgReadRetryHandler( PgReadHandler *pgReadHandler,
size_t pgnb ) : pgReadHandler( pgReadHandler ),
251 void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252 XrdCl::AnyObject *response,
255 using namespace XrdCl;
257 if( !status->
IsOK() )
259 Log *log = DefaultEnv::GetLog();
260 log->
Info( FileMsg,
"[%p@%s] Failed to recover page #%zu.",
261 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262 pgReadHandler->HandleResponseWithHosts( status, response, hostList );
267 XrdCl::PageInfo *pginf = 0;
268 response->
Get( pginf );
271 Log *log = DefaultEnv::GetLog();
272 log->
Info( FileMsg,
"[%p@%s] Failed to recover page #%zu.",
273 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
275 DeleteArgs( status, response, hostList );
276 pgReadHandler->HandleResponseWithHosts(
new XRootDStatus( stError, errDataError ), 0, 0 );
282 if( crcval != pginf->
GetCksums().front() )
284 Log *log = DefaultEnv::GetLog();
285 log->
Info( FileMsg,
"[%p@%s] Failed to recover page #%zu.",
286 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287 DeleteArgs( status, response, hostList );
288 pgReadHandler->HandleResponseWithHosts(
new XRootDStatus( stError, errDataError ), 0, 0 );
293 Log *log = DefaultEnv::GetLog();
294 log->
Info( FileMsg,
"[%p@%s] Successfully recovered page #%zu.",
295 (
void*)
this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
297 DeleteArgs( 0, response, hostList );
298 pgReadHandler->UpdateCksum( pgnb, crcval );
299 pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
305 inline void DeleteArgs( XrdCl::XRootDStatus *status,
306 XrdCl::AnyObject *response,
314 PgReadHandler *pgReadHandler;
329 XrdCl::ResponseHandler *userHandler ) :
330 stateHandler( stateHandler ),
331 userHandler( userHandler )
339 XrdCl::AnyObject *rdresp,
342 using namespace XrdCl;
344 if( !status->
IsOK() )
346 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
352 ChunkInfo *chunk =
nullptr;
353 rdresp->
Get( chunk );
357 userHandler->HandleResponseWithHosts( status, rdresp, hostList );
362 std::vector<uint32_t> cksums;
363 if( stateHandler->pIsChannelEncrypted )
368 cksums.reserve( nbpages );
370 size_t size = chunk->
length;
371 char *buffer =
reinterpret_cast<char*
>( chunk->
buffer );
373 for(
size_t pg = 0; pg < nbpages; ++pg )
376 if( pgsize > size ) pgsize = size;
378 cksums.push_back( crcval );
384 PageInfo *pages =
new PageInfo( chunk->
offset, chunk->
length,
385 chunk->
buffer, std::move( cksums ) );
387 AnyObject *response =
new AnyObject();
388 response->
Set( pages );
389 userHandler->HandleResponseWithHosts( status, response, hostList );
396 std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
397 XrdCl::ResponseHandler *userHandler;
410 OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
411 XrdCl::ResponseHandler *userHandler ):
412 pStateHandler( stateHandler ),
413 pUserHandler( userHandler )
420 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
421 XrdCl::AnyObject *response,
424 using namespace XrdCl;
429 OpenInfo *openInfo = 0;
431 response->
Get( openInfo );
437 if( status->
code == errRedirect )
440 EcHandler *ecHandler =
GetEcHandler( hostList->front().url, ecurl );
443 pStateHandler->pPlugin = ecHandler;
444 ecHandler->
Open( pStateHandler->pOpenFlags, pUserHandler, 0 );
452 pStateHandler->OnOpen( status, openInfo, hostList );
455 pUserHandler->HandleResponseWithHosts( status, 0, hostList );
465 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
466 XrdCl::ResponseHandler *pUserHandler;
479 CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
480 XrdCl::ResponseHandler *userHandler,
481 XrdCl::Message *message ):
482 pStateHandler( stateHandler ),
483 pUserHandler( userHandler ),
491 virtual ~CloseHandler()
499 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
500 XrdCl::AnyObject *response,
503 pStateHandler->OnClose( status );
505 pUserHandler->HandleResponseWithHosts( status, response, hostList );
517 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
518 XrdCl::ResponseHandler *pUserHandler;
519 XrdCl::Message *pMessage;
531 StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
532 XrdCl::ResponseHandler *userHandler,
533 XrdCl::Message *message,
534 const XrdCl::MessageSendParams &sendParams ):
535 pStateHandler( stateHandler ),
536 pUserHandler( userHandler ),
538 pSendParams( sendParams )
545 virtual ~StatefulHandler()
548 delete pSendParams.chunkList;
549 delete pSendParams.kbuff;
555 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
556 XrdCl::AnyObject *response,
559 using namespace XrdCl;
560 std::unique_ptr<AnyObject> responsePtr( response );
561 pSendParams.hostList = hostList;
566 if( !status->
IsOK() )
575 responsePtr.release();
578 pUserHandler->HandleResponseWithHosts( status, response, hostList );
591 XrdCl::ResponseHandler *GetUserHandler()
597 std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
598 XrdCl::ResponseHandler *pUserHandler;
599 XrdCl::Message *pMessage;
600 XrdCl::MessageSendParams pSendParams;
613 ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
614 buffer( std::move( buffer ) ),
622 virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
623 XrdCl::AnyObject *response,
627 handler->HandleResponseWithHosts( status, response, hostList );
633 XrdCl::Buffer& GetBuffer()
639 XrdCl::Buffer buffer;
640 XrdCl::ResponseHandler *handler;
656 pWrtRecoveryRedir( 0 ),
661 pDoRecoverRead( true ),
662 pDoRecoverWrite( true ),
663 pFollowRedirects( true ),
664 pUseVirtRedirector( true ),
665 pIsChannelEncrypted( false ),
666 pAllowBundledClose( false ),
669 pFileHandle =
new uint8_t[4];
670 ResetMonitoringVars();
689 pWrtRecoveryRedir( 0 ),
694 pDoRecoverRead( true ),
695 pDoRecoverWrite( true ),
696 pFollowRedirects( true ),
697 pUseVirtRedirector( useVirtRedirector ),
698 pAllowBundledClose( false ),
701 pFileHandle =
new uint8_t[4];
702 ResetMonitoringVars();
733 ResetMonitoringVars();
738 if(
DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
747 delete pLoadBalancer;
748 delete [] pFileHandle;
749 delete pLFileHandler;
756 const std::string &url,
767 if( self->pFileState ==
Error )
768 return self->pStatus;
786 if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
789 registry.
Release( *self->pFileUrl );
791 delete self->pFileUrl;
795 self->pFileUrl =
new URL( url );
803 char requuid[37]= {0};
804 uuid_generate( uuid );
805 uuid_unparse( uuid, requuid );
806 cgi[
"xrdcl.requuid"] = requuid;
807 self->pFileUrl->SetParams( cgi );
809 if( !self->pFileUrl->IsValid() )
811 log->
Error(
FileMsg,
"[%p@%s] Trying to open invalid url: %s",
812 (
void*)self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
814 self->pFileState =
Closed;
815 return self->pStatus;
822 URL::ParamsMap::const_iterator it;
823 it = urlParams.find(
"xrdcl.recover-reads" );
824 if( (it != urlParams.end() && it->second ==
"false") ||
825 !self->pDoRecoverRead )
827 self->pDoRecoverRead =
false;
828 log->
Debug(
FileMsg,
"[%p@%s] Read recovery procedures are disabled",
829 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
832 it = urlParams.find(
"xrdcl.recover-writes" );
833 if( (it != urlParams.end() && it->second ==
"false") ||
834 !self->pDoRecoverWrite )
836 self->pDoRecoverWrite =
false;
837 log->
Debug(
FileMsg,
"[%p@%s] Write recovery procedures are disabled",
838 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
844 log->
Debug(
FileMsg,
"[%p@%s] Sending an open command", (
void*)self.get(),
845 self->pFileUrl->GetObfuscatedURL().c_str() );
847 self->pOpenMode = mode;
848 self->pOpenFlags = flags;
853 std::string path = self->pFileUrl->GetPathWithFilteredParams();
859 req->
dlen = path.length();
860 msg->
Append( path.c_str(), path.length(), 24 );
867 XRootDStatus st = self->IssueRequest( *self->pFileUrl, msg, openHandler, params );
873 self->pFileState =
Closed;
891 if( self->pFileState ==
Error )
892 return self->pStatus;
897 if( self->pFileState ==
Closed )
903 if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
909 log->
Debug(
FileMsg,
"[%p@%s] Sending a close command for handle %#x to %s",
910 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
911 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
921 memcpy( req->
fhandle, self->pFileHandle, 4 );
925 CloseHandler *closeHandler =
new CloseHandler( self, handler, msg );
932 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
942 self->pFileState =
Closed;
951 self->pFileState =
Error;
967 if( self->pFileState ==
Error )
return self->pStatus;
985 log->
Debug(
FileMsg,
"[%p@%s] Sending a stat command for handle %#x to %s",
986 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
987 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
996 std::string path = self->pFileUrl->GetPath();
1000 memcpy( req->
fhandle, self->pFileHandle, 4 );
1009 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1011 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1026 if( self->pFileState ==
Error )
return self->pStatus;
1032 log->
Debug(
FileMsg,
"[%p@%s] Sending a read command for handle %#x to %s",
1033 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1034 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1043 memcpy( req->
fhandle, self->pFileHandle, 4 );
1046 list->push_back(
ChunkInfo( offset, size, buffer ) );
1055 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1057 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1070 int issupported =
true;
1083 issupported =
false;
1088 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1090 auto st =
Read( self, offset, size, buffer, substitHandler, timeout );
1091 if( !st.
IsOK() )
delete substitHandler;
1097 if( !st.
IsOK() )
delete pgHandler;
1111 "PgRead retry size exceeded 4KB." );
1115 if( !st.
IsOK() )
delete retryHandler;
1129 if( self->pFileState ==
Error )
return self->pStatus;
1135 log->
Debug(
FileMsg,
"[%p@%s] Sending a pgread command for handle %#x to %s",
1136 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1137 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1146 memcpy( req->
fhandle, self->pFileHandle, 4 );
1159 list->push_back(
ChunkInfo( offset, size, buffer ) );
1168 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1170 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1185 if( self->pFileState ==
Error )
return self->pStatus;
1191 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
1192 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1193 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1202 memcpy( req->
fhandle, self->pFileHandle, 4 );
1205 list->push_back(
ChunkInfo( 0, size, (
char*)buffer ) );
1216 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1218 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1237 log->
Info(
FileMsg,
"[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1238 "cannot convert it to kernel space buffer.", (
void*)self.get(),
1239 self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1241 void *buff = buffer.GetBuffer();
1242 uint32_t size = buffer.GetSize();
1243 ReleaseBufferHandler *wrtHandler =
1244 new ReleaseBufferHandler( std::move( buffer ), handler );
1245 XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1248 buffer = std::move( wrtHandler->GetBuffer() );
1257 uint32_t length = buffer.GetSize();
1258 char *ubuff = buffer.Release();
1268 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1286 ssize_t ret = fdoff ?
XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1294 return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1304 std::vector<uint32_t> &cksums,
1321 if( cksums.empty() )
1323 const char *data =
static_cast<const char*
>( buffer );
1329 if( crc32cCnt != cksums.size() )
1352 static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1354 if( pgoff == offset )
return 0;
1360 if( !status ) status = s;
1367 auto pgwrt = std::make_shared<pgwrt_t>( handler );
1371 uint32_t fstpglen = fLen;
1373 time_t start = ::time(
nullptr );
1376 std::unique_ptr<AnyObject> scoped( r );
1381 pgwrt->SetStatus( s );
1390 pgwrt->SetStatus( s );
1395 uint16_t elapsed = ::time(
nullptr ) - start;
1396 if( elapsed >= timeout )
1401 else timeout -= elapsed;
1403 for(
size_t i = 0; i < inf->
Size(); ++i )
1405 auto tpl = inf->
At( i );
1406 uint64_t pgoff = std::get<0>( tpl );
1407 uint32_t pglen = std::get<1>( tpl );
1408 const void *pgbuf =
static_cast<const char*
>( buffer ) + ( pgoff - offset );
1409 uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1412 std::unique_ptr<AnyObject> scoped( r );
1416 pgwrt->SetStatus( s );
1426 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (
void*)self.get(),
1427 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1429 "Failed to retransmit corrupted page" ) );
1433 "page: pgoff=%llu, pglen=%u, pgdigest=%u", (
void*)self.get(),
1434 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1436 auto st =
PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1437 if( !st.IsOK() ) pgwrt->SetStatus(
new XRootDStatus( st ) );
1439 "pgoff=%llu, pglen=%u, pgdigest=%u", (
void*)self.get(),
1440 self->pFileUrl->GetObfuscatedURL().c_str(), (
unsigned long long) pgoff, pglen, pgdigest );
1444 auto st =
PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1447 pgwrt->handler =
nullptr;
1464 std::vector<uint32_t> cksums{ digest };
1475 std::vector<uint32_t> &cksums,
1482 if( self->pFileState ==
Error )
return self->pStatus;
1488 log->
Debug(
FileMsg,
"[%p@%s] Sending a pgwrite command for handle %#x to %s",
1489 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1490 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1501 req->
dlen = size + cksums.size() *
sizeof( uint32_t );
1503 memcpy( req->
fhandle, self->pFileHandle, 4 );
1506 list->push_back(
ChunkInfo( offset, size, (
char*)buffer ) );
1518 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1520 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1532 if( self->pFileState ==
Error )
return self->pStatus;
1538 log->
Debug(
FileMsg,
"[%p@%s] Sending a sync command for handle %#x to %s",
1539 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1540 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1547 memcpy( req->
fhandle, self->pFileHandle, 4 );
1556 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1558 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1571 if( self->pFileState ==
Error )
return self->pStatus;
1577 log->
Debug(
FileMsg,
"[%p@%s] Sending a truncate command for handle %#x to %s",
1578 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1579 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1586 memcpy( req->
fhandle, self->pFileHandle, 4 );
1596 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1598 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1615 if( self->pFileState ==
Error )
return self->pStatus;
1621 log->
Debug(
FileMsg,
"[%p@%s] Sending a vector read command for handle %#x to %s",
1622 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1623 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1636 char *cursor = (
char*)buffer;
1642 for(
size_t i = 0; i < chunks.size(); ++i )
1644 dataChunk[i].
rlen = chunks[i].length;
1645 dataChunk[i].
offset = chunks[i].offset;
1646 memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1651 chunkBuffer = cursor;
1652 cursor += chunks[i].length;
1655 chunkBuffer = chunks[i].buffer;
1657 list->push_back(
ChunkInfo( chunks[i].offset,
1673 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1675 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1691 if( self->pFileState ==
Error )
return self->pStatus;
1697 log->
Debug(
FileMsg,
"[%p@%s] Sending a vector write command for handle %#x to %s",
1698 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1699 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1728 for(
size_t i = 0; i < chunks.size(); ++i )
1730 writeList[i].
wlen = chunks[i].length;
1731 writeList[i].
offset = chunks[i].offset;
1732 memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1734 list->push_back(
ChunkInfo( chunks[i].offset,
1736 chunks[i].buffer ) );
1750 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1752 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1760 const struct iovec *
iov,
1767 if( self->pFileState ==
Error )
return self->pStatus;
1773 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
1774 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1775 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1784 for(
int i = 0; i < iovcnt; ++i )
1786 if(
iov[i].iov_len == 0 )
continue;
1787 size +=
iov[i].iov_len;
1789 (
char*)
iov[i].iov_base ) );
1795 memcpy( req->
fhandle, self->pFileHandle, 4 );
1806 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1808 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1823 if( self->pFileState ==
Error )
return self->pStatus;
1829 log->
Debug(
FileMsg,
"[%p@%s] Sending a read command for handle %#x to %s",
1830 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1831 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1838 size_t size = std::accumulate(
iov,
iov + iovcnt, 0, [](
size_t acc, iovec &rhs )
1840 return acc + rhs.iov_len;
1846 memcpy( req->
fhandle, self->pFileHandle, 4 );
1849 list->reserve( iovcnt );
1850 uint64_t choff = offset;
1851 for(
int i = 0; i < iovcnt; ++i )
1853 list->emplace_back( choff,
iov[i].iov_len,
iov[i].iov_base );
1854 choff +=
iov[i].iov_len;
1864 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1866 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1880 if( self->pFileState ==
Error )
return self->pStatus;
1886 log->
Debug(
FileMsg,
"[%p@%s] Sending a fcntl command for handle %#x to %s",
1887 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1888 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1897 memcpy( req->
fhandle, self->pFileHandle, 4 );
1907 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1909 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1921 if( self->pFileState ==
Error )
return self->pStatus;
1927 log->
Debug(
FileMsg,
"[%p@%s] Sending a visa command for handle %#x to %s",
1928 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1929 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1937 memcpy( req->
fhandle, self->pFileHandle, 4 );
1946 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
1948 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1955 const std::vector<xattr_t> &attrs,
1961 if( self->pFileState ==
Error )
return self->pStatus;
1967 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr set command for handle %#x to %s",
1968 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1969 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1974 return XAttrOperationImpl( self,
kXR_fattrSet, 0, attrs, handler, timeout );
1981 const std::vector<std::string> &attrs,
1987 if( self->pFileState ==
Error )
return self->pStatus;
1993 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr get command for handle %#x to %s",
1994 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1995 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2000 return XAttrOperationImpl( self,
kXR_fattrGet, 0, attrs, handler, timeout );
2007 const std::vector<std::string> &attrs,
2013 if( self->pFileState ==
Error )
return self->pStatus;
2019 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr del command for handle %#x to %s",
2020 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2021 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2026 return XAttrOperationImpl( self,
kXR_fattrDel, 0, attrs, handler, timeout );
2038 if( self->pFileState ==
Error )
return self->pStatus;
2044 log->
Debug(
FileMsg,
"[%p@%s] Sending a fattr list command for handle %#x to %s",
2045 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2046 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2051 static const std::vector<std::string> nothing;
2053 nothing, handler, timeout );
2074 if( self->pFileState ==
Error )
return self->pStatus;
2080 log->
Debug(
FileMsg,
"[%p@%s] Sending a checkpoint command for handle %#x to %s",
2081 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2082 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2090 memcpy( req->
fhandle, self->pFileHandle, 4 );
2100 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2102 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2125 if( self->pFileState ==
Error )
return self->pStatus;
2131 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
2132 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2133 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2142 memcpy( req->
fhandle, self->pFileHandle, 4 );
2147 wrtreq->
dlen = size;
2148 memcpy( wrtreq->
fhandle, self->pFileHandle, 4 );
2151 list->push_back(
ChunkInfo( 0, size, (
char*)buffer ) );
2162 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2164 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2180 const struct iovec *
iov,
2187 if( self->pFileState ==
Error )
return self->pStatus;
2193 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
2194 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2195 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2204 memcpy( req->
fhandle, self->pFileHandle, 4 );
2208 for(
int i = 0; i < iovcnt; ++i )
2210 if(
iov[i].iov_len == 0 )
continue;
2211 size +=
iov[i].iov_len;
2213 (
char*)
iov[i].iov_base ) );
2219 wrtreq->
dlen = size;
2220 memcpy( wrtreq->
fhandle, self->pFileHandle, 4 );
2231 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2233 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2252 const std::string &
value )
2255 if( name ==
"ReadRecovery" )
2257 if(
value ==
"true" ) pDoRecoverRead =
true;
2258 else pDoRecoverRead =
false;
2261 else if( name ==
"WriteRecovery" )
2263 if(
value ==
"true" ) pDoRecoverWrite =
true;
2264 else pDoRecoverWrite =
false;
2267 else if( name ==
"FollowRedirects" )
2269 if(
value ==
"true" ) pFollowRedirects =
true;
2270 else pFollowRedirects =
false;
2273 else if( name ==
"BundledClose" )
2275 if(
value ==
"true" ) pAllowBundledClose =
true;
2276 else pAllowBundledClose =
false;
2286 std::string &
value )
const
2289 if( name ==
"ReadRecovery" )
2291 if( pDoRecoverRead )
value =
"true";
2292 else value =
"false";
2295 else if( name ==
"WriteRecovery" )
2297 if( pDoRecoverWrite )
value =
"true";
2298 else value =
"false";
2301 else if( name ==
"FollowRedirects" )
2303 if( pFollowRedirects )
value =
"true";
2304 else value =
"false";
2307 else if( name ==
"DataServer" && pDataServer )
2308 {
value = pDataServer->GetHostId();
return true; }
2309 else if( name ==
"LastURL" && pDataServer )
2310 {
value = pDataServer->GetURL();
return true; }
2311 else if( name ==
"WrtRecoveryRedir" && pWrtRecoveryRedir )
2312 {
value = pWrtRecoveryRedir->GetHostId();
return true; }
2330 std::string lastServer = pFileUrl->GetHostId();
2334 delete pLoadBalancer;
2336 delete pWrtRecoveryRedir;
2337 pWrtRecoveryRedir = 0;
2339 pDataServer =
new URL( hostList->back().url );
2340 pDataServer->SetParams( pFileUrl->GetParams() );
2341 if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2342 lastServer = pDataServer->GetHostId();
2343 HostList::const_iterator itC;
2345 for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2348 itC->url.GetParams(),
2351 pDataServer->SetParams( params );
2353 HostList::const_reverse_iterator it;
2354 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2355 if( it->loadBalancer )
2357 pLoadBalancer =
new URL( it->url );
2361 for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2364 pWrtRecoveryRedir =
new URL( it->url );
2369 log->
Debug(
FileMsg,
"[%p@%s] Open has returned with status %s",
2370 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), status->
ToStr().c_str() );
2372 if( pDataServer && !pDataServer->IsLocalFile() )
2383 isencobj.
Get( isenc );
2384 pIsChannelEncrypted = isenc ? *isenc :
false;
2393 if( !pStatus.IsOK() || !openInfo )
2395 log->
Debug(
FileMsg,
"[%p@%s] Error while opening at %s: %s",
2396 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2397 pStatus.ToStr().c_str() );
2398 FailQueuedMessages( pStatus );
2430 log->
Debug(
FileMsg,
"[%p@%s] successfully opened at %s, handle: %#x, "
2431 "session id: %llu", (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(),
2432 pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2433 (
unsigned long long) pSessionId );
2438 gettimeofday( &pOpenTime, 0 );
2446 i.
fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2453 ReSendQueuedMessages();
2466 log->
Debug(
FileMsg,
"[%p@%s] Close returned from %s with: %s", (
void*)
this,
2467 pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2468 status->
ToStr().c_str() );
2470 log->
Dump(
FileMsg,
"[%p@%s] Items in the fly %zu, queued for recovery %zu",
2471 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2473 MonitorClose( status );
2474 ResetMonitoringVars();
2494 static const std::string root =
"root", xroot =
"xroot", file =
"file",
2495 roots =
"roots", xroots =
"xroots";
2497 if( !msg.compare( 0, root.size(), root ) ||
2498 !msg.compare( 0, xroot.size(), xroot ) ||
2499 !msg.compare( 0, file.size(), file ) ||
2500 !msg.compare( 0, roots.size(), roots ) ||
2501 !msg.compare( 0, xroots.size(), xroots ) )
2513 self->pInTheFly.erase( message );
2515 log->
Dump(
FileMsg,
"[%p@%s] File state error encountered. Message %s "
2516 "returned with %s", (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2526 i.
file = self->pFileUrl;
2548 if( !self->IsRecoverable( *status ) || sendParams.
kbuff )
2550 log->
Error(
FileMsg,
"[%p@%s] Fatal file state error. Message %s "
2551 "returned with %s", (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2554 self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2563 self->pCloseReason = *status;
2564 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2572 const std::string &redirectUrl,
2578 self->pInTheFly.erase( message );
2584 if( !self->pStateRedirect )
2586 std::ostringstream o;
2587 self->pStateRedirect =
new URL( redirectUrl );
2590 self->pStateRedirect->GetParams(),
2592 self->pFileUrl->SetParams( params );
2595 RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2610 log->
Dump(
FileMsg,
"[%p@%s] Got state response for message %s",
2611 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2618 self->pInTheFly.erase( message );
2619 RunRecovery( self );
2634 response->
Get( info );
2635 delete self->pStatInfo;
2636 self->pStatInfo =
new StatInfo( *info );
2668 for(
size_t i = 0; i < segs; ++i )
2669 self->pVRBytes += dataChunk[i].
rlen;
2670 self->pVSegs += segs;
2703 for(
size_t i = 0; i < size; ++i )
2704 self->pVWBytes += wrtList[i].
wlen;
2715 if (pMutex.CondLock())
2726 if( !pToBeRecovered.empty() )
2729 log->
Dump(
FileMsg,
"[%p@%s] Got a timer event", (
void*)
this,
2730 pFileUrl->GetObfuscatedURL().c_str() );
2731 RequestList::iterator it;
2733 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2735 if( it->params.expires <= now )
2740 0, it->params.hostList ) );
2741 it = pToBeRecovered.erase( it );
2759 if( (IsReadOnly() && pDoRecoverRead) ||
2760 (!IsReadOnly() && pDoRecoverWrite) )
2762 log->
Debug(
FileMsg,
"[%p@%s] Putting the file in recovery state in "
2763 "process %d", (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2766 pToBeRecovered.clear();
2779 if( self->pFileState !=
Opened || !self->pLoadBalancer )
2785 log->
Debug(
FileMsg,
"[%p@%s] Reopen file at next data server.",
2786 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2789 auto lbcgi = self->pLoadBalancer->GetParams();
2790 auto dtcgi = self->pDataServer->GetParams();
2793 auto itr = lbcgi.find(
"tried" );
2794 if( itr == lbcgi.end() )
2795 lbcgi[
"tried"] = self->pDataServer->GetHostName();
2798 std::string tried = itr->second;
2799 tried +=
"," + self->pDataServer->GetHostName();
2800 lbcgi[
"tried"] = tried;
2802 self->pLoadBalancer->SetParams( lbcgi );
2804 return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2810 template<
typename T>
2811 Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2814 const std::vector<T> &attrs,
2829 memcpy( req->
fhandle, self->pFileHandle, 4 );
2831 if( !st.
IsOK() )
return st;
2840 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
2842 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2848 Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2859 return RecoverMessage( self, RequestData( msg, handler, sendParams ),
false );
2865 if( self->pFileState ==
Opened )
2868 XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2875 return RecoverMessage( self, RequestData( msg, handler, sendParams ),
false );
2878 self->pInTheFly.insert(msg);
2889 bool FileStateHandler::IsRecoverable(
const XRootDStatus &status )
const
2891 const auto recoverable_errors = {
2900 if (pDoRecoverRead || pDoRecoverWrite)
2901 for (
const auto error : recoverable_errors)
2902 if (status.
code == error)
2903 return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
2911 bool FileStateHandler::IsReadOnly()
const
2922 Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
2924 bool callbackOnFailure )
2929 log->
Dump(
FileMsg,
"[%p@%s] Putting message %s in the recovery list",
2930 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2931 rd.request->GetObfuscatedDescription().c_str() );
2933 Status st = RunRecovery( self );
2936 self->pToBeRecovered.push_back( rd );
2940 if( callbackOnFailure )
2941 self->FailMessage( rd, st );
2949 Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
2954 if( !self->pInTheFly.empty() )
2958 log->
Debug(
FileMsg,
"[%p@%s] Running the recovery procedure", (
void*)self.get(),
2959 self->pFileUrl->GetObfuscatedURL().c_str() );
2962 if( self->pStateRedirect )
2964 SendClose( self, 0 );
2965 st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
2966 delete self->pStateRedirect; self->pStateRedirect = 0;
2968 else if( self->IsReadOnly() && self->pLoadBalancer )
2969 st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
2971 st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
2975 self->pFileState =
Error;
2977 self->FailQueuedMessages( st );
2986 XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
2990 ClientCloseRequest *req;
2994 memcpy( req->
fhandle, self->pFileHandle, 4 );
2999 [self]( XRootDStatus&, AnyObject& )
mutable { self.reset(); } );
3000 MessageSendParams params;
3001 params.timeout = timeout;
3002 params.followRedirects =
false;
3003 params.stateful =
true;
3007 return self->IssueRequest( *self->pDataServer, msg, handler, params );
3013 XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3018 log->
Dump(
FileMsg,
"[%p@%s] Sending a recovery open command to %s",
3019 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.
GetObfuscatedURL().c_str() );
3028 self->pOpenFlags &= ~kXR_delete;
3032 self->pOpenFlags &= ~kXR_new;
3035 ClientOpenRequest *req;
3039 u.
SetPath( self->pFileUrl->GetPath() );
3045 req->
mode = self->pOpenMode;
3046 req->
options = self->pOpenFlags;
3047 req->
dlen = path.length();
3048 msg->
Append( path.c_str(), path.length(), 24 );
3054 MessageSendParams params; params.
timeout = timeout;
3061 XRootDStatus st = self->IssueRequest( url, msg, openHandler, params );
3068 self->pFileState =
Closed;
3076 void FileStateHandler::FailMessage( RequestData rd,
XRootDStatus status )
3079 log->
Dump(
FileMsg,
"[%p@%s] Failing message %s with %s",
3080 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(),
3081 rd.request->GetObfuscatedDescription().c_str(),
3082 status.
ToStr().c_str() );
3084 StatefulHandler *sh =
dynamic_cast<StatefulHandler*
>(rd.handler);
3088 log->
Error(
FileMsg,
"[%p@%s] Internal error while recovering %s",
3089 (
void*)
this, pFileUrl->GetObfuscatedURL().c_str(),
3090 rd.request->GetObfuscatedDescription().c_str() );
3095 ResponseHandler *userHandler = sh->GetUserHandler();
3098 new XRootDStatus( status ),
3099 0, rd.params.hostList ) );
3107 void FileStateHandler::FailQueuedMessages(
XRootDStatus status )
3109 RequestList::iterator it;
3110 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3111 FailMessage( *it, status );
3112 pToBeRecovered.clear();
3118 void FileStateHandler::ReSendQueuedMessages()
3120 RequestList::iterator it;
3121 for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3123 it->request->SetSessionId( pSessionId );
3124 ReWriteFileHandle( it->request );
3125 XRootDStatus st = IssueRequest( *pDataServer, it->request,
3126 it->handler, it->params );
3128 FailMessage( *it, st );
3130 pToBeRecovered.clear();
3136 void FileStateHandler::ReWriteFileHandle(
Message *msg )
3138 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->
GetBuffer();
3143 ClientReadRequest *req = (ClientReadRequest*)msg->
GetBuffer();
3144 memcpy( req->
fhandle, pFileHandle, 4 );
3149 ClientWriteRequest *req = (ClientWriteRequest*)msg->
GetBuffer();
3150 memcpy( req->
fhandle, pFileHandle, 4 );
3155 ClientSyncRequest *req = (ClientSyncRequest*)msg->
GetBuffer();
3156 memcpy( req->
fhandle, pFileHandle, 4 );
3161 ClientTruncateRequest *req = (ClientTruncateRequest*)msg->
GetBuffer();
3162 memcpy( req->
fhandle, pFileHandle, 4 );
3167 ClientReadVRequest *req = (ClientReadVRequest*)msg->
GetBuffer();
3168 readahead_list *dataChunk = (readahead_list*)msg->
GetBuffer( 24 );
3169 for(
size_t i = 0; i < req->
dlen/
sizeof(readahead_list); ++i )
3170 memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3175 ClientWriteVRequest *req =
3176 reinterpret_cast<ClientWriteVRequest*
>( msg->
GetBuffer() );
3177 XrdProto::write_list *wrtList =
3178 reinterpret_cast<XrdProto::write_list*
>( msg->
GetBuffer( 24 ) );
3179 size_t size = req->
dlen /
sizeof(XrdProto::write_list);
3180 for(
size_t i = 0; i < size; ++i )
3181 memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3186 ClientPgReadRequest *req = (ClientPgReadRequest*) msg->
GetBuffer();
3187 memcpy( req->
fhandle, pFileHandle, 4 );
3192 ClientPgWriteRequest *req = (ClientPgWriteRequest*) msg->
GetBuffer();
3193 memcpy( req->
fhandle, pFileHandle, 4 );
3199 log->
Dump(
FileMsg,
"[%p@%s] Rewritten file handle for %s to %#x",
3201 *((uint32_t*)pFileHandle) );
3208 void FileStateHandler::MonitorClose(
const XRootDStatus *status )
3213 Monitor::CloseInfo i;
3216 gettimeofday( &i.
cTOD, 0 );
3238 sendParams, pLFileHandler );
3242 return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3246 sendParams, pLFileHandler );
3252 XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3255 std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3262 XrdSysMutexHelper scopedLock( self->pMutex );
3268 log->
Debug(
FileMsg,
"[%p@%s] Sending a write command for handle %#x to %s",
3269 (
void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3270 *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3273 ClientWriteRequest *req;
3279 memcpy( req->
fhandle, self->pFileHandle, 4 );
3281 MessageSendParams params;
3285 params.
kbuff = kbuff.release();
3291 StatefulHandler *stHandler =
new StatefulHandler( self, handler, msg, params );
3293 return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
struct ClientPgReadRequest pgread
static const int kXR_ckpXeq
struct ClientPgWriteRequest pgwrite
struct ClientRequestHdr header
struct ClientReadRequest read
#define kXR_PROTPGRWVERSION
struct ClientWriteRequest write
static int mapError(int rc)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetSize() const
Get the size of the message.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
bool GetInt(const std::string &key, int &value)
An interface for file plug-ins.
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, uint16_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
friend class ::PgReadRetryHandler
void Tick(time_t now)
Tick.
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
FileStateHandler(FilePlugIn *&plugin)
Constructor.
friend class ::PgReadHandler
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, uint16_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Closed
The file is closed.
@ Opened
Opening has succeeded.
@ Error
Opening has failed.
@ Recovering
Recovering from an error.
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
~FileStateHandler()
Destructor.
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
friend class ::PgReadSubstitutionHandler
bool IsOpen() const
Check if the file is open.
friend class ::OpenHandler
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, uint16_t timeout)
Try other data server.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
An abstract class to describe the client-side monitoring plugin interface.
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
PgReadSubstitutionHandler(XrdCl::ResponseHandler *a, bool isHttps)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
const std::string & GetPath() const
Get the path.
bool IsMetalink() const
Is it a URL to a metalink.
std::map< std::string, std::string > ParamsMap
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
void SetPath(const std::string &path)
Set the path.
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInProgress
const uint16_t errSocketTimeout
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const uint16_t suAlreadyDone
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
const uint16_t errSocketError
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
const uint16_t errRedirect
const uint16_t errSocketDisconnected
Response NullRef< Response >::value
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted