00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00036 #include "OW_config.h"
00037 #include "OW_PollingManager.hpp"
00038 #include "OW_NonRecursiveMutexLock.hpp"
00039 #include "OW_DateTime.hpp"
00040 #include "OW_CIMOMHandleIFC.hpp"
00041 #include "OW_Format.hpp"
00042 #include "OW_ConfigOpts.hpp"
00043 #include "OW_PolledProviderIFC.hpp"
00044 #include "OW_ProviderManager.hpp"
00045 #include "OW_Platform.hpp"
00046 #include "OW_TimeoutException.hpp"
00047 #include "OW_OperationContext.hpp"
00048 #include "OW_RepositoryIFC.hpp"
00049 #include "OW_ServiceIFCNames.hpp"
00050
00051
00052 #include <limits>
00053
00054 namespace
00055 {
00056 template <bool b> struct compile_time_assert;
00057 template <> struct compile_time_assert<true> { };
00058 using OpenWBEM::Int32;
00059
00060
00061 time_t safe_add(time_t x, Int32 y)
00062 {
00063 compile_time_assert<(sizeof(time_t) >= sizeof(Int32))> dummy;
00064 time_t const max_time = std::numeric_limits<time_t>::max();
00065 return (
00066 y <= 0 ? x :
00067 x > max_time - y ? max_time :
00068 x + y
00069 );
00070 }
00071 }
00072
00073 namespace OW_NAMESPACE
00074 {
00075
00076 namespace
00077 {
00078 const String COMPONENT_NAME("ow.owcimomd.PollingManager");
00079 }
00080
00082 PollingManager::PollingManager(const ProviderManagerRef& providerManager)
00083 : m_pollingManagerThread(new PollingManagerThread(providerManager))
00084 {
00085
00086 }
00087
00089 PollingManager::~PollingManager()
00090 {
00091 }
00092
00094 void
00095 PollingManager::init(const ServiceEnvironmentIFCRef& env)
00096 {
00097 m_pollingManagerThread->init(env);
00098 }
00099
00101 void
00102 PollingManager::start()
00103 {
00104 m_pollingManagerThread->start();
00105 m_pollingManagerThread->waitUntilReady();
00106 }
00107
00109 void
00110 PollingManager::shutdown()
00111 {
00112 m_pollingManagerThread->shutdown();
00113 }
00114
00116 void
00117 PollingManager::addPolledProvider(const PolledProviderIFCRef& p)
00118 {
00119 m_pollingManagerThread->addPolledProvider(p);
00120 }
00121
00122
00123
00125 PollingManagerThread::PollingManagerThread(const ProviderManagerRef& providerManager)
00126 : Thread()
00127 , m_shuttingDown(false)
00128 , m_providerManager(providerManager)
00129 , m_startedBarrier(2)
00130 {
00131 }
00133 PollingManagerThread::~PollingManagerThread()
00134 {
00135 }
00137 String
00138 PollingManager::getName() const
00139 {
00140 return ServiceIFCNames::PollingManager;
00141 }
00142
00144 StringArray
00145 PollingManager::getDependencies() const
00146 {
00147 StringArray rv;
00148 rv.push_back(ServiceIFCNames::CIMServer);
00149 return rv;
00150 }
00151
00153 void
00154 PollingManagerThread::init(const ServiceEnvironmentIFCRef& env)
00155 {
00156 m_env = env;
00157 m_logger = m_env->getLogger(COMPONENT_NAME);
00158 Int32 maxThreads;
00159 try
00160 {
00161 maxThreads = env->getConfigItem(ConfigOpts::POLLING_MANAGER_MAX_THREADS_opt, OW_DEFAULT_POLLING_MANAGER_MAX_THREADS).toInt32();
00162 }
00163 catch (const StringConversionException&)
00164 {
00165 maxThreads = String(OW_DEFAULT_POLLING_MANAGER_MAX_THREADS).toInt32();
00166 }
00167
00168 m_triggerRunnerThreadPool = ThreadPoolRef(new ThreadPool(ThreadPool::DYNAMIC_SIZE, maxThreads, maxThreads * 10,
00169 m_logger, "Polling Manager"));
00170 }
00171
00173 namespace
00174 {
00175 class PollingManagerProviderEnvironment : public ProviderEnvironmentIFC
00176 {
00177 public:
00178 PollingManagerProviderEnvironment(ServiceEnvironmentIFCRef env)
00179 : m_context()
00180 , m_env(env)
00181 {}
00182 virtual CIMOMHandleIFCRef getCIMOMHandle() const
00183 {
00184 return m_env->getCIMOMHandle(m_context);
00185 }
00186 virtual CIMOMHandleIFCRef getRepositoryCIMOMHandle() const
00187 {
00188 return m_env->getCIMOMHandle(m_context, ServiceEnvironmentIFC::E_BYPASS_PROVIDERS);
00189 }
00190 virtual RepositoryIFCRef getRepository() const
00191 {
00192 return m_env->getRepository();
00193 }
00194 virtual String getConfigItem(const String& name, const String& defRetVal="") const
00195 {
00196 return m_env->getConfigItem(name, defRetVal);
00197 }
00198 virtual StringArray getMultiConfigItem(const String &itemName,
00199 const StringArray& defRetVal, const char* tokenizeSeparator = 0) const
00200 {
00201 return m_env->getMultiConfigItem(itemName, defRetVal, tokenizeSeparator);
00202 }
00203
00204 virtual LoggerRef getLogger() const
00205 {
00206 return m_env->getLogger(COMPONENT_NAME);
00207 }
00208 virtual LoggerRef getLogger(const String& componentName) const
00209 {
00210 return m_env->getLogger(componentName);
00211 }
00212 virtual String getUserName() const
00213 {
00214 return Platform::getCurrentUserName();
00215 }
00216 virtual OperationContext& getOperationContext()
00217 {
00218 return m_context;
00219 }
00220 virtual ProviderEnvironmentIFCRef clone() const
00221 {
00222 return ProviderEnvironmentIFCRef(new PollingManagerProviderEnvironment(m_env));
00223 }
00224 private:
00225 mutable OperationContext m_context;
00226 ServiceEnvironmentIFCRef m_env;
00227 };
00228 ProviderEnvironmentIFCRef createProvEnvRef(ServiceEnvironmentIFCRef env)
00229 {
00230 return ProviderEnvironmentIFCRef(new PollingManagerProviderEnvironment(env));
00231 }
00232 }
00234 Int32
00235 PollingManagerThread::run()
00236 {
00237
00238 m_startedBarrier.wait();
00239
00240 bool doInit = true;
00241
00242
00243 PolledProviderIFCRefArray itpra =
00244 m_providerManager->getPolledProviders(createProvEnvRef(m_env));
00245
00246 OW_LOG_DEBUG(m_logger, Format("PollingManager found %1 polled providers",
00247 itpra.size()));
00248 {
00249
00250 NonRecursiveMutexLock ml(m_triggerGuard);
00251 for (size_t i = 0; i < itpra.size(); ++i)
00252 {
00253 TriggerRunnerRef tr(new TriggerRunner(this, m_env));
00254 tr->m_pollInterval =
00255 itpra[i]->getInitialPollingInterval(createProvEnvRef(m_env));
00256 OW_LOG_DEBUG(m_logger, Format("PollingManager poll interval for provider"
00257 " %1: %2", i, tr->m_pollInterval));
00258 if (!tr->m_pollInterval)
00259 {
00260 continue;
00261 }
00262 tr->m_itp = itpra[i];
00263 m_triggerRunners.append(tr);
00264 }
00265 }
00266 {
00267 NonRecursiveMutexLock l(m_triggerGuard);
00268 while (!m_shuttingDown)
00269 {
00270 bool rightNow;
00271 UInt32 sleepTime = calcSleepTime(rightNow, doInit);
00272 doInit = false;
00273 if (!rightNow)
00274 {
00275 if (sleepTime == 0)
00276 {
00277 m_triggerCondition.wait(l);
00278 }
00279 else
00280 {
00281 m_triggerCondition.timedWait(l, sleepTime);
00282 }
00283 }
00284 if (m_shuttingDown)
00285 {
00286 break;
00287 }
00288 processTriggers();
00289 }
00290 }
00291
00292 m_triggerRunnerThreadPool->shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 60);
00293 m_triggerRunners.clear();
00294 return 0;
00295 }
00297 UInt32
00298 PollingManagerThread::calcSleepTime(bool& rightNow, bool doInit)
00299 {
00300 rightNow = false;
00301 DateTime dtm;
00302 dtm.setToCurrent();
00303 time_t tm = dtm.get();
00304
00305 Int32 const int32_max = std::numeric_limits<Int32>::max();
00306 time_t const time_t_max = std::numeric_limits<time_t>::max();
00307 time_t leastTime = (time_t_max > int32_max ? int32_max : time_t_max);
00308
00309
00310
00311 int checkedCount = 0;
00312
00313 for (size_t i = 0; i < m_triggerRunners.size(); i++)
00314 {
00315 if (m_triggerRunners[i]->m_isRunning
00316 || m_triggerRunners[i]->m_pollInterval == 0)
00317 {
00318 continue;
00319 }
00320 if (doInit)
00321 {
00322 m_triggerRunners[i]->m_nextPoll =
00323 safe_add(tm, m_triggerRunners[i]->m_pollInterval);
00324 }
00325 else if (m_triggerRunners[i]->m_nextPoll <= tm)
00326 {
00327 rightNow = true;
00328 return 0;
00329 }
00330
00331 checkedCount++;
00332 time_t diff = m_triggerRunners[i]->m_nextPoll - tm;
00333 if (diff < leastTime)
00334 {
00335 leastTime = diff;
00336 }
00337 }
00338 return (checkedCount == 0) ? 0 : UInt32(leastTime);
00339 }
00341 void
00342 PollingManagerThread::processTriggers()
00343 {
00344 DateTime dtm;
00345 dtm.setToCurrent();
00346 time_t tm = dtm.get();
00347 for (size_t i = 0; i < m_triggerRunners.size(); i++)
00348 {
00349 if (m_triggerRunners[i]->m_isRunning)
00350 {
00351 continue;
00352 }
00353 if (m_triggerRunners[i]->m_pollInterval == 0)
00354 {
00355
00356 m_triggerRunners.remove(i--);
00357 continue;
00358 }
00359 if (tm >= m_triggerRunners[i]->m_nextPoll)
00360 {
00361 m_triggerRunners[i]->m_isRunning = true;
00362 if (!m_triggerRunnerThreadPool->tryAddWork(m_triggerRunners[i]))
00363 {
00364 OW_LOG_INFO(m_logger, "Failed to run polled provider, because there are too many already running!");
00365 }
00366 }
00367 }
00368 }
00370 void
00371 PollingManagerThread::shutdown()
00372 {
00373 {
00374 NonRecursiveMutexLock l(m_triggerGuard);
00375 m_shuttingDown = true;
00376 m_triggerCondition.notifyAll();
00377 }
00378
00379 this->join();
00380
00381
00382 m_triggerRunners.clear();
00383 m_env = 0;
00384 m_providerManager = 0;
00385 m_triggerRunnerThreadPool = 0;
00386
00387 }
00389 void
00390 PollingManagerThread::addPolledProvider(const PolledProviderIFCRef& p)
00391 {
00392 NonRecursiveMutexLock l(m_triggerGuard);
00393 if (m_shuttingDown)
00394 return;
00395 TriggerRunnerRef tr(new TriggerRunner(this, m_env));
00396 tr->m_pollInterval =
00397 p->getInitialPollingInterval(createProvEnvRef(m_env));
00398 OW_LOG_DEBUG(m_logger, Format("PollingManager poll interval for provider"
00399 " %1", tr->m_pollInterval));
00400 if (!tr->m_pollInterval)
00401 {
00402 return;
00403 }
00404 DateTime dtm;
00405 dtm.setToCurrent();
00406 time_t tm = dtm.get();
00407 tr->m_nextPoll = safe_add(tm, tr->m_pollInterval);
00408 tr->m_itp = p;
00409 m_triggerRunners.append(tr);
00410 m_triggerCondition.notifyAll();
00411 }
00413 PollingManagerThread::TriggerRunner::TriggerRunner(PollingManagerThread* svr,
00414 ServiceEnvironmentIFCRef env)
00415 : Runnable()
00416 , m_itp(0)
00417 , m_nextPoll(0)
00418 , m_isRunning(false)
00419 , m_pollInterval(0)
00420 , m_pollMan(svr)
00421 , m_env(env)
00422 , m_logger(env->getLogger(COMPONENT_NAME))
00423 {
00424 }
00426 void
00427 PollingManagerThread::TriggerRunner::run()
00428 {
00429 Int32 nextInterval = 0;
00430 try
00431 {
00432 nextInterval = m_itp->poll(createProvEnvRef(m_env));
00433 }
00434 catch(std::exception& e)
00435 {
00436 OW_LOG_ERROR(m_logger, Format("Caught Exception while running poll: %1",
00437 e.what()));
00438 }
00439 catch(ThreadCancelledException& e)
00440 {
00441 throw;
00442 }
00443 catch(...)
00444 {
00445 OW_LOG_ERROR(m_logger, "Caught Unknown Exception while running poll");
00446 }
00447 NonRecursiveMutexLock l(m_pollMan->m_triggerGuard);
00448 if (nextInterval == 0 || m_pollInterval == 0)
00449 {
00450 m_pollInterval = 0;
00451 m_nextPoll = 0;
00452 }
00453 else
00454 {
00455 if (nextInterval > 0)
00456 {
00457 m_pollInterval = nextInterval;
00458 }
00459 DateTime dtm;
00460 dtm.setToCurrent();
00461 m_nextPoll = safe_add(dtm.get(), m_pollInterval);
00462 }
00463 m_isRunning = false;
00464 m_pollMan->m_triggerCondition.notifyOne();
00465 }
00466
00468 void
00469 PollingManagerThread::TriggerRunner::doCooperativeCancel()
00470 {
00471 m_itp->doCooperativeCancel();
00472 }
00473
00475 void
00476 PollingManagerThread::TriggerRunner::doDefinitiveCancel()
00477 {
00478 m_itp->doDefinitiveCancel();
00479 }
00480
00482 void
00483 PollingManagerThread::doCooperativeCancel()
00484 {
00485 NonRecursiveMutexLock l(m_triggerGuard);
00486 m_shuttingDown = true;
00487 m_triggerCondition.notifyAll();
00488 }
00489
00490 }
00491