XRootD
XrdClAsyncMsgWriter.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_
20 #define SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_
21 
22 #include "XrdCl/XrdClMessage.hh"
25 #include "XrdCl/XrdClSocket.hh"
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClStream.hh"
28 #include "XrdSys/XrdSysE2T.hh"
29 
30 #include <memory>
31 
32 namespace XrdCl
33 {
34  //----------------------------------------------------------------------------
36  //----------------------------------------------------------------------------
38  {
39  public:
40  //------------------------------------------------------------------------
48  //------------------------------------------------------------------------
50  Socket &socket,
51  const std::string &strmname,
52  Stream &strm,
53  uint16_t substrmnb,
54  AnyObject &chdata ) : writestage( WriteStart ),
55  xrdTransport( xrdTransport ),
56  socket( socket ),
57  strmname( strmname ),
58  strm( strm ),
59  substrmnb( substrmnb ),
60  chdata( chdata ),
61  outmsg( nullptr ),
62  outmsgsize( 0 ),
63  outhandler( nullptr )
64  {
65  }
66 
67  //------------------------------------------------------------------------
69  //------------------------------------------------------------------------
70  inline void Reset()
71  {
72  writestage = WriteStart;
73  outmsg = nullptr;
74  outmsgsize = 0;;
75  outhandler = nullptr;
76  outsign.reset();
77  }
78 
79  //------------------------------------------------------------------------
81  //------------------------------------------------------------------------
83  {
84  Log *log = DefaultEnv::GetLog();
85  while( true )
86  {
87  switch( writestage )
88  {
89  //------------------------------------------------------------------
90  // Pick up a message if we're not in process of writing something
91  //------------------------------------------------------------------
92  case WriteStart:
93  {
94  std::pair<Message *, MsgHandler *> toBeSent;
95  toBeSent = strm.OnReadyToWrite( substrmnb );
96  outmsg = toBeSent.first;
97  outhandler = toBeSent.second;
98  if( !outmsg ) return XRootDStatus( stOK, suAlreadyDone );
99 
100  outmsg->SetCursor( 0 );
101  outmsgsize = outmsg->GetSize();
102 
103  //----------------------------------------------------------------
104  // Secure the message if necessary
105  //----------------------------------------------------------------
106  Message *signature = nullptr;
107  XRootDStatus st = xrdTransport.GetSignature( outmsg, signature, chdata );
108  if( !st.IsOK() ) return st;
109  outsign.reset( signature );
110 
111  if( outsign )
112  outmsgsize += outsign->GetSize();
113 
114  //----------------------------------------------------------------
115  // The next step is to write the signature
116  //----------------------------------------------------------------
117  writestage = WriteSign;
118  continue;
119  }
120  //------------------------------------------------------------------
121  // First write the signature (if there is one)
122  //------------------------------------------------------------------
123  case WriteSign:
124  {
125  //----------------------------------------------------------------
126  // If there is a signature for the request send it over the socket
127  //----------------------------------------------------------------
128  if( outsign )
129  {
130  XRootDStatus st = socket.Send( *outsign, strmname );
131  if( !st.IsOK() || st.code == suRetry ) return st;
132  }
133  //----------------------------------------------------------------
134  // The next step is to write the signature
135  //----------------------------------------------------------------
136  writestage = WriteRequest;
137  continue;
138  }
139  //------------------------------------------------------------------
140  // Then write the request itself
141  //------------------------------------------------------------------
142  case WriteRequest:
143  {
144  XRootDStatus st = socket.Send( *outmsg, strmname );
145  if( !st.IsOK() || st.code == suRetry ) return st;
146  //----------------------------------------------------------------
147  // The next step is to write the signature
148  //----------------------------------------------------------------
149  writestage = WriteRawData;
150  continue;
151  }
152  //------------------------------------------------------------------
153  // And then write the raw data (if any)
154  //------------------------------------------------------------------
155  case WriteRawData:
156  {
157  if( outhandler->IsRaw() )
158  {
159  uint32_t wrtcnt = 0;
160  XRootDStatus st = outhandler->WriteMessageBody( &socket, wrtcnt );
161  if( !st.IsOK() || st.code == suRetry ) return st;
162  outmsgsize += wrtcnt;
163  log->Dump( AsyncSockMsg, "[%s] Wrote %d bytes of raw data of message"
164  "(%p) body.", strmname.c_str(), wrtcnt, outmsg );
165  }
166  //----------------------------------------------------------------
167  // The next step is to finalize the write operation
168  //----------------------------------------------------------------
169  writestage = WriteDone;
170  continue;
171  }
172  //------------------------------------------------------------------
173  // Finally, finalize the write operation
174  //------------------------------------------------------------------
175  case WriteDone:
176  {
177  XRootDStatus st = socket.Flash();
178  if( !st.IsOK() )
179  {
180  log->Error( AsyncSockMsg, "[%s] Unable to flash the socket: %s",
181  strmname.c_str(), XrdSysE2T( st.errNo ) );
182  return st;
183  }
184 
185  log->Dump( AsyncSockMsg, "[%s] Successfully sent message: %s (%p).",
186  strmname.c_str(), outmsg->GetObfuscatedDescription().c_str(), outmsg );
187 
188  strm.OnMessageSent( substrmnb, outmsg, outmsgsize );
189  return XRootDStatus();
190  }
191  }
192  // just in case ...
193  break;
194  }
195  //----------------------------------------------------------------------
196  // We are done
197  //----------------------------------------------------------------------
198  return XRootDStatus();
199  }
200 
201  private:
202 
203  //------------------------------------------------------------------------
205  //------------------------------------------------------------------------
206  enum Stage
207  {
208  WriteStart, //< the next step is to initialize the read
209  WriteSign, //< the next step is to write the signature
210  WriteRequest, //< the next step is to write the request
211  WriteRawData, //< the next step is to write the raw data
212  WriteDone //< the next step is to finalize the write
213  };
214 
215  //------------------------------------------------------------------------
216  // Current read stage
217  //------------------------------------------------------------------------
218  Stage writestage;
219 
220  //------------------------------------------------------------------------
221  // The context of the read operation
222  //------------------------------------------------------------------------
223  TransportHandler &xrdTransport;
224  Socket &socket;
225  const std::string &strmname;
226  Stream &strm;
227  uint16_t substrmnb;
228  AnyObject &chdata;
229 
230  //------------------------------------------------------------------------
231  // The internal state of the the reader
232  //------------------------------------------------------------------------
233  Message *outmsg; //< we don't own the message
234  uint32_t outmsgsize;
235  MsgHandler *outhandler;
236  std::unique_ptr<Message> outsign;
237  };
238 
239 }
240 
241 #endif /* SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_ */
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
Utility class encapsulating writing request logic.
XRootDStatus Write()
Write the request into the socket.
AsyncMsgWriter(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb, AnyObject &chdata)
void Reset()
Reset the state of the object (makes it ready to read out next msg)
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
virtual XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
virtual bool IsRaw() const
A network socket.
Definition: XrdClSocket.hh:43
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
Definition: XrdClSocket.cc:461
XRootDStatus Flash()
Definition: XrdClSocket.cc:818
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
Definition: XrdClStream.cc:584
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
Definition: XrdClStream.cc:545
Perform the handshake and the authentication for each physical stream.
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)=0
Get signature for given message.
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint64_t AsyncSockMsg
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:148