XRootD
XrdClJobManager.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2013 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #include "XrdCl/XrdClJobManager.hh"
20 #include "XrdCl/XrdClLog.hh"
21 #include "XrdCl/XrdClDefaultEnv.hh"
22 #include "XrdCl/XrdClConstants.hh"
23 #include "XrdSys/XrdSysE2T.hh"
24 
25 //------------------------------------------------------------------------------
26 // The thread
27 //------------------------------------------------------------------------------
28 extern "C"
29 {
30  static void *RunRunnerThread( void *arg )
31  {
32  using namespace XrdCl;
33  JobManager *mgr = (JobManager*)arg;
34  mgr->RunJobs();
35  return 0;
36  }
37 }
38 
39 namespace XrdCl
40 {
41  //----------------------------------------------------------------------------
42  // Initialize the job manager
43  //----------------------------------------------------------------------------
45  {
46  return true;
47  }
48 
49  //----------------------------------------------------------------------------
50  // Finalize the job manager, clear the queues
51  //----------------------------------------------------------------------------
53  {
54  pJobs.Clear();
55  return true;
56  }
57 
58  //----------------------------------------------------------------------------
59  // Start the workers
60  //----------------------------------------------------------------------------
62  {
63  XrdSysMutexHelper scopedLock( pMutex );
64  Log *log = DefaultEnv::GetLog();
65  log->Debug( JobMgrMsg, "Starting the job manager..." );
66 
67  if( pRunning )
68  {
69  log->Error( JobMgrMsg, "The job manager is already running" );
70  return false;
71  }
72 
73  for( uint32_t i = 0; i < pWorkers.size(); ++i )
74  {
75  int ret = ::pthread_create( &pWorkers[i], 0, ::RunRunnerThread, this );
76  if( ret != 0 )
77  {
78  log->Error( JobMgrMsg, "Unable to spawn a job worker thread: %s",
79  XrdSysE2T( errno ) );
80  if( i > 0 )
81  StopWorkers( i );
82  return false;
83  }
84  }
85  pRunning = true;
86  log->Debug( JobMgrMsg, "Job manager started, %zu workers", pWorkers.size() );
87  return true;
88  }
89 
90  //----------------------------------------------------------------------------
91  // Stop the workers
92  //----------------------------------------------------------------------------
94  {
95  XrdSysMutexHelper scopedLock( pMutex );
96  Log *log = DefaultEnv::GetLog();
97  log->Debug( JobMgrMsg, "Stopping the job manager..." );
98  if( !pRunning )
99  {
100  log->Error( JobMgrMsg, "The job manager is not running" );
101  return false;
102  }
103 
104  StopWorkers( pWorkers.size() );
105 
106  pRunning = false;
107  log->Debug( JobMgrMsg, "Job manager stopped" );
108  return true;
109  }
110 
111  //----------------------------------------------------------------------------
112  // Stop all workers up to n'th
113  //----------------------------------------------------------------------------
114  void JobManager::StopWorkers( uint32_t n )
115  {
116  Log *log = DefaultEnv::GetLog();
117  for( uint32_t i = 0; i < n; ++i )
118  {
119  void *threadRet;
120  log->Dump( JobMgrMsg, "Stopping worker #%d...", i );
121  int rc = pthread_cancel( pWorkers[i] );
122  if( rc != 0 )
123  {
124  log->Error( TaskMgrMsg, "Unable to cancel worker #%d: %s", i,
125  XrdSysE2T( errno ) );
126  if( rc == ESRCH ) continue;
127  abort();
128  }
129 
130  rc = pthread_join( pWorkers[i], (void**)&threadRet );
131  if( rc != 0 )
132  {
133  log->Error( TaskMgrMsg, "Unable to join worker #%d: %s", i,
134  XrdSysE2T( errno ) );
135  if( rc == ESRCH ) continue;
136  abort();
137  }
138 
139  log->Dump( JobMgrMsg, "Worker #%d stopped", i );
140  }
141  }
142 
143  //----------------------------------------------------------------------------
144  // Initialize the job manager
145  //----------------------------------------------------------------------------
147  {
148  pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 );
149  for( ;; )
150  {
151  JobHelper h = pJobs.Get();
152  pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, 0 );
153  h.job->Run( h.arg );
154  pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, 0 );
155  }
156  }
157 }
static void * RunRunnerThread(void *arg)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
static Log * GetLog()
Get default log.
A synchronized queue.
bool Finalize()
Finalize the job manager, clear the queues.
bool Start()
Start the workers.
bool Initialize()
Initialize the job manager.
void RunJobs()
Run the jobs.
bool Stop()
Stop the workers.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
void Clear()
Clear the queue.
Item Get()
Get the item from the front of the queue.
const uint64_t TaskMgrMsg
const uint64_t JobMgrMsg