XRootD
XrdClTaskManager.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
20 #include "XrdCl/XrdClLog.hh"
21 #include "XrdCl/XrdClUtils.hh"
22 #include "XrdCl/XrdClDefaultEnv.hh"
23 #include "XrdCl/XrdClConstants.hh"
24 #include "XrdSys/XrdSysE2T.hh"
25 #include "XrdSys/XrdSysTimer.hh"
26 
27 #include <iostream>
28 
29 //------------------------------------------------------------------------------
30 // The thread
31 //------------------------------------------------------------------------------
32 extern "C"
33 {
34  static void *RunRunnerThread( void *arg )
35  {
36  using namespace XrdCl;
37  TaskManager *mgr = (TaskManager*)arg;
38  mgr->RunTasks();
39  return 0;
40  }
41 }
42 
43 namespace XrdCl
44 {
45  //----------------------------------------------------------------------------
46  // Constructor
47  //----------------------------------------------------------------------------
48  TaskManager::TaskManager(): pResolution(1), pRunnerThread(0), pRunning(false)
49  {}
50 
51  //----------------------------------------------------------------------------
52  // Destructor
53  //----------------------------------------------------------------------------
55  {
56  TaskSet::iterator it, itE;
57  for( it = pTasks.begin(); it != pTasks.end(); ++it )
58  if( it->own )
59  delete it->task;
60  }
61 
62  //----------------------------------------------------------------------------
63  // Start the manager
64  //----------------------------------------------------------------------------
66  {
67  XrdSysMutexHelper scopedLock( pOpMutex );
68  Log *log = DefaultEnv::GetLog();
69  log->Debug( TaskMgrMsg, "Starting the task manager..." );
70 
71  if( pRunning )
72  {
73  log->Error( TaskMgrMsg, "The task manager is already running" );
74  return false;
75  }
76 
77  int ret = ::pthread_create( &pRunnerThread, 0, ::RunRunnerThread, this );
78  if( ret != 0 )
79  {
80  log->Error( TaskMgrMsg, "Unable to spawn the task runner thread: %s",
81  XrdSysE2T( errno ) );
82  return false;
83  }
84  pRunning = true;
85  log->Debug( TaskMgrMsg, "Task manager started" );
86  return true;
87  }
88 
89  //----------------------------------------------------------------------------
90  // Stop the manager
91  //----------------------------------------------------------------------------
93  {
94  XrdSysMutexHelper scopedLock( pOpMutex );
95  Log *log = DefaultEnv::GetLog();
96  log->Debug( TaskMgrMsg, "Stopping the task manager..." );
97  if( !pRunning )
98  {
99  log->Error( TaskMgrMsg, "The task manager is not running" );
100  return false;
101  }
102 
103  if( ::pthread_cancel( pRunnerThread ) != 0 )
104  {
105  log->Error( TaskMgrMsg, "Unable to cancel the task runner thread: %s",
106  XrdSysE2T( errno ) );
107  return false;
108  }
109 
110  void *threadRet;
111  int ret = pthread_join( pRunnerThread, (void **)&threadRet );
112  if( ret != 0 )
113  {
114  log->Error( TaskMgrMsg, "Failed to join the task runner thread: %s",
115  XrdSysE2T( errno ) );
116  return false;
117  }
118 
119  pRunning = false;
120  log->Debug( TaskMgrMsg, "Task manager stopped" );
121  return true;
122  }
123 
124  //----------------------------------------------------------------------------
125  // Run the given task at the given time
126  //----------------------------------------------------------------------------
127  void TaskManager::RegisterTask( Task *task, time_t time, bool own )
128  {
129  Log *log = DefaultEnv::GetLog();
130 
131  log->Debug( TaskMgrMsg, "Registering task: \"%s\" to be run at: [%s]",
132  task->GetName().c_str(), Utils::TimeToString(time).c_str() );
133 
134  XrdSysMutexHelper scopedLock( pMutex );
135  pTasks.insert( TaskHelper( task, time, own ) );
136  }
137 
138  //--------------------------------------------------------------------------
139  // Remove a task if it hasn't run yet
140  //--------------------------------------------------------------------------
142  {
143  Log *log = DefaultEnv::GetLog();
144  log->Debug( TaskMgrMsg, "Requesting unregistration of: \"%s\"",
145  task->GetName().c_str() );
146  XrdSysMutexHelper scopedLock( pMutex );
147  pToBeUnregistered.push_back( task );
148  }
149 
150  //----------------------------------------------------------------------------
151  // Run tasks
152  //----------------------------------------------------------------------------
154  {
155  Log *log = DefaultEnv::GetLog();
156 
157  //--------------------------------------------------------------------------
158  // We want the thread to be cancelable only when we sleep between tasks
159  // execution
160  //--------------------------------------------------------------------------
161  pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 );
162 
163  for(;;)
164  {
165  pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, 0 );
166  pMutex.Lock();
167 
168  //------------------------------------------------------------------------
169  // Remove the tasks from the active set - super inefficient,
170  // but, hopefully, never really necessary. We first need to build a list
171  // of iterators because it is impossible to remove elements from
172  // a multiset when iterating over it
173  //------------------------------------------------------------------------
174  TaskList::iterator listIt = pToBeUnregistered.begin();
175  TaskSet::iterator it, itE;
176  std::list<TaskSet::iterator> iteratorList;
177  std::list<TaskSet::iterator>::iterator itRem;
178  for( ; listIt != pToBeUnregistered.end(); ++listIt )
179  {
180  for( it = pTasks.begin(); it != pTasks.end(); ++it )
181  {
182  if( it->task == *listIt )
183  iteratorList.push_back( it );
184  }
185  }
186 
187  for( itRem = iteratorList.begin(); itRem != iteratorList.end(); ++itRem )
188  {
189  Task *tsk = (*itRem)->task;
190  bool own = (*itRem)->own;
191  log->Debug( TaskMgrMsg, "Removing task: \"%s\"", tsk->GetName().c_str() );
192  pTasks.erase( *itRem );
193  if( own )
194  delete tsk;
195  }
196 
197  pToBeUnregistered.clear();
198 
199  //------------------------------------------------------------------------
200  // Select the tasks to be run
201  //------------------------------------------------------------------------
202  time_t now = time(0);
203  std::list<TaskHelper> toRun;
204  std::list<TaskHelper>::iterator trIt;
205 
206  it = pTasks.begin();
207  itE = pTasks.upper_bound( TaskHelper( 0, now ) );
208 
209  for( ; it != itE; ++it )
210  toRun.push_back( TaskHelper( it->task, 0, it->own ) );
211 
212  pTasks.erase( pTasks.begin(), itE );
213  pMutex.UnLock();
214 
215  //------------------------------------------------------------------------
216  // Run the tasks and reinsert them if necessary
217  //------------------------------------------------------------------------
218  for( trIt = toRun.begin(); trIt != toRun.end(); ++trIt )
219  {
220  log->Dump( TaskMgrMsg, "Running task: \"%s\"",
221  trIt->task->GetName().c_str() );
222  time_t schedule = trIt->task->Run( now );
223  if( schedule )
224  {
225  log->Dump( TaskMgrMsg, "Will rerun task \"%s\" at [%s]",
226  trIt->task->GetName().c_str(),
227  Utils::TimeToString(schedule).c_str() );
228  pMutex.Lock();
229  pTasks.insert( TaskHelper( trIt->task, schedule, trIt->own ) );
230  pMutex.UnLock();
231  }
232  else
233  {
234  log->Debug( TaskMgrMsg, "Done with task: \"%s\"",
235  trIt->task->GetName().c_str() );
236  if( trIt->own )
237  delete trIt->task;
238  }
239  }
240 
241  //------------------------------------------------------------------------
242  // Enable the cancellation and go to sleep
243  //------------------------------------------------------------------------
244  pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, 0 );
245  pthread_testcancel();
246  XrdSysTimer::Wait( pResolution*1000 );
247  }
248  }
249 }
static void * RunRunnerThread(void *arg)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
void RegisterTask(Task *task, time_t time, bool own=true)
void RunTasks()
Run the tasks - this loops infinitely.
TaskManager()
Constructor.
bool Start()
Start the manager.
~TaskManager()
Destructor.
void UnregisterTask(Task *task)
Interface for a task to be run by the TaskManager.
const std::string & GetName() const
Name of the task.
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
Definition: XrdClUtils.cc:256
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
const uint64_t TaskMgrMsg