XRootD
XrdClInQueue.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #include "XProtocol/XProtocol.hh"
20 #include "XrdCl/XrdClInQueue.hh"
22 #include "XrdCl/XrdClMessage.hh"
23 #include "XrdCl/XrdClLog.hh"
24 #include "XrdCl/XrdClDefaultEnv.hh"
25 #include "XrdCl/XrdClConstants.hh"
26 
27 #include <arpa/inet.h> // for network unmarshalling stuff
28 
29 namespace XrdCl
30 {
31  //----------------------------------------------------------------------------
32  // Filter messages
33  //----------------------------------------------------------------------------
34  bool InQueue::DiscardMessage( Message& msg, uint16_t& sid) const
35  {
36  if( msg.GetSize() < 8 )
37  return true;
38 
39  ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
40 
41  // We only care about async responses, but those are extracted now
42  // in the SocketHandler
43  if( rsp->hdr.status == kXR_attn )
44  return true;
45  else
46  sid = ((uint16_t)rsp->hdr.streamid[1] << 8) | (uint16_t)rsp->hdr.streamid[0];
47 
48  return false;
49  }
50 
51  //----------------------------------------------------------------------------
52  // Add a listener that should be notified about incoming messages
53  //----------------------------------------------------------------------------
54  void InQueue::AddMessageHandler( MsgHandler *handler, time_t expires, bool &rmMsg )
55  {
56  uint16_t handlerSid = handler->GetSid();
57  XrdSysMutexHelper scopedLock( pMutex );
58 
59  pHandlers[handlerSid] = HandlerAndExpire( handler, expires );
60  }
61 
62  //----------------------------------------------------------------------------
63  // Get a message handler interested in receiving message whose header
64  // is stored in msg
65  //----------------------------------------------------------------------------
66  MsgHandler *InQueue::GetHandlerForMessage( std::shared_ptr<Message> &msg,
67  time_t &expires,
68  uint16_t &action )
69  {
70  time_t exp = 0;
71  uint16_t act = 0;
72  uint16_t msgSid = 0;
73  MsgHandler* handler = 0;
74 
75  if (DiscardMessage(*msg, msgSid))
76  {
77  return handler;
78  }
79 
80  XrdSysMutexHelper scopedLock( pMutex );
81  HandlerMap::iterator it = pHandlers.find(msgSid);
82 
83  if (it != pHandlers.end())
84  {
85  Log *log = DefaultEnv::GetLog();
86  handler = it->second.first;
87  act = handler->Examine( msg );
88  exp = it->second.second;
89  log->Debug( ExDbgMsg, "[msg: %p] Assigned MsgHandler: %p.",
90  msg.get(), handler );
91 
92 
93  if( act & MsgHandler::RemoveHandler )
94  {
95  pHandlers.erase( it );
96  log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
97  handler, handler );
98  }
99  }
100 
101  if( handler )
102  {
103  expires = exp;
104  action = act;
105  }
106 
107  return handler;
108  }
109 
110  //----------------------------------------------------------------------------
111  // Re-insert the handler without scanning the cached messages
112  //----------------------------------------------------------------------------
114  time_t expires )
115  {
116  uint16_t handlerSid = handler->GetSid();
117  XrdSysMutexHelper scopedLock( pMutex );
118  pHandlers[handlerSid] = HandlerAndExpire( handler, expires );
119  }
120 
121  //----------------------------------------------------------------------------
122  // Remove a listener
123  //----------------------------------------------------------------------------
125  {
126  uint16_t handlerSid = handler->GetSid();
127  XrdSysMutexHelper scopedLock( pMutex );
128  pHandlers.erase(handlerSid);
129  Log *log = DefaultEnv::GetLog();
130  log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
131  handler, handler );
132 
133  }
134 
135  //----------------------------------------------------------------------------
136  // Report an event to the handlers
137  //----------------------------------------------------------------------------
139  XRootDStatus status )
140  {
141  uint8_t action = 0;
142  XrdSysMutexHelper scopedLock( pMutex );
143  for( HandlerMap::iterator it = pHandlers.begin(); it != pHandlers.end(); )
144  {
145  action = it->second.first->OnStreamEvent( event, status );
146 
147  if( action & MsgHandler::RemoveHandler )
148  {
149  auto next = it; ++next;
150  pHandlers.erase( it );
151  it = next;
152  }
153  else
154  ++it;
155  }
156  }
157 
158  //----------------------------------------------------------------------------
159  // Timeout handlers
160  //----------------------------------------------------------------------------
161  void InQueue::ReportTimeout( time_t now )
162  {
163  if( !now )
164  now = ::time(0);
165 
166  XrdSysMutexHelper scopedLock( pMutex );
167  HandlerMap::iterator it = pHandlers.begin();
168  while( it != pHandlers.end() )
169  {
170  if( it->second.second <= now )
171  {
172  uint8_t act = it->second.first->OnStreamEvent( MsgHandler::Timeout,
174  auto next = it; ++next;
175  if( act & MsgHandler::RemoveHandler )
176  pHandlers.erase( it );
177  it = next;
178  }
179  else
180  ++it;
181  }
182  }
183 }
kXR_char streamid[2]
Definition: XProtocol.hh:914
@ kXR_attn
Definition: XProtocol.hh:901
ServerResponseHeader hdr
Definition: XProtocol.hh:1287
static Log * GetLog()
Get default log.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66
void AddMessageHandler(MsgHandler *handler, time_t expires, bool &rmMsg)
Definition: XrdClInQueue.cc:54
Handle diagnostics.
Definition: XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
virtual uint16_t Examine(std::shared_ptr< Message > &msg)=0
virtual uint16_t GetSid() const =0
StreamEvent
Events that may have occurred to the stream.
@ Timeout
The declared timeout has occurred.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t ExDbgMsg
Procedure execution status.
Definition: XrdClStatus.hh:115