XRootD
XrdCl::AsyncMsgReader Class Reference

Utility class encapsulating reading response message logic. More...

#include <XrdClAsyncMsgReader.hh>

+ Collaboration diagram for XrdCl::AsyncMsgReader:

Public Member Functions

 AsyncMsgReader (TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
 
virtual ~AsyncMsgReader ()
 Destructor. More...
 
XRootDStatus Read ()
 Read out the response from the socket. More...
 
void Reset ()
 Reset the state of the object (makes it ready to read out next msg) More...
 

Detailed Description

Utility class encapsulating reading response message logic.

Definition at line 36 of file XrdClAsyncMsgReader.hh.

Constructor & Destructor Documentation

◆ AsyncMsgReader()

XrdCl::AsyncMsgReader::AsyncMsgReader ( TransportHandler xrdTransport,
Socket socket,
const std::string &  strmname,
Stream strm,
uint16_t  substrmnb 
)
inline

Constructor

Parameters
xrdTransport: the (xrootd) transport layer
socket: the socket with the message to be read out
strmname: stream name
strm: the stream encapsulating the connection
substrmnb: the substream number

Definition at line 48 of file XrdClAsyncMsgReader.hh.

52  : readstage( ReadStart ),
53  xrdTransport( xrdTransport ),
54  socket( socket ),
55  strmname( strmname ),
56  strm( strm ),
57  substrmnb( substrmnb ),
58  inmsgsize( 0 ),
59  inhandler( nullptr )
60  {
61  }

◆ ~AsyncMsgReader()

virtual XrdCl::AsyncMsgReader::~AsyncMsgReader ( )
inlinevirtual

Destructor.

Definition at line 66 of file XrdClAsyncMsgReader.hh.

66 { }

Member Function Documentation

◆ Read()

XRootDStatus XrdCl::AsyncMsgReader::Read ( )
inline

Read out the response from the socket.

Definition at line 82 of file XrdClAsyncMsgReader.hh.

83  {
84  Log *log = DefaultEnv::GetLog();
85 
86  while( true )
87  {
88  switch( readstage )
89  {
90  //------------------------------------------------------------------
91  // There is no incoming message currently being processed so we
92  // create a new one
93  //------------------------------------------------------------------
94  case ReadStart:
95  {
96  inmsg = std::make_shared<Message>();
97  //----------------------------------------------------------------
98  // The next step is to read the header
99  //----------------------------------------------------------------
100  readstage = ReadHeader;
101  continue;
102  }
103  //------------------------------------------------------------------
104  // We need to read the header
105  //------------------------------------------------------------------
106  case ReadHeader:
107  {
108  XRootDStatus st = xrdTransport.GetHeader( *inmsg, &socket );
109  if( !st.IsOK() || st.code == suRetry )
110  return st;
111 
112  log->Dump( AsyncSockMsg, "[%s] Received message header for %p size: %d",
113  strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
114 
115  ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
116  if( rsp->hdr.status == kXR_attn )
117  {
118  log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
119  "of message %p", strmname.c_str(), inmsg.get() );
120  inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
121  readstage = ReadAttn;
122  continue;
123  }
124 
125  inmsgsize = inmsg->GetCursor();
126  inhandler = strm.InstallIncHandler( inmsg, substrmnb );
127 
128  if( inhandler )
129  {
130  log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
131  "of message %p", strmname.c_str(), inmsg.get() );
132  //--------------------------------------------------------------
133  // The next step is to read raw data
134  //--------------------------------------------------------------
135  readstage = ReadRawData;
136  continue;
137  }
138 
139  //----------------------------------------------------------------
140  // The next step is to read the message body
141  //----------------------------------------------------------------
142  readstage = ReadMsgBody;
143  continue;
144  }
145  //------------------------------------------------------------------
146  // Before proceeding we need to figure out the attn action code
147  //------------------------------------------------------------------
148  case ReadAttn:
149  {
150  XRootDStatus st = ReadAttnActnum();
151  if( !st.IsOK() || st.code == suRetry )
152  return st;
153 
154  //----------------------------------------------------------------
155  // There is an embedded response, overwrite the message with that
156  //----------------------------------------------------------------
157  if( HasEmbeddedRsp() )
158  {
159  inmsg->Free();
160  readstage = ReadHeader;
161  continue;
162  }
163 
164  //----------------------------------------------------------------
165  // Readout the rest of the body
166  //----------------------------------------------------------------
167  inmsgsize = inmsg->GetCursor();
168  readstage = ReadMsgBody;
169  continue;
170  }
171  //------------------------------------------------------------------
172  // kXR_status is special as it can have both body and raw data,
173  // handle it separately
174  //------------------------------------------------------------------
175  case ReadMore:
176  {
177  XRootDStatus st = xrdTransport.GetMore( *inmsg, &socket );
178  if( !st.IsOK() || st.code == suRetry )
179  return st;
180  inmsgsize = inmsg->GetCursor();
181 
182  //----------------------------------------------------------------
183  // The next step is to finalize the read
184  //----------------------------------------------------------------
185  readstage = ReadDone;
186  continue;
187  }
188  //------------------------------------------------------------------
189  // We need to call a raw message handler to get the data from the
190  // socket
191  //------------------------------------------------------------------
192  case ReadRawData:
193  {
194  uint32_t bytesRead = 0;
195  XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
196  if( !st.IsOK() )
197  return st;
198  inmsgsize += bytesRead;
199  if( st.code == suRetry )
200  return st;
201  //----------------------------------------------------------------
202  // The next step is to finalize the read
203  //----------------------------------------------------------------
204  readstage = ReadDone;
205  continue;
206  }
207  //------------------------------------------------------------------
208  // No raw handler, so we read the message to the buffer
209  //------------------------------------------------------------------
210  case ReadMsgBody:
211  {
212  XRootDStatus st = xrdTransport.GetBody( *inmsg, &socket );
213  if( !st.IsOK() || st.code == suRetry )
214  return st;
215  inmsgsize = inmsg->GetCursor();
216 
217 
218  //----------------------------------------------------------------
219  // kXR_status response needs special handling as it can have
220  // either (body + raw data) or (body + additional body data)
221  //----------------------------------------------------------------
222  if( IsStatusRsp() )
223  {
224  uint16_t action = strm.InspectStatusRsp( substrmnb,
225  inhandler );
226 
227  if( action & MsgHandler::Corrupted )
228  return XRootDStatus( stError, errCorruptedHeader );
229 
230  if( action & MsgHandler::Raw )
231  {
232  //--------------------------------------------------------------
233  // The next step is to read the raw data
234  //--------------------------------------------------------------
235  readstage = ReadRawData;
236  continue;
237  }
238 
239  if( action & MsgHandler::More )
240  {
241 
242  //--------------------------------------------------------------
243  // The next step is to read the additional data in the message
244  // body
245  //--------------------------------------------------------------
246  readstage = ReadMore;
247  continue;
248  }
249  }
250 
251  //----------------------------------------------------------------
252  // The next step is to finalize the read
253  //----------------------------------------------------------------
254  readstage = ReadDone;
255  continue;
256  }
257 
258  case ReadDone:
259  {
260  //----------------------------------------------------------------
261  // Report the incoming message
262  //----------------------------------------------------------------
263  log->Dump( AsyncSockMsg, "[%s] Received message %p of %d bytes",
264  strmname.c_str(), inmsg.get(), inmsgsize );
265 
266  strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
267  }
268  }
269  // just in case
270  break;
271  }
272 
273  //----------------------------------------------------------------------
274  // We are done
275  //----------------------------------------------------------------------
276  return XRootDStatus();
277  }
@ kXR_attn
Definition: XProtocol.hh:901
ServerResponseHeader hdr
Definition: XProtocol.hh:1287
static Log * GetLog()
Get default log.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
@ More
there are more (non-raw) data to be read
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:471
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
virtual XRootDStatus GetMore(Message &message, Socket *socket)=0
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t AsyncSockMsg
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
XrdSysError Log
Definition: XrdConfig.cc:112

References XrdCl::AsyncSockMsg, XrdCl::Status::code, XrdCl::MsgHandler::Corrupted, XrdCl::Log::Dump(), XrdCl::errCorruptedHeader, XrdCl::TransportHandler::GetBody(), XrdCl::TransportHandler::GetHeader(), XrdCl::DefaultEnv::GetLog(), XrdCl::TransportHandler::GetMore(), ServerResponse::hdr, XrdCl::Stream::InspectStatusRsp(), XrdCl::Stream::InstallIncHandler(), XrdCl::Status::IsOK(), kXR_attn, XrdCl::MsgHandler::More, XrdCl::Stream::OnIncoming(), XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::ReadMessageBody(), ServerResponseHeader::status, XrdCl::stError, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ Reset()

void XrdCl::AsyncMsgReader::Reset ( )
inline

Reset the state of the object (makes it ready to read out next msg)

Definition at line 71 of file XrdClAsyncMsgReader.hh.

72  {
73  readstage = ReadStart;
74  inmsg.reset();
75  inmsgsize = 0;
76  inhandler = nullptr;
77  }

The documentation for this class was generated from the following file: