57 :
XrdJob(
"SendQ Shutdown"), myLink(link) {}
83 unsigned int XrdSendQ::qWarn = 3;
84 unsigned int XrdSendQ::qMax = 0xffffffff;
85 bool XrdSendQ::qPerm =
false;
93 mLink(lP), wMutex(mP),
94 fMsg(0), lMsg(0), delQ(0), theFD(lP.FDnum()),
95 inQ(0), qWmsg(qWarn), discards(0),
96 active(false), terminate(false) {}
114 if (delQ) {RelMsgs(delQ); delQ = 0;}
118 while(!terminate && (theMsg = fMsg))
119 {
if (!(fMsg = fMsg->next)) lMsg = 0;
122 rc = send(myFD, theMsg->mData, theMsg->mLen, 0);
125 if (rc < 0) {Scuttle();
break;}
130 if (delQ) {RelMsgs(delQ); delQ = 0;}
131 if ((theEnd = terminate) && fMsg) RelMsgs(fMsg);
140 if (theEnd)
delete this;
147 bool XrdSendQ::QMsg(XrdSendQ::mBuff *theMsg)
153 if ((discards & 0xff) == 0x01)
155 snprintf(qBuff,
sizeof(qBuff),
156 "%u) reached; %hu message(s) discarded!", qMax, discards);
158 "appears to be slow; queue limit (", qBuff);
166 if (lMsg) lMsg->next = theMsg;
183 snprintf(qBuff,
sizeof(qBuff),
"%ud messages queued!", inQ);
184 Log.
Emsg(
"SendQ", mLink.
Host(),
"appears to be slow;", qBuff);
186 if (inQ < qWarn && qWmsg != qWarn) qWmsg = qWarn;
198 void XrdSendQ::RelMsgs(XrdSendQ::mBuff *mP)
212 void XrdSendQ::Scuttle()
239 if (active) bleft = blen;
240 else if ((bleft = SendNB(buff, blen)) <= 0)
return (bleft ? -1 : blen);
244 if (!(theMsg = (mBuff *)malloc(
sizeof(mBuff) + bleft)))
245 {errno = ENOMEM;
return -1;}
249 bsent = blen - bleft;
250 memcpy(theMsg->mData, buff+bsent, bleft);
251 theMsg->mLen = bleft;
255 return (QMsg(theMsg) ? blen : -1);
266 int bleft, bmore, iovX;
274 for (iovX = 0; iovX < iovcnt; iovX++)
275 if ((bleft =
iov[iovX].iov_len))
break;
276 if (!bleft)
return iotot;
278 if ((bleft = SendNB(
iov, iovcnt, iotot, iovX)) <= 0)
279 return (bleft ? -1 : 0);
285 for (
int i = iovX+1; i < iovcnt; i++) bmore +=
iov[i].iov_len;
289 if (!(theMsg = (mBuff *)malloc(bmore+
sizeof(mBuff))))
290 {errno = ENOMEM;
return -1;}
294 theMsg->mLen = bmore;
298 body = theMsg->mData;
299 memcpy(body, ((
char *)
iov[iovX].iov_base)+(
iov[iovX].iov_len-bleft), bleft);
304 for (
int i = iovX+1; i < iovcnt; i++)
306 {memcpy(body,
iov[i].iov_base,
iov[i].iov_len);
307 body +=
iov[i].iov_len;
313 return (QMsg(theMsg) ? iotot : 0);
322 int XrdSendQ::SendNB(
const char *Buff,
int Blen)
324 #if !defined(__linux__)
327 ssize_t retc = 0, bytesleft = Blen;
332 {
do {retc = send(theFD, Buff, bytesleft, MSG_DONTWAIT);}
333 while(retc < 0 && errno == EINTR);
334 if (retc <= 0)
break;
335 bytesleft -= retc; Buff += retc;
341 {
if (!retc || errno == EAGAIN || retc == EWOULDBLOCK)
return bytesleft;
342 Log.
Emsg(
"SendQ", errno,
"send to", mLink.
ID);
353 int XrdSendQ::SendNB(
const struct iovec *
iov,
int iocnt,
int bytes,
int &iovX)
356 #if !defined(__linux__)
361 int msgL, msgF = MSG_DONTWAIT|MSG_MORE, ioLast = iocnt-1;
367 for (iovX = 0; iovX < iocnt; iovX++)
368 {msgP = (
char *)
iov[iovX].iov_base;
369 msgL =
iov[iovX].iov_len;
370 if (iovX == ioLast) msgF &= ~MSG_MORE;
372 {
do {retc = send(theFD, msgP, msgL, msgF);}
373 while(retc < 0 && errno == EINTR);
375 {
if (!retc || errno == EAGAIN || retc == EWOULDBLOCK)
377 Log.
Emsg(
"SendQ", errno,
"send to", mLink.
ID);
410 if (fMsg) {RelMsgs(fMsg); fMsg = lMsg = 0;}
411 if (delQ) {RelMsgs(delQ); delQ = 0;}
LinkShutdown(XrdLink *link)
const char * Host() const
char * ID
Pointer to the client's link identity.
void Shutdown(bool getLock)
void Schedule(XrdJob *jp)
XrdSendQ(XrdLink &lP, XrdSysMutex &mP)
void Terminate(XrdLink *lP=0)
int Send(const char *buff, int blen)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)