XRootD
XrdFfsQueue.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* XrdFfsQueue.cc functions to run independent tasks in queue */
3 /* */
4 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
5 /* All Rights Reserved */
6 /* Author: Wei Yang (SLAC National Accelerator Laboratory, 2009) */
7 /* Contract DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include "XrdFfs/XrdFfsQueue.hh"
31 #include <cstdlib>
32 
33 /* queue operation */
34 
35 #ifdef __cplusplus
36  extern "C" {
37 #endif
38 
41 unsigned int XrdFfsQueueNext_task_id = 0;
42 pthread_mutex_t XrdFfsQueueTaskque_mutex = PTHREAD_MUTEX_INITIALIZER;
43 pthread_cond_t XrdFfsQueueTaskque_cond = PTHREAD_COND_INITIALIZER;
44 
46 {
47  pthread_mutex_lock(&XrdFfsQueueTaskque_mutex);
48 
49  task->id = XrdFfsQueueNext_task_id + 1;
51  if (XrdFfsQueueTaskque_tail == NULL)
52  {
55  task->next = NULL;
56  pthread_cond_broadcast(&XrdFfsQueueTaskque_cond);
57  }
58  else
59  {
61  task->next = NULL;
64  }
65 
66  pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
67  return;
68 }
69 
71 {
72  struct XrdFfsQueueTasks *head;
73  while (pthread_mutex_lock(&XrdFfsQueueTaskque_mutex) == 0)
74  if (XrdFfsQueueTaskque_head == NULL)
75  {
77  pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
78  }
79  else
80  break;
81 
84 
85  head->next = NULL;
86  head->prev = NULL;
87 
88  if (XrdFfsQueueTaskque_head == NULL)
90 
91  pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
92  return head;
93 }
94 
95 /* create, wait and free(delete) a task */
96 
97 struct XrdFfsQueueTasks* XrdFfsQueue_create_task(void* (*func)(void*), void **args, short initstat)
98 {
99  struct XrdFfsQueueTasks *task = (struct XrdFfsQueueTasks*) malloc(sizeof(struct XrdFfsQueueTasks));
100  task->func = func;
101  task->args = args;
102  task->done = ( (initstat == -1)? -1 : 0); /* -1 means this task is meant to kill a worker thread */
103 
104  pthread_mutex_init(&task->mutex, NULL);
105  pthread_cond_init(&task->cond, NULL);
106 
107  XrdFfsQueue_enqueue(task);
108  return task;
109 }
110 
112 {
113  pthread_mutex_destroy(&task->mutex);
114  pthread_cond_destroy(&task->cond);
115  task->func = NULL;
116  task->args = NULL;
117  task->next = NULL;
118  task->prev = NULL;
119  free(task);
120  task = NULL;
121 }
122 
124 {
125  pthread_mutex_lock(&task->mutex);
126  if (task->done != 1)
127  pthread_cond_wait(&task->cond, &task->mutex);
128  pthread_mutex_unlock(&task->mutex);
129 }
130 
132 {
133  unsigned int que_len = 0;
134  pthread_mutex_lock(&XrdFfsQueueTaskque_mutex);
135  if (XrdFfsQueueTaskque_head != NULL && XrdFfsQueueTaskque_tail != NULL) {
138  else
139 // this is wrong:
140 // que_len = (unsigned int)2147483647 - (XrdFfsQueueTaskque_head->id - XrdFfsQueueTaskque_tail->id) + 1;
141 
142 //not accepted by c89
143 // que_len = (unsigned int)4294967295 - (XrdFfsQueueTaskque_head->id - XrdFfsQueueTaskque_tail->id) + 1;
144 
145 //this is not quite correct, but I imagine that the queue will never by so long >= 2147483647
146  que_len = (unsigned int)2147483647 - (XrdFfsQueueTaskque_head->id - XrdFfsQueueTaskque_tail->id) + 1+(unsigned int)2147483647+1;
147  }
148  pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
149  return que_len;
150 }
151 
152 /* workers */
153 
154 void *XrdFfsQueue_worker(void* x)
155 {
156  struct XrdFfsQueueTasks *task;
157  short quit = 0;
158 
159  loop:
160  task = XrdFfsQueue_dequeue();
161 
162  if (task->done == -1) // terminate this worker thread
163  quit = 1;
164 
165  pthread_mutex_lock(&task->mutex);
166 #ifdef QUEDEBUG
167  printf("worker %d on task %d\n", wid, task->id);
168 #endif
169  if (!quit)
170  (task->func)(task->args);
171 
172  task->done = 1;
173  pthread_cond_signal(&task->cond);
174  pthread_mutex_unlock(&task->mutex);
175  if (quit)
176  {
177 #ifdef QUEDEBUG
178  printf("worker %d is leaving\n", wid);
179 #endif
180  free(x);
181 // pthread_exit(NULL);
182  return(NULL);
183  }
184  else
185  goto loop;
186 }
187 
188 pthread_mutex_t XrdFfsQueueWorker_mutex;
189 unsigned short XrdFfsQueueNworkers = 0;
190 unsigned int XrdFfsQueueWorker_id = 0;
191 
193 {
194  int i, rc, *id;
195  pthread_t *thread;
196  pthread_attr_t attr;
197  size_t stacksize = 2*1024*1024;
198 
199  pthread_attr_init(&attr);
200  pthread_attr_setstacksize(&attr, stacksize);
201  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
202 
203  pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
204  for (i = 0; i < n; i++)
205  {
206  id = (int*) malloc(sizeof(int));
207  *id = XrdFfsQueueWorker_id++;
208  thread = (pthread_t*) malloc(sizeof(pthread_t));
209  if (thread == NULL)
210  {
212  break;
213  }
214  rc = pthread_create(thread, &attr, XrdFfsQueue_worker, id);
215  if (rc != 0)
216  {
218  break;
219  }
220  pthread_detach(*thread);
221  free(thread);
222  }
223  pthread_attr_destroy(&attr);
224  XrdFfsQueueNworkers += i;
225  pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
226  return i;
227 }
228 
230 {
231  int i;
232  struct XrdFfsQueueTasks *task;
233 
234  pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
235  if (XrdFfsQueueNworkers == 0)
236  n = 0;
237  else if (n > XrdFfsQueueNworkers)
238  {
241  }
242  else
243  XrdFfsQueueNworkers -= n;
244  for (i = 0; i < n; i++)
245  {
246  task = XrdFfsQueue_create_task(NULL, NULL, -1);
247  XrdFfsQueue_wait_task(task);
248  XrdFfsQueue_free_task(task);
249  }
250  pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
251  return n;
252 }
253 
255 {
256  int i;
257  pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
259  pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
260  return i;
261 }
262 
263 
264 /* Test program below
265  ==================
266 
267 struct jobargs {
268  int i;
269  int XrdFfsQueueWorker_id;
270 };
271 
272 void* job1(void *arg)
273 {
274  int i = ((struct jobargs*)arg)->i;
275 // int wid = ((struct jobargs*)arg)->XrdFfsQueueWorker_id;
276 
277 // if (i == 10 || i == 20 || i == 30 || i == 40)
278 // sleep(2);
279  printf("hello from job1 ( %d )\n", i);
280 }
281 
282 int main()
283 {
284  int i;
285 
286  XrdFfsQueue_create_workers(20);
287 #define N 500
288  struct XrdFfsQueueTasks *myjob1[N];
289  struct jobargs myarg1[N];
290 
291  sleep(1);
292  printf("1st round ...\n");
293  for (i = 0; i < N; i++)
294  {
295  myarg1[i].i = i;
296  myjob1[i] = XrdFfsQueue_create_task((void*) &job1, (void*) &myarg1[i], 0);
297  }
298  for (i = 0; i < N; i++)
299  {
300  XrdFfsQueue_wait_task(myjob1[i]);
301  XrdFfsQueue_free_task(myjob1[i]);
302  }
303 
304  printf("there are %d workers after 1st round\n", XrdFfsQueue_count_workers());
305  printf("remove %d workers\n", XrdFfsQueue_remove_workers(8));
306  printf("add 1 worker\n");
307  XrdFfsQueue_create_workers(10);
308 
309  sleep(2);
310  printf("2nd round ...\n");
311 
312  for (i = 0; i < N; i++)
313  {
314  myarg1[i].i = i;
315  myjob1[i] = XrdFfsQueue_create_task((void*) &job1, (void*) &myarg1[i], 0);
316  }
317  for (i = 0; i < N; i++)
318  {
319  XrdFfsQueue_wait_task(myjob1[i]);
320  XrdFfsQueue_free_task(myjob1[i]);
321  }
322 
323  XrdFfsQueue_remove_workers(XrdFfsQueue_count_workers());
324  printf("bye ...\n");
325  return 0;
326 }
327 
328 */
329 
330 #ifdef __cplusplus
331  }
332 #endif
void XrdFfsQueue_free_task(struct XrdFfsQueueTasks *task)
Definition: XrdFfsQueue.cc:111
struct XrdFfsQueueTasks * XrdFfsQueueTaskque_head
Definition: XrdFfsQueue.cc:39
int XrdFfsQueue_remove_workers(int n)
Definition: XrdFfsQueue.cc:229
pthread_mutex_t XrdFfsQueueWorker_mutex
Definition: XrdFfsQueue.cc:188
unsigned int XrdFfsQueueWorker_id
Definition: XrdFfsQueue.cc:190
unsigned int XrdFfsQueueNext_task_id
Definition: XrdFfsQueue.cc:41
struct XrdFfsQueueTasks * XrdFfsQueue_dequeue()
Definition: XrdFfsQueue.cc:70
struct XrdFfsQueueTasks * XrdFfsQueue_create_task(void *(*func)(void *), void **args, short initstat)
Definition: XrdFfsQueue.cc:97
pthread_mutex_t XrdFfsQueueTaskque_mutex
Definition: XrdFfsQueue.cc:42
void XrdFfsQueue_enqueue(struct XrdFfsQueueTasks *task)
Definition: XrdFfsQueue.cc:45
int XrdFfsQueue_count_workers()
Definition: XrdFfsQueue.cc:254
int XrdFfsQueue_create_workers(int n)
Definition: XrdFfsQueue.cc:192
struct XrdFfsQueueTasks * XrdFfsQueueTaskque_tail
Definition: XrdFfsQueue.cc:40
pthread_cond_t XrdFfsQueueTaskque_cond
Definition: XrdFfsQueue.cc:43
unsigned short XrdFfsQueueNworkers
Definition: XrdFfsQueue.cc:189
unsigned int XrdFfsQueue_count_tasks()
Definition: XrdFfsQueue.cc:131
void * XrdFfsQueue_worker(void *x)
Definition: XrdFfsQueue.cc:154
void XrdFfsQueue_wait_task(struct XrdFfsQueueTasks *task)
Definition: XrdFfsQueue.cc:123
pthread_cond_t cond
Definition: XrdFfsQueue.hh:38
unsigned int id
Definition: XrdFfsQueue.hh:43
struct XrdFfsQueueTasks * prev
Definition: XrdFfsQueue.hh:45
struct XrdFfsQueueTasks * next
Definition: XrdFfsQueue.hh:44
pthread_mutex_t mutex
Definition: XrdFfsQueue.hh:37
void *(* func)(void *)
Definition: XrdFfsQueue.hh:40