XRootD
XrdCl::TaskManager Class Reference

#include <XrdClTaskManager.hh>

+ Collaboration diagram for XrdCl::TaskManager:

Public Member Functions

 TaskManager ()
 Constructor. More...
 
 ~TaskManager ()
 Destructor. More...
 
void RegisterTask (Task *task, time_t time, bool own=true)
 
void RunTasks ()
 Run the tasks - this loops infinitely. More...
 
bool Start ()
 Start the manager. More...
 
bool Stop ()
 
void UnregisterTask (Task *task)
 

Detailed Description

Run short tasks at a given time in the future

The task manager just runs one extra thread so the execution of one tasks may interfere with the execution of another

Definition at line 75 of file XrdClTaskManager.hh.

Constructor & Destructor Documentation

◆ TaskManager()

XrdCl::TaskManager::TaskManager ( )

Constructor.

Definition at line 48 of file XrdClTaskManager.cc.

48  : pResolution(1), pRunnerThread(0), pRunning(false)
49  {}

◆ ~TaskManager()

XrdCl::TaskManager::~TaskManager ( )

Destructor.

Definition at line 54 of file XrdClTaskManager.cc.

55  {
56  TaskSet::iterator it, itE;
57  for( it = pTasks.begin(); it != pTasks.end(); ++it )
58  if( it->own )
59  delete it->task;
60  }

Member Function Documentation

◆ RegisterTask()

void XrdCl::TaskManager::RegisterTask ( Task task,
time_t  time,
bool  own = true 
)

Run the given task at the given time.

Parameters
tasktask to be run
timetime at which the task should be run
owndetermines whether the task object should be destroyed when no longer needed

Definition at line 127 of file XrdClTaskManager.cc.

128  {
129  Log *log = DefaultEnv::GetLog();
130 
131  log->Debug( TaskMgrMsg, "Registering task: \"%s\" to be run at: [%s]",
132  task->GetName().c_str(), Utils::TimeToString(time).c_str() );
133 
134  XrdSysMutexHelper scopedLock( pMutex );
135  pTasks.insert( TaskHelper( task, time, own ) );
136  }
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
const std::string & GetName() const
Name of the task.
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
Definition: XrdClUtils.cc:256
const uint64_t TaskMgrMsg

References XrdCl::Log::Debug(), XrdCl::DefaultEnv::GetLog(), XrdCl::Task::GetName(), XrdCl::TaskMgrMsg, and XrdCl::Utils::TimeToString().

Referenced by XrdCl::Channel::Channel(), XrdCl::ForkHandler::Child(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Stream::OnConnectError(), and XrdCl::XRootDMsgHandler::Process().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RunTasks()

void XrdCl::TaskManager::RunTasks ( )

Run the tasks - this loops infinitely.

Definition at line 153 of file XrdClTaskManager.cc.

154  {
155  Log *log = DefaultEnv::GetLog();
156 
157  //--------------------------------------------------------------------------
158  // We want the thread to be cancelable only when we sleep between tasks
159  // execution
160  //--------------------------------------------------------------------------
161  pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 );
162 
163  for(;;)
164  {
165  pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, 0 );
166  pMutex.Lock();
167 
168  //------------------------------------------------------------------------
169  // Remove the tasks from the active set - super inefficient,
170  // but, hopefully, never really necessary. We first need to build a list
171  // of iterators because it is impossible to remove elements from
172  // a multiset when iterating over it
173  //------------------------------------------------------------------------
174  TaskList::iterator listIt = pToBeUnregistered.begin();
175  TaskSet::iterator it, itE;
176  std::list<TaskSet::iterator> iteratorList;
177  std::list<TaskSet::iterator>::iterator itRem;
178  for( ; listIt != pToBeUnregistered.end(); ++listIt )
179  {
180  for( it = pTasks.begin(); it != pTasks.end(); ++it )
181  {
182  if( it->task == *listIt )
183  iteratorList.push_back( it );
184  }
185  }
186 
187  for( itRem = iteratorList.begin(); itRem != iteratorList.end(); ++itRem )
188  {
189  Task *tsk = (*itRem)->task;
190  bool own = (*itRem)->own;
191  log->Debug( TaskMgrMsg, "Removing task: \"%s\"", tsk->GetName().c_str() );
192  pTasks.erase( *itRem );
193  if( own )
194  delete tsk;
195  }
196 
197  pToBeUnregistered.clear();
198 
199  //------------------------------------------------------------------------
200  // Select the tasks to be run
201  //------------------------------------------------------------------------
202  time_t now = time(0);
203  std::list<TaskHelper> toRun;
204  std::list<TaskHelper>::iterator trIt;
205 
206  it = pTasks.begin();
207  itE = pTasks.upper_bound( TaskHelper( 0, now ) );
208 
209  for( ; it != itE; ++it )
210  toRun.push_back( TaskHelper( it->task, 0, it->own ) );
211 
212  pTasks.erase( pTasks.begin(), itE );
213  pMutex.UnLock();
214 
215  //------------------------------------------------------------------------
216  // Run the tasks and reinsert them if necessary
217  //------------------------------------------------------------------------
218  for( trIt = toRun.begin(); trIt != toRun.end(); ++trIt )
219  {
220  log->Dump( TaskMgrMsg, "Running task: \"%s\"",
221  trIt->task->GetName().c_str() );
222  time_t schedule = trIt->task->Run( now );
223  if( schedule )
224  {
225  log->Dump( TaskMgrMsg, "Will rerun task \"%s\" at [%s]",
226  trIt->task->GetName().c_str(),
227  Utils::TimeToString(schedule).c_str() );
228  pMutex.Lock();
229  pTasks.insert( TaskHelper( trIt->task, schedule, trIt->own ) );
230  pMutex.UnLock();
231  }
232  else
233  {
234  log->Debug( TaskMgrMsg, "Done with task: \"%s\"",
235  trIt->task->GetName().c_str() );
236  if( trIt->own )
237  delete trIt->task;
238  }
239  }
240 
241  //------------------------------------------------------------------------
242  // Enable the cancellation and go to sleep
243  //------------------------------------------------------------------------
244  pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, 0 );
245  pthread_testcancel();
246  XrdSysTimer::Wait( pResolution*1000 );
247  }
248  }
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
Interface for a task to be run by the TaskManager.
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227

References XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::Task::GetName(), XrdSysMutex::Lock(), XrdCl::TaskMgrMsg, XrdCl::Utils::TimeToString(), XrdSysMutex::UnLock(), and XrdSysTimer::Wait().

Referenced by RunRunnerThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Start()

bool XrdCl::TaskManager::Start ( )

Start the manager.

Definition at line 65 of file XrdClTaskManager.cc.

66  {
67  XrdSysMutexHelper scopedLock( pOpMutex );
68  Log *log = DefaultEnv::GetLog();
69  log->Debug( TaskMgrMsg, "Starting the task manager..." );
70 
71  if( pRunning )
72  {
73  log->Error( TaskMgrMsg, "The task manager is already running" );
74  return false;
75  }
76 
77  int ret = ::pthread_create( &pRunnerThread, 0, ::RunRunnerThread, this );
78  if( ret != 0 )
79  {
80  log->Error( TaskMgrMsg, "Unable to spawn the task runner thread: %s",
81  XrdSysE2T( errno ) );
82  return false;
83  }
84  pRunning = true;
85  log->Debug( TaskMgrMsg, "Task manager started" );
86  return true;
87  }
static void * RunRunnerThread(void *arg)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), RunRunnerThread(), XrdCl::TaskMgrMsg, and XrdSysE2T().

+ Here is the call graph for this function:

◆ Stop()

bool XrdCl::TaskManager::Stop ( )

Stop the manager

Will wait until the currently running task completes

Definition at line 92 of file XrdClTaskManager.cc.

93  {
94  XrdSysMutexHelper scopedLock( pOpMutex );
95  Log *log = DefaultEnv::GetLog();
96  log->Debug( TaskMgrMsg, "Stopping the task manager..." );
97  if( !pRunning )
98  {
99  log->Error( TaskMgrMsg, "The task manager is not running" );
100  return false;
101  }
102 
103  if( ::pthread_cancel( pRunnerThread ) != 0 )
104  {
105  log->Error( TaskMgrMsg, "Unable to cancel the task runner thread: %s",
106  XrdSysE2T( errno ) );
107  return false;
108  }
109 
110  void *threadRet;
111  int ret = pthread_join( pRunnerThread, (void **)&threadRet );
112  if( ret != 0 )
113  {
114  log->Error( TaskMgrMsg, "Failed to join the task runner thread: %s",
115  XrdSysE2T( errno ) );
116  return false;
117  }
118 
119  pRunning = false;
120  log->Debug( TaskMgrMsg, "Task manager stopped" );
121  return true;
122  }

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::TaskMgrMsg, and XrdSysE2T().

+ Here is the call graph for this function:

◆ UnregisterTask()

void XrdCl::TaskManager::UnregisterTask ( Task task)

Remove a task, the unregistration process is asynchronous and may be performed at any point in the future, the function just queues the request. Unregistered task gets destroyed if it was owned by the task manager.

Definition at line 141 of file XrdClTaskManager.cc.

142  {
143  Log *log = DefaultEnv::GetLog();
144  log->Debug( TaskMgrMsg, "Requesting unregistration of: \"%s\"",
145  task->GetName().c_str() );
146  XrdSysMutexHelper scopedLock( pMutex );
147  pToBeUnregistered.push_back( task );
148  }

References XrdCl::Log::Debug(), XrdCl::DefaultEnv::GetLog(), XrdCl::Task::GetName(), and XrdCl::TaskMgrMsg.

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: