XRootD
XrdCl::JobManager Class Reference

A synchronized queue. More...

#include <XrdClJobManager.hh>

+ Collaboration diagram for XrdCl::JobManager:

Public Member Functions

 JobManager (uint32_t workers)
 Constructor. More...
 
 ~JobManager ()
 Destructor. More...
 
bool Finalize ()
 Finalize the job manager, clear the queues. More...
 
bool Initialize ()
 Initialize the job manager. More...
 
bool IsWorker ()
 
void QueueJob (Job *job, void *arg=0)
 Add a job to be run. More...
 
void RunJobs ()
 Run the jobs. More...
 
bool Start ()
 Start the workers. More...
 
bool Stop ()
 Stop the workers. More...
 

Detailed Description

A synchronized queue.

Definition at line 50 of file XrdClJobManager.hh.

Constructor & Destructor Documentation

◆ JobManager()

XrdCl::JobManager::JobManager ( uint32_t  workers)
inline

Constructor.

Definition at line 56 of file XrdClJobManager.hh.

57  {
58  pRunning = false;
59  pWorkers.resize( workers );
60  }

◆ ~JobManager()

XrdCl::JobManager::~JobManager ( )
inline

Destructor.

Definition at line 65 of file XrdClJobManager.hh.

66  {
67  }

Member Function Documentation

◆ Finalize()

bool XrdCl::JobManager::Finalize ( )

Finalize the job manager, clear the queues.

Definition at line 52 of file XrdClJobManager.cc.

53  {
54  pJobs.Clear();
55  return true;
56  }
void Clear()
Clear the queue.

References XrdCl::SyncQueue< Item >::Clear().

Referenced by XrdEc::ThreadPool::~ThreadPool(), and XrdCl::CopyProcess::Run().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Initialize()

bool XrdCl::JobManager::Initialize ( )

Initialize the job manager.

Definition at line 44 of file XrdClJobManager.cc.

45  {
46  return true;
47  }

Referenced by XrdCl::CopyProcess::Run().

+ Here is the caller graph for this function:

◆ IsWorker()

bool XrdCl::JobManager::IsWorker ( )
inline

Definition at line 102 of file XrdClJobManager.hh.

103  {
104  pthread_t thread = pthread_self();
105  std::vector<pthread_t>::iterator itr =
106  std::find( pWorkers.begin(), pWorkers.end(), thread );
107  return itr != pWorkers.end();
108  }

◆ QueueJob()

void XrdCl::JobManager::QueueJob ( Job job,
void *  arg = 0 
)
inline

Add a job to be run.

Definition at line 92 of file XrdClJobManager.hh.

93  {
94  pJobs.Put( JobHelper( job, arg ) );
95  }
void Put(const Item &item)
Put the item in the queue.

References XrdCl::SyncQueue< Item >::Put().

Referenced by XrdCl::FileStateHandler::Close(), XrdEc::ThreadPool::Execute(), XrdCl::Stream::OnConnect(), XrdCl::Stream::OnIncoming(), XrdCl::LocalFileHandler::QueueTask(), XrdCl::CopyProcess::Run(), XrdCl::Operation< HasHndl >::Run(), XrdEc::ScheduleHandler(), and XrdCl::FileStateHandler::TimeOutRequests().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RunJobs()

void XrdCl::JobManager::RunJobs ( )

Run the jobs.

Definition at line 146 of file XrdClJobManager.cc.

147  {
148  pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 );
149  for( ;; )
150  {
151  JobHelper h = pJobs.Get();
152  pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, 0 );
153  h.job->Run( h.arg );
154  pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, 0 );
155  }
156  }
Item Get()
Get the item from the front of the queue.

References XrdCl::SyncQueue< Item >::Get().

Referenced by RunRunnerThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Start()

bool XrdCl::JobManager::Start ( )

Start the workers.

Definition at line 61 of file XrdClJobManager.cc.

62  {
63  XrdSysMutexHelper scopedLock( pMutex );
64  Log *log = DefaultEnv::GetLog();
65  log->Debug( JobMgrMsg, "Starting the job manager..." );
66 
67  if( pRunning )
68  {
69  log->Error( JobMgrMsg, "The job manager is already running" );
70  return false;
71  }
72 
73  for( uint32_t i = 0; i < pWorkers.size(); ++i )
74  {
75  int ret = ::pthread_create( &pWorkers[i], 0, ::RunRunnerThread, this );
76  if( ret != 0 )
77  {
78  log->Error( JobMgrMsg, "Unable to spawn a job worker thread: %s",
79  XrdSysE2T( errno ) );
80  if( i > 0 )
81  StopWorkers( i );
82  return false;
83  }
84  }
85  pRunning = true;
86  log->Debug( JobMgrMsg, "Job manager started, %zu workers", pWorkers.size() );
87  return true;
88  }
static void * RunRunnerThread(void *arg)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
const uint64_t JobMgrMsg

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::JobMgrMsg, RunRunnerThread(), and XrdSysE2T().

Referenced by XrdCl::CopyProcess::Run().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Stop()

bool XrdCl::JobManager::Stop ( )

Stop the workers.

Definition at line 93 of file XrdClJobManager.cc.

94  {
95  XrdSysMutexHelper scopedLock( pMutex );
96  Log *log = DefaultEnv::GetLog();
97  log->Debug( JobMgrMsg, "Stopping the job manager..." );
98  if( !pRunning )
99  {
100  log->Error( JobMgrMsg, "The job manager is not running" );
101  return false;
102  }
103 
104  StopWorkers( pWorkers.size() );
105 
106  pRunning = false;
107  log->Debug( JobMgrMsg, "Job manager stopped" );
108  return true;
109  }

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), and XrdCl::JobMgrMsg.

Referenced by XrdEc::ThreadPool::~ThreadPool(), and XrdCl::CopyProcess::Run().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: