53 #define DUMPIT(x,y) XrdSsiUtils::b2x(x,y,hexBuff,sizeof(hexBuff),dotBuff)<<dotBuff
61 const char *statName[] = {
"isPend",
"isWrite",
"isSync",
62 "isReady",
"isDone",
"isDead"};
66 std::string pName(
"ReadRecovery");
67 std::string pValue(
"false");
92 void RecycleMsg(
bool sent=
true) {
delete respObj;
delete this;}
105 class SchedEmsg :
public XrdJob
109 void DoIt() {taskP->SendError();
127 bool XrdSsiTaskReal::Ask4Resp()
137 sessP->epFile.SetProperty(pName, pValue);
142 memcpy(qBuff.GetBuffer(), rInfo.
Data(),
sizeof(
long long));
146 DEBUG(
"Calling fcntl for response.");
150 epStatus = sessP->epFile.Fcntl(qBuff, (ResponseHandler *)
this, tmOut);
155 if (!epStatus.
IsOK())
return RespErr(&epStatus);
168 if (force) sessP = &voidSession;
186 DEBUG(
"Request="<<&rqstR<<
" cancel="<<cancel);
196 if (Kill()) sessP->TaskFinished(
this);
197 else {
DEBUG(
"Task removal deferred.");}
205 char *&dbuff,
int &dbL)
212 unsigned int mdL, pxL, n;
217 response->
Get(buffP);
218 if (!buffP || !(cdP = buffP->
GetBuffer()))
219 {
DEBUG(
"Responding with stream.");
227 mdL = ntohl(mdP->
mdLen);
235 {
char hexBuff[16],dotBuff[4];
236 dbuff = cdP+pxL; dbL = mdL;
237 DEBUG(
"Posting "<<dbL<<
" byte alert (0x"<<
DUMPIT(dbuff,dbL)<<
")");
244 {
char hexBuff[16],dotBuff[4];
245 DEBUG(mdL <<
" byte metadata set (0x" <<
DUMPIT(cdP+pxL,mdL)<<
")");
246 SetMetadata(cdP+pxL, mdL);
252 {dbuff = (dbL ? cdP+mdL+pxL : &zedData);
254 DEBUG(
"Responding with " <<dbL <<
" data bytes.");
256 else {xResp = isStream;
257 DEBUG(
"Responding with stream.");
283 DEBUG(
"Status="<<statName[tStat]<<
" defer=" <<defer<<
" mhPend="<<mhPend);
288 {
case isWrite:
break;
291 case isDone: tStat = isDead;
292 return !(mhPend || defer);
294 case isDead:
return !(mhPend || defer);
296 case isPend: tStat = isDead;
297 return !(mhPend || defer);
299 default:
char mBuff[32];
300 snprintf(mBuff,
sizeof(mBuff),
"%d", tStat);
301 Log.
Emsg(
"TaskKill",
"Invalid state", mBuff);
312 if (tStat == isWrite && mhPend)
315 DEBUG(
"Waiting for write event.");
326 DEBUG(
"Sending cancel request.");
334 DEBUG(
"Returning " <<!(mhPend || defer));
335 return !(mhPend || defer);
355 {sessP->UnHold(
false);
361 DEBUG(
"Posting error " <<eNum <<
": " <<eTxt.c_str());
362 SetErrResponse(eTxt.c_str(), eNum);
376 if (eInfo) errInfo = *eInfo;
396 DEBUG(
"Status="<<statName[tStat]<<
" defer=" <<defer<<
" mhPend="<<mhPend);
404 const char *eTxt = errInfo.Get(eNum).c_str();
406 SetErrResponse(eTxt, eNum);
419 {
DEBUG(
"Defering TaskFinished."<<
" defer=" <<defer<<
" mhPend="<<mhPend);
422 DEBUG(
"Calling TaskFinished");
424 sessP->TaskFinished(
this);
446 {
if (tStat == isDead) sessP->TaskFinished(
this);
447 else Log.
Emsg(
"SendRequest",
"Invalid state", statName[tStat],
448 "; should be isPend!");
465 {sessP->TaskFinished(
this);
473 rrInfo.
Size(reqBlen);
486 Status = sessP->epFile.Write(rrInfo.
Info(), (uint32_t)reqBlen, reqBuff,
509 char *buff,
int blen,
bool &last)
515 union {uint32_t ubRead;
int ibRead;};
519 DEBUG(
"ReadSync status=" <<statName[tStat]);
520 if (tStat != isReady)
521 {
if (tStat == isDone)
return 0;
522 eRef.
Set(
"Stream is not active", ENODEV);
532 epStatus = sessP->epFile.Read(rrInfo.
Info(),(uint32_t)blen,buff,ubRead,tmOut);
534 {
if (ibRead < blen) {tStat = isDone; last =
true;}
535 DEBUG(
"ReadSync returning " <<ibRead <<
" bytes.");
558 DEBUG(
"ReadAsync Status=" <<statName[tStat]);
559 if (tStat != isReady)
560 {eRef.
Set(
"Stream is not active", ENODEV);
567 {eRef.
Set(
"Stream is already active", EINPROGRESS);
574 {eRef.
Set(
"Buffer length invalid", EINVAL);
584 dataBuff = buff; dataRlen = blen;
585 epStatus = sessP->epFile.Read(rrInfo.
Info(), (uint32_t)blen, buff,
590 if (epStatus.
IsOK()) {mhPend =
true;
return true;}
596 DEBUG(
"ReadAsync error; " <<epStatus.
ToStr());
612 union {uint32_t ubRead;
int ibRead;};
615 bool last, aOK = status->
IsOK();
627 DEBUG(
"aOK="<<aOK<<
" status="<<statName[tStat]<<
" defer=" <<defer);
638 DEBUG(
"Write completed.");
640 {
DEBUG(
"Posting killer.");
641 wPost->Post(); wPost = 0;
644 DEBUG(
"Calling RelBuff.");
645 ReleaseRequestBuffer();
646 if (tStat == isWrite)
648 return (Ask4Resp() ? 0 : 1);
654 if (!aOK)
return (RespErr(status) ? 0 : 1);
656 if (response)
switch(GetResp(respP, dBuff, dLen))
657 {
case isAlert: aMsg =
new AlertMsg(*respP, dBuff, dLen);
663 return (Ask4Resp() ? 0 : 1);
666 return (Tstat != isDead ? 0 : 1);
668 case isData: tStat = isDone; sessP->UnLock();
669 SetResponse(dBuff, dLen);
671 case isStream: tStat = isReady; sessP->UnLock();
674 default: tStat = isDone; sessP->UnLock();
675 SetErrResponse(
"Invalid response", EFAULT);
678 tStat = isDone; sessP->UnLock();
679 SetErrResponse(
"Missing response", EFAULT);
689 default:
char mBuff[32];
690 snprintf(mBuff,
sizeof(mBuff),
"%d", tStat);
691 Log.
Emsg(
"TaskXeqEvent",
"Invalid state", mBuff);
697 if (!aOK || !response)
703 response->
Get(cInfo);
704 ubRead = (cInfo ? cInfo->
length : 0);
710 if (ibRead < dataRlen) {tStat = isDone; dataRlen = ibRead;}
712 last = tStat == isDone;
714 DEBUG(
"Calling ProcessResponseData; len="<<ibRead<<
" last="<<last);
716 dBuff, ibRead, last);
735 DEBUG(
"Status="<<statName[tStat]<<
" defer=" <<defer<<
" mhPend="<<mhPend);
742 {
if (sessP != &voidSession)
743 {
if (mhPend || defer) {
DEBUG(
"Defering TaskFinished.");}
744 else {
DEBUG(
"Calling TaskFinished");
746 sessP->TaskFinished(
this);
749 DEBUG(
"Deleting orphaned task.");
753 }
else sessP->UnLock();
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetSize() const
Get the size of the message.
Handle an async response.
std::string ToStr() const
Convert to string.
void Schedule(XrdJob *jp)
void Set(const char *eMsg=0, int eNum=0, int eArg=0)
static XrdSsiRequest * Request(XrdSsiResponder *rP)
static XrdSsiErrInfo & ErrInfoRef(XrdSsiRequest *rP)
static void Alert(XrdSsiRequest &reqR, XrdSsiRespInfoMsg &aMsg)
static void ResetResponder(XrdSsiResponder *rP)
static void SetNode(XrdSsiRequest *rP, const char *name)
void Size(unsigned int sz)
const unsigned char * Data()
unsigned long long Info()
virtual char * GetRequest(int &dlen)=0
void Finished(XrdSsiRequest &rqstR, const XrdSsiRespInfo &rInfo, bool cancel=false)
int SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen, bool &last)
int XeqEvent(XrdCl::XRootDStatus *status, XrdCl::AnyObject **respP)
void SchedError(XrdSsiErrInfo *eInfo=0)
bool SendRequest(const char *node)
void Detach(bool force=false)
static int GetErr(XrdCl::XRootDStatus &Status, std::string &eText)
static void SetErr(XrdCl::XRootDStatus &Status, XrdSsiErrInfo &eInfo)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Describe a data chunk for vector read.
uint32_t length
offset in the file
bool IsOK() const
We're fine.
static const int fullResp
static const int alrtResp