XRootD
XrdClAsyncRawReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCRAWREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCRAWREADER_HH_
21 
22 
24 #include "XrdCl/XrdClSocket.hh"
25 #include "XrdCl/XrdClStream.hh"
27 
28 namespace XrdCl
29 {
30 
31  //----------------------------------------------------------------------------
33  //----------------------------------------------------------------------------
35  {
36  public:
37  //------------------------------------------------------------------------
42  //------------------------------------------------------------------------
43  AsyncRawReader( const URL &url, const Message &request ) :
45  {
46  }
47 
48  //------------------------------------------------------------------------
54  //------------------------------------------------------------------------
55  XRootDStatus Read( Socket &socket, uint32_t &btsret )
56  {
57  while( true )
58  {
59  switch( readstage )
60  {
61  //------------------------------------------------------------------
62  // Prepare to readout a new response
63  //------------------------------------------------------------------
64  case ReadStart:
65  {
66  msgbtsrd = 0;
67  chlen = ( *chunks )[0].length;
69  continue;
70  }
71 
72  //------------------------------------------------------------------
73  // Readout the raw data
74  //------------------------------------------------------------------
75  case ReadRaw:
76  {
77  //----------------------------------------------------------------
78  // Make sure we are not reading past the end of the read response
79  //----------------------------------------------------------------
80  if( msgbtsrd + chlen > dlen )
81  chlen = dlen - msgbtsrd;
82 
83  //----------------------------------------------------------------
84  // Readout the raw data from the socket
85  //----------------------------------------------------------------
86  uint32_t btsrd = 0;
87  char *buff = static_cast<char*>( ( *chunks )[chidx].buffer );
88  Status st = ReadBytesAsync( socket, buff + choff, chlen, btsrd );
89  choff += btsrd;
90  chlen -= btsrd;
91  msgbtsrd += btsrd;
92  rawbtsrd += btsrd;
93  btsret += btsrd;
94 
95  if( !st.IsOK() || st.code == suRetry )
96  return st;
97 
98  //----------------------------------------------------------------
99  // If the chunk is full, move to the next buffer
100  //----------------------------------------------------------------
101  if( choff == ( *chunks )[chidx].length )
102  {
103  ++chidx;
104  choff = 0;
105  chlen = ( chidx < chunks->size() ? ( *chunks )[chidx].length : 0 );
106  }
107  //----------------------------------------------------------------
108  // Check if there are some data left in the response to be readout
109  // from the socket.
110  //----------------------------------------------------------------
111  if( msgbtsrd < dlen )
112  {
113  //--------------------------------------------------------------
114  // We run out of space, the server has send too much data
115  //--------------------------------------------------------------
116  if( chidx >= chunks->size() )
117  {
119  continue;
120  }
121  readstage = ReadRaw;
122  continue;
123  }
124  //----------------------------------------------------------------
125  // We are done
126  //----------------------------------------------------------------
128  continue;
129  }
130 
131  //------------------------------------------------------------------
132  // We've had an error and we are in the discarding mode
133  //------------------------------------------------------------------
134  case ReadDiscard:
135  {
136  DefaultEnv::GetLog()->Error( XRootDMsg, "[%s] RawReader: Handling "
137  "response to %s: user supplied buffer is "
138  "too small for the received data.",
139  url.GetHostId().c_str(),
140  request.GetObfuscatedDescription().c_str() );
141  // Just drop the connection, we don't know if the stream is sane
142  // anymore. Recover with a reconnect.
144  }
145 
146  //------------------------------------------------------------------
147  // Finalize the read
148  //------------------------------------------------------------------
149  case ReadDone:
150  {
151  break;
152  }
153 
154  //------------------------------------------------------------------
155  // Others should not happen
156  //------------------------------------------------------------------
157  default : return XRootDStatus( stError, errInternal );
158  }
159 
160  // just in case
161  break;
162  }
163  //----------------------------------------------------------------------
164  // We are done
165  //----------------------------------------------------------------------
166  return XRootDStatus();
167  }
168 
169  //------------------------------------------------------------------------
171  //------------------------------------------------------------------------
173  {
174  if( dataerr )
176  std::unique_ptr<AnyObject> rsp( new AnyObject() );
178  rsp->Set( GetVectorReadInfo() );
179  else
180  rsp->Set( GetChunkInfo() );
181  response = rsp.release();
182  return XRootDStatus();
183  }
184 
185  private:
186 
187  inline ChunkInfo* GetChunkInfo()
188  {
189  ChunkInfo *info = new ChunkInfo( chunks->front() );
190  info->length = rawbtsrd;
191  return info;
192  }
193 
194  inline VectorReadInfo* GetVectorReadInfo()
195  {
196  VectorReadInfo *info = new VectorReadInfo();
197  info->SetSize( rawbtsrd );
198  int btsleft = rawbtsrd;
199  for( auto &chunk : *chunks )
200  {
201  int length = uint32_t( btsleft ) >= chunk.length ? chunk.length : btsleft;
202  info->GetChunks().emplace_back( chunk.offset, length, chunk.buffer );
203  btsleft -= length;
204  }
205  return info;
206  }
207  };
208 
209 } /* namespace XrdCl */
210 
211 #endif /* SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_ */
@ kXR_virtReadv
Definition: XProtocol.hh:150
Base class for any message's body reader.
XRootDStatus ReadBytesAsync(Socket &socket, char *buffer, uint32_t toBeRead, uint32_t &bytesRead)
Object for reading out data from the kXR_read response.
XRootDStatus Read(Socket &socket, uint32_t &btsret)
XRootDStatus GetResponse(AnyObject *&response)
Get the response.
AsyncRawReader(const URL &url, const Message &request)
static Log * GetLog()
Get default log.
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint16_t GetVirtReqID() const
Get virtual request ID for the message.
A network socket.
Definition: XrdClSocket.hh:43
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t XRootDMsg
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
Describe a data chunk for vector read.
uint32_t length
offset in the file
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124