25 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
51 #include <arpa/inet.h>
64 class LocalFileHandler;
100 case EntryRedirect:
return "Redirected from: " + fromstr +
" to: "
104 "Falling back to virtual redirector: " + tostr;
108 case EntryWait:
return "Waited at server request. Resending: "
112 return "Failed at: " + fromstr +
", retrying at: " + tostr;
137 std::shared_ptr<SIDManager> sidMgr,
140 pResponseHandler( respHandler ),
142 pEffectiveDataServerUrl( 0 ),
144 pLFileHandler( lFileHandler ),
146 pRedirectAsAnswer( false ),
147 pOksofarAsAnswer( false ),
148 pHasLoadBalancer( false ),
149 pHasSessionId( false ),
152 pRedirectCounter( 0 ),
153 pNotAuthorizedCounter( 0 ),
156 pAsyncChunkIndex( 0 ),
158 pPgWrtCksumBuff( 4 ),
159 pPgWrtCurrentPageOffset( 0 ),
160 pPgWrtCurrentPageNb( 0 ),
162 pOtherRawStarted( false ),
164 pFollowMetalink( false ),
168 pAggregatedWaitTime( 0 ),
172 pTimeoutFence( false ),
174 pDirListStarted( false ),
175 pDirListWithStat( false ),
183 pHasSessionId =
true;
186 log->
Debug(
ExDbgMsg,
"[%s] MsgHandler created: %p (message: %s ).",
195 ntohl( pgrdreq->
rlen ) ) );
211 DumpRedirectTraceBack();
215 delete pEffectiveDataServerUrl;
217 pRequest =
reinterpret_cast<Message*
>( 0xDEADBEEF );
219 pPostMaster =
reinterpret_cast<PostMaster*
>( 0xDEADBEEF );
221 pChunkList =
reinterpret_cast<ChunkList*
>( 0xDEADBEEF );
222 pEffectiveDataServerUrl =
reinterpret_cast<URL*
>( 0xDEADBEEF );
236 virtual uint16_t
Examine( std::shared_ptr<Message> &msg );
255 virtual uint16_t
GetSid()
const;
277 uint32_t &bytesRead );
297 virtual bool IsRaw()
const;
310 uint32_t &bytesWritten );
324 pExpiration = expiration;
341 pRedirectAsAnswer = redirectAsAnswer;
350 pOksofarAsAnswer = oksofarAsAnswer;
368 pLoadBalancer = loadBalancer;
369 pHasLoadBalancer =
true;
377 pHosts.reset( hostList );
385 pChunkList = chunkList;
387 pBodyReader->SetChunkList( chunkList );
389 pChunkStatus.resize( chunkList->size() );
391 pChunkStatus.clear();
396 pCrc32cDigests = std::move( crc32cDigests );
412 pRedirectCounter = redirectCounter;
417 pFollowMetalink = followMetalink;
422 pStateful = stateful;
447 void HandleResponse();
464 Status ParseXAttrResponse(
char *data,
size_t len,
AnyObject *&response );
470 Status RewriteRequestRedirect(
const URL &newUrl );
475 Status RewriteRequestWait();
480 void UpdateTriedCGI(uint32_t errNo=0);
485 void SwitchOnRefreshFlag();
491 void HandleRspOrQueue();
496 void HandleLocalRedirect(
URL *url );
514 bool OmitWait(
Message &request,
const URL &url );
523 bool RetriableErrorResponse(
const Status &status );
528 void DumpRedirectTraceBack();
538 Status ReadFromBuffer(
char *&buffer,
size_t &buflen, T& result );
548 Status ReadFromBuffer(
char *&buffer,
size_t &buflen, std::string &result );
559 Status ReadFromBuffer(
char *&buffer,
size_t &buflen,
size_t size,
560 std::string &result );
567 ChunkStatus(): sizeError( false ), done( false ) {}
572 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
574 static const size_t CksumSize =
sizeof( uint32_t );
576 static const size_t MaxSslErrRetry = 3;
578 inline static size_t NbPgPerRsp( uint64_t offset, uint32_t dlen )
588 if( _1stpg + CksumSize > dlen )
589 _1stpg = dlen - CksumSize;
590 dlen -= _1stpg + CksumSize;
592 pgcnt += dlen / PageWithCksum;
593 if( dlen % PageWithCksum )
599 std::shared_ptr<Message> pResponse;
600 std::vector<std::shared_ptr<Message>> pPartialResps;
601 ResponseHandler *pResponseHandler;
603 URL *pEffectiveDataServerUrl;
604 PostMaster *pPostMaster;
605 std::shared_ptr<SIDManager> pSidMgr;
606 LocalFileHandler *pLFileHandler;
607 XRootDStatus pStatus;
610 bool pRedirectAsAnswer;
611 bool pOksofarAsAnswer;
612 std::unique_ptr<HostList> pHosts;
613 bool pHasLoadBalancer;
614 HostInfo pLoadBalancer;
616 std::string pRedirectUrl;
618 std::vector<uint32_t> pCrc32cDigests;
620 std::vector<ChunkStatus> pChunkStatus;
621 uint16_t pRedirectCounter;
622 uint16_t pNotAuthorizedCounter;
624 uint32_t pAsyncOffset;
625 uint32_t pAsyncChunkIndex;
627 std::unique_ptr<AsyncPageReader> pPageReader;
628 std::unique_ptr<AsyncRawReaderIntfc> pBodyReader;
630 Buffer pPgWrtCksumBuff;
631 uint32_t pPgWrtCurrentPageOffset;
632 uint32_t pPgWrtCurrentPageNb;
634 bool pOtherRawStarted;
636 bool pFollowMetalink;
639 int pAggregatedWaitTime;
641 std::unique_ptr<RedirectEntry> pRdirEntry;
642 RedirectTraceBack pRedirectTraceBack;
651 std::atomic<bool> pTimeoutFence;
658 bool pDirListStarted;
659 bool pDirListWithStat;
Object for discarding data.
Object for reading out data from the kXR_read response.
Object for reading out data from the VectorRead response.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
StreamEvent
Events that may have occurred to the stream.
A hub for dispatching and receiving messages.
Handle an async response.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
std::string GetLocation() const
Get location (protocol://host:port/path)
bool IsValid() const
Is the url valid.
Handle/Process/Forward XRootD messages.
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
virtual uint16_t Examine(std::shared_ptr< Message > &msg)
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
void SetFollowMetalink(bool followMetalink)
const Message * GetRequest() const
Get the request pointer.
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status)
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
virtual void OnStatusReady(const Message *message, XRootDStatus status)
The requested action has been performed and the status is available.
~XRootDMsgHandler()
Destructor.
void WaitDone(time_t now)
void SetStateful(bool stateful)
virtual bool IsRaw() const
Are we a raw writer or not?
time_t GetExpiration()
Get a timestamp after which we give up.
void SetOksofarAsAnswer(bool oksofarAsAnswer)
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
virtual void Process()
Process the message if it was "taken" by the examine action.
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
void SetRedirectAsAnswer(bool redirectAsAnswer)
virtual uint16_t InspectStatusRsp()
virtual uint16_t GetSid() const
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
std::vector< HostInfo > HostList
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
RedirectEntry(const URL &from, const URL &to, Type type)
std::string ToString(bool prevok=true)
Procedure execution status.