XRootD
XrdXrootdAioPgrw.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d X r o o t d A i o P g r w . c c */
4 /* */
5 /* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <unistd.h>
32 #include <arpa/inet.h>
33 
34 #include "Xrd/XrdBuffer.hh"
37 #include "XrdSys/XrdSysPthread.hh"
42 
43 #define TRACELINK reqP
44 #define ID ID()
45 
46 /******************************************************************************/
47 /* G l o b a l S t a t i c s */
48 /******************************************************************************/
49 
51 
52 namespace XrdXrootd
53 {
55 }
56 
57 using namespace XrdXrootd;
58 
59 /******************************************************************************/
60 /* L o c a l S t a t i c s */
61 /******************************************************************************/
62 
63 const char *XrdXrootdAioPgrw::TraceID = "PgrwBuff";
64 
65 namespace
66 {
67 XrdSysMutex fqMutex;
68 XrdXrootdAioBuff *fqFirst = 0;
69 int numFree = 0;
70 
71 static const int csLen = sizeof(uint32_t);
72 
73 static const int maxKeep = 64; // 4MB in the pocket
74 }
75 
76 /******************************************************************************/
77 /* C o n s t r u c t o r */
78 /******************************************************************************/
79 
81  : XrdXrootdAioBuff(this, tP, bP)
82 {
83  char *buff = bP->buff;
84  uint32_t *csV = csVec;
85 
86 // Fill out the iovec
87 //
88  for (int i = 1; i <= acsSZ<<1; i+= 2)
89  {ioVec[i ].iov_base = csV;
90  ioVec[i ].iov_len = csLen;
91  ioVec[i+1].iov_base = buff;
92  ioVec[i+1].iov_len = XrdProto::kXR_pgPageSZ;
93  csV++;
94  buff += XrdProto::kXR_pgPageSZ;
95  }
96 
97 // Complete initialization
98 //
99  Result = 0;
100  iovReset = 0;
101  cksVec = csVec;
102  TIdent = "AioPgrw";
103 }
104 
105 /******************************************************************************/
106 /* D e s t r u c t o r */
107 /******************************************************************************/
108 
110 {
111 // Recycle the buffer if we have one
112 //
113  if (buffP) BPool->Release(buffP);
114 }
115 
116 /******************************************************************************/
117 /* A l l o c */
118 /******************************************************************************/
119 
121 {
122  XrdXrootdAioBuff *aiobuff;
123 
124 // Obtain a preallocated aio object
125 //
126  fqMutex.Lock();
127  if ((aiobuff = fqFirst))
128  {fqFirst = aiobuff->next;
129  numFree--;
130  }
131  fqMutex.UnLock();
132 
133 // If we have no object, create a new one. Otherwise initialize n old one
134 //
135  if (!aiobuff)
136  {XrdBuffer *bP = BPool->Obtain(aioSZ);
137  if (!bP) return 0;
138  aiobuff = new XrdXrootdAioPgrw(arp, bP);
139  } else {
140  aiobuff->Result = 0;
141  aiobuff->cksVec = aiobuff->pgrwP->csVec;
142  aiobuff->pgrwP->reqP = arp;
143  }
144 
145 // Update aio counters
146 //
147  arp->urProtocol()->aioUpdate(1);
148 
149 // All done
150 //
151  return aiobuff->pgrwP;
152 }
153 
154 /******************************************************************************/
155 /* i o v 4 R e c v */
156 /******************************************************************************/
157 
158 struct iovec *XrdXrootdAioPgrw::iov4Recv(int &iovNum)
159 {
160 // Readjust ioVec as needed
161 //
162  if (aioSZ != (int)sfsAio.aio_nbytes)
163  {int fLen, lLen;
165  fLen, lLen);
166  ioVec[2].iov_len = fLen;
167  if (csNum > 1 && lLen != XrdProto::kXR_pgPageSZ)
168  {iovReset = csNum<<1;
169  ioVec[iovReset].iov_len = lLen;
170  }
171  } else csNum = acsSZ;
172 
173 // Return the iovec reception args
174 //
175  iovNum = (csNum<<1);
176  return &ioVec[1];
177 }
178 
179 /******************************************************************************/
180 /* i o v 4 S e n d */
181 /******************************************************************************/
182 
183 struct iovec *XrdXrootdAioPgrw::iov4Send(int &iovNum, int &iovLen, bool cs2net)
184 {
185  int fLen, lLen;
186 
187 // Recalculate the iovec values for first and last read and summary values
188 //
189  if (Result > 0)
190  {csNum = XrdOucPgrwUtils::csNum(sfsAio.aio_offset, Result, fLen, lLen);
191  iovNum = (csNum<<1) + 1;
192  iovLen = Result + (csNum * sizeof(uint32_t));
193  ioVec[2].iov_len = fLen;
194  if (csNum > 1 && lLen != XrdProto::kXR_pgPageSZ)
195  {iovReset = csNum<<1;
196  ioVec[iovReset].iov_len = lLen;
197  }
198  } else csNum = 0;
199 
200 // Convert checksums to net order if so requested
201 //
202  if (cs2net) for (int i = 0; i < csNum; i++) csVec[i] = htonl(csVec[i]);
203 
204 // Return the iovec
205 //
206  return ioVec;
207 }
208 
209 /******************************************************************************/
210 /* R e c y c l e */
211 /******************************************************************************/
212 
214 {
215 // Do some tracing
216 //
217  TRACE(FSAIO, " Recycle " <<sfsAio.aio_nbytes<<'@'
218  <<sfsAio.aio_offset<<" numF="<<numFree);
219 
220 // Update aio counters
221 //
222  reqP->urProtocol()->aioUpdate(-1);
223 
224 // Place the object on the free queue if possible
225 //
226  fqMutex.Lock();
227  if (numFree >= maxKeep)
228  {fqMutex.UnLock();
229  delete this;
230  } else {
231  next = fqFirst;
232  fqFirst = this;
233  numFree++;
234  fqMutex.UnLock();
235  }
236 }
237 
238 /******************************************************************************/
239 /* S e t u p 2 R e c v */
240 /******************************************************************************/
241 
242 int XrdXrootdAioPgrw::Setup2Recv(off_t offs, int dlen, const char *&eMsg)
243 {
245 
246 // Reset any truncated segement in the iov vector
247 //
248  if (iovReset)
249  {ioVec[iovReset].iov_len = XrdProto::kXR_pgPageSZ;
250  iovReset = 0;
251  }
252 
253 // Get the layout for the iovec
254 //
255  if (!(csNum = XrdOucPgrwUtils::recvLayout(layout, offs, dlen, aioSZ)))
256  {eMsg = layout.eWhy;
257  return 0;
258  }
259  eMsg = 0;
260 
261 // Set the length of the first and last segments. Note that our iovec has
262 // an extra leading element meant for writing to the network.
263 //
264  ioVec[2].iov_len = layout.fLen;
265  if (csNum > 1 && layout.lLen < XrdProto::kXR_pgPageSZ)
266  {iovReset = csNum<<1;
267  ioVec[iovReset].iov_len = layout.lLen;
268  }
269 
270 // Setup for actual writing of the data
271 //
272  sfsAio.aio_buf = ioVec[2].iov_base = buffP->buff + layout.bOffset;
273  sfsAio.aio_nbytes = layout.dataLen;
274  sfsAio.aio_offset = offs;
275 
276 // Return how much we can read from the socket
277 //
278  return layout.sockLen;
279 }
280 
281 /******************************************************************************/
282 /* S e t u p 2 S e n d */
283 /******************************************************************************/
284 
285 int XrdXrootdAioPgrw::Setup2Send(off_t offs, int dlen, const char *&eMsg)
286 {
288 
289 // Reset any truncated segement in the iov vector
290 //
291  if (iovReset)
292  {ioVec[iovReset].iov_len = XrdProto::kXR_pgPageSZ;
293  iovReset = 0;
294  }
295 
296 // Get the layout for the iovec
297 //
298  if (!(csNum = XrdOucPgrwUtils::sendLayout(layout, offs, dlen, aioSZ)))
299  {eMsg = layout.eWhy;
300  return 0;
301  }
302  eMsg = 0;
303 
304 // Set the length of the first and last segments. Note that our iovec has
305 // an extra leading element meant for writing to the network.
306 //
307  ioVec[2].iov_len = layout.fLen;
308  if (csNum > 1 && layout.lLen < XrdProto::kXR_pgPageSZ)
309  {iovReset = csNum<<1;
310  ioVec[iovReset].iov_len = layout.lLen;
311  }
312 
313 // Setup for actual writing of the data
314 //
315  sfsAio.aio_buf = ioVec[2].iov_base = buffP->buff + layout.bOffset;
316  sfsAio.aio_nbytes = layout.dataLen;
317  sfsAio.aio_offset = offs;
318 
319 // Return how much we can write to the socket
320 //
321  return layout.dataLen;
322 }
#define eMsg(x)
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
#define TRACE(act, x)
Definition: XrdTrace.hh:63
XrdSysTrace XrdXrootdTrace
void Release(XrdBuffer *bp)
Definition: XrdBuffer.cc:221
XrdBuffer * Obtain(int bsz)
Definition: XrdBuffer.cc:140
char * buff
Definition: XrdBuffer.hh:45
off_t bOffset
Buffer offset to apply iov[1].iov_base.
int dataLen
Total number of filesys bytes the iovec will handle.
static int sendLayout(Layout &layout, off_t offs, int dlen, int bsz=0)
int fLen
Length to use for iov[1].iov_len.
int sockLen
Total number of network bytes the iovec will handle.
const char * eWhy
Reason for failure when zero is returned.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
int lLen
Length to use for iov[csnum*2-1].iov_len)
static int recvLayout(Layout &layout, off_t offs, int dlen, int bsz=0)
Compute the layout for an iovec that receives network bytes applying.
uint32_t * cksVec
Definition: XrdSfsAio.hh:63
ssize_t Result
Definition: XrdSfsAio.hh:65
const char * TIdent
Definition: XrdSfsAio.hh:67
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
XrdXrootdAioBuff * next
XrdXrootdAioPgrw *const pgrwP
XrdXrootdAioTask * reqP
int Setup2Send(off_t offs, int dlen, const char *&eMsg)
struct iovec * iov4Send(int &iovNum, int &iovLen, bool cs2net=false)
static const int acsSZ
static const int aioSZ
XrdXrootdAioPgrw(XrdXrootdAioTask *tP, XrdBuffer *bP)
struct iovec * iov4Recv(int &iovNum)
void Recycle() override
static XrdXrootdAioPgrw * Alloc(XrdXrootdAioTask *arp)
int Setup2Recv(off_t offs, int dlen, const char *&eMsg)
XrdXrootdProtocol * urProtocol()
void aioUpdate(int val)
static const int kXR_pgPageSZ
Definition: XProtocol.hh:494
XrdBuffManager * BPool