XRootD
XrdFrmXfrQueue Class Reference

#include <XrdFrmXfrQueue.hh>

+ Collaboration diagram for XrdFrmXfrQueue:

Public Member Functions

 XrdFrmXfrQueue ()
 
 ~XrdFrmXfrQueue ()
 

Static Public Member Functions

static int Add (XrdFrcRequest *rP, XrdFrcReqFile *reqF, int theQ)
 
static void Done (XrdFrmXfrJob *xP, const char *Msg)
 
static XrdFrmXfrJobGet (int ioQType)
 
static int Init ()
 
static void StopMon (void *parg)
 

Static Public Attributes

static const int useAnyQ = 0
 
static const int useInpQ = 1
 
static const int useOutQ = -1
 

Detailed Description

Definition at line 41 of file XrdFrmXfrQueue.hh.

Constructor & Destructor Documentation

◆ XrdFrmXfrQueue()

XrdFrmXfrQueue::XrdFrmXfrQueue ( )
inline

Definition at line 59 of file XrdFrmXfrQueue.hh.

59 {}

◆ ~XrdFrmXfrQueue()

XrdFrmXfrQueue::~XrdFrmXfrQueue ( )
inline

Definition at line 60 of file XrdFrmXfrQueue.hh.

60 {}

Member Function Documentation

◆ Add()

int XrdFrmXfrQueue::Add ( XrdFrcRequest rP,
XrdFrcReqFile reqF,
int  theQ 
)
static

Definition at line 73 of file XrdFrmXfrQueue.cc.

74 {
75  XrdFrmXfrJob *xP;
76  struct stat buf;
77  const char *xfrType = xfrName(*rP, qNum);
78  char *Lfn, lclpath[MAXPATHLEN];
79  int Outgoing = (qNum & XrdFrcRequest::outQ);
80 
81 // Validate queue number
82 //
83  if (qNum < 0 || qNum >= XrdFrcRequest::numQ-1)
84  {sprintf(lclpath, "%d", qNum);
85  Say.Emsg("Queue", lclpath, " is an invalid queue; skipping", rP->LFN);
86  if (reqFQ) reqFQ->Del(rP);
87  return 0;
88  }
89 
90 // First check if this request is active or pending. If it's an inbound request
91 // then only the lfn matters regardless of source. For outgoing requests then
92 // the lfn plus the target only matters.
93 //
94  Lfn = (Outgoing ? rP->LFN : (rP->LFN)+rP->LFO);
95  hMutex.Lock();
96  if ((xP = hTab.Find(Lfn)))
98  && strcmp(xP->reqData.Notify, rP->Notify))
99  {XrdOucTList *tP = new XrdOucTList(rP->Notify, 0, xP->NoteList);
100  xP->NoteList = tP;
101  }
102  hMutex.UnLock();
103  if (Config.Verbose || Trace.What & TRACE_Debug)
104  {sprintf(lclpath, " in progress; %s skipped for ", xfrType);
105  Say.Say(0, xP->Type, xP->reqData.LFN, lclpath, rP->User);
106  }
107  if (reqFQ) reqFQ->Del(rP);
108  return 0;
109  }
110  hMutex.UnLock();
111 
112 // Obtain the local name
113 //
114  if (!Config.LocalPath((rP->LFN)+rP->LFO, lclpath, sizeof(lclpath)-16))
115  {if (reqFQ) reqFQ->Del(rP);
116  return Notify(rP, qNum, 1, "Unable to generate pfn");
117  }
118 
119 // Check if the file exists or not. For incoming requests, the file must not
120 // exist. For outgoing requests the file must exist.
121 //
122  if (Config.Stat((rP->LFN)+rP->LFO, lclpath, &buf))
123  {if (Outgoing)
124  {if (Config.Verbose || Trace.What & TRACE_Debug)
125  Say.Say(0, xfrType,"skipped; ",lclpath," not resident.");
126  if (reqFQ) reqFQ->Del(rP);
127  return Notify(rP, qNum, 2, "file not resident");
128  }
129  } else {
130  if (!Outgoing)
131  {if (Config.Verbose || Trace.What & TRACE_Debug)
132  Say.Say(0, xfrType, "skipped; ", lclpath, " exists.");
133  if (reqFQ) reqFQ->Del(rP);
134  return Notify(rP, qNum, 0);
135  }
136  }
137 
138 // Obtain a queue slot, we may block until one is available
139 //
140  do {qMutex.Lock();
141  if ((xP = xfrQ[qNum].Free)) break;
142  qMutex.UnLock();
143  xfrQ[qNum].Avail.Wait();
144  } while(!xP);
145  xfrQ[qNum].Free = xP->Next;
146  qMutex.UnLock();
147 
148 // Initialize the slot
149 //
150  xP->Next = 0;
151  xP->NoteList = 0;
152  xP->reqFQ = reqFQ;
153  xP->reqData = *rP;
154  xP->reqFile = (Outgoing ? xP->reqData.LFN : (xP->reqData.LFN)+rP->LFO);
155  strcpy(xP->PFN, lclpath);
156  xP->pfnEnd = strlen(lclpath);
157  xP->RetCode = 0;
158  xP->qNum = qNum;
159  xP->Act =*xfrType;
160  xP->Type = xfrType+1;
161 
162 // Add this to the table of requests
163 //
164  hMutex.Lock();
165  hTab.Add(xP->reqFile, xP, 0, Hash_keep);
166  hMutex.UnLock();
167 
168 // Place request in the appropriate transfer queue
169 //
170  qMutex.Lock();
171  if (xfrQ[qNum].Last) {xfrQ[qNum].Last->Next = xP; xfrQ[qNum].Last = xP;}
172  else xfrQ[qNum].Last = xfrQ[qNum].First = xP;
173  qMutex.UnLock();
174  qReady.Post();
175 
176 // All done
177 //
178  return 1;
179 }
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
XrdOucTList * NoteList
Definition: XrdFrmXfrJob.hh:45
char PFN[MAXPATHLEN+16]
Definition: XrdFrmXfrJob.hh:50
const char * Type
Definition: XrdFrmXfrJob.hh:49
XrdFrmXfrJob * Next
Definition: XrdFrmXfrJob.hh:44
XrdFrcRequest reqData
Definition: XrdFrmXfrJob.hh:48
XrdFrcReqFile * reqFQ
Definition: XrdFrmXfrJob.hh:46
char * reqFile
Definition: XrdFrmXfrJob.hh:47
@ Hash_keep
Definition: XrdOucHash.hh:55
int stat(const char *path, struct stat *buf)
char LFN[3072]
static const int msgFail
char User[256]
static const int msgSucc
static const int outQ
static const int numQ
char Notify[512]
T * Add(const char *KeyVal, T *KeyData, const int LifeTime=0, XrdOucHash_Options opt=Hash_default)
Definition: XrdOucHash.icc:61
T * Find(const char *KeyVal, time_t *KeyTime=0)
Definition: XrdOucHash.icc:160
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
XrdSysError Say
XrdSysTrace Trace("cms")
XrdCmsConfig Config

References XrdFrmXfrJob::Act, XrdCms::Config, XrdFrcReqFile::Del(), XrdSysError::Emsg(), Hash_keep, XrdFrcRequest::LFN, XrdFrcRequest::LFO, XrdFrcRequest::msgFail, XrdFrcRequest::msgSucc, XrdFrmXfrJob::Next, XrdFrmXfrJob::NoteList, XrdFrcRequest::Notify, XrdFrcRequest::numQ, XrdFrcRequest::Options, XrdFrcRequest::outQ, XrdFrmXfrJob::PFN, XrdFrmXfrJob::pfnEnd, XrdFrmXfrJob::qNum, XrdFrmXfrJob::reqData, XrdFrmXfrJob::reqFile, XrdFrmXfrJob::reqFQ, XrdFrmXfrJob::RetCode, XrdFrc::Say, XrdSysError::Say(), stat(), XrdFrc::Trace, TRACE_Debug, XrdFrmXfrJob::Type, XrdFrcRequest::User, and XrdOucTrace::What.

Referenced by XrdFrmReqBoss::Process(), and XrdFrmMigrate::Queue().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Done()

void XrdFrmXfrQueue::Done ( XrdFrmXfrJob xP,
const char *  Msg 
)
static

Definition at line 185 of file XrdFrmXfrQueue.cc.

186 {
187  XrdOucTList *tP;
188 
189 // Send notifications to everyone that wants it that this job is done
190 //
191  do {Notify(&(xP->reqData), xP->qNum, xP->RetCode, Msg);
192  if ((tP = xP->NoteList))
193  {strcpy(xP->reqData.Notify, tP->text);
194  xP->NoteList = tP->next;
195  delete tP;
196  }
197  } while(tP);
198 
199 // Remove this job from the queue file
200 //
201  if (xP->reqFQ) xP->reqFQ->Del(&(xP->reqData));
202 
203 // Remove this job from the active table
204 //
205  hMutex.Lock(); hTab.Del(xP->reqFile); hMutex.UnLock();
206 
207 // Place job element on the free queue
208 //
209  qMutex.Lock();
210  xP->Next = xfrQ[xP->qNum].Free;
211  xfrQ[xP->qNum].Free = xP;
212  xfrQ[xP->qNum].Avail.Post();
213  qMutex.UnLock();
214 }
void Del(XrdFrcRequest *rP)
int Del(const char *KeyVal, XrdOucHash_Options opt=Hash_default)
Definition: XrdOucHash.icc:136
XrdOucTList * next
Definition: XrdOucTList.hh:45
char * text
Definition: XrdOucTList.hh:46

References XrdFrcReqFile::Del(), XrdFrmXfrJob::Next, XrdOucTList::next, XrdFrmXfrJob::NoteList, XrdFrcRequest::Notify, XrdFrmXfrJob::qNum, XrdFrmXfrJob::reqData, XrdFrmXfrJob::reqFile, XrdFrmXfrJob::reqFQ, XrdFrmXfrJob::RetCode, and XrdOucTList::text.

Referenced by XrdFrmTransfer::Start().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Get()

XrdFrmXfrJob * XrdFrmXfrQueue::Get ( int  ioQType)
static

Definition at line 220 of file XrdFrmXfrQueue.cc.

221 {
222  XrdFrmXfrJob *xfrP;
223 
224 // Wait for an available job and return it
225 //
226  do {qReady.Wait();} while(!(xfrP = Pull(ioQType)));
227  return xfrP;
228 }

Referenced by XrdFrmTransfer::Start().

+ Here is the caller graph for this function:

◆ Init()

int XrdFrmXfrQueue::Init ( )
static

Definition at line 239 of file XrdFrmXfrQueue.cc.

240 {
241  static const char *StopFN[] = {"STAGE", "MIGR", "COPYIN", "COPYOUT"};
242  static const char *StopQN[] = {"stage", "migr", "copyin", "copyout"};
243  XrdFrmXfrJob *xP;
244  pthread_t tid;
245  char StopFile[1024], *fnSfx;
246  int n, qNum, retc;
247 
248 // Prepare to initialize the queues
249 //
250  strcpy(StopFile, Config.AdminPath);
251  strcat(StopFile, "STOP");
252  fnSfx = StopFile + strlen(StopFile);
253 
254 // Initialize each queue
255 //
256  for (qNum= 0; qNum < XrdFrcRequest::numQ-1; qNum++)
257  {
258 
259  // Initialize the stop file name and set the queue name and number
260  //
261  strcpy(fnSfx, StopFN[qNum]);
262  xfrQ[qNum].File = strdup(StopFile);
263  xfrQ[qNum].Name = StopQN[qNum];
264  xfrQ[qNum].qNum = qNum;
265 
266  // Start the stop file monitor thread for this queue
267  //
268  if ((retc = XrdSysThread::Run(&tid, InitStop, (void *)&xfrQ[qNum],
269  XRDSYSTHREAD_BIND, "Stopfile monitor")))
270  {Say.Emsg("main", retc, "create stopfile thread"); return 0;}
271 
272  // Create twice as many free queue elements as we have xfr agents for the
273  // queue. This prevents stalls when a particular queue is stopped but keeps
274  // us from exceeding internal resources when we get flooded with requests.
275  //
276  n = Config.xfrMax*2;
277  while(n--)
278  {xP = new XrdFrmXfrJob;
279  xP->Next = xfrQ[qNum].Free;
280  xfrQ[qNum].Free = xP;
281  xfrQ[qNum].Avail.Post();
282  }
283  }
284 
285 // All done
286 //
287  return 1;
288 }
void * InitStop(void *parg)
#define XRDSYSTHREAD_BIND
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)

References XrdCms::Config, XrdSysError::Emsg(), InitStop(), XrdFrmXfrJob::Next, XrdFrcRequest::numQ, XrdSysThread::Run(), XrdFrc::Say, and XRDSYSTHREAD_BIND.

Referenced by XrdFrmTransfer::Init().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ StopMon()

void XrdFrmXfrQueue::StopMon ( void *  parg)
static

Definition at line 438 of file XrdFrmXfrQueue.cc.

439 {
440  struct theQueue *monQ = (struct theQueue *)parg;
441  XrdFrmXfrJob *xP;
442  struct stat buf;
443  char theMsg[80];
444  int Cnt;
445 
446 // Establish which message to produce
447 //
448  sprintf(theMsg, "exists; %s transfers suspended.", monQ->Name);
449 
450 // Wait until someone needs to tell us to check for a stop file
451 //
452  while(1)
453  {monQ->Alert.Wait();
454  Cnt = 0;
455  while(!stat(monQ->File, &buf))
456  {if (!Cnt--) {Say.Emsg("StopMon", monQ->File, theMsg); Cnt = 12;}
458  }
459  qMutex.Lock();
460  monQ->Stop = 0;
461  xP = monQ->First;
462  while(xP) {qReady.Post(); xP = xP->Next;}
463  qMutex.UnLock();
464  }
465 }
static void Snooze(int seconds)
Definition: XrdSysTimer.cc:168

References XrdSysError::Emsg(), XrdFrmXfrJob::Next, XrdFrc::Say, XrdSysTimer::Snooze(), and stat().

Referenced by InitStop().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Data Documentation

◆ useAnyQ

const int XrdFrmXfrQueue::useAnyQ = 0
static

Definition at line 50 of file XrdFrmXfrQueue.hh.

Referenced by XrdFrmTransfer::Init().

◆ useInpQ

const int XrdFrmXfrQueue::useInpQ = 1
static

Definition at line 49 of file XrdFrmXfrQueue.hh.

Referenced by XrdFrmTransfer::Init().

◆ useOutQ

const int XrdFrmXfrQueue::useOutQ = -1
static

Definition at line 51 of file XrdFrmXfrQueue.hh.

Referenced by XrdFrmTransfer::Init().


The documentation for this class was generated from the following files: