XRootD
XrdOfsTPCJob.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O f s T P C J o b . c c */
4 /* */
5 /* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include "XrdOfs/XrdOfsStats.hh"
32 #include "XrdOfs/XrdOfsTPCJob.hh"
33 #include "XrdOfs/XrdOfsTPCProg.hh"
34 #include "XrdOuc/XrdOucCallBack.hh"
36 
37 /******************************************************************************/
38 /* G l o b a l O b j e c t s */
39 /******************************************************************************/
40 
41 extern XrdSysError OfsEroute;
42 extern XrdOfsStats OfsStats;
43 
44 /******************************************************************************/
45 /* S t a t i c O b j e c t s */
46 /******************************************************************************/
47 
48 XrdSysMutex XrdOfsTPCJob::jobMutex;
49 XrdOfsTPCJob *XrdOfsTPCJob::jobQ = 0;
50 XrdOfsTPCJob *XrdOfsTPCJob::jobLast = 0;
51 
52 /******************************************************************************/
53 /* C o n s t r u c t o r */
54 /******************************************************************************/
55 
56 XrdOfsTPCJob::XrdOfsTPCJob(const char *Url, const char *Org,
57  const char *Lfn, const char *Pfn,
58  const char *Cks, short lfnLoc[2],
59  const char *Spr, const char *Tpr)
60  : XrdOfsTPC(Url, Org, Lfn, Pfn, Cks, Spr, Tpr),
61  Next(0), myProg(0), eCode(0), Status(isWaiting)
62 { lfnPos[0] = lfnLoc[0]; lfnPos[1] = lfnLoc[1]; }
63 
64 /******************************************************************************/
65 /* D e l */
66 /******************************************************************************/
67 
69 {
70  XrdOfsTPCJob *pP = 0;
71  bool tpcCan = false;
72 
73 // Remove from queue if we are still in the queue
74 //
75  jobMutex.Lock();
76  if (inQ)
77  {if (this == jobQ) jobQ = Next;
78  else {pP = jobQ;
79  while(pP && pP->Next != this) pP = pP->Next;
80  if (pP) pP->Next = Next;
81  }
82  if (this == jobLast) jobLast = pP;
83  inQ = 0; tpcCan = true;
84  } else if (Status == isRunning && myProg)
85  {myProg->Cancel(); tpcCan = true;}
86 
87  if (tpcCan && Info.cbP)
88  {Refs++; // Make sure this object cannot get deleted
89  Info.Reply(SFS_ERROR, ECANCELED, "destination file prematurely closed",
90  &jobMutex); // Mutex is unlocked upon return!
91  jobMutex.Lock();
92  Refs--; // Undo the extra ref increase
93  }
94 
95 // Delete the element if possible
96 //
97  if (Refs <= 1) {jobMutex.UnLock(); delete this;}
98  else {Refs--; jobMutex.UnLock();}
99 }
100 
101 /******************************************************************************/
102 /* D o n e */
103 /******************************************************************************/
104 
105 XrdOfsTPCJob *XrdOfsTPCJob::Done(XrdOfsTPCProg *pgmP, const char *eTxt, int rc)
106 {
107  XrdSysMutexHelper jobMon(&jobMutex);
108  XrdOfsTPCJob *jP;
109 
110 // Indicate job status
111 //
112  eCode = rc; Status = isDone;
113  if (Info.Key) free(Info.Key);
114  Info.Key = (rc ? strdup(eTxt) : 0);
115 
116 // Check if we need to do a callback
117 //
118  if (Info.cbP)
119  {if (rc) Info.Reply(SFS_ERROR, rc, eTxt);
120  else Info.Reply(SFS_OK, 0, "");
121  }
122 
123 // Check if anyone is waiting for a program
124 //
125  if ((jP = jobQ))
126  {if (jP == jobLast) jobQ = jobLast = 0;
127  else jobQ = jP->Next;
128  jP->myProg = pgmP; jP->Refs++; jP->inQ = 0; jP->Status = isRunning;
129  if (jP->Info.cbP) jP->Info.Reply(SFS_OK, 0, "");
130  }
131 
132 // Free up this job and return the next job, if any
133 //
134  myProg = 0;
135  if (Refs <= 1) delete this;
136  else Refs--;
137  return jP;
138 }
139 
140 /******************************************************************************/
141 /* S y n c */
142 /******************************************************************************/
143 
145 {
146  static const int cbWaitTime = 1800;
147  XrdSysMutexHelper jobMon(&jobMutex);
148  int rc;
149 
150 // If we are running then simply wait for the copy to complete
151 //
152  if (Status == isRunning)
153  {if (Info.SetCB(eRR)) return SFS_ERROR;
154  eRR->setErrCode(cbWaitTime);
155  Info.Engage();
156  return SFS_STARTED;
157  }
158 
159 // If we are done then return what we have (this can't change)
160 //
161  if (Status == isDone)
162  {if (eCode) {eRR->setErrInfo(eCode, Info.Key); return SFS_ERROR;}
163  return SFS_OK;
164  }
165 
166 // The only thing left is that we are an unstarted job, so try to start it.
167 //
168  if (inQ) {myProg = 0; rc = 0;}
169  else if ((myProg = XrdOfsTPCProg::Start(this, rc)))
170  {Refs++; Status = isRunning; return SFS_OK;}
171 
172 // We could not allocate a program to this job. Check if this is due to an err
173 //
174  if (rc)
175  {OfsEroute.Emsg("TPC", rc, "create tpc job thread");
176  Status = isDone;
177  eCode = ECANCELED;
178  if (Info.Key) free(Info.Key);
179  Info.Key = strdup("Copy failed; resources unavailable.");
180  return Info.Fail(eRR, "resources unavailable", ECANCELED);
181  }
182 
183 // No programs available, place this job in callback mode
184 //
185  if (Info.SetCB(eRR)) return SFS_ERROR;
186  if (jobLast) {jobLast->Next = this; jobLast = this;}
187  else jobQ = jobLast = this;
188  inQ = 1;
189  eRR->setErrCode(cbWaitTime);
190  Info.Engage();
191  return SFS_STARTED;
192 }
XrdOfsStats OfsStats
Definition: XrdOfs.cc:113
XrdSysError OfsEroute
#define SFS_ERROR
#define SFS_STARTED
#define SFS_OK
XrdOucCallBack * cbP
int SetCB(XrdOucErrInfo *eRR)
void Reply(int rC, int eC, const char *eMsg, XrdSysMutex *mP=0)
int Fail(XrdOucErrInfo *eRR, const char *eMsg, int eCode)
XrdOfsTPCJob(const char *Url, const char *Org, const char *Lfn, const char *Pfn, const char *Cks, short lfnLoc[2], const char *Spr, const char *Tpr)
Definition: XrdOfsTPCJob.cc:56
XrdOfsTPCJob * Done(XrdOfsTPCProg *pgmP, const char *eTxt, int rc)
int Sync(XrdOucErrInfo *eRR)
static XrdOfsTPCProg * Start(XrdOfsTPCJob *jP, int &rc)
char Refs
Definition: XrdOfsTPC.hh:129
XrdOfsTPCInfo Info
Definition: XrdOfsTPC.hh:109
char inQ
Definition: XrdOfsTPC.hh:130
int setErrInfo(int code, const char *emsg)
int setErrCode(int code)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95