00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00034 #include "OW_config.h"
00035 #include "OW_Condition.hpp"
00036 #include "OW_NonRecursiveMutexLock.hpp"
00037 #include "OW_ExceptionIds.hpp"
00038
00039 #include <cassert>
00040 #include <cerrno>
00041 #ifdef OW_HAVE_SYS_TIME_H
00042 #include <sys/time.h>
00043 #endif
00044
00045 namespace OW_NAMESPACE
00046 {
00047
00048 OW_DEFINE_EXCEPTION_WITH_ID(ConditionLock);
00049 OW_DEFINE_EXCEPTION_WITH_ID(ConditionResource);
00050 #if defined(OW_USE_PTHREAD)
00051
00052 Condition::Condition()
00053 {
00054 int res = pthread_cond_init(&m_condition, 0);
00055 if (res != 0)
00056 {
00057 OW_THROW(ConditionResourceException, "Failed initializing condition variable");
00058 }
00059 }
00061 Condition::~Condition()
00062 {
00063 int res = pthread_cond_destroy(&m_condition);
00064 assert(res == 0);
00065 }
00067 void
00068 Condition::notifyOne()
00069 {
00070 int res = pthread_cond_signal(&m_condition);
00071 assert(res == 0);
00072 }
00074 void
00075 Condition::notifyAll()
00076 {
00077 int res = pthread_cond_broadcast(&m_condition);
00078 assert(res == 0);
00079 }
00081 void
00082 Condition::doWait(NonRecursiveMutex& mutex)
00083 {
00084 int res;
00085 NonRecursiveMutexLockState state;
00086 mutex.conditionPreWait(state);
00087 res = pthread_cond_wait(&m_condition, state.pmutex);
00088 mutex.conditionPostWait(state);
00089 assert(res == 0);
00090 }
00092 bool
00093 Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout)
00094 {
00095 int res;
00096 NonRecursiveMutexLockState state;
00097 mutex.conditionPreWait(state);
00098 bool ret = false;
00099 timespec ts;
00100 struct timeval now;
00101 ::gettimeofday(&now, NULL);
00102
00103 ts.tv_sec = now.tv_sec + sTimeout;
00104
00105 const int NANOSECONDS_PER_MICROSECOND = 1000;
00106 const int NANOSECONDS_PER_SECOND = 1000000000;
00107 int nsec = (now.tv_usec + usTimeout) * NANOSECONDS_PER_MICROSECOND;
00108 ts.tv_sec += nsec / NANOSECONDS_PER_SECOND;
00109 ts.tv_nsec = nsec % NANOSECONDS_PER_SECOND;
00110
00111 res = pthread_cond_timedwait(&m_condition, state.pmutex, &ts);
00112 mutex.conditionPostWait(state);
00113 assert(res == 0 || res == ETIMEDOUT);
00114 ret = res != ETIMEDOUT;
00115 return ret;
00116 }
00117 #elif defined (OW_WIN32)
00118
00119 Condition::Condition()
00120 : m_condition(new ConditionInfo_t)
00121 {
00122 m_condition->waitersCount = 0;
00123 m_condition->wasBroadcast = false;
00124 m_condition->queue = ::CreateSemaphore(
00125 NULL,
00126 0,
00127 0x7fffffff,
00128 NULL);
00129 ::InitializeCriticalSection(&m_condition->waitersCountLock);
00130 m_condition->waitersDone = ::CreateEvent(
00131 NULL,
00132 false,
00133 false,
00134 NULL);
00135 }
00137 Condition::~Condition()
00138 {
00139 ::CloseHandle(m_condition->queue);
00140 ::DeleteCriticalSection(&m_condition->waitersCountLock);
00141 ::CloseHandle(m_condition->waitersDone);
00142 delete m_condition;
00143 }
00145 void
00146 Condition::notifyOne()
00147 {
00148 ::EnterCriticalSection(&m_condition->waitersCountLock);
00149 bool haveWaiters = m_condition->waitersCount > 0;
00150 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00151
00152
00153 if (haveWaiters)
00154 {
00155 ::ReleaseSemaphore(m_condition->queue, 1, 0);
00156 }
00157 }
00159 void
00160 Condition::notifyAll()
00161 {
00162 ::EnterCriticalSection(&m_condition->waitersCountLock);
00163 bool haveWaiters = false;
00164 if (m_condition->waitersCount > 0)
00165 {
00166
00167 haveWaiters = m_condition->wasBroadcast = true;
00168 }
00169
00170 if (haveWaiters)
00171 {
00172
00173 ::ReleaseSemaphore(m_condition->queue, m_condition->waitersCount, 0);
00174 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00175
00176
00177 ::WaitForSingleObject(m_condition->waitersDone, INFINITE);
00178 m_condition->wasBroadcast = false;
00179 }
00180 else
00181 {
00182 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00183 }
00184 }
00186 void
00187 Condition::doWait(NonRecursiveMutex& mutex)
00188 {
00189 doTimedWait(mutex, INFINITE, 0);
00190 }
00192 bool
00193 Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout)
00194 {
00195 bool cc = true;
00196 NonRecursiveMutexLockState state;
00197 mutex.conditionPreWait(state);
00198
00199 ::EnterCriticalSection(&m_condition->waitersCountLock);
00200 m_condition->waitersCount++;
00201 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00202
00203
00204 if (sTimeout != INFINITE)
00205 {
00206 sTimeout *= 1000;
00207 sTimeout += usTimeout / 1000;
00208 }
00209
00210
00211
00212 if (::SignalObjectAndWait(mutex.m_mutex, m_condition->queue, sTimeout,
00213 false) == WAIT_TIMEOUT)
00214 {
00215 cc = false;
00216 }
00217
00218 ::EnterCriticalSection(&m_condition->waitersCountLock);
00219 m_condition->waitersCount--;
00220
00221
00222 bool isLastWaiter = (m_condition->wasBroadcast && m_condition->waitersCount == 0
00223 && cc == true);
00224
00225 ::LeaveCriticalSection(&m_condition->waitersCountLock);
00226
00227
00228
00229 if (isLastWaiter)
00230 {
00231
00232
00233 ::SignalObjectAndWait(m_condition->waitersDone, mutex.m_mutex,
00234 INFINITE, false);
00235 }
00236 else
00237 {
00238
00239 ::WaitForSingleObject(mutex.m_mutex, INFINITE);
00240 }
00241 mutex.conditionPostWait(state);
00242 return cc;
00243 }
00244 #else
00245 #error "port me!"
00246 #endif
00247
00248 void
00249 Condition::wait(NonRecursiveMutexLock& lock)
00250 {
00251 if (!lock.isLocked())
00252 {
00253 OW_THROW(ConditionLockException, "Lock must be locked");
00254 }
00255 doWait(*(lock.m_mutex));
00256 }
00258 bool
00259 Condition::timedWait(NonRecursiveMutexLock& lock, UInt32 sTimeout, UInt32 usTimeout)
00260 {
00261 if (!lock.isLocked())
00262 {
00263 OW_THROW(ConditionLockException, "Lock must be locked");
00264 }
00265 return doTimedWait(*(lock.m_mutex), sTimeout, usTimeout);
00266 }
00267
00268 }
00269