XRootD
XrdXrootdXeqChkPnt.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d X r o o t d X e q C h k P n t . c c */
4 /* */
5 /* */
6 /* (c) 2020 by the Board of Trustees of the Leland Stanford, Jr., University */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include "XProtocol/XProtocol.hh"
32 #include "Xrd/XrdBuffer.hh"
33 #include "Xrd/XrdLink.hh"
34 #include "XrdOuc/XrdOucErrInfo.hh"
43 
44 /******************************************************************************/
45 /* G l o b a l s */
46 /******************************************************************************/
47 
49 
50 /******************************************************************************/
51 /* d o _ C h k P n t */
52 /******************************************************************************/
53 
54 int XrdXrootdProtocol::do_ChkPnt()
55 {
56  static const char *ckpName[] = {"begin","commit","query","rollback","xeq"};
57 
58 // Keep statistics
59 //
60  SI->Bump(SI->miscCnt);
61 
62 // The kXR_ckpXeq is far to complicated to process here so we do it elsewhere.
63 //
64  if (Request.chkpoint.opcode == kXR_ckpXeq) return do_ChkPntXeq();
65 
66 // Validate the filehandle
67 //
69  struct iov ckpVec;
70  int rc;
71 
72  if (!FTab || !(IO.File = FTab->Get(fh.handle)))
74  "chkpoint does not refer to an open file");
75 
76 // Handle each subcode
77 //
78  switch(Request.chkpoint.opcode)
79  {case kXR_ckpBegin:
81  break;
82  case kXR_ckpCommit:
84  break;
85  case kXR_ckpQuery:
86  rc = IO.File->XrdSfsp->checkpoint(XrdSfsFile::cpQuery,&ckpVec,1);
87  if (!rc)
89  ckpQResp.maxCkpSize = htonl(ckpVec.size);
90  ckpQResp.useCkpSize =
91  htonl(static_cast<uint32_t>(ckpVec.offset));
92  return Response.Send(&ckpQResp, sizeof(ckpQResp));
93  }
94  break;
95  case kXR_ckpRollback:
97  break;
98  default: return Response.Send(kXR_ArgInvalid,
99  "chkpoint subcode is invalid");
100  };
101 
102 // Do some tracing
103 //
104  TRACEP(FS, "fh=" <<fh.handle <<" chkpnt " <<ckpName[Request.chkpoint.opcode]
105  <<" rc=" <<rc);
106 
107 // Check for error and invalid return codes from checkpoint note that writev's
108 // aren't flushed, we simply close the connection to get rid of pending data.
109 //
110  if (SFS_OK != rc)
111  {if (rc != SFS_ERROR)
112  {char eBuff[128];
113  snprintf(eBuff, sizeof(eBuff), "chkpoint %s returned invalid rc=%d!",
114  ckpName[Request.chkpoint.opcode], rc);
115  eDest.Emsg("Xeq", eBuff);
116  IO.File->XrdSfsp->error.setErrInfo(ENODEV, "logic error");
117  }
118  return fsError(SFS_ERROR, 0, IO.File->XrdSfsp->error, 0, 0);
119  }
120 
121 // Respond that all went well
122 //
123  return Response.Send();
124 }
125 
126 /******************************************************************************/
127 /* d o _ C h k P n t X e q */
128 /******************************************************************************/
129 
130 int XrdXrootdProtocol::do_ChkPntXeq()
131 {
132  static const int sidSZ = sizeof(Request.header.streamid);
133  int rc;
134 
135 // If this is the first pass, check that streamid's match and setup the
136 // request to be that of the chkpnt request. Note that kXR_writev requires
137 // an additional fetch of data which may cause re-entry as pass2.
138 //
140  {ClientRequestHdr *Subject = (ClientRequestHdr *)(argp->buff);
141  if (memcmp(Request.header.streamid, Subject->streamid, sidSZ))
142  {Response.Send(kXR_ArgInvalid, "Request streamid mismatch");
143  return -1;
144  }
145  if (Request.header.dlen != sizeof(Request.header))
146  {Response.Send(kXR_ArgInvalid, "Request length invalid");
147  return -1;
148  }
149 
150  memcpy(Request.header.body, Subject->body, sizeof(Request.header.body));
151  Request.header.requestid = ntohs(Subject->requestid);
152  Request.header.dlen = ntohl(Subject->dlen);
153 
156  {Response.Send(kXR_ArgInvalid,"chkpoint request is invalid");
157  return -1;
158  }
159 
161  {if (!Request.header.dlen) return Response.Send();
163  {Response.Send(kXR_ArgTooLong,"chkpoint write vector is too long");
164  return -1;
165  }
166  if ( Request.header.dlen > argp->bsize)
167  {BPool->Release(argp);
168  if (!(argp = BPool->Obtain(Request.header.dlen)))
170  "Insufficient memory for chkpoint request");
171  return -1;
172  }
173  hcNow = hcPrev; halfBSize = argp->bsize >> 1;
174  }
175  if ((rc = getData("arg", argp->buff, Request.header.dlen)))
176  {Resume = &XrdXrootdProtocol::do_ChkPntXeq; return rc;}
177  }
178  }
179 
180 // Prepare to process the actual request
181 //
182  const char *xeqOp;
183  struct iov ckpVec;
184  XrdXrootdFHandle fh;
185  kXR_unt16 reqID;
186 
187  reqID = Request.header.requestid;
189 
190 // Obtain the filehandle that we should check
191 //
192  switch(reqID)
193  {case kXR_pgwrite:
194  xeqOp = "pgwrite";
195  fh.Set(Request.pgwrite.fhandle);
196  break;
197  case kXR_truncate:
198  xeqOp = "trunc";
199  fh.Set(Request.truncate.fhandle);
200  break;
201  case kXR_write:
202  xeqOp = "write";
203  fh.Set(Request.write.fhandle);
204  break;
205  case kXR_writev:
206  xeqOp = "writev";
207  if ((rc = do_WriteV())) return rc;
208  if (!wvInfo) return 0;
209  fh.handle = wvInfo->curFH;
210  for (int i = 0; i < wvInfo->vEnd; i++)
211  if (wvInfo->ioVec[i].info != fh.handle)
212  {free(wvInfo); wvInfo = 0;
214  "multi-file chkpoint writev not supported");
215  return -1;
216  }
217  break;
218  default: return Response.Send(kXR_ArgInvalid,
219  "chkpoint request is invalid");
220  }
221 
222 // Make sure we have the target file
223 //
224  if (!FTab || !(IO.File = FTab->Get(fh.handle)))
226  "chkpoint does not refer to an open file");
227  if (reqID != kXR_truncate)
228  return Link->setEtext("chkpnt xeq write protocol violation");
229  return rc;
230  }
231 
232 // If this is a packaged request, create a checkpoint
233 //
234 
235 // Now perform the action
236 //
237  switch(reqID)
238  {case kXR_pgwrite:
239  ckpVec.size = Request.header.dlen;
240  n2hll(Request.pgwrite.offset, ckpVec.offset);
241  ckpVec.info = 0;
242  ckpVec.data = 0;
243  rc = IO.File->XrdSfsp->checkpoint(XrdSfsFile::cpWrite,&ckpVec,1);
244  if (!rc) return do_PgWrite();
245  break;
246  case kXR_truncate:
247  n2hll(Request.write.offset, ckpVec.offset);
248  ckpVec.info = 0;
249  ckpVec.data = 0;
250  rc = IO.File->XrdSfsp->checkpoint(XrdSfsFile::cpTrunc,&ckpVec,1);
251  if (!rc) return do_Truncate();
252  break;
253  case kXR_write:
254  ckpVec.size = Request.header.dlen;
255  n2hll(Request.write.offset, ckpVec.offset);
256  ckpVec.info = 0;
257  ckpVec.data = 0;
258  rc = IO.File->XrdSfsp->checkpoint(XrdSfsFile::cpWrite,&ckpVec,1);
259  if (!rc) return do_Write();
260  break;
261  default: // kXR_writev
263  (iov *)wvInfo->ioVec, wvInfo->vEnd);
264  if (!rc)
265  {for (int i = 0; i < wvInfo->vEnd; i++)
266  wvInfo->ioVec[i].info = fh.handle;
267  return do_WriteVec();
268  }
269  break;
270  }
271 
272 // Do some tracing
273 //
274  TRACEP(FS, "fh=" <<fh.handle <<" chkpnt " <<xeqOp <<" rc=" <<rc);
275 
276 // Check for error and invalid return codes from checkpoint note that writev's
277 // aren't flushed, we simply close the connection to get rid of pending data.
278 //
279  if (SFS_OK != rc)
280  {if (rc != SFS_ERROR)
281  {char eBuff[128];
282  snprintf(eBuff, sizeof(eBuff),
283  "chkpoint xeq %s returned invalid rc=%d!", xeqOp, rc);
284  eDest.Emsg("Xeq", eBuff);
285  IO.File->XrdSfsp->error.setErrInfo(ENODEV, "logic error");
286  }
287  if (reqID == kXR_pgwrite)
288  {IO.EInfo[0] = SFS_ERROR; IO.EInfo[0] = 0;
289  return do_WriteNone(static_cast<int>(Request.pgwrite.pathid));
290  }
291  if (reqID == kXR_write)
292  {IO.EInfo[0] = SFS_ERROR; IO.EInfo[0] = 0;
293  return do_WriteNone(static_cast<int>(Request.write.pathid));
294  }
295  rc = fsError(SFS_ERROR, 0, IO.File->XrdSfsp->error, 0, 0);
296  return (reqID != kXR_truncate ? -1 : rc);
297  }
298 
299 // Respond that all went well
300 //
301  return Response.Send();
302 }
static const int kXR_ckpRollback
Definition: XProtocol.hh:215
@ kXR_ArgInvalid
Definition: XProtocol.hh:990
@ kXR_FileNotOpen
Definition: XProtocol.hh:994
@ kXR_Unsupported
Definition: XProtocol.hh:1003
@ kXR_ArgTooLong
Definition: XProtocol.hh:992
@ kXR_NoMemory
Definition: XProtocol.hh:998
kXR_char body[16]
Definition: XProtocol.hh:158
struct ClientTruncateRequest truncate
Definition: XProtocol.hh:875
kXR_char fhandle[4]
Definition: XProtocol.hh:531
kXR_char fhandle[4]
Definition: XProtocol.hh:807
kXR_char streamid[2]
Definition: XProtocol.hh:156
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:862
struct ClientRequestHdr header
Definition: XProtocol.hh:846
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_pgwrite
Definition: XProtocol.hh:138
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:849
kXR_char fhandle[4]
Definition: XProtocol.hh:794
static const int kXR_ckpCommit
Definition: XProtocol.hh:213
kXR_char fhandle[4]
Definition: XProtocol.hh:204
static const int kXR_ckpQuery
Definition: XProtocol.hh:214
kXR_int64 offset
Definition: XProtocol.hh:808
struct ClientWriteRequest write
Definition: XProtocol.hh:876
kXR_int32 dlen
Definition: XProtocol.hh:159
static const int kXR_ckpBegin
Definition: XProtocol.hh:212
unsigned short kXR_unt16
Definition: XPtypes.hh:67
#define SFS_ERROR
#define SFS_OK
#define TRACEP(act, x)
XrdOucIOVec ioVec[1]
XrdSysTrace XrdXrootdTrace
void Release(XrdBuffer *bp)
Definition: XrdBuffer.cc:221
XrdBuffer * Obtain(int bsz)
Definition: XrdBuffer.cc:140
int bsize
Definition: XrdBuffer.hh:46
char * buff
Definition: XrdBuffer.hh:45
int setErrInfo(int code, const char *emsg)
void Bump(int &val)
Definition: XrdOucStats.hh:47
XrdOucErrInfo & error
virtual int checkpoint(cpAct act, struct iov *range=0, int n=0)
@ cpTrunc
Truncate a file within checkpoint.
@ cpDelete
Delete an existing checkpoint.
@ cpRestore
Restore an active checkpoint and delete it.
@ cpWrite
Add data to an existing checkpoint.
@ cpQuery
Return checkpoint limits.
@ cpCreate
Create a checkpoint, one must not be active.
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdXrootdFile * Get(int fnum)
XrdSfsFile * XrdSfsp
static XrdXrootdStats * SI
XrdXrootd::IOParms IO
XrdXrootdWVInfo * wvInfo
XrdXrootdFileTable * FTab
static XrdSysError & eDest
int getData(gdCallBack *gdcbP, const char *dtype, char *buff, int blen)
int(XrdXrootdProtocol::* Resume)()
XrdXrootdResponse Response
static XrdBuffManager * BPool
static const int maxWvecln
Definition: XProtocol.hh:837
XrdXrootdFile * File