XRootD
XrdClAsyncVectorReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_
21 
23 #include "XrdCl/XrdClSocket.hh"
25 
26 namespace XrdCl
27 {
28 
29  //----------------------------------------------------------------------------
31  //----------------------------------------------------------------------------
33  {
34  public:
35  //------------------------------------------------------------------------
39  //------------------------------------------------------------------------
40  AsyncVectorReader( const URL &url, const Message &request ) :
42  rdlstoff( 0 ),
43  rdlstlen( 0 )
44  {
45  memset( &rdlst, 0, sizeof( readahead_list ) );
46  }
47 
48  //------------------------------------------------------------------------
54  //------------------------------------------------------------------------
55  XRootDStatus Read( Socket &socket, uint32_t &btsret )
56  {
57  Log *log = DefaultEnv::GetLog();
58 
59  while( true )
60  {
61  switch( readstage )
62  {
63  //------------------------------------------------------------------
64  // Prepare to readout a new response
65  //------------------------------------------------------------------
66  case ReadStart:
67  {
68  msgbtsrd = 0;
69  rdlstoff = 0;
70  rdlstlen = sizeof( readahead_list );
72  continue;
73  }
74 
75  //------------------------------------------------------------------
76  // Readout the read_list
77  //------------------------------------------------------------------
78  case ReadRdLst:
79  {
80  //----------------------------------------------------------------
81  // We cannot afford to read the next header from the stream
82  // because we will cross the message boundary
83  //----------------------------------------------------------------
84  if( msgbtsrd + rdlstlen > dlen )
85  {
86  uint32_t btsleft = dlen - msgbtsrd;
87  log->Error( XRootDMsg, "[%s] VectorReader: No enough data to read "
88  "another chunk header. Discarding %d bytes.",
89  url.GetHostId().c_str(), btsleft );
91  continue;
92  }
93 
94  //----------------------------------------------------------------
95  // Let's readout the read list record from the socket
96  //----------------------------------------------------------------
97  uint32_t btsrd = 0;
98  char *buff = reinterpret_cast<char*>( &rdlst );
99  Status st = ReadBytesAsync( socket, buff + rdlstoff, rdlstlen, btsrd );
100  rdlstoff += btsrd;
101  rdlstlen -= btsrd;
102  msgbtsrd += btsrd;
103  btsret += btsrd;
104 
105  if( !st.IsOK() || st.code == suRetry )
106  return st;
107 
108  //----------------------------------------------------------------
109  // We have a complete read list record, now we need to marshal it
110  //----------------------------------------------------------------
111  rdlst.rlen = ntohl( rdlst.rlen );
112  rdlst.offset = ntohll( rdlst.offset );
113  choff = 0;
114  chlen = rdlst.rlen;
115 
116  //----------------------------------------------------------------
117  // Find the buffer corresponding to the chunk
118  //----------------------------------------------------------------
119  bool chfound = false;
120  for( size_t i = 0; i < chunks->size(); ++i )
121  {
122  if( ( *chunks )[i].offset == uint64_t( rdlst.offset ) &&
123  ( *chunks )[i].length == uint32_t( rdlst.rlen ) )
124  {
125  chfound = true;
126  chidx = i;
127  break;
128  }
129  }
130 
131  //----------------------------------------------------------------
132  // If the chunk was not found this is a bogus response, switch
133  // to discard mode
134  //----------------------------------------------------------------
135  if( !chfound )
136  {
137  log->Error( XRootDMsg, "[%s] VectorReader: Impossible to find chunk "
138  "buffer corresponding to %d bytes at %lld",
139  url.GetHostId().c_str(), rdlst.rlen, rdlst.offset );
141  continue;
142  }
143 
144  readstage = ReadRaw;
145  continue;
146  }
147 
148  //------------------------------------------------------------------
149  // Readout the raw data
150  //------------------------------------------------------------------
151  case ReadRaw:
152  {
153  //----------------------------------------------------------------
154  // The chunk was found, but reading all the data will cross the
155  // message boundary
156  //----------------------------------------------------------------
157  if( msgbtsrd + chlen > dlen )
158  {
159  uint32_t btsleft = dlen - msgbtsrd;
160  log->Error( XRootDMsg, "[%s] VectorReader: Malformed chunk header: "
161  "reading %d bytes from message would cross the message "
162  "boundary, discarding %d bytes.", url.GetHostId().c_str(),
163  rdlst.rlen, btsleft );
164  chstatus[chidx].sizeerr = true;
166  continue;
167  }
168 
169  //----------------------------------------------------------------
170  // Readout the raw data from the socket
171  //----------------------------------------------------------------
172  uint32_t btsrd = 0;
173  char *buff = static_cast<char*>( ( *chunks )[chidx].buffer );
174  Status st = ReadBytesAsync( socket, buff + choff, chlen, btsrd );
175  choff += btsrd;
176  chlen -= btsrd;
177  msgbtsrd += btsrd;
178  rawbtsrd += btsrd;
179  btsret += btsrd;
180 
181  if( !st.IsOK() || st.code == suRetry )
182  return st;
183 
184  log->Dump( XRootDMsg, "[%s] VectorReader: read buffer for chunk %d@%lld",
185  url.GetHostId().c_str(), rdlst.rlen, rdlst.offset );
186 
187  //----------------------------------------------------------------
188  // Mark chunk as done
189  //----------------------------------------------------------------
190  chstatus[chidx].done = true;
191 
192  //----------------------------------------------------------------
193  // There is still data to be read, we need to readout the next
194  // read list record.
195  //----------------------------------------------------------------
196  if( msgbtsrd < dlen )
197  {
198  rdlstoff = 0;
199  rdlstlen = sizeof( readahead_list );
201  continue;
202  }
203 
205  continue;
206  }
207 
208  //------------------------------------------------------------------
209  // We've had an error and we are in the discarding mode
210  //------------------------------------------------------------------
211  case ReadDiscard:
212  {
213  // Just drop the connection, we don't know if the stream is sane
214  // anymore. Recover with a reconnect.
216  }
217 
218  //------------------------------------------------------------------
219  // Finalize the read
220  //------------------------------------------------------------------
221  case ReadDone:
222  {
223  chidx = 0;
224  choff = 0;
225  chlen = 0;
226  rdlstoff = 0;
227  rdlstlen = 0;
228  break;
229  }
230 
231  //------------------------------------------------------------------
232  // Others should not happen
233  //------------------------------------------------------------------
234  default : return XRootDStatus( stError, errInternal );
235  }
236 
237  // just in case
238  break;
239  }
240  //----------------------------------------------------------------------
241  // We are done
242  //----------------------------------------------------------------------
243  return XRootDStatus();
244  }
245 
246  //------------------------------------------------------------------------
248  //------------------------------------------------------------------------
250  {
251  //--------------------------------------------------------------------------
252  // See if all the chunks are OK and put them in the response
253  //--------------------------------------------------------------------------
254  std::unique_ptr<VectorReadInfo> ptr( new VectorReadInfo() );
255  for( uint32_t i = 0; i < chunks->size(); ++i )
256  {
257  if( !chstatus[i].done )
258  return Status( stFatal, errInvalidResponse );
259  ptr->GetChunks().emplace_back( ( *chunks )[i].offset,
260  ( *chunks )[i].length, ( *chunks )[i].buffer );
261  }
262  ptr->SetSize( rawbtsrd );
263  response = new AnyObject();
264  response->Set( ptr.release() );
265  return XRootDStatus();
266  }
267 
268  private:
269 
270  size_t rdlstoff; //< offset within the current read_list
271  readahead_list rdlst; //< the readahead list for the current chunk
272  size_t rdlstlen; //< bytes left to be readout into read list
273  };
274 
275 } /* namespace XrdCl */
276 
277 #endif /* SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_ */
kXR_int32 rlen
Definition: XProtocol.hh:660
kXR_int64 offset
Definition: XProtocol.hh:661
void Set(Type object, bool own=true)
Base class for any message's body reader.
std::vector< ChunkStatus > chstatus
XRootDStatus ReadBytesAsync(Socket &socket, char *buffer, uint32_t toBeRead, uint32_t &bytesRead)
Object for reading out data from the VectorRead response.
AsyncVectorReader(const URL &url, const Message &request)
XRootDStatus Read(Socket &socket, uint32_t &btsret)
XRootDStatus GetResponse(AnyObject *&response)
Get the response.
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
A network socket.
Definition: XrdClSocket.hh:43
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t XRootDMsg
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124