XRootD
XrdSendQ.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d S e n d Q . c c */
4 /* */
5 /* (c) 2016 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstring>
33 #include <unistd.h>
34 #include <sys/uio.h>
35 
36 #include "Xrd/XrdLink.hh"
37 #include "Xrd/XrdScheduler.hh"
38 #include "Xrd/XrdSendQ.hh"
39 
40 #include "XrdSys/XrdSysError.hh"
41 #include "XrdSys/XrdSysPthread.hh"
42 
43 /******************************************************************************/
44 /* L o c a l C l a s s e s */
45 /******************************************************************************/
46 
47 class LinkShutdown : public XrdJob
48 {
49 public:
50 
51 virtual void DoIt() {myLink->Shutdown(true);
52  myLink->setRef(-1);
53  delete this;
54  }
55 
57  : XrdJob("SendQ Shutdown"), myLink(link) {}
58 
59 virtual ~LinkShutdown() {}
60 
61 private:
62 
63 XrdLink *myLink;
64 };
65 
66 
67 /******************************************************************************/
68 /* G l o b a l O b j e c t s */
69 /******************************************************************************/
70 
71 namespace XrdGlobal
72 {
73 extern XrdSysError Log;
74 extern XrdScheduler Sched;
75 };
76 
77 using namespace XrdGlobal;
78 
79 /******************************************************************************/
80 /* S t a t i c O b j e c t s */
81 /******************************************************************************/
82 
83 unsigned int XrdSendQ::qWarn = 3;
84 unsigned int XrdSendQ::qMax = 0xffffffff;
85 bool XrdSendQ::qPerm = false;
86 
87 /******************************************************************************/
88 /* C o n s t r u c t o r */
89 /******************************************************************************/
90 
92  : XrdJob("sendQ runner"),
93  mLink(lP), wMutex(mP),
94  fMsg(0), lMsg(0), delQ(0), theFD(lP.FDnum()),
95  inQ(0), qWmsg(qWarn), discards(0),
96  active(false), terminate(false) {}
97 
98 /******************************************************************************/
99 /* D o I t */
100 /******************************************************************************/
101 
103 {
104  mBuff *theMsg;
105  int myFD, rc;
106  bool theEnd;
107 
108 // Obtain the lock
109 //
110  wMutex.Lock();
111 
112 // Before we start check if we should delete any messages
113 //
114  if (delQ) {RelMsgs(delQ); delQ = 0;}
115 
116 // Send all queued messages (we can use a blocking send here)
117 //
118  while(!terminate && (theMsg = fMsg))
119  {if (!(fMsg = fMsg->next)) lMsg = 0;
120  inQ--; myFD = theFD;
121  wMutex.UnLock();
122  rc = send(myFD, theMsg->mData, theMsg->mLen, 0);
123  free(theMsg);
124  wMutex.Lock();
125  if (rc < 0) {Scuttle(); break;}
126  }
127 
128 // Before we exit check if we should delete any messages
129 //
130  if (delQ) {RelMsgs(delQ); delQ = 0;}
131  if ((theEnd = terminate) && fMsg) RelMsgs(fMsg);
132  active = false;
133  qWmsg = qWarn;
134 
135 // Release any messages that need to be released. Note that we may have been
136 // deleted at this point so we cannot reference anything via "this" once we
137 // unlock the mutex. We may also need to delete ourselves.
138 //
139  wMutex.UnLock();
140  if (theEnd) delete this;
141 }
142 
143 /******************************************************************************/
144 /* Private: Q M s g */
145 /******************************************************************************/
146 
147 bool XrdSendQ::QMsg(XrdSendQ::mBuff *theMsg)
148 {
149 // Check if we reached the max number of messages
150 //
151  if (inQ >= qMax)
152  {discards++;
153  if ((discards & 0xff) == 0x01)
154  {char qBuff[80];
155  snprintf(qBuff, sizeof(qBuff),
156  "%u) reached; %hu message(s) discarded!", qMax, discards);
157  Log.Emsg("SendQ", mLink.Host(),
158  "appears to be slow; queue limit (", qBuff);
159  }
160  return false;
161  }
162 
163 // Add the message at the end of the queue
164 //
165  theMsg->next = 0;
166  if (lMsg) lMsg->next = theMsg;
167  else fMsg = theMsg;
168  lMsg = theMsg;
169  inQ++;
170 
171 // If there is no active thread handling this queue, schedule one
172 //
173  if (!active)
174  {Sched.Schedule((XrdJob *)this);
175  active = true;
176  }
177 
178 // Check if we should issue a warning.
179 //
180  if (inQ >= qWmsg)
181  {char qBuff[32];
182  qWmsg += qWarn;
183  snprintf(qBuff, sizeof(qBuff), "%ud messages queued!", inQ);
184  Log.Emsg("SendQ", mLink.Host(), "appears to be slow;", qBuff);
185  } else {
186  if (inQ < qWarn && qWmsg != qWarn) qWmsg = qWarn;
187  }
188 
189 // All done
190 //
191  return true;
192 }
193 
194 /******************************************************************************/
195 /* Private: R e l M s g s */
196 /******************************************************************************/
197 
198 void XrdSendQ::RelMsgs(XrdSendQ::mBuff *mP)
199 {
200  mBuff *freeMP;
201 
202  while((freeMP = mP))
203  {mP = mP->next;
204  free(freeMP);
205  }
206 }
207 
208 /******************************************************************************/
209 /* Private: S c u t t l e */
210 /******************************************************************************/
211 
212 void XrdSendQ::Scuttle() // qMutex must be locked!
213 {
214 // Simply move any outsanding messages to the deletion queue
215 //
216  if (fMsg)
217  {lMsg->next = delQ;
218  delQ = fMsg;
219  fMsg = lMsg = 0;
220  inQ = 0;
221  }
222 }
223 
224 /******************************************************************************/
225 /* S e n d */
226 /******************************************************************************/
227 
228 // Called with wMutex locked.
229 
230 int XrdSendQ::Send(const char *buff, int blen)
231 {
232  mBuff *theMsg;
233  int bleft, bsent;
234 
235 // If there is an active thread handling messages then we have to queue it.
236 // Otherwise try to send it. We need to hold the lock here to prevent messing
237 // up the message is only part of it could be sent. This is a non-blocking call.
238 //
239  if (active) bleft = blen;
240  else if ((bleft = SendNB(buff, blen)) <= 0) return (bleft ? -1 : blen);
241 
242 // Allocate buffer for the message
243 //
244  if (!(theMsg = (mBuff *)malloc(sizeof(mBuff) + bleft)))
245  {errno = ENOMEM; return -1;}
246 
247 // Copy the unsent message fragment
248 //
249  bsent = blen - bleft;
250  memcpy(theMsg->mData, buff+bsent, bleft);
251  theMsg->mLen = bleft;
252 
253 // Queue the message.
254 //
255  return (QMsg(theMsg) ? blen : -1);
256 }
257 
258 /******************************************************************************/
259 
260 // Called with wMutex locked.
261 
262 int XrdSendQ::Send(const struct iovec *iov, int iovcnt, int iotot)
263 {
264  mBuff *theMsg;
265  char *body;
266  int bleft, bmore, iovX;
267 
268 // If there is an active thread handling messages then we have to queue it.
269 // Otherwise try to send it. We need to hold the lock here to prevent messing
270 // up the message is only part of it could be sent. This is a non-blocking call.
271 //
272  if (active)
273  {bleft = 0;
274  for (iovX = 0; iovX < iovcnt; iovX++)
275  if ((bleft = iov[iovX].iov_len)) break;
276  if (!bleft) return iotot;
277  } else {
278  if ((bleft = SendNB(iov, iovcnt, iotot, iovX)) <= 0)
279  return (bleft ? -1 : 0);
280  }
281 
282 // Readjust the total amount not sent based on where we stopped in the iovec.
283 //
284  bmore = bleft;
285  for (int i = iovX+1; i < iovcnt; i++) bmore += iov[i].iov_len;
286 
287 // Copy the unsent message (for simplicity we will copy the whole iovec stop).
288 //
289  if (!(theMsg = (mBuff *)malloc(bmore+sizeof(mBuff))))
290  {errno = ENOMEM; return -1;}
291 
292 // Setup the message length
293 //
294  theMsg->mLen = bmore;
295 
296 // Copy the first fragment (it cannot be zero length)
297 //
298  body = theMsg->mData;
299  memcpy(body, ((char *)iov[iovX].iov_base)+(iov[iovX].iov_len-bleft), bleft);
300  body += bleft;
301 
302 // All remaining items
303 //
304  for (int i = iovX+1; i < iovcnt; i++)
305  {if (iov[i].iov_len)
306  {memcpy(body, iov[i].iov_base, iov[i].iov_len);
307  body += iov[i].iov_len;
308  }
309  }
310 
311 // Queue the message.
312 //
313  return (QMsg(theMsg) ? iotot : 0);
314 }
315 
316 /******************************************************************************/
317 /* S e n d N B */
318 /******************************************************************************/
319 
320 // Called with wMutex locked.
321 
322 int XrdSendQ::SendNB(const char *Buff, int Blen)
323 {
324 #if !defined(__linux__)
325  return -1;
326 #else
327  ssize_t retc = 0, bytesleft = Blen;
328 
329 // Write the data out
330 //
331  while(bytesleft)
332  {do {retc = send(theFD, Buff, bytesleft, MSG_DONTWAIT);}
333  while(retc < 0 && errno == EINTR);
334  if (retc <= 0) break;
335  bytesleft -= retc; Buff += retc;
336  }
337 
338 // All done
339 //
340  if (retc <= 0)
341  {if (!retc || errno == EAGAIN || retc == EWOULDBLOCK) return bytesleft;
342  Log.Emsg("SendQ", errno, "send to", mLink.ID);
343  return -1;
344  }
345  return bytesleft;
346 #endif
347 }
348 
349 /******************************************************************************/
350 
351 // Called with wMutex locked.
352 
353 int XrdSendQ::SendNB(const struct iovec *iov, int iocnt, int bytes, int &iovX)
354 {
355 
356 #if !defined(__linux__)
357  return -1;
358 #else
359  char *msgP;
360  ssize_t retc;
361  int msgL, msgF = MSG_DONTWAIT|MSG_MORE, ioLast = iocnt-1;
362 
363 // Write the data out. The following code only works in Linux as we use the
364 // new POSIX flags deined for send() which currently is only implemented in
365 // Linux. This allows us to selectively use non-blocking I/O.
366 //
367  for (iovX = 0; iovX < iocnt; iovX++)
368  {msgP = (char *)iov[iovX].iov_base;
369  msgL = iov[iovX].iov_len;
370  if (iovX == ioLast) msgF &= ~MSG_MORE;
371  while(msgL)
372  {do {retc = send(theFD, msgP, msgL, msgF);}
373  while(retc < 0 && errno == EINTR);
374  if (retc <= 0)
375  {if (!retc || errno == EAGAIN || retc == EWOULDBLOCK)
376  return msgL;
377  Log.Emsg("SendQ", errno, "send to", mLink.ID);
378  return -1;
379  }
380  msgL -= retc;
381  }
382  }
383 
384 // All done
385 //
386  return 0;
387 #endif
388 }
389 
390 /******************************************************************************/
391 /* T e r m i n a t e */
392 /******************************************************************************/
393 
394 // This must be called with wMutex locked!
395 
397 {
398 // First step is to see if we need to schedule a shutdown prior to quiting
399 //
400  if (lP) Sched.Schedule((XrdJob *)new LinkShutdown(lP));
401 
402 // If there is an active thread then we need to let the thread handle the
403 // termination of this object. Otherwise, we can do it now.
404 //
405  if (active)
406  {Scuttle();
407  terminate = true;
408  theFD =-1;
409  } else {
410  if (fMsg) {RelMsgs(fMsg); fMsg = lMsg = 0;}
411  if (delQ) {RelMsgs(delQ); delQ = 0;}
412  delete this;
413  }
414 }
virtual ~LinkShutdown()
Definition: XrdSendQ.cc:59
virtual void DoIt()
Definition: XrdSendQ.cc:51
LinkShutdown(XrdLink *link)
Definition: XrdSendQ.cc:56
Definition: XrdJob.hh:43
void Schedule(XrdJob *jp)
XrdSendQ(XrdLink &lP, XrdSysMutex &mP)
Definition: XrdSendQ.cc:91
void Terminate(XrdLink *lP=0)
Definition: XrdSendQ.cc:396
int Send(const char *buff, int blen)
Definition: XrdSendQ.cc:230
virtual void DoIt()
Definition: XrdSendQ.cc:102
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysError Log
Definition: XrdConfig.cc:112
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54