00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00035 #include "OW_config.h"
00036 #include "OW_ThreadPool.hpp"
00037 #include "OW_Array.hpp"
00038 #include "OW_Thread.hpp"
00039 #include "OW_NonRecursiveMutex.hpp"
00040 #include "OW_NonRecursiveMutexLock.hpp"
00041 #include "OW_Condition.hpp"
00042 #include "OW_Format.hpp"
00043 #include "OW_Mutex.hpp"
00044 #include "OW_MutexLock.hpp"
00045 #include "OW_NullLogger.hpp"
00046
00047 #include <deque>
00048
00049 #ifdef OW_DEBUG
00050 #include <iostream>
00051 #endif
00052
00053 namespace OW_NAMESPACE
00054 {
00055
00056 OW_DEFINE_EXCEPTION(ThreadPool);
00057
00058
00059 #define OW_POOL_LOG_DEBUG(logger, arg) do { if ((logger)) OW_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
00060 #define OW_POOL_LOG_FATAL_ERROR(logger, arg) do { if ((logger)) OW_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
00061
00063 class ThreadPoolImpl : public IntrusiveCountableBase
00064 {
00065 public:
00066
00067 virtual bool addWork(const RunnableRef& work, bool blockWhenFull) = 0;
00068 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs) = 0;
00069 virtual void waitForEmptyQueue() = 0;
00070 virtual ~ThreadPoolImpl()
00071 {
00072 }
00073 };
00074 namespace {
00075 class FixedSizePoolImpl;
00077 class FixedSizePoolWorkerThread : public Thread
00078 {
00079 public:
00080 FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
00081 : Thread()
00082 , m_thePool(thePool)
00083 {
00084 }
00085 virtual Int32 run();
00086 private:
00087 virtual void doCooperativeCancel()
00088 {
00089 MutexLock lock(m_guard);
00090 if (m_currentRunnable)
00091 {
00092 m_currentRunnable->doCooperativeCancel();
00093 }
00094 }
00095 virtual void doDefinitiveCancel()
00096 {
00097 MutexLock lock(m_guard);
00098 if (m_currentRunnable)
00099 {
00100 m_currentRunnable->doCooperativeCancel();
00101 }
00102 }
00103
00104 FixedSizePoolImpl* m_thePool;
00105
00106 Mutex m_guard;
00107 RunnableRef m_currentRunnable;
00108
00109
00110 FixedSizePoolWorkerThread(const FixedSizePoolWorkerThread&);
00111 FixedSizePoolWorkerThread& operator=(const FixedSizePoolWorkerThread&);
00112 };
00114 class CommonPoolImpl : public ThreadPoolImpl
00115 {
00116 protected:
00117 CommonPoolImpl(UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00118 : m_maxQueueSize(maxQueueSize)
00119 , m_queueClosed(false)
00120 , m_shutdown(false)
00121 , m_logger(logger)
00122 , m_poolName(poolName)
00123 {
00124 }
00125
00126 virtual ~CommonPoolImpl()
00127 {
00128 }
00129
00130
00131 virtual bool queueIsFull() const
00132 {
00133 return ((m_maxQueueSize > 0) && (m_queue.size() == m_maxQueueSize));
00134 }
00135
00136
00137 bool queueClosed() const
00138 {
00139 return m_shutdown || m_queueClosed;
00140 }
00141
00142 bool finishOffWorkInQueue(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00143 {
00144 NonRecursiveMutexLock l(m_queueLock);
00145
00146 if (queueClosed())
00147 {
00148 OW_POOL_LOG_DEBUG(m_logger, "Queue is already closed. Why are you trying to shutdown again?");
00149 return false;
00150 }
00151 m_queueClosed = true;
00152 OW_POOL_LOG_DEBUG(m_logger, "Queue closed");
00153
00154 if (finishWorkInQueue)
00155 {
00156 while (m_queue.size() != 0)
00157 {
00158 if (shutdownSecs < 0)
00159 {
00160 OW_POOL_LOG_DEBUG(m_logger, "Waiting forever for queue to empty");
00161 m_queueEmpty.wait(l);
00162 }
00163 else
00164 {
00165 OW_POOL_LOG_DEBUG(m_logger, "Waiting w/timout for queue to empty");
00166 if (!m_queueEmpty.timedWait(l, shutdownSecs))
00167 {
00168 OW_POOL_LOG_DEBUG(m_logger, "Wait timed out. Work in queue will be discarded.");
00169 break;
00170 }
00171 }
00172 }
00173 }
00174 m_shutdown = true;
00175 return true;
00176 }
00177
00178 virtual void waitForEmptyQueue()
00179 {
00180 NonRecursiveMutexLock l(m_queueLock);
00181 while (m_queue.size() != 0)
00182 {
00183 OW_POOL_LOG_DEBUG(m_logger, "Waiting for empty queue");
00184 m_queueEmpty.wait(l);
00185 }
00186 OW_POOL_LOG_DEBUG(m_logger, "Queue empty: the wait is over");
00187 }
00188
00189 void shutdownThreads(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00190 {
00191 if (!finishOffWorkInQueue(finishWorkInQueue, shutdownSecs))
00192 {
00193 return;
00194 }
00195
00196
00197 m_queueNotEmpty.notifyAll();
00198 m_queueNotFull.notifyAll();
00199
00200 if (shutdownSecs >= 0)
00201 {
00202
00203 for (UInt32 i = 0; i < m_threads.size(); ++i)
00204 {
00205 OW_POOL_LOG_DEBUG(m_logger, Format("Calling cooperativeCancel on thread %1", i));
00206 m_threads[i]->cooperativeCancel();
00207 }
00208
00209 for (UInt32 i = 0; i < m_threads.size(); ++i)
00210 {
00211 OW_POOL_LOG_DEBUG(m_logger, Format("Calling definitiveCancel on thread %1", i));
00212 if (!m_threads[i]->definitiveCancel(shutdownSecs))
00213 {
00214 OW_POOL_LOG_FATAL_ERROR(m_logger, Format("Thread %1 was forcibly cancelled.", i));
00215 }
00216 }
00217 }
00218
00219 for (UInt32 i = 0; i < m_threads.size(); ++i)
00220 {
00221 OW_POOL_LOG_DEBUG(m_logger, Format("calling join() on thread %1", i));
00222 m_threads[i]->join();
00223 OW_POOL_LOG_DEBUG(m_logger, Format("join() finished for thread %1", i));
00224 }
00225 }
00226
00227 RunnableRef getWorkFromQueue(bool waitForWork)
00228 {
00229 NonRecursiveMutexLock l(m_queueLock);
00230 while ((m_queue.size() == 0) && (!m_shutdown))
00231 {
00232 if (waitForWork)
00233 {
00234 OW_POOL_LOG_DEBUG(m_logger, "Waiting for work");
00235 m_queueNotEmpty.wait(l);
00236 }
00237 else
00238 {
00239
00240
00241 if (!m_queueNotEmpty.timedWait(l,1))
00242 {
00243 OW_POOL_LOG_DEBUG(m_logger, "No work after 1 sec. I'm not waiting any longer");
00244 return RunnableRef();
00245 }
00246 }
00247 }
00248
00249 if (m_shutdown)
00250 {
00251 OW_POOL_LOG_DEBUG(m_logger, "The pool is shutdown, not getting any more work");
00252 return RunnableRef();
00253 }
00254
00255 RunnableRef work = m_queue.front();
00256 m_queue.pop_front();
00257
00258
00259 if (!queueIsFull())
00260 {
00261 m_queueNotFull.notifyAll();
00262 }
00263
00264
00265 if (m_queue.size() == 0)
00266 {
00267 m_queueEmpty.notifyAll();
00268 }
00269 OW_POOL_LOG_DEBUG(m_logger, "A thread got some work to do");
00270 return work;
00271 }
00272
00273
00274 UInt32 m_maxQueueSize;
00275
00276 Array<ThreadRef> m_threads;
00277 std::deque<RunnableRef> m_queue;
00278 bool m_queueClosed;
00279 bool m_shutdown;
00280
00281 NonRecursiveMutex m_queueLock;
00282 Condition m_queueNotFull;
00283 Condition m_queueEmpty;
00284 Condition m_queueNotEmpty;
00285 LoggerRef m_logger;
00286 String m_poolName;
00287 };
00288 class FixedSizePoolImpl : public CommonPoolImpl
00289 {
00290 public:
00291 FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00292 : CommonPoolImpl(maxQueueSize, logger, poolName)
00293 {
00294
00295 m_threads.reserve(numThreads);
00296 for (UInt32 i = 0; i < numThreads; ++i)
00297 {
00298 m_threads.push_back(ThreadRef(new FixedSizePoolWorkerThread(this)));
00299 }
00300 for (UInt32 i = 0; i < numThreads; ++i)
00301 {
00302 m_threads[i]->start();
00303 }
00304 OW_POOL_LOG_DEBUG(m_logger, "Threads are started and ready to go");
00305 }
00306
00307 virtual bool addWork(const RunnableRef& work, bool blockWhenFull)
00308 {
00309
00310 if (!work)
00311 {
00312 OW_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00313 return false;
00314 }
00315 NonRecursiveMutexLock l(m_queueLock);
00316 if (!blockWhenFull && queueIsFull())
00317 {
00318 OW_POOL_LOG_DEBUG(m_logger, "Queue is full. Not adding work and returning false");
00319 return false;
00320 }
00321 while ( queueIsFull() && !queueClosed() )
00322 {
00323 OW_POOL_LOG_DEBUG(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00324 m_queueNotFull.wait(l);
00325 }
00326
00327 if (queueClosed())
00328 {
00329 OW_POOL_LOG_DEBUG(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00330 return false;
00331 }
00332 m_queue.push_back(work);
00333
00334
00335 if (m_queue.size() == 1)
00336 {
00337 OW_POOL_LOG_DEBUG(m_logger, "Waking up sleepy workers");
00338 m_queueNotEmpty.notifyAll();
00339 }
00340
00341 OW_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00342 return true;
00343 }
00344
00345
00346 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00347 {
00348 shutdownThreads(finishWorkInQueue, shutdownSecs);
00349 }
00350 virtual ~FixedSizePoolImpl()
00351 {
00352
00353 try
00354 {
00355
00356 if (!queueClosed())
00357 {
00358
00359
00360 this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 1);
00361 }
00362 }
00363 catch (...)
00364 {
00365 }
00366 }
00367 private:
00368 friend class FixedSizePoolWorkerThread;
00369 };
00370 void runRunnable(const RunnableRef& work)
00371 {
00372
00373 try
00374 {
00375 work->run();
00376 }
00377 catch (ThreadCancelledException&)
00378 {
00379 throw;
00380 }
00381 catch (Exception& ex)
00382 {
00383 #ifdef OW_DEBUG
00384 std::cerr << "!!! Exception: " << ex.type() << " caught in ThreadPool worker: " << ex << std::endl;
00385 #endif
00386 }
00387 catch(std::exception& ex)
00388 {
00389 #ifdef OW_DEBUG
00390 std::cerr << "!!! std::exception what = " << ex.what() << std::endl;
00391 #endif
00392 }
00393 catch (...)
00394 {
00395 #ifdef OW_DEBUG
00396 std::cerr << "!!! Unknown Exception caught in ThreadPool worker" << std::endl;
00397 #endif
00398 }
00399 }
00400 Int32 FixedSizePoolWorkerThread::run()
00401 {
00402 while (true)
00403 {
00404
00405 RunnableRef work = m_thePool->getWorkFromQueue(true);
00406 if (!work)
00407 {
00408 return 0;
00409 }
00410
00411 {
00412 MutexLock lock(m_guard);
00413 m_currentRunnable = work;
00414 }
00415 runRunnable(work);
00416 {
00417 MutexLock lock(m_guard);
00418 m_currentRunnable = 0;
00419 }
00420 }
00421 return 0;
00422 }
00423 class DynamicSizePoolImpl;
00425 class DynamicSizePoolWorkerThread : public Thread
00426 {
00427 public:
00428 DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
00429 : Thread()
00430 , m_thePool(thePool)
00431 {
00432 }
00433 virtual Int32 run();
00434 private:
00435 virtual void doCooperativeCancel()
00436 {
00437 MutexLock lock(m_guard);
00438 if (m_currentRunnable)
00439 {
00440 m_currentRunnable->doCooperativeCancel();
00441 }
00442 }
00443 virtual void doDefinitiveCancel()
00444 {
00445 MutexLock lock(m_guard);
00446 if (m_currentRunnable)
00447 {
00448 m_currentRunnable->doCooperativeCancel();
00449 }
00450 }
00451
00452 DynamicSizePoolImpl* m_thePool;
00453
00454 Mutex m_guard;
00455 RunnableRef m_currentRunnable;
00456
00457
00458 DynamicSizePoolWorkerThread(const DynamicSizePoolWorkerThread&);
00459 DynamicSizePoolWorkerThread& operator=(const DynamicSizePoolWorkerThread&);
00460 };
00462 class DynamicSizePoolImpl : public CommonPoolImpl
00463 {
00464 public:
00465 DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize, const LoggerRef& logger, const String& poolName)
00466 : CommonPoolImpl(maxQueueSize, logger, poolName)
00467 , m_maxThreads(maxThreads)
00468 {
00469 }
00470
00471 virtual bool addWork(const RunnableRef& work, bool blockWhenFull)
00472 {
00473
00474 if (!work)
00475 {
00476 OW_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00477 return false;
00478 }
00479 NonRecursiveMutexLock l(m_queueLock);
00480
00481
00482 if (queueClosed())
00483 {
00484 OW_POOL_LOG_DEBUG(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00485 return false;
00486 }
00487
00488
00489
00490
00491 size_t i = 0;
00492 while (i < m_threads.size())
00493 {
00494 if (!m_threads[i]->isRunning())
00495 {
00496 OW_POOL_LOG_DEBUG(m_logger, Format("Thread %1 is finished. Cleaning up it's remains.", i));
00497 m_threads[i]->join();
00498 m_threads.remove(i);
00499 }
00500 else
00501 {
00502 ++i;
00503 }
00504 }
00505
00506 if (!blockWhenFull && queueIsFull())
00507 {
00508 OW_POOL_LOG_DEBUG(m_logger, "Queue is full. Not adding work and returning false");
00509 return false;
00510 }
00511 while ( queueIsFull() && !queueClosed() )
00512 {
00513 OW_POOL_LOG_DEBUG(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00514 m_queueNotFull.wait(l);
00515 }
00516
00517 m_queue.push_back(work);
00518
00519 OW_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00520
00521
00522
00523
00524
00525
00526
00527 l.release();
00528 m_queueNotEmpty.notifyOne();
00529 Thread::yield();
00530 l.lock();
00531
00532
00533 if (!m_queue.empty() && m_threads.size() < m_maxThreads)
00534 {
00535 ThreadRef theThread(new DynamicSizePoolWorkerThread(this));
00536 m_threads.push_back(theThread);
00537 OW_POOL_LOG_DEBUG(m_logger, "About to start a new thread");
00538 theThread->start();
00539 OW_POOL_LOG_DEBUG(m_logger, "New thread started");
00540 }
00541 return true;
00542 }
00543
00544
00545 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00546 {
00547 shutdownThreads(finishWorkInQueue, shutdownSecs);
00548 }
00549 virtual ~DynamicSizePoolImpl()
00550 {
00551
00552 try
00553 {
00554
00555 if (!queueClosed())
00556 {
00557
00558
00559 this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 1);
00560 }
00561 }
00562 catch (...)
00563 {
00564 }
00565 }
00566 protected:
00567 UInt32 getMaxThreads() const
00568 {
00569 return m_maxThreads;
00570 }
00571
00572 private:
00573
00574 UInt32 m_maxThreads;
00575 friend class DynamicSizePoolWorkerThread;
00576 };
00577 Int32 DynamicSizePoolWorkerThread::run()
00578 {
00579 while (true)
00580 {
00581
00582 RunnableRef work = m_thePool->getWorkFromQueue(false);
00583 if (!work)
00584 {
00585 return 0;
00586 }
00587
00588 {
00589 MutexLock lock(m_guard);
00590 m_currentRunnable = work;
00591 }
00592 runRunnable(work);
00593 {
00594 MutexLock lock(m_guard);
00595 m_currentRunnable = 0;
00596 }
00597 }
00598 return 0;
00599 }
00600
00602 class DynamicSizeNoQueuePoolImpl : public DynamicSizePoolImpl
00603 {
00604 public:
00605 DynamicSizeNoQueuePoolImpl(UInt32 maxThreads, const LoggerRef& logger, const String& poolName)
00606 : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName)
00607 {
00608 }
00609
00610 virtual ~DynamicSizeNoQueuePoolImpl()
00611 {
00612 }
00613
00614
00615 virtual bool queueIsFull() const
00616 {
00617
00618
00619 size_t freeThreads = getMaxThreads() - m_threads.size();
00620 return (freeThreads <= m_queue.size());
00621 }
00622
00623 };
00624
00625 }
00627 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const LoggerRef& logger_, const String& poolName)
00628 {
00629 LoggerRef logger(logger_);
00630 if (!logger)
00631 {
00632 logger = LoggerRef(new NullLogger());
00633 }
00634 switch (poolType)
00635 {
00636 case FIXED_SIZE:
00637 m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00638 break;
00639 case DYNAMIC_SIZE:
00640 m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00641 break;
00642 case DYNAMIC_SIZE_NO_QUEUE:
00643 m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
00644 break;
00645 }
00646 }
00648 bool ThreadPool::addWork(const RunnableRef& work)
00649 {
00650 return m_impl->addWork(work, true);
00651 }
00653 bool ThreadPool::tryAddWork(const RunnableRef& work)
00654 {
00655 return m_impl->addWork(work, false);
00656 }
00658 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00659 {
00660 m_impl->shutdown(finishWorkInQueue, shutdownSecs);
00661 }
00663 void ThreadPool::waitForEmptyQueue()
00664 {
00665 m_impl->waitForEmptyQueue();
00666 }
00668 ThreadPool::~ThreadPool()
00669 {
00670 }
00672 ThreadPool::ThreadPool(const ThreadPool& x)
00673 : IntrusiveCountableBase(x)
00674 , m_impl(x.m_impl)
00675 {
00676 }
00678 ThreadPool& ThreadPool::operator=(const ThreadPool& x)
00679 {
00680 m_impl = x.m_impl;
00681 return *this;
00682 }
00683
00684 }
00685