XRootD
XrdFrmXfrQueue.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d F r m X f r Q u e u e . c c */
4 /* */
5 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstring>
32 #include <strings.h>
33 #include <cstdio>
34 #include <fcntl.h>
35 #include <unistd.h>
36 #include <utime.h>
37 #include <sys/param.h>
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 
41 #include "XrdFrc/XrdFrcReqFile.hh"
42 #include "XrdFrc/XrdFrcTrace.hh"
43 #include "XrdFrm/XrdFrmConfig.hh"
44 #include "XrdFrm/XrdFrmXfrJob.hh"
45 #include "XrdFrm/XrdFrmXfrQueue.hh"
46 #include "XrdNet/XrdNetMsg.hh"
47 #include "XrdOss/XrdOss.hh"
48 #include "XrdOuc/XrdOucTList.hh"
49 #include "XrdSys/XrdSysError.hh"
50 #include "XrdSys/XrdSysFD.hh"
51 #include "XrdSys/XrdSysTimer.hh"
52 #include "XrdSys/XrdSysPlatform.hh"
53 
54 using namespace XrdFrc;
55 using namespace XrdFrm;
56 
57 /******************************************************************************/
58 /* S t a t i c s */
59 /******************************************************************************/
60 
61 XrdSysMutex XrdFrmXfrQueue::hMutex;
62 XrdOucHash<XrdFrmXfrJob> XrdFrmXfrQueue::hTab;
63 
64 XrdSysMutex XrdFrmXfrQueue::qMutex;
65 XrdSysSemaphore XrdFrmXfrQueue::qReady(0);
66 
67 XrdFrmXfrQueue::theQueue XrdFrmXfrQueue::xfrQ[XrdFrcRequest::numQ];
68 
69 /******************************************************************************/
70 /* Public: A d d */
71 /******************************************************************************/
72 
74 {
75  XrdFrmXfrJob *xP;
76  struct stat buf;
77  const char *xfrType = xfrName(*rP, qNum);
78  char *Lfn, lclpath[MAXPATHLEN];
79  int Outgoing = (qNum & XrdFrcRequest::outQ);
80 
81 // Validate queue number
82 //
83  if (qNum < 0 || qNum >= XrdFrcRequest::numQ-1)
84  {sprintf(lclpath, "%d", qNum);
85  Say.Emsg("Queue", lclpath, " is an invalid queue; skipping", rP->LFN);
86  if (reqFQ) reqFQ->Del(rP);
87  return 0;
88  }
89 
90 // First check if this request is active or pending. If it's an inbound request
91 // then only the lfn matters regardless of source. For outgoing requests then
92 // the lfn plus the target only matters.
93 //
94  Lfn = (Outgoing ? rP->LFN : (rP->LFN)+rP->LFO);
95  hMutex.Lock();
96  if ((xP = hTab.Find(Lfn)))
98  && strcmp(xP->reqData.Notify, rP->Notify))
99  {XrdOucTList *tP = new XrdOucTList(rP->Notify, 0, xP->NoteList);
100  xP->NoteList = tP;
101  }
102  hMutex.UnLock();
103  if (Config.Verbose || Trace.What & TRACE_Debug)
104  {sprintf(lclpath, " in progress; %s skipped for ", xfrType);
105  Say.Say(0, xP->Type, xP->reqData.LFN, lclpath, rP->User);
106  }
107  if (reqFQ) reqFQ->Del(rP);
108  return 0;
109  }
110  hMutex.UnLock();
111 
112 // Obtain the local name
113 //
114  if (!Config.LocalPath((rP->LFN)+rP->LFO, lclpath, sizeof(lclpath)-16))
115  {if (reqFQ) reqFQ->Del(rP);
116  return Notify(rP, qNum, 1, "Unable to generate pfn");
117  }
118 
119 // Check if the file exists or not. For incoming requests, the file must not
120 // exist. For outgoing requests the file must exist.
121 //
122  if (Config.Stat((rP->LFN)+rP->LFO, lclpath, &buf))
123  {if (Outgoing)
124  {if (Config.Verbose || Trace.What & TRACE_Debug)
125  Say.Say(0, xfrType,"skipped; ",lclpath," not resident.");
126  if (reqFQ) reqFQ->Del(rP);
127  return Notify(rP, qNum, 2, "file not resident");
128  }
129  } else {
130  if (!Outgoing)
131  {if (Config.Verbose || Trace.What & TRACE_Debug)
132  Say.Say(0, xfrType, "skipped; ", lclpath, " exists.");
133  if (reqFQ) reqFQ->Del(rP);
134  return Notify(rP, qNum, 0);
135  }
136  }
137 
138 // Obtain a queue slot, we may block until one is available
139 //
140  do {qMutex.Lock();
141  if ((xP = xfrQ[qNum].Free)) break;
142  qMutex.UnLock();
143  xfrQ[qNum].Avail.Wait();
144  } while(!xP);
145  xfrQ[qNum].Free = xP->Next;
146  qMutex.UnLock();
147 
148 // Initialize the slot
149 //
150  xP->Next = 0;
151  xP->NoteList = 0;
152  xP->reqFQ = reqFQ;
153  xP->reqData = *rP;
154  xP->reqFile = (Outgoing ? xP->reqData.LFN : (xP->reqData.LFN)+rP->LFO);
155  strcpy(xP->PFN, lclpath);
156  xP->pfnEnd = strlen(lclpath);
157  xP->RetCode = 0;
158  xP->qNum = qNum;
159  xP->Act =*xfrType;
160  xP->Type = xfrType+1;
161 
162 // Add this to the table of requests
163 //
164  hMutex.Lock();
165  hTab.Add(xP->reqFile, xP, 0, Hash_keep);
166  hMutex.UnLock();
167 
168 // Place request in the appropriate transfer queue
169 //
170  qMutex.Lock();
171  if (xfrQ[qNum].Last) {xfrQ[qNum].Last->Next = xP; xfrQ[qNum].Last = xP;}
172  else xfrQ[qNum].Last = xfrQ[qNum].First = xP;
173  qMutex.UnLock();
174  qReady.Post();
175 
176 // All done
177 //
178  return 1;
179 }
180 
181 /******************************************************************************/
182 /* Public: D o n e */
183 /******************************************************************************/
184 
185 void XrdFrmXfrQueue::Done(XrdFrmXfrJob *xP, const char *Msg)
186 {
187  XrdOucTList *tP;
188 
189 // Send notifications to everyone that wants it that this job is done
190 //
191  do {Notify(&(xP->reqData), xP->qNum, xP->RetCode, Msg);
192  if ((tP = xP->NoteList))
193  {strcpy(xP->reqData.Notify, tP->text);
194  xP->NoteList = tP->next;
195  delete tP;
196  }
197  } while(tP);
198 
199 // Remove this job from the queue file
200 //
201  if (xP->reqFQ) xP->reqFQ->Del(&(xP->reqData));
202 
203 // Remove this job from the active table
204 //
205  hMutex.Lock(); hTab.Del(xP->reqFile); hMutex.UnLock();
206 
207 // Place job element on the free queue
208 //
209  qMutex.Lock();
210  xP->Next = xfrQ[xP->qNum].Free;
211  xfrQ[xP->qNum].Free = xP;
212  xfrQ[xP->qNum].Avail.Post();
213  qMutex.UnLock();
214 }
215 
216 /******************************************************************************/
217 /* Public: G e t */
218 /******************************************************************************/
219 
221 {
222  XrdFrmXfrJob *xfrP;
223 
224 // Wait for an available job and return it
225 //
226  do {qReady.Wait();} while(!(xfrP = Pull(ioQType)));
227  return xfrP;
228 }
229 
230 /******************************************************************************/
231 /* I n i t */
232 /******************************************************************************/
233 
234 void *InitStop(void *parg)
236  return (void *)0;
237 }
238 
240 {
241  static const char *StopFN[] = {"STAGE", "MIGR", "COPYIN", "COPYOUT"};
242  static const char *StopQN[] = {"stage", "migr", "copyin", "copyout"};
243  XrdFrmXfrJob *xP;
244  pthread_t tid;
245  char StopFile[1024], *fnSfx;
246  int n, qNum, retc;
247 
248 // Prepare to initialize the queues
249 //
250  strcpy(StopFile, Config.AdminPath);
251  strcat(StopFile, "STOP");
252  fnSfx = StopFile + strlen(StopFile);
253 
254 // Initialize each queue
255 //
256  for (qNum= 0; qNum < XrdFrcRequest::numQ-1; qNum++)
257  {
258 
259  // Initialize the stop file name and set the queue name and number
260  //
261  strcpy(fnSfx, StopFN[qNum]);
262  xfrQ[qNum].File = strdup(StopFile);
263  xfrQ[qNum].Name = StopQN[qNum];
264  xfrQ[qNum].qNum = qNum;
265 
266  // Start the stop file monitor thread for this queue
267  //
268  if ((retc = XrdSysThread::Run(&tid, InitStop, (void *)&xfrQ[qNum],
269  XRDSYSTHREAD_BIND, "Stopfile monitor")))
270  {Say.Emsg("main", retc, "create stopfile thread"); return 0;}
271 
272  // Create twice as many free queue elements as we have xfr agents for the
273  // queue. This prevents stalls when a particular queue is stopped but keeps
274  // us from exceeding internal resources when we get flooded with requests.
275  //
276  n = Config.xfrMax*2;
277  while(n--)
278  {xP = new XrdFrmXfrJob;
279  xP->Next = xfrQ[qNum].Free;
280  xfrQ[qNum].Free = xP;
281  xfrQ[qNum].Avail.Post();
282  }
283  }
284 
285 // All done
286 //
287  return 1;
288 }
289 
290 /******************************************************************************/
291 /* Private: P u l l */
292 /******************************************************************************/
293 
294 XrdFrmXfrJob *XrdFrmXfrQueue::Pull(int ioQType)
295 {
296  static bool ioX = false, prevQ[2] = {0,0};
297  XrdFrmXfrJob *xfrP;
298  int pikQ, theQ, Q1, Q2, nSel = 1;
299 
300 // Setup to pick a request equally multiplexing between all possible queues
301 //
302  qMutex.Lock();
303 do{if (!ioQType) ioX = !ioX;
304  else {ioX = (ioQType < 0 ? 1 : 0); nSel = 0;}
305  if (ioX) {Q1 = XrdFrcRequest::migQ; Q2 = XrdFrcRequest::putQ; pikQ = 1;}
306  else {Q1 = XrdFrcRequest::stgQ; Q2 = XrdFrcRequest::getQ; pikQ = 0;}
307 
308 // Check if we should avoid either queue because it is stopped
309 //
310  if (xfrQ[Q1].Stop || Stopped(Q1)) Q1 = XrdFrcRequest::nilQ;
311  if (xfrQ[Q2].Stop || Stopped(Q2)) Q2 = XrdFrcRequest::nilQ;
312 
313 // Pick the oldest possible request
314 //
315  if (xfrQ[Q1].First && xfrQ[Q2].First)
316  { if (xfrQ[Q1].First->reqData.addTOD < xfrQ[Q2].First->reqData.addTOD)
317  theQ = Q1;
318  else if (xfrQ[Q1].First->reqData.addTOD > xfrQ[Q2].First->reqData.addTOD)
319  theQ = Q2;
320  else theQ = (prevQ[pikQ] == Q1 ? Q2 : Q1);
321  }else theQ = (xfrQ[Q1].First ? Q1 : Q2);
322 
323 // Dequeue the request (we may have an empty selectoin here)
324 //
325  if ((xfrP = xfrQ[theQ].First)
326  && !(xfrQ[theQ].First = xfrP->Next)) xfrQ[theQ].Last = 0;
327  } while(!xfrP && nSel--);
328 
329 // Return the job, if any
330 //
331  prevQ[pikQ] = theQ;
332  qMutex.UnLock();
333  return xfrP;
334 }
335 
336 /******************************************************************************/
337 /* Private: N o t i f y */
338 /******************************************************************************/
339 
340 int XrdFrmXfrQueue::Notify(XrdFrcRequest *rP, int qNum, int rc, const char *msg)
341 {
342  static const char *isFile = "file:///";
343  static const int lnFile = 8;
344  static const char *isUDP = "udp://";
345  static const int lnUDP = 6;
346  static const char *qOpr[] = {"stage", "migr", "get", "put"};
347  char msgbuff[4096], *nP, *mP = rP->Notify;
348  int n;
349 
350 // Check if message really needs to be sent
351 //
352  if ((!rc && !(rP->Options & XrdFrcRequest::msgSucc))
353  || ( rc && !(rP->Options & XrdFrcRequest::msgFail))) return 0;
354 
355 // Multiple destinations can be specified, each destination separated by a
356 // carriable rturn. We don't screen out duplicates.
357 //
358 do{if ((nP = index(rP->Notify, '\r'))) *nP++ = '\0';
359 
360 // Check for file destination
361 //
362  if (!strncmp(mP, isFile, lnFile))
363  {if (rc) n = sprintf(msgbuff, "%s %s %s %s\n", qOpr[qNum],
364  (rc > 1 ? "ENOENT":"BAD"), rP->LFN, (msg ? msg:"?"));
365  else n = sprintf(msgbuff, "stage OK %s\n", rP->LFN);
366  Send2File(mP+lnFile, msgbuff, n);
367  }
368 
369 // Check for udp destination
370 //
371  else if (!strncmp(mP, isUDP, lnUDP))
372  {char *txtP, *dstP = mP+lnUDP;
373  if ((txtP = index(dstP, '/'))) *txtP++ = '\0';
374  else txtP = (char *)"";
375  n = sprintf(msgbuff, "%s %s %s %s", (rc ? "unprep" : "ready"),
376  rP->ID, txtP, rP->LFN);
377  Send2UDP(dstP, msgbuff, n);
378  }
379 
380 // Issue warning as we don't yet support mail or tcp notifications
381 //
382  else if (*mP != '-')
383  Say.Emsg("Notify", "Unsupported notification path '", mP, "'.");
384  } while((mP = nP));
385 
386 // All done
387 //
388  return 0;
389 }
390 
391 /******************************************************************************/
392 /* Private: S e n d 2 F i l e */
393 /******************************************************************************/
394 
395 void XrdFrmXfrQueue::Send2File(char *Dest, char *Msg, int Mln)
396 {
397  EPNAME("Notify");
398  int FD;
399 
400 // Do some debugging
401 //
402  DEBUG("sending '" <<Msg <<"' via " <<Dest);
403 
404 // Open the file
405 //
406  if ((FD = XrdSysFD_Open(Dest, O_WRONLY)) < 0)
407  {Say.Emsg("Notify", errno, "send notification via", Dest); return;}
408 
409 // Write the message
410 //
411  if (write(FD, Msg, Mln) < 0)
412  Say.Emsg("Notify", errno, "send notification via", Dest);
413  close(FD);
414 }
415 
416 /******************************************************************************/
417 /* Private: S e n d 2 U D P */
418 /******************************************************************************/
419 
420 void XrdFrmXfrQueue::Send2UDP(char *Dest, char *Msg, int Mln)
421 {
422  EPNAME("Notify");
423  static XrdNetMsg Relay(&Say, 0);
424 
425 // Do some debugging
426 //
427  DEBUG("sending '" <<Msg <<"' via " <<Dest);
428 
429 // Send off the message
430 //
431  Relay.Send(Msg, Mln, Dest);
432 }
433 
434 /******************************************************************************/
435 /* Public: S t o p M o n */
436 /******************************************************************************/
437 
438 void XrdFrmXfrQueue::StopMon(void *parg)
439 {
440  struct theQueue *monQ = (struct theQueue *)parg;
441  XrdFrmXfrJob *xP;
442  struct stat buf;
443  char theMsg[80];
444  int Cnt;
445 
446 // Establish which message to produce
447 //
448  sprintf(theMsg, "exists; %s transfers suspended.", monQ->Name);
449 
450 // Wait until someone needs to tell us to check for a stop file
451 //
452  while(1)
453  {monQ->Alert.Wait();
454  Cnt = 0;
455  while(!stat(monQ->File, &buf))
456  {if (!Cnt--) {Say.Emsg("StopMon", monQ->File, theMsg); Cnt = 12;}
458  }
459  qMutex.Lock();
460  monQ->Stop = 0;
461  xP = monQ->First;
462  while(xP) {qReady.Post(); xP = xP->Next;}
463  qMutex.UnLock();
464  }
465 }
466 
467 /******************************************************************************/
468 /* Private: S t o p p e d */
469 /******************************************************************************/
470 
471 int XrdFrmXfrQueue::Stopped(int qNum) // Called with qMutex locked!
472 {
473  struct stat buf;
474 
475 // Check for stop file existence. If it exists and the queue has not been
476 // stopped; stop it and alert the stop file monitor.
477 //
478  if (stat(xfrQ[qNum].File, &buf)) return 0;
479  if (!xfrQ[qNum].Stop) {xfrQ[qNum].Stop = 1; xfrQ[qNum].Alert.Post();}
480  return 1;
481 }
482 
483 /******************************************************************************/
484 /* Private: x f r N a m e */
485 /******************************************************************************/
486 
487 const char *XrdFrmXfrQueue::xfrName(XrdFrcRequest &reqData, int qNum)
488 {
489 
490 // Return a human name for this transfer:
491 // Migrate
492 // Migr+rm
493 // Staging
494 // CopyIn
495 // CopyOut
496 // Copy+rm
497 //
498  switch(qNum)
499  {case XrdFrcRequest::getQ:
500  return "1CopyIn ";
501  break;
502  case XrdFrcRequest::migQ:
503  return (reqData.Options & XrdFrcRequest::Purge ?
504  "3Migr+rm ":"2Migrate ");
505  break;
506  case XrdFrcRequest::putQ:
507  return (reqData.Options&XrdFrcRequest::Purge ?
508  "5Copy+rm " : "4CopyOut ");
509  break;
510  case XrdFrcRequest::stgQ:
511  return "6Staging ";
512  break;
513  default: break;
514  }
515 
516  return "0Unknown ";
517 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
XrdOucTList * NoteList
Definition: XrdFrmXfrJob.hh:45
char PFN[MAXPATHLEN+16]
Definition: XrdFrmXfrJob.hh:50
const char * Type
Definition: XrdFrmXfrJob.hh:49
XrdFrmXfrJob * Next
Definition: XrdFrmXfrJob.hh:44
XrdFrcRequest reqData
Definition: XrdFrmXfrJob.hh:48
XrdFrcReqFile * reqFQ
Definition: XrdFrmXfrJob.hh:46
char * reqFile
Definition: XrdFrmXfrJob.hh:47
void * InitStop(void *parg)
@ Hash_keep
Definition: XrdOucHash.hh:55
int stat(const char *path, struct stat *buf)
ssize_t write(int fildes, const void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:43
XrdOucString File
#define XRDSYSTHREAD_BIND
void Del(XrdFrcRequest *rP)
static const int stgQ
static const int getQ
char LFN[3072]
static const int migQ
static const int putQ
static const int Purge
static const int msgFail
char User[256]
static const int msgSucc
static const int outQ
static const int numQ
char Notify[512]
static const int nilQ
static void Done(XrdFrmXfrJob *xP, const char *Msg)
static int Add(XrdFrcRequest *rP, XrdFrcReqFile *reqF, int theQ)
static void StopMon(void *parg)
static XrdFrmXfrJob * Get(int ioQType)
static int Init()
XrdOucTList * next
Definition: XrdOucTList.hh:45
char * text
Definition: XrdOucTList.hh:46
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Snooze(int seconds)
Definition: XrdSysTimer.cc:168
XrdCmsConfig Config
XrdOucTrace Trace
XrdSysError Say