XRootD
XrdSendQ Class Reference

#include <XrdSendQ.hh>

+ Inheritance diagram for XrdSendQ:
+ Collaboration diagram for XrdSendQ:

Public Member Functions

 XrdSendQ (XrdLink &lP, XrdSysMutex &mP)
 
unsigned int Backlog ()
 
virtual void DoIt ()
 
int Send (const char *buff, int blen)
 
int Send (const struct iovec *iov, int iovcnt, int iotot)
 
void Terminate (XrdLink *lP=0)
 
- Public Member Functions inherited from XrdJob
 XrdJob (const char *desc="")
 
virtual ~XrdJob ()
 

Static Public Member Functions

static void SetAQ (bool onoff)
 
static void SetQM (unsigned int qmVal)
 
static void SetQW (unsigned int qwVal)
 

Additional Inherited Members

- Public Attributes inherited from XrdJob
const char * Comment
 
XrdJobNextJob
 

Detailed Description

Definition at line 42 of file XrdSendQ.hh.

Constructor & Destructor Documentation

◆ XrdSendQ()

XrdSendQ::XrdSendQ ( XrdLink lP,
XrdSysMutex mP 
)

Definition at line 91 of file XrdSendQ.cc.

92  : XrdJob("sendQ runner"),
93  mLink(lP), wMutex(mP),
94  fMsg(0), lMsg(0), delQ(0), theFD(lP.FDnum()),
95  inQ(0), qWmsg(qWarn), discards(0),
96  active(false), terminate(false) {}
XrdJob(const char *desc="")
Definition: XrdJob.hh:51

Member Function Documentation

◆ Backlog()

unsigned int XrdSendQ::Backlog ( )
inline

Definition at line 46 of file XrdSendQ.hh.

46 {return inQ;}

Referenced by XrdLinkXeq::Backlog().

+ Here is the caller graph for this function:

◆ DoIt()

void XrdSendQ::DoIt ( )
virtual

Implements XrdJob.

Definition at line 102 of file XrdSendQ.cc.

103 {
104  mBuff *theMsg;
105  int myFD, rc;
106  bool theEnd;
107 
108 // Obtain the lock
109 //
110  wMutex.Lock();
111 
112 // Before we start check if we should delete any messages
113 //
114  if (delQ) {RelMsgs(delQ); delQ = 0;}
115 
116 // Send all queued messages (we can use a blocking send here)
117 //
118  while(!terminate && (theMsg = fMsg))
119  {if (!(fMsg = fMsg->next)) lMsg = 0;
120  inQ--; myFD = theFD;
121  wMutex.UnLock();
122  rc = send(myFD, theMsg->mData, theMsg->mLen, 0);
123  free(theMsg);
124  wMutex.Lock();
125  if (rc < 0) {Scuttle(); break;}
126  }
127 
128 // Before we exit check if we should delete any messages
129 //
130  if (delQ) {RelMsgs(delQ); delQ = 0;}
131  if ((theEnd = terminate) && fMsg) RelMsgs(fMsg);
132  active = false;
133  qWmsg = qWarn;
134 
135 // Release any messages that need to be released. Note that we may have been
136 // deleted at this point so we cannot reference anything via "this" once we
137 // unlock the mutex. We may also need to delete ourselves.
138 //
139  wMutex.UnLock();
140  if (theEnd) delete this;
141 }

References XrdSysMutex::Lock(), and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

◆ Send() [1/2]

int XrdSendQ::Send ( const char *  buff,
int  blen 
)

Definition at line 230 of file XrdSendQ.cc.

231 {
232  mBuff *theMsg;
233  int bleft, bsent;
234 
235 // If there is an active thread handling messages then we have to queue it.
236 // Otherwise try to send it. We need to hold the lock here to prevent messing
237 // up the message is only part of it could be sent. This is a non-blocking call.
238 //
239  if (active) bleft = blen;
240  else if ((bleft = SendNB(buff, blen)) <= 0) return (bleft ? -1 : blen);
241 
242 // Allocate buffer for the message
243 //
244  if (!(theMsg = (mBuff *)malloc(sizeof(mBuff) + bleft)))
245  {errno = ENOMEM; return -1;}
246 
247 // Copy the unsent message fragment
248 //
249  bsent = blen - bleft;
250  memcpy(theMsg->mData, buff+bsent, bleft);
251  theMsg->mLen = bleft;
252 
253 // Queue the message.
254 //
255  return (QMsg(theMsg) ? blen : -1);
256 }

Referenced by XrdLinkXeq::Send(), and XrdLinkXeq::TLS_Send().

+ Here is the caller graph for this function:

◆ Send() [2/2]

int XrdSendQ::Send ( const struct iovec *  iov,
int  iovcnt,
int  iotot 
)

Definition at line 262 of file XrdSendQ.cc.

263 {
264  mBuff *theMsg;
265  char *body;
266  int bleft, bmore, iovX;
267 
268 // If there is an active thread handling messages then we have to queue it.
269 // Otherwise try to send it. We need to hold the lock here to prevent messing
270 // up the message is only part of it could be sent. This is a non-blocking call.
271 //
272  if (active)
273  {bleft = 0;
274  for (iovX = 0; iovX < iovcnt; iovX++)
275  if ((bleft = iov[iovX].iov_len)) break;
276  if (!bleft) return iotot;
277  } else {
278  if ((bleft = SendNB(iov, iovcnt, iotot, iovX)) <= 0)
279  return (bleft ? -1 : 0);
280  }
281 
282 // Readjust the total amount not sent based on where we stopped in the iovec.
283 //
284  bmore = bleft;
285  for (int i = iovX+1; i < iovcnt; i++) bmore += iov[i].iov_len;
286 
287 // Copy the unsent message (for simplicity we will copy the whole iovec stop).
288 //
289  if (!(theMsg = (mBuff *)malloc(bmore+sizeof(mBuff))))
290  {errno = ENOMEM; return -1;}
291 
292 // Setup the message length
293 //
294  theMsg->mLen = bmore;
295 
296 // Copy the first fragment (it cannot be zero length)
297 //
298  body = theMsg->mData;
299  memcpy(body, ((char *)iov[iovX].iov_base)+(iov[iovX].iov_len-bleft), bleft);
300  body += bleft;
301 
302 // All remaining items
303 //
304  for (int i = iovX+1; i < iovcnt; i++)
305  {if (iov[i].iov_len)
306  {memcpy(body, iov[i].iov_base, iov[i].iov_len);
307  body += iov[i].iov_len;
308  }
309  }
310 
311 // Queue the message.
312 //
313  return (QMsg(theMsg) ? iotot : 0);
314 }

◆ SetAQ()

static void XrdSendQ::SetAQ ( bool  onoff)
inlinestatic

Definition at line 54 of file XrdSendQ.hh.

54 {qPerm = onoff;}

◆ SetQM()

static void XrdSendQ::SetQM ( unsigned int  qmVal)
inlinestatic

Definition at line 56 of file XrdSendQ.hh.

56 {qMax = qmVal;}

Referenced by XrdCmsConfig::Configure1().

+ Here is the caller graph for this function:

◆ SetQW()

static void XrdSendQ::SetQW ( unsigned int  qwVal)
inlinestatic

Definition at line 58 of file XrdSendQ.hh.

58 {qWarn = qwVal;}

◆ Terminate()

void XrdSendQ::Terminate ( XrdLink lP = 0)

Definition at line 396 of file XrdSendQ.cc.

397 {
398 // First step is to see if we need to schedule a shutdown prior to quiting
399 //
400  if (lP) Sched.Schedule((XrdJob *)new LinkShutdown(lP));
401 
402 // If there is an active thread then we need to let the thread handle the
403 // termination of this object. Otherwise, we can do it now.
404 //
405  if (active)
406  {Scuttle();
407  terminate = true;
408  theFD =-1;
409  } else {
410  if (fMsg) {RelMsgs(fMsg); fMsg = lMsg = 0;}
411  if (delQ) {RelMsgs(delQ); delQ = 0;}
412  delete this;
413  }
414 }
Definition: XrdJob.hh:43
void Schedule(XrdJob *jp)
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54

References XrdGlobal::Sched, and XrdScheduler::Schedule().

Referenced by XrdLinkXeq::Close().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: