00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00035 #ifndef OW_INDICATION_SERVER_IMPL_HPP_
00036 #define OW_INDICATION_SERVER_IMPL_HPP_
00037 #include "OW_config.h"
00038 #include "OW_Types.hpp"
00039 #include "OW_CIMFwd.hpp"
00040 #include "OW_IfcsFwd.hpp"
00041 #include "OW_List.hpp"
00042 #include "OW_Condition.hpp"
00043 #include "OW_CIMInstance.hpp"
00044 #include "OW_CIMObjectPath.hpp"
00045 #include "OW_IndicationServer.hpp"
00046 #include "OW_HashMultiMap.hpp"
00047 #include "OW_WQLSelectStatement.hpp"
00048 #include "OW_WQLCompile.hpp"
00049 #include "OW_ThreadBarrier.hpp"
00050 #include "OW_ProviderFwd.hpp"
00051 #include "OW_SortedVectorMap.hpp"
00052 #include "OW_Map.hpp"
00053 #include "OW_Mutex.hpp"
00054 #include "OW_Thread.hpp"
00055
00056 namespace OW_NAMESPACE
00057 {
00058
00059 class NotifyTrans;
00060 class LifecycleIndicationPoller;
00061 class IndicationServerImplThread;
00062
00064 class IndicationServerImpl : public IndicationServer
00065 {
00066 public:
00067 IndicationServerImpl();
00068 ~IndicationServerImpl();
00069 virtual String getName() const;
00070 virtual StringArray getDependencies() const;
00071 virtual void init(const ServiceEnvironmentIFCRef& env);
00072 virtual void start();
00073 void shutdown();
00074 void processIndication(const CIMInstance& instance,
00075 const String& instNS);
00076
00077 virtual void startDeleteSubscription(const String& ns, const CIMObjectPath& subPath);
00078 virtual void startCreateSubscription(const String& ns, const CIMInstance& subInst, const String& username);
00079 virtual void startModifySubscription(const String& ns, const CIMInstance& subInst);
00080
00081
00082 void deleteSubscription(const String& ns, const CIMObjectPath& subPath);
00083 void createSubscription(const String& ns, const CIMInstance& subInst, const String& username);
00084 void modifySubscription(const String& ns, const CIMInstance& subInst);
00085
00086 virtual void modifyFilter(const String& ns, const CIMInstance& filterInst, const String& userName);
00087
00088 private:
00089 IntrusiveReference<IndicationServerImplThread> m_indicationServerThread;
00090 };
00091
00092 class IndicationServerImplThread : public Thread
00093 {
00094 public:
00095 IndicationServerImplThread();
00096 ~IndicationServerImplThread();
00097 virtual void init(const CIMOMEnvironmentRef& env);
00098 virtual void waitUntilReady();
00099 virtual Int32 run();
00100 void shutdown();
00101 void processIndication(const CIMInstance& instance,
00102 const String& instNS);
00103 CIMOMEnvironmentRef getEnvironment() const;
00104 bool getNewTrans(NotifyTrans& outTrans);
00105
00106 virtual void startDeleteSubscription(const String& ns, const CIMObjectPath& subPath);
00107 virtual void startCreateSubscription(const String& ns, const CIMInstance& subInst, const String& username);
00108 virtual void startModifySubscription(const String& ns, const CIMInstance& subInst);
00109
00110
00111 void deleteSubscription(const String& ns, const CIMObjectPath& subPath);
00112 void createSubscription(const String& ns, const CIMInstance& subInst, const String& username);
00113 void modifySubscription(const String& ns, const CIMInstance& subInst);
00114
00115 virtual void modifyFilter(const String& ns, const CIMInstance& filterInst, const String& userName);
00116
00117 virtual void doCooperativeCancel();
00118
00119 private:
00120 struct Subscription : public IntrusiveCountableBase
00121 {
00122 Subscription()
00123 : m_subPath(CIMNULL)
00124 , m_sub(CIMNULL)
00125 , m_filter(CIMNULL)
00126 {}
00127 CIMObjectPath m_subPath;
00128 CIMInstance m_sub;
00129 IndicationProviderIFCRefArray m_providers;
00130 CIMInstance m_filter;
00131 WQLSelectStatement m_selectStmt;
00132 WQLCompile m_compiledStmt;
00133 StringArray m_classes;
00134 String m_filterSourceNameSpace;
00135 Array<bool> m_isPolled;
00136 };
00137 typedef IntrusiveReference<Subscription> SubscriptionRef;
00138
00139
00140
00141
00142 typedef HashMultiMap<String, SubscriptionRef> subscriptions_t;
00143
00144 #if defined(OW_AIX)
00145 typedef subscriptions_t subscriptions_copy_t;
00146 #else
00147
00148 typedef std::vector<subscriptions_t>::value_type subscriptions_copy_t;
00149 #endif // AIX
00150 typedef subscriptions_copy_t::iterator subscriptions_iterator;
00151
00152 void _processIndication(const CIMInstance& instance,
00153 const String& instNS);
00154
00155 void _processIndicationRange(
00156 const CIMInstance& instanceArg, const String instNS,
00157 subscriptions_iterator first, subscriptions_iterator last);
00158
00159 void addTrans(const String& ns, const CIMInstance& indication,
00160 const CIMInstance& handler,
00161 const CIMInstance& subscription,
00162 IndicationExportProviderIFCRef provider);
00163
00164 IndicationExportProviderIFCRef getProvider(const CIMName& className);
00165
00166 void deactivateAllSubscriptions();
00167
00168 struct ProcIndicationTrans
00169 {
00170 ProcIndicationTrans(const CIMInstance& inst,
00171 const String& ns)
00172 : instance(inst)
00173 , nameSpace(ns) {}
00174 CIMInstance instance;
00175 String nameSpace;
00176 };
00177 typedef SortedVectorMap<CIMName, IndicationExportProviderIFCRef> provider_map_t;
00178 provider_map_t m_providers;
00179
00180
00181
00182 List<ProcIndicationTrans> m_procTrans;
00183 bool m_shuttingDown;
00184 NonRecursiveMutex m_mainLoopGuard;
00185 Condition m_mainLoopCondition;
00186 CIMOMEnvironmentRef m_env;
00187 ThreadBarrier m_startedBarrier;
00188 subscriptions_t m_subscriptions;
00189 Mutex m_subGuard;
00190 typedef SharedLibraryReference< IntrusiveReference<LifecycleIndicationPoller> > LifecycleIndicationPollerRef;
00191 typedef Map<CIMName, LifecycleIndicationPollerRef > poller_map_t;
00192 poller_map_t m_pollers;
00193 ThreadPoolRef m_notifierThreadPool;
00194 ThreadPoolRef m_subscriptionPool;
00195 WQLIFCRef m_wqlRef;
00196 LoggerRef m_logger;
00197 };
00198
00199 }
00200
00201 #endif