XRootD
XrdSsiSessReal.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d S s i S e s s R e a l . c c */
4 /* */
5 /* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <cerrno>
31 #include <cinttypes>
32 #include <cstdio>
33 #include <cstdlib>
34 #include <cstring>
35 #include <string>
36 #include <sys/types.h>
37 #include <netinet/in.h>
38 
39 #include "XrdSsi/XrdSsiAtomics.hh"
40 #include "XrdSsi/XrdSsiRequest.hh"
41 #include "XrdSsi/XrdSsiRRAgent.hh"
42 #include "XrdSsi/XrdSsiRRInfo.hh"
43 #include "XrdSsi/XrdSsiScale.hh"
44 #include "XrdSsi/XrdSsiServReal.hh"
45 #include "XrdSsi/XrdSsiSessReal.hh"
46 #include "XrdSsi/XrdSsiTaskReal.hh"
47 #include "XrdSsi/XrdSsiTrace.hh"
48 #include "XrdSsi/XrdSsiUtils.hh"
49 
50 #include "XrdSys/XrdSysError.hh"
51 #include "XrdSys/XrdSysHeaders.hh"
52 #include "Xrd/XrdScheduler.hh"
53 
54 using namespace XrdSsi;
55 
56 /******************************************************************************/
57 /* L o c a l D e f i n e s */
58 /******************************************************************************/
59 
60 #define SINGLETON(dlvar, theitem)\
61  theitem ->dlvar .next == theitem
62 
63 #define INSERT(dlvar, curitem, newitem) \
64  newitem ->dlvar .next = curitem; \
65  newitem ->dlvar .prev = curitem ->dlvar .prev; \
66  curitem ->dlvar .prev-> dlvar .next = newitem; \
67  curitem ->dlvar .prev = newitem
68 
69 #define REMOVE(dlbase, dlvar, curitem) \
70  if (dlbase == curitem) dlbase = (SINGLETON(dlvar,curitem) \
71  ? 0 : curitem ->dlvar .next);\
72  curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
73  curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
74  curitem ->dlvar .next = curitem;\
75  curitem ->dlvar .prev = curitem
76 
77 /******************************************************************************/
78 /* L o c a l S t a t i c s */
79 /******************************************************************************/
80 
81 namespace
82 {
83  std::string dsProperty("DataServer");
84  XrdSsiMutex sidMutex;
85 
86  Atomic(uint32_t) sidVal(0);
87 }
88 
89 /******************************************************************************/
90 /* G l o b a l s */
91 /******************************************************************************/
92 
93 namespace XrdSsi
94 {
95 extern XrdScheduler *schedP;
96 
97 extern XrdSysError Log;
98 extern XrdSsiScale sidScale;
99 }
100 
101 /******************************************************************************/
102 /* L o c a l C l a s s e s */
103 /******************************************************************************/
104 
105 namespace
106 {
107 class CleanUp : public XrdJob
108 {
109 public:
110 
111 void DoIt() {sessP->Lock();
112  sessP->Unprovision();
113  delete this;
114  }
115 
116  CleanUp(XrdSsiSessReal *sP) : sessP(sP) {}
117  ~CleanUp() {}
118 
119 private:
120 XrdSsiSessReal *sessP;
121 };
122 }
123 
124 /******************************************************************************/
125 /* D e s t r u c t o r */
126 /******************************************************************************/
127 
129 {
130  XrdSsiTaskReal *tP;
131 
132  if (resKey) free(resKey);
133  if (sessName) free(sessName);
134  if (sessNode) free(sessNode);
135 
136  while((tP = freeTask)) {freeTask = tP->attList.next; delete tP;}
137 }
138 
139 /******************************************************************************/
140 /* I n i t S e s s i o n */
141 /******************************************************************************/
142 
143 void XrdSsiSessReal::InitSession(XrdSsiServReal *servP, const char *sName,
144  int uent, bool hold, bool newSID)
145 {
146  EPNAME("InitSession");
147  requestP = 0;
148  uEnt = uent;
149  attBase = 0;
150  freeTask = 0;
151  myService = servP;
152  nextTID = 0;
153  alocLeft = XrdSsiRRInfo::idMax;
154  isHeld = hold;
155  inOpen = false;
156  noReuse = false;
157  if (resKey) {free(resKey); resKey = 0;}
158  if (sessName) free(sessName);
159  sessName = (sName ? strdup(sName) : 0);
160  if (sessNode) free(sessNode);
161  sessNode = 0;
162  if (newSID)
163  {if (servP == 0) sessID = 0xffffffff;
164  else {Atomic_BEG(sidMutex);
165  sessID = Atomic_INC(sidVal);
166  Atomic_END(sidMutex);
167  snprintf(tident, sizeof(tident), "S %u#", sessID);
168  DEBUG("new sess for "<<sName<<" uent="<<uent<<" hold="<<hold);
169  }
170  } else {
171  DEBUG("reuse sess for "<<sName<<" uent="<<uent<<" hold="<<hold);
172  }
173 }
174 
175 /******************************************************************************/
176 /* Private: N e w T a s k */
177 /******************************************************************************/
178 
179 // Must be called with sessMutex locked!
180 
181 XrdSsiTaskReal *XrdSsiSessReal::NewTask(XrdSsiRequest *reqP)
182 {
183  EPNAME("NewTask");
184  XrdSsiTaskReal *ptP, *tP;
185 
186 // Allocate a task object for this request
187 //
188  if ((tP = freeTask)) freeTask = tP->attList.next;
189  else {if (!alocLeft || !(tP = new XrdSsiTaskReal(this)))
190  {XrdSsiUtils::RetErr(*reqP, "Too many active requests.", EMLINK);
191  return 0;
192  }
193  alocLeft--;
194  }
195 
196 // We always set a new task ID to avoid ID collisions. This is good for over
197 // 194 days if we have 1 request/second. In practice. this will work for a
198 // couple of years before wrapping. By then the ID's should be free.
199 //
200  tP->SetTaskID(nextTID++, sessID);
201  nextTID &= XrdSsiRRInfo::idMax;
202 
203 // Initialize the task and return its pointer
204 //
205  tP->Init(reqP, reqP->GetTimeOut());
206  DEBUG("New task=" <<tP <<" id=" <<tP->ID());
207 
208 // Insert the task into our list of tasks
209 //
210  if ((ptP = attBase)) {INSERT(attList, ptP, tP);}
211  else attBase = tP;
212 
213 // We will be using the session mutex for serialization. Afterwards, bind the
214 // task to the request and return the task pointer.
215 //
216  XrdSsiRRAgent::SetMutex(reqP, &sessMutex);
217  tP->BindRequest(*reqP);
218  return tP;
219 }
220 
221 /******************************************************************************/
222 /* P r o v i s i o n */
223 /******************************************************************************/
224 
225 bool XrdSsiSessReal::Provision(XrdSsiRequest *reqP, const char *epURL)
226 {
227  EPNAME("Provision");
228  XrdCl::XRootDStatus epStatus;
229  XrdSsiMutexMon rHelp(&sessMutex);
231 
232 // Set retry flag as appropriate
233 //
234  if (XrdSsiRRAgent::isaRetry(reqP, true)) oFlags |= XrdCl::OpenFlags::Refresh;
235 
236 // Issue the open and if the open was started, return success.
237 //
238  DEBUG("Provisioning " <<epURL);
239  epStatus = epFile.Open((const std::string)epURL, oFlags,
241  (XrdCl::ResponseHandler *)this,
242  reqP->GetTimeOut());
243 
244 // If there was an error, scuttle the request. Note that errors will be returned
245 // on a separate thread to avoid hangs here.
246 //
247  if (!epStatus.IsOK())
248  {std::string eTxt;
249  int eNum = XrdSsiUtils::GetErr(epStatus, eTxt);
250  XrdSsiUtils::RetErr(*reqP, eTxt.c_str(), eNum);
251  XrdSsi::sidScale.retEnt(uEnt);
252  return false;
253  }
254 
255 // Queue a new task and indicate our state
256 //
257  NewTask(reqP);
258  inOpen = true;
259  return true;
260 }
261 
262 /******************************************************************************/
263 /* Private: R e l T a s k */
264 /******************************************************************************/
265 
266 void XrdSsiSessReal::RelTask(XrdSsiTaskReal *tP) // sessMutex locked!
267 {
268  EPNAME("RelTask");
269 
270 // Do some debugging here
271 //
272  DEBUG((isHeld ? "Recycling":"Deleting")<<" task="<<tP<<" id=" <<tP->ID());
273 
274 // Delete this task or place it on the free list
275 //
276  if (!isHeld) delete tP;
277  else {tP->ClrEvent();
278  tP->attList.next = freeTask;
279  freeTask = tP;
280  }
281 }
282 
283 /******************************************************************************/
284 /* R u n */
285 /******************************************************************************/
286 
288 {
289  XrdSsiMutexMon sessMon(sessMutex);
290  XrdSsiTaskReal *tP;
291 
292 // If we are not allowed to be reused, return to indicated try someone else
293 //
294  if (noReuse) return false;
295 
296 // Reserve a stream ID. If we cannot then indicate we cannot be reused
297 //
298  if (!XrdSsi::sidScale.rsvEnt(uEnt)) return false;
299 
300 // Queue a new task
301 //
302  tP = NewTask(reqP);
303 
304 // If we are already open and we have a task, send off the request
305 //
306  if (!inOpen && tP && !tP->SendRequest(sessNode)) noReuse = true;
307  return true;
308 }
309 
310 /******************************************************************************/
311 /* Private: S h u t d o w n */
312 /******************************************************************************/
313 
314 // Called with sessMutex locked and return with it unlocked
315 
316 void XrdSsiSessReal::Shutdown(XrdCl::XRootDStatus &epStatus, bool onClose)
317 {
318  XrdSsiTaskReal *tP, *ntP = freeTask;
319 
320 // Delete all acccumulated tasks
321 //
322  while((tP = ntP)) {ntP = tP->attList.next; delete tP;}
323  freeTask = 0;
324 
325 // If the close failed then we cannot recycle this object as it is not reusable
326 //
327  if (onClose && !epStatus.IsOK())
328  {std::string eText;
329  int eNum = XrdSsiUtils::GetErr(epStatus, eText);
330  char mBuff[1024];
331  snprintf(mBuff, sizeof(mBuff), "Unprovision: %s@%s error; %d",
332  sessName, sessNode, eNum);
333  Log.Emsg("Shutdown", mBuff, eText.c_str());
334  sessMutex.UnLock();
335  myService->Recycle(this, false);
336  } else {
337  if (sessName) {free(sessName); sessName = 0;}
338  if (sessNode) {free(sessNode); sessNode = 0;}
339  sessMutex.UnLock();
340  myService->Recycle(this, !noReuse);
341  }
342 }
343 
344 /******************************************************************************/
345 /* T a s k F i n i s h e d */
346 /******************************************************************************/
347 
349 {
350  EPNAME("TaskFin");
351 // Lock our mutex
352 //
353  sessMutex.Lock();
354 
355 // Remove task from the task list if it's in it and release the task object.
356 //
357  if (tP == attBase || tP->attList.next != tP)
358  {REMOVE(attBase, attList, tP);}
359  RelTask(tP);
360 
361 // Return the request entry number
362 //
363  XrdSsi::sidScale.retEnt(uEnt);
364 
365 // If we are waiting for a provision to finish, simply exit as the event
366 // handler will notice that there is no task and will unprovision. Otherwise
367 //
368 
369 
370 // If we can shutdown, then unprovision which will drive a shutdown. Note
371 // that Unprovision() returns without the sessMutex, otherwise we must
372 // unlock it before we return. A shutdown invalidates this object!
373 //
374  if (!inOpen)
375  {if (!isHeld && !attBase) Unprovision();
376  else sessMutex.UnLock();
377  } else {
378  DEBUG("Unprovision deferred for " <<sessName);
379  sessMutex.UnLock();
380  }
381 }
382 
383 /******************************************************************************/
384 /* U n H o l d */
385 /******************************************************************************/
386 
387 void XrdSsiSessReal::UnHold(bool cleanup)
388 {
389  XrdSsiMutexMon sessMon(sessMutex);
390 
391 // Immediately stopo reuse of this object
392 //
393  if (isHeld && resKey && myService) myService->StopReuse(resKey);
394 
395 // Turn off the hold flag and if we have no attached tasks, schedule shutdown
396 //
397  isHeld = false;
398  if (cleanup && !attBase) XrdSsi::schedP->Schedule(new CleanUp(this));
399 }
400 
401 /******************************************************************************/
402 /* Private: U n p r o v i s i o n */
403 /******************************************************************************/
404 
405 // Called with sessMutex locked and returns with it unlocked
406 // Returns false if a shutdown occurred (i.e. session object no longer valid)
407 
408 bool XrdSsiSessReal::Unprovision() // Called with sessMutex locked!
409 {
410  EPNAME("Unprovision");
411  XrdCl::XRootDStatus uStat;
412 
413 // Clear any pending events
414 //
415  DEBUG("Closing " <<sessName);
416 
417 // If the file is not open (it might be due to an open error) then do a
418 // shutdown right away. Otherwise, try to close if successful the event
419 // handler will do the shutdown, Otherwise, we do a Futterwacken dance.
420 //
421  if (!epFile.IsOpen()) {Shutdown(uStat, false); return false;}
422  else {uStat = epFile.Close((XrdCl::ResponseHandler *)this);
423  if (!uStat.IsOK()) {Shutdown(uStat, true); return false;}
424  else sessMutex.UnLock();
425  }
426  return true;
427 }
428 
429 /******************************************************************************/
430 /* X e q E v e n t */
431 /******************************************************************************/
432 
434  XrdCl::AnyObject **respP)
435 {
436 // Lock out mutex. Note that events like shutdown unlock the mutex. The only
437 // events handled here are open() and close().
438 //
439  sessMutex.Lock();
440  XrdSsiTaskReal *ztP, *ntP, *tP = attBase;
441 
442 // If we are not in the open phase then this is due to a close event. Simply
443 // do a shutdown and return to stop event processing.
444 //
445  if (!inOpen)
446  {Shutdown(*status, true); // sessMutex gets unlocked!
447  return -1; // This object no longer valid!
448  }
449 
450 // We are no longer in open. However, if open encounetered an error then this
451 // session cannot be reused because the file object is in a bad state.
452 //
453  inOpen = false;
454  noReuse = !status->IsOK();
455 
456 // If we have no requests then we may want to simply shoutdown.
457 // Note that shutdown and unprovision unlock the sessMutex.
458 //
459  if (!tP)
460  {if (isHeld)
461  {sessMutex.UnLock();
462  return 1;
463  }
464  if (!status->IsOK()) Shutdown(*status, false);
465  else {if (!isHeld) return (Unprovision() ? 1 : -1);
466  else sessMutex.UnLock();
467  }
468  return 1; // Flush events and continue
469  }
470 
471 // We are here because the open finally completed. If the open failed, then
472 // schedule an error for all pending tasks. The Finish() call on each will
473 // drive the cleanup of this session.
474 //
475  if (!status->IsOK())
476  {XrdSsiErrInfo eInfo;
477  XrdSsiUtils::SetErr(*status, eInfo);
478  do {tP->SchedError(&eInfo); tP = tP->attList.next;}
479  while(tP != attBase);
480  sessMutex.UnLock();
481  return 1;
482  }
483 
484 // Obtain the endpoint name
485 //
486  std::string currNode;
487  if (epFile.GetProperty(dsProperty, currNode))
488  {if (sessNode) free(sessNode);
489  sessNode = strdup(currNode.c_str());
490  } else sessNode = strdup("Unknown!");
491 
492 // Execute each pending request. Make sure not to reference the task object
493 // chain pointer after invoking SendRequest() as it may become invalid.
494 //
495  ztP = attBase;
496  do {ntP = tP->attList.next;
497  if (!tP->SendRequest(sessNode)) noReuse = true;
498  tP = ntP;
499  } while(tP != ztP);
500 
501 // We are done, field the next event
502 //
503  sessMutex.UnLock();
504  return 0;
505 }
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
Definition: XrdAccTest.cc:262
#define tident
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define Atomic_INC(x)
#define Atomic_END(x)
#define Atomic_BEG(x)
#define REMOVE(dlbase, dlvar, curitem)
#define INSERT(dlvar, curitem, newitem)
Handle an async response.
Definition: XrdJob.hh:43
void Schedule(XrdJob *jp)
void ClrEvent()
Definition: XrdSsiEvent.hh:42
static void SetMutex(XrdSsiRequest *rP, XrdSsiMutex *mP)
static bool isaRetry(XrdSsiRequest *rP, bool reset=false)
static const unsigned int idMax
Definition: XrdSsiRRInfo.hh:41
uint16_t GetTimeOut()
void BindRequest(XrdSsiRequest &rqstR)
void retEnt(int xEnt)
Definition: XrdSsiScale.cc:95
void InitSession(XrdSsiServReal *servP, const char *sName, int uent, bool hold, bool newSID=false)
bool Provision(XrdSsiRequest *reqP, const char *epURL)
bool Run(XrdSsiRequest *reqP)
int XeqEvent(XrdCl::XRootDStatus *status, XrdCl::AnyObject **respP)
void UnHold(bool cleanup=true)
void TaskFinished(XrdSsiTaskReal *tP)
void Init(XrdSsiRequest *rP, unsigned short tmo=0)
XrdSsiTaskReal * next
void SchedError(XrdSsiErrInfo *eInfo=0)
bool SendRequest(const char *node)
void SetTaskID(uint32_t tid, uint32_t sid)
static int GetErr(XrdCl::XRootDStatus &Status, std::string &eText)
Definition: XrdSsiUtils.cc:191
static void RetErr(XrdSsiRequest &reqP, const char *eTxt, int eNum)
Definition: XrdSsiUtils.cc:220
static void SetErr(XrdCl::XRootDStatus &Status, XrdSsiErrInfo &eInfo)
Definition: XrdSsiUtils.cc:232
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdScheduler * schedP
Definition: XrdSsiClient.cc:74
XrdSsiScale sidScale
XrdSysError Log
Atomic(int) contactN(1)
Mode
Access mode.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124