OW_PollingManager.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2001-2004 Vintela, Inc. All rights reserved.
00003 *
00004 * Redistribution and use in source and binary forms, with or without
00005 * modification, are permitted provided that the following conditions are met:
00006 *
00007 *  - Redistributions of source code must retain the above copyright notice,
00008 *    this list of conditions and the following disclaimer.
00009 *
00010 *  - Redistributions in binary form must reproduce the above copyright notice,
00011 *    this list of conditions and the following disclaimer in the documentation
00012 *    and/or other materials provided with the distribution.
00013 *
00014 *  - Neither the name of Vintela, Inc. nor the names of its
00015 *    contributors may be used to endorse or promote products derived from this
00016 *    software without specific prior written permission.
00017 *
00018 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00019 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc. OR THE CONTRIBUTORS
00022 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 * POSSIBILITY OF SUCH DAMAGE.
00029 *******************************************************************************/
00030 
00036 #include "OW_config.h"
00037 #include "OW_PollingManager.hpp"
00038 #include "OW_NonRecursiveMutexLock.hpp"
00039 #include "OW_DateTime.hpp"
00040 #include "OW_CIMOMHandleIFC.hpp"
00041 #include "OW_Format.hpp"
00042 #include "OW_ConfigOpts.hpp"
00043 #include "OW_PolledProviderIFC.hpp"
00044 #include "OW_ProviderManager.hpp"
00045 #include "OW_Platform.hpp"
00046 #include "OW_TimeoutException.hpp"
00047 #include "OW_OperationContext.hpp"
00048 #include "OW_RepositoryIFC.hpp"
00049 #include "OW_ServiceIFCNames.hpp"
00050 
00051 //#include <climits>
00052 #include <limits>
00053 
00054 namespace
00055 {
00056    template <bool b> struct compile_time_assert;
00057    template <> struct compile_time_assert<true> { };
00058    using OpenWBEM::Int32;
00059 
00060    // RETURNS: max(x, min(x+y, TIME_T_MAX)) where TIME_T_MAX is largest time_t
00061    time_t safe_add(time_t x, Int32 y)
00062    {
00063       compile_time_assert<(sizeof(time_t) >= sizeof(Int32))> dummy;
00064       time_t const max_time = std::numeric_limits<time_t>::max();
00065       return (
00066          y <= 0 ? x :
00067          x > max_time - y ? max_time :
00068          x + y
00069       );
00070    }
00071 }
00072 
00073 namespace OW_NAMESPACE
00074 {
00075 
00076 namespace
00077 {
00078 const String COMPONENT_NAME("ow.owcimomd.PollingManager");
00079 }
00080 
00082 PollingManager::PollingManager(const ProviderManagerRef& providerManager)
00083    : m_pollingManagerThread(new PollingManagerThread(providerManager))
00084 {
00085 
00086 }
00087 
00089 PollingManager::~PollingManager()
00090 {
00091 }
00092 
00094 void
00095 PollingManager::init(const ServiceEnvironmentIFCRef& env)
00096 {
00097    m_pollingManagerThread->init(env);
00098 }
00099 
00101 void
00102 PollingManager::start()
00103 {
00104    m_pollingManagerThread->start();
00105    m_pollingManagerThread->waitUntilReady();
00106 }
00107 
00109 void
00110 PollingManager::shutdown()
00111 {
00112    m_pollingManagerThread->shutdown();
00113 }
00114 
00116 void
00117 PollingManager::addPolledProvider(const PolledProviderIFCRef& p)
00118 {
00119    m_pollingManagerThread->addPolledProvider(p);
00120 }
00121 
00122 
00123 
00125 PollingManagerThread::PollingManagerThread(const ProviderManagerRef& providerManager)
00126    : Thread()
00127    , m_shuttingDown(false)
00128    , m_providerManager(providerManager)
00129    , m_startedBarrier(2)
00130 {
00131 }
00133 PollingManagerThread::~PollingManagerThread()
00134 {
00135 }
00137 String
00138 PollingManager::getName() const
00139 {
00140    return ServiceIFCNames::PollingManager;
00141 }
00142 
00144 StringArray
00145 PollingManager::getDependencies() const
00146 {
00147    StringArray rv;
00148    rv.push_back(ServiceIFCNames::CIMServer);
00149    return rv;
00150 }
00151 
00153 void
00154 PollingManagerThread::init(const ServiceEnvironmentIFCRef& env)
00155 {
00156    m_env = env;
00157    m_logger = m_env->getLogger(COMPONENT_NAME);
00158    Int32 maxThreads;
00159    try
00160    {
00161       maxThreads = env->getConfigItem(ConfigOpts::POLLING_MANAGER_MAX_THREADS_opt, OW_DEFAULT_POLLING_MANAGER_MAX_THREADS).toInt32();
00162    }
00163    catch (const StringConversionException&)
00164    {
00165       maxThreads = String(OW_DEFAULT_POLLING_MANAGER_MAX_THREADS).toInt32();
00166    }
00167    
00168    m_triggerRunnerThreadPool = ThreadPoolRef(new ThreadPool(ThreadPool::DYNAMIC_SIZE, maxThreads, maxThreads * 10,
00169       m_logger, "Polling Manager"));
00170 }
00171 
00173 namespace
00174 {
00175    class PollingManagerProviderEnvironment : public ProviderEnvironmentIFC
00176    {
00177    public:
00178       PollingManagerProviderEnvironment(ServiceEnvironmentIFCRef env)
00179          : m_context()
00180          , m_env(env)
00181       {}
00182       virtual CIMOMHandleIFCRef getCIMOMHandle() const
00183       {
00184          return m_env->getCIMOMHandle(m_context);
00185       }
00186       virtual CIMOMHandleIFCRef getRepositoryCIMOMHandle() const
00187       {
00188          return m_env->getCIMOMHandle(m_context, ServiceEnvironmentIFC::E_BYPASS_PROVIDERS);
00189       }
00190       virtual RepositoryIFCRef getRepository() const
00191       {
00192          return m_env->getRepository();
00193       }
00194       virtual String getConfigItem(const String& name, const String& defRetVal="") const
00195       {
00196          return m_env->getConfigItem(name, defRetVal);
00197       }
00198       virtual StringArray getMultiConfigItem(const String &itemName, 
00199          const StringArray& defRetVal, const char* tokenizeSeparator = 0) const
00200       {
00201          return m_env->getMultiConfigItem(itemName, defRetVal, tokenizeSeparator);
00202       }
00203       
00204       virtual LoggerRef getLogger() const
00205       {
00206          return m_env->getLogger(COMPONENT_NAME);
00207       }
00208       virtual LoggerRef getLogger(const String& componentName) const
00209       {
00210          return m_env->getLogger(componentName);
00211       }
00212       virtual String getUserName() const
00213       {
00214          return Platform::getCurrentUserName();
00215       }
00216       virtual OperationContext& getOperationContext()
00217       {
00218          return m_context;
00219       }
00220       virtual ProviderEnvironmentIFCRef clone() const
00221       {
00222          return ProviderEnvironmentIFCRef(new PollingManagerProviderEnvironment(m_env));
00223       }
00224    private:
00225       mutable OperationContext m_context;
00226       ServiceEnvironmentIFCRef m_env;
00227    };
00228    ProviderEnvironmentIFCRef createProvEnvRef(ServiceEnvironmentIFCRef env)
00229    {
00230       return ProviderEnvironmentIFCRef(new PollingManagerProviderEnvironment(env));
00231    }
00232 }
00234 Int32
00235 PollingManagerThread::run()
00236 {
00237    // let CIMOMEnvironment know we're running and ready to go.
00238    m_startedBarrier.wait();
00239 
00240    bool doInit = true;
00241 
00242    // Get all of the indication trigger providers
00243    PolledProviderIFCRefArray itpra =
00244          m_providerManager->getPolledProviders(createProvEnvRef(m_env));
00245 
00246    OW_LOG_DEBUG(m_logger, Format("PollingManager found %1 polled providers",
00247       itpra.size()));
00248    {
00249       // Get initial polling interval from all polled providers
00250       NonRecursiveMutexLock ml(m_triggerGuard);
00251       for (size_t i = 0; i < itpra.size(); ++i)
00252       {
00253          TriggerRunnerRef tr(new TriggerRunner(this, m_env));
00254          tr->m_pollInterval =
00255             itpra[i]->getInitialPollingInterval(createProvEnvRef(m_env));
00256          OW_LOG_DEBUG(m_logger, Format("PollingManager poll interval for provider"
00257             " %1: %2", i, tr->m_pollInterval));
00258          if (!tr->m_pollInterval)
00259          {
00260             continue;
00261          }
00262          tr->m_itp = itpra[i];
00263          m_triggerRunners.append(tr);
00264       }
00265    }
00266    {
00267       NonRecursiveMutexLock l(m_triggerGuard);
00268       while (!m_shuttingDown)
00269       {
00270          bool rightNow;
00271          UInt32 sleepTime = calcSleepTime(rightNow, doInit);
00272          doInit = false;
00273          if (!rightNow)
00274          {
00275             if (sleepTime == 0)
00276             {
00277                m_triggerCondition.wait(l);
00278             }
00279             else
00280             {
00281                m_triggerCondition.timedWait(l, sleepTime);
00282             }
00283          }
00284          if (m_shuttingDown)
00285          {
00286             break;
00287          }
00288          processTriggers();
00289       }
00290    }
00291    // wait until all the threads exit
00292    m_triggerRunnerThreadPool->shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, 60);
00293    m_triggerRunners.clear();
00294    return 0;
00295 }
00297 UInt32
00298 PollingManagerThread::calcSleepTime(bool& rightNow, bool doInit)
00299 {
00300    rightNow = false;
00301    DateTime dtm;
00302    dtm.setToCurrent();
00303    time_t tm = dtm.get();
00304 
00305    Int32 const int32_max = std::numeric_limits<Int32>::max();
00306    time_t const time_t_max = std::numeric_limits<time_t>::max();
00307    time_t leastTime = (time_t_max > int32_max ? int32_max : time_t_max);
00308    // leastTime is now a large positive time_t value that will fit into an
00309    // Int32, and hence into a UInt32.
00310 
00311    int checkedCount = 0;
00312    // LOOP INVARIANT: 0 <= leastTime <= int32_max
00313    for (size_t i = 0; i < m_triggerRunners.size(); i++)
00314    {
00315       if (m_triggerRunners[i]->m_isRunning
00316          || m_triggerRunners[i]->m_pollInterval == 0)
00317       {
00318          continue;
00319       }
00320       if (doInit)
00321       {
00322          m_triggerRunners[i]->m_nextPoll =
00323             safe_add(tm, m_triggerRunners[i]->m_pollInterval);
00324       }
00325       else if (m_triggerRunners[i]->m_nextPoll <= tm)
00326       {
00327          rightNow = true;
00328          return 0;
00329       }
00330       // GUARANTEED: m_triggerRunners[i]->m_nextPoll >= tm
00331       checkedCount++;
00332       time_t diff = m_triggerRunners[i]->m_nextPoll - tm;
00333       if (diff < leastTime)
00334       {
00335          leastTime = diff;
00336       }
00337    }
00338    return (checkedCount == 0) ? 0 : UInt32(leastTime);
00339 }
00341 void
00342 PollingManagerThread::processTriggers()
00343 {
00344    DateTime dtm;
00345    dtm.setToCurrent();
00346    time_t tm = dtm.get();
00347    for (size_t i = 0; i < m_triggerRunners.size(); i++)
00348    {
00349       if (m_triggerRunners[i]->m_isRunning)
00350       {
00351          continue;
00352       }
00353       if (m_triggerRunners[i]->m_pollInterval == 0)
00354       {
00355          // Stopped running - remove it
00356          m_triggerRunners.remove(i--);
00357          continue;
00358       }
00359       if (tm >= m_triggerRunners[i]->m_nextPoll)
00360       {
00361          m_triggerRunners[i]->m_isRunning = true;
00362          if (!m_triggerRunnerThreadPool->tryAddWork(m_triggerRunners[i]))
00363          {
00364             OW_LOG_INFO(m_logger, "Failed to run polled provider, because there are too many already running!");
00365          }
00366       }
00367    }
00368 }
00370 void
00371 PollingManagerThread::shutdown()
00372 {
00373    {
00374       NonRecursiveMutexLock l(m_triggerGuard);
00375       m_shuttingDown = true;
00376       m_triggerCondition.notifyAll();
00377    }
00378    // wait until the main thread exits.
00379    this->join();
00380 
00381    // clear out variables to avoid circular reference counts.
00382    m_triggerRunners.clear();
00383    m_env = 0;
00384    m_providerManager = 0;
00385    m_triggerRunnerThreadPool = 0;
00386 
00387 }
00389 void
00390 PollingManagerThread::addPolledProvider(const PolledProviderIFCRef& p)
00391 {
00392    NonRecursiveMutexLock l(m_triggerGuard);
00393    if (m_shuttingDown)
00394       return;
00395    TriggerRunnerRef tr(new TriggerRunner(this, m_env));
00396    tr->m_pollInterval = 
00397       p->getInitialPollingInterval(createProvEnvRef(m_env));
00398    OW_LOG_DEBUG(m_logger, Format("PollingManager poll interval for provider"
00399       " %1", tr->m_pollInterval));
00400    if (!tr->m_pollInterval)
00401    {
00402       return;
00403    }
00404    DateTime dtm;
00405    dtm.setToCurrent();
00406    time_t tm = dtm.get();
00407    tr->m_nextPoll = safe_add(tm, tr->m_pollInterval);
00408    tr->m_itp = p;
00409    m_triggerRunners.append(tr);
00410    m_triggerCondition.notifyAll();
00411 }
00413 PollingManagerThread::TriggerRunner::TriggerRunner(PollingManagerThread* svr,
00414    ServiceEnvironmentIFCRef env)
00415    : Runnable()
00416    , m_itp(0)
00417    , m_nextPoll(0)
00418    , m_isRunning(false)
00419    , m_pollInterval(0)
00420    , m_pollMan(svr)
00421    , m_env(env)
00422    , m_logger(env->getLogger(COMPONENT_NAME))
00423 {
00424 }
00426 void
00427 PollingManagerThread::TriggerRunner::run()
00428 {
00429    Int32 nextInterval = 0;
00430    try
00431    {
00432       nextInterval = m_itp->poll(createProvEnvRef(m_env));
00433    }
00434    catch(std::exception& e)
00435    {
00436       OW_LOG_ERROR(m_logger, Format("Caught Exception while running poll: %1",
00437          e.what()));
00438    }
00439    catch(ThreadCancelledException& e)
00440    {
00441       throw;
00442    }
00443    catch(...)
00444    {
00445       OW_LOG_ERROR(m_logger, "Caught Unknown Exception while running poll");
00446    }
00447    NonRecursiveMutexLock l(m_pollMan->m_triggerGuard);
00448    if (nextInterval == 0 || m_pollInterval == 0) // m_pollInterval == 0 means this poller has been instructed to stop
00449    {
00450       m_pollInterval = 0;
00451       m_nextPoll = 0;
00452    }
00453    else
00454    {
00455       if (nextInterval > 0)
00456       {
00457          m_pollInterval = nextInterval;
00458       }
00459       DateTime dtm;
00460       dtm.setToCurrent();
00461       m_nextPoll = safe_add(dtm.get(), m_pollInterval);
00462    }
00463    m_isRunning = false;
00464    m_pollMan->m_triggerCondition.notifyOne();
00465 }
00466 
00468 void
00469 PollingManagerThread::TriggerRunner::doCooperativeCancel()
00470 {
00471    m_itp->doCooperativeCancel();
00472 }
00473 
00475 void
00476 PollingManagerThread::TriggerRunner::doDefinitiveCancel()
00477 {
00478    m_itp->doDefinitiveCancel();
00479 }
00480 
00482 void
00483 PollingManagerThread::doCooperativeCancel()
00484 {
00485    NonRecursiveMutexLock l(m_triggerGuard);
00486    m_shuttingDown = true;
00487    m_triggerCondition.notifyAll();
00488 }
00489 
00490 } // end namespace OW_NAMESPACE
00491 

Generated on Thu Feb 9 08:48:08 2006 for openwbem by  doxygen 1.4.6