XRootD
XrdClAsyncMsgReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
21 
22 #include "XrdCl/XrdClMessage.hh"
25 #include "XrdCl/XrdClSocket.hh"
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClStream.hh"
28 
29 #include <memory>
30 
31 namespace XrdCl
32 {
33  //----------------------------------------------------------------------------
35  //----------------------------------------------------------------------------
37  {
38  public:
39  //------------------------------------------------------------------------
47  //------------------------------------------------------------------------
49  Socket &socket,
50  const std::string &strmname,
51  Stream &strm,
52  uint16_t substrmnb) : readstage( ReadStart ),
53  xrdTransport( xrdTransport ),
54  socket( socket ),
55  strmname( strmname ),
56  strm( strm ),
57  substrmnb( substrmnb ),
58  inmsgsize( 0 ),
59  inhandler( nullptr )
60  {
61  }
62 
63  //------------------------------------------------------------------------
65  //------------------------------------------------------------------------
66  virtual ~AsyncMsgReader(){ }
67 
68  //------------------------------------------------------------------------
70  //------------------------------------------------------------------------
71  inline void Reset()
72  {
73  readstage = ReadStart;
74  inmsg.reset();
75  inmsgsize = 0;
76  inhandler = nullptr;
77  }
78 
79  //------------------------------------------------------------------------
81  //------------------------------------------------------------------------
83  {
84  Log *log = DefaultEnv::GetLog();
85 
86  while( true )
87  {
88  switch( readstage )
89  {
90  //------------------------------------------------------------------
91  // There is no incoming message currently being processed so we
92  // create a new one
93  //------------------------------------------------------------------
94  case ReadStart:
95  {
96  inmsg = std::make_shared<Message>();
97  //----------------------------------------------------------------
98  // The next step is to read the header
99  //----------------------------------------------------------------
100  readstage = ReadHeader;
101  continue;
102  }
103  //------------------------------------------------------------------
104  // We need to read the header
105  //------------------------------------------------------------------
106  case ReadHeader:
107  {
108  XRootDStatus st = xrdTransport.GetHeader( *inmsg, &socket );
109  if( !st.IsOK() || st.code == suRetry )
110  return st;
111 
112  log->Dump( AsyncSockMsg, "[%s] Received message header for %p size: %d",
113  strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
114 
115  ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
116  if( rsp->hdr.status == kXR_attn )
117  {
118  log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
119  "of message %p", strmname.c_str(), inmsg.get() );
120  inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
121  readstage = ReadAttn;
122  continue;
123  }
124 
125  inmsgsize = inmsg->GetCursor();
126  inhandler = strm.InstallIncHandler( inmsg, substrmnb );
127 
128  if( inhandler )
129  {
130  log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
131  "of message %p", strmname.c_str(), inmsg.get() );
132  //--------------------------------------------------------------
133  // The next step is to read raw data
134  //--------------------------------------------------------------
135  readstage = ReadRawData;
136  continue;
137  }
138 
139  //----------------------------------------------------------------
140  // The next step is to read the message body
141  //----------------------------------------------------------------
142  readstage = ReadMsgBody;
143  continue;
144  }
145  //------------------------------------------------------------------
146  // Before proceeding we need to figure out the attn action code
147  //------------------------------------------------------------------
148  case ReadAttn:
149  {
150  XRootDStatus st = ReadAttnActnum();
151  if( !st.IsOK() || st.code == suRetry )
152  return st;
153 
154  //----------------------------------------------------------------
155  // There is an embedded response, overwrite the message with that
156  //----------------------------------------------------------------
157  if( HasEmbeddedRsp() )
158  {
159  inmsg->Free();
160  readstage = ReadHeader;
161  continue;
162  }
163 
164  //----------------------------------------------------------------
165  // Readout the rest of the body
166  //----------------------------------------------------------------
167  inmsgsize = inmsg->GetCursor();
168  readstage = ReadMsgBody;
169  continue;
170  }
171  //------------------------------------------------------------------
172  // kXR_status is special as it can have both body and raw data,
173  // handle it separately
174  //------------------------------------------------------------------
175  case ReadMore:
176  {
177  XRootDStatus st = xrdTransport.GetMore( *inmsg, &socket );
178  if( !st.IsOK() || st.code == suRetry )
179  return st;
180  inmsgsize = inmsg->GetCursor();
181 
182  //----------------------------------------------------------------
183  // The next step is to finalize the read
184  //----------------------------------------------------------------
185  readstage = ReadDone;
186  continue;
187  }
188  //------------------------------------------------------------------
189  // We need to call a raw message handler to get the data from the
190  // socket
191  //------------------------------------------------------------------
192  case ReadRawData:
193  {
194  uint32_t bytesRead = 0;
195  XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
196  if( !st.IsOK() )
197  return st;
198  inmsgsize += bytesRead;
199  if( st.code == suRetry )
200  return st;
201  //----------------------------------------------------------------
202  // The next step is to finalize the read
203  //----------------------------------------------------------------
204  readstage = ReadDone;
205  continue;
206  }
207  //------------------------------------------------------------------
208  // No raw handler, so we read the message to the buffer
209  //------------------------------------------------------------------
210  case ReadMsgBody:
211  {
212  XRootDStatus st = xrdTransport.GetBody( *inmsg, &socket );
213  if( !st.IsOK() || st.code == suRetry )
214  return st;
215  inmsgsize = inmsg->GetCursor();
216 
217 
218  //----------------------------------------------------------------
219  // kXR_status response needs special handling as it can have
220  // either (body + raw data) or (body + additional body data)
221  //----------------------------------------------------------------
222  if( IsStatusRsp() )
223  {
224  uint16_t action = strm.InspectStatusRsp( substrmnb,
225  inhandler );
226 
227  if( action & MsgHandler::Corrupted )
229 
230  if( action & MsgHandler::Raw )
231  {
232  //--------------------------------------------------------------
233  // The next step is to read the raw data
234  //--------------------------------------------------------------
235  readstage = ReadRawData;
236  continue;
237  }
238 
239  if( action & MsgHandler::More )
240  {
241 
242  //--------------------------------------------------------------
243  // The next step is to read the additional data in the message
244  // body
245  //--------------------------------------------------------------
246  readstage = ReadMore;
247  continue;
248  }
249  }
250 
251  //----------------------------------------------------------------
252  // The next step is to finalize the read
253  //----------------------------------------------------------------
254  readstage = ReadDone;
255  continue;
256  }
257 
258  case ReadDone:
259  {
260  //----------------------------------------------------------------
261  // Report the incoming message
262  //----------------------------------------------------------------
263  log->Dump( AsyncSockMsg, "[%s] Received message %p of %d bytes",
264  strmname.c_str(), inmsg.get(), inmsgsize );
265 
266  strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
267  }
268  }
269  // just in case
270  break;
271  }
272 
273  //----------------------------------------------------------------------
274  // We are done
275  //----------------------------------------------------------------------
276  return XRootDStatus();
277  }
278 
279  private:
280 
281  XRootDStatus ReadAttnActnum()
282  {
283  //----------------------------------------------------------------------
284  // Readout the action code from the socket. We are reading out 8 bytes
285  // into the message, the 8 byte header is already there.
286  //----------------------------------------------------------------------
287  size_t btsleft = 8 - ( inmsg->GetCursor() - 8 );
288  while( btsleft > 0 )
289  {
290  int btsrd = 0;
291  XRootDStatus st = socket.Read( inmsg->GetBufferAtCursor(), btsleft, btsrd );
292  if( !st.IsOK() || st.code == suRetry )
293  return st;
294  btsleft -= btsrd;
295  inmsg->AdvanceCursor( btsrd );
296  }
297 
298  //----------------------------------------------------------------------
299  // Marshal the action code
300  //----------------------------------------------------------------------
301  ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
302  attn->actnum = ntohl( attn->actnum );
303 
304  return XRootDStatus();
305  }
306 
307  inline bool IsStatusRsp()
308  {
309  ServerResponseHeader *hdr = (ServerResponseHeader*)inmsg->GetBuffer();
310  return ( hdr->status == kXR_status );
311  }
312 
313  inline bool HasEmbeddedRsp()
314  {
315  ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
316  return ( attn->actnum == kXR_asynresp );
317  }
318 
319  //------------------------------------------------------------------------
321  //------------------------------------------------------------------------
322  enum Stage
323  {
324  ReadStart, //< the next step is to initialize the read
325  ReadHeader, //< the next step is to read the header
326  ReadAttn, //< the next step is to read attn action code
327  ReadMore, //< the next step is to read more status body
328  ReadMsgBody, //< the next step is to read the body
329  ReadRawData, //< the next step is to read the raw data
330  ReadDone //< the next step is to finalize the read
331  };
332 
333  //------------------------------------------------------------------------
334  // Current read stage
335  //------------------------------------------------------------------------
336  Stage readstage;
337 
338  //------------------------------------------------------------------------
339  // The context of the read operation
340  //------------------------------------------------------------------------
341  TransportHandler &xrdTransport;
342  Socket &socket;
343  const std::string &strmname;
344  Stream &strm;
345  uint16_t substrmnb;
346 
347 
348  //------------------------------------------------------------------------
349  // The internal state of the the reader
350  //------------------------------------------------------------------------
351  std::shared_ptr<Message> inmsg; //< the ownership is shared with MsgHandler
352  uint32_t inmsgsize;
353  MsgHandler *inhandler;
354 
355  };
356 
357 } /* namespace XrdCl */
358 
359 #endif /* SRC_XRDCL_XRDCLASYNCMSGREADER_HH_ */
@ kXR_asynresp
Definition: XProtocol.hh:938
@ kXR_status
Definition: XProtocol.hh:907
@ kXR_attn
Definition: XProtocol.hh:901
ServerResponseHeader hdr
Definition: XProtocol.hh:1287
Utility class encapsulating reading response message logic.
void Reset()
Reset the state of the object (makes it ready to read out next msg)
XRootDStatus Read()
Read out the response from the socket.
virtual ~AsyncMsgReader()
Destructor.
AsyncMsgReader(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
@ More
there are more (non-raw) data to be read
A network socket.
Definition: XrdClSocket.hh:43
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
Definition: XrdClSocket.cc:740
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:471
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
Perform the handshake and the authentication for each physical stream.
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
virtual XRootDStatus GetMore(Message &message, Socket *socket)=0
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t AsyncSockMsg
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124