00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00035 #include "OW_config.h"
00036 #include "OW_ThreadImpl.hpp"
00037 #include "OW_Mutex.hpp"
00038 #include "OW_Assertion.hpp"
00039 #include "OW_Thread.hpp"
00040 #include "OW_NonRecursiveMutexLock.hpp"
00041 #include "OW_Format.hpp"
00042 #if defined(OW_WIN32)
00043 #include "OW_Map.hpp"
00044 #include "OW_MutexLock.hpp"
00045 #endif
00046 #include <cassert>
00047 #include <cstring>
00048 #include <cstddef>
00049
00050 extern "C"
00051 {
00052 #ifdef OW_HAVE_SYS_TIME_H
00053 #include <sys/time.h>
00054 #endif
00055
00056 #include <sys/types.h>
00057
00058 #ifdef OW_HAVE_UNISTD_H
00059 #include <unistd.h>
00060 #endif
00061
00062 #include <errno.h>
00063 #include <signal.h>
00064
00065 #ifdef OW_USE_PTHREAD
00066 #include <pthread.h>
00067 #endif
00068
00069 #ifdef OW_WIN32
00070 #include <process.h>
00071 #endif
00072 }
00073
00074 namespace OW_NAMESPACE
00075 {
00076
00077 namespace ThreadImpl {
00078
00080
00081 void
00082 sleep(UInt32 milliSeconds)
00083 {
00084 ThreadImpl::testCancel();
00085 #if defined(OW_HAVE_NANOSLEEP)
00086 struct timespec wait;
00087 wait.tv_sec = milliSeconds / 1000;
00088 wait.tv_nsec = (milliSeconds % 1000) * 1000000;
00089 while (nanosleep(&wait, &wait) == -1 && errno == EINTR)
00090 {
00091 ThreadImpl::testCancel();
00092 }
00093 #elif OW_WIN32
00094 Sleep(milliSeconds);
00095 #else
00096 timeval now, end;
00097 unsigned long microSeconds = milliSeconds * 1000;
00098 const UInt32 loopMicroSeconds = 100 * 1000;
00099 gettimeofday(&now, NULL);
00100 end = now;
00101 end.tv_sec += microSeconds / 1000000;
00102 end.tv_usec += microSeconds % 1000000;
00103 while ((now.tv_sec < end.tv_sec)
00104 || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec)))
00105 {
00106 timeval tv;
00107 tv.tv_sec = end.tv_sec - now.tv_sec;
00108 if (end.tv_usec >= now.tv_usec)
00109 {
00110 tv.tv_usec = end.tv_usec - now.tv_usec;
00111 }
00112 else
00113 {
00114 tv.tv_sec--;
00115 tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00116 }
00117 if (tv.tv_sec > 0 || tv.tv_usec > loopMicroSeconds)
00118 {
00119 tv.tv_sec = 0;
00120 tv.tv_usec = loopMicroSeconds;
00121 }
00122 ThreadImpl::testCancel();
00123 select(0, NULL, NULL, NULL, &tv);
00124 gettimeofday(&now, NULL);
00125 }
00126 #endif
00127 }
00129
00130 void
00131 yield()
00132 {
00133 #if defined(OW_HAVE_SCHED_YIELD)
00134 sched_yield();
00135 #elif defined(OW_WIN32)
00136 ThreadImpl::testCancel();
00137 ::SwitchToThread();
00138 #else
00139 ThreadImpl::sleep(1);
00140 #endif
00141 }
00142
00143 #if defined(OW_USE_PTHREAD)
00144 namespace {
00145 struct LocalThreadParm
00146 {
00147 ThreadFunction m_func;
00148 void* m_funcParm;
00149 };
00150 extern "C" {
00151 static void*
00152 threadStarter(void* arg)
00153 {
00154
00155
00156 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00157 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00158
00159
00160 sigset_t signalSet;
00161 int rv = sigfillset(&signalSet);
00162 OW_ASSERT(rv == 0);
00163 rv = sigdelset(&signalSet, SIGUSR1);
00164 OW_ASSERT(rv == 0);
00165 rv = pthread_sigmask(SIG_SETMASK, &signalSet, 0);
00166 OW_ASSERT(rv == 0);
00167
00168 LocalThreadParm* parg = static_cast<LocalThreadParm*>(arg);
00169 ThreadFunction func = parg->m_func;
00170 void* funcParm = parg->m_funcParm;
00171 delete parg;
00172 Int32 rval = (*func)(funcParm);
00173 void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00174 pthread_exit(prval);
00175 return prval;
00176 }
00177 }
00178
00179
00180 struct default_stack_size
00181 {
00182 default_stack_size()
00183 {
00184
00185 val = 0;
00186 needsSetting = false;
00187
00188
00189
00190
00191 #ifdef _POSIX_THREAD_ATTR_STACKSIZE
00192 pthread_attr_t stack_size_attr;
00193 if (pthread_attr_init(&stack_size_attr) != 0)
00194 {
00195 return;
00196 }
00197 if (pthread_attr_getstacksize(&stack_size_attr, &val) != 0)
00198 {
00199 return;
00200 }
00201
00202 if (val < 1048576)
00203 {
00204 val = 1048576;
00205 needsSetting = true;
00206 }
00207 #ifdef PTHREAD_STACK_MIN
00208 if (PTHREAD_STACK_MIN > val)
00209 {
00210 val = PTHREAD_STACK_MIN;
00211 needsSetting = true;
00212 }
00213 #endif
00214 #endif
00215 }
00216 static size_t val;
00217 static bool needsSetting;
00218 };
00219 size_t default_stack_size::val = 0;
00220 bool default_stack_size::needsSetting(false);
00221 default_stack_size g_theDefaultStackSize;
00223 pthread_once_t once_control = PTHREAD_ONCE_INIT;
00224 pthread_key_t theKey;
00225 extern "C" {
00227 static void initializeTheKey()
00228 {
00229 pthread_key_create(&theKey,NULL);
00230
00231 struct sigaction temp;
00232 memset(&temp, '\0', sizeof(temp));
00233 sigaction(SIGUSR1, 0, &temp);
00234 if (temp.sa_handler != SIG_IGN)
00235 {
00236 temp.sa_handler = SIG_IGN;
00237 sigemptyset(&temp.sa_mask);
00238 temp.sa_flags = 0;
00239 sigaction(SIGUSR1, &temp, NULL);
00240 }
00241 }
00242 }
00243 }
00245
00246 int
00247 createThread(Thread_t& handle, ThreadFunction func,
00248 void* funcParm, UInt32 threadFlags)
00249 {
00250 int cc = 0;
00251 pthread_attr_t attr;
00252 pthread_attr_init(&attr);
00253 if (!(threadFlags & OW_THREAD_FLG_JOINABLE))
00254 {
00255 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00256 }
00257
00258 #if !defined(OW_VALGRIND_SUPPORT) // valgrind doesn't like us to set the stack size
00259
00260 if (default_stack_size::needsSetting)
00261 {
00262 pthread_attr_setstacksize(&attr, default_stack_size::val);
00263 }
00264 #endif
00265
00266 LocalThreadParm* parg = new LocalThreadParm;
00267 parg->m_func = func;
00268 parg->m_funcParm = funcParm;
00269 if (pthread_create(&handle, &attr, threadStarter, parg) != 0)
00270 {
00271 cc = -1;
00272 }
00273 pthread_attr_destroy(&attr);
00274 return cc;
00275 }
00277
00278 void
00279 exitThread(Thread_t&, Int32 rval)
00280 {
00281 void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00282 pthread_exit(prval);
00283 }
00284
00285
00286 #if defined(OW_SIZEOF_PTHREAD_T)
00287 #if OW_SIZEOF_PTHREAD_T == 2
00288 #define OW_THREAD_CONVERTER UInt16
00289 #elif OW_SIZEOF_PTHREAD_T == 4
00290 #define OW_THREAD_CONVERTER UInt32
00291 #elif OW_SIZEOF_PTHREAD_T == 8
00292 #define OW_THREAD_CONVERTER UInt64
00293 #else
00294 #error Unexpected size for pthread_t
00295 #endif
00296 #else
00297 #error No pthread_t size was found!
00298 #endif
00299
00300 UInt64 thread_t_ToUInt64(Thread_t thr)
00301 {
00302 return UInt64(OW_THREAD_CONVERTER(thr));
00303 }
00304 #undef OW_THREAD_CONVERTER
00305
00307
00308 void
00309 destroyThread(Thread_t& )
00310 {
00311 }
00313
00314 int
00315 setThreadDetached(Thread_t& handle)
00316 {
00317 int cc = pthread_detach(handle);
00318 if (cc != 0)
00319 {
00320 if (cc != EINVAL)
00321 {
00322 cc = -1;
00323 }
00324 }
00325 return cc;
00326 }
00328
00329 int
00330 joinThread(Thread_t& handle, Int32& rval)
00331 {
00332 void* prval(0);
00333 if ((errno = pthread_join(handle, &prval)) == 0)
00334 {
00335 rval = static_cast<Int32>(reinterpret_cast<ptrdiff_t>(prval));
00336 return 0;
00337 }
00338 else
00339 {
00340 return 1;
00341 }
00342 }
00344 void
00345 testCancel()
00346 {
00347
00348 pthread_once(&once_control, &initializeTheKey);
00349 Thread* theThread = reinterpret_cast<Thread*>(pthread_getspecific(theKey));
00350 if (theThread == 0)
00351 {
00352 return;
00353 }
00354 NonRecursiveMutexLock l(theThread->m_cancelLock);
00355 if (theThread->m_cancelRequested)
00356 {
00357
00358
00359
00360
00361
00362 throw ThreadCancelledException();
00363 }
00364 }
00366 void saveThreadInTLS(void* pTheThread)
00367 {
00368
00369 pthread_once(&once_control, &initializeTheKey);
00370 int rc;
00371 if ((rc = pthread_setspecific(theKey, pTheThread)) != 0)
00372 {
00373 OW_THROW(ThreadException, Format("pthread_setspecific failed. error = %1(%2)", rc, strerror(rc)).c_str());
00374 }
00375 }
00377 void sendSignalToThread(Thread_t threadID, int signo)
00378 {
00379 int rc;
00380 if ((rc = pthread_kill(threadID, signo)) != 0)
00381 {
00382 OW_THROW(ThreadException, Format("pthread_kill failed. error = %1(%2)", rc, strerror(rc)).c_str());
00383 }
00384 }
00386 void cancel(Thread_t threadID)
00387 {
00388 int rc;
00389 if ((rc = pthread_cancel(threadID)) != 0)
00390 {
00391 OW_THROW(ThreadException, Format("pthread_cancel failed. error = %1(%2)", rc, strerror(rc)).c_str());
00392 }
00393 }
00394 #endif // #ifdef OW_USE_PTHREAD
00395
00396 #if defined(OW_WIN32)
00397
00398 namespace {
00399
00400 struct WThreadInfo
00401 {
00402 HANDLE handle;
00403 OW_NAMESPACE::Thread* pTheThread;
00404 };
00405
00406 typedef Map<DWORD, WThreadInfo> Win32ThreadMap;
00407 Win32ThreadMap g_threads;
00408 Mutex g_threadsGuard;
00409
00410 struct LocalThreadParm
00411 {
00412 ThreadFunction m_func;
00413 void* m_funcParm;
00414 };
00415
00417 extern "C" {
00418 unsigned __stdcall threadStarter(void* arg)
00419 {
00420 LocalThreadParm* parg = reinterpret_cast<LocalThreadParm*>(arg);
00421 ThreadFunction func = parg->m_func;
00422 void* funcParm = parg->m_funcParm;
00423 delete parg;
00424 Int32 rval = (*func)(funcParm);
00425 ::_endthreadex(static_cast<unsigned>(rval));
00426 return rval;
00427 }
00428 }
00429
00431 void
00432 addThreadToMap(DWORD threadId, HANDLE threadHandle)
00433 {
00434 MutexLock ml(g_threadsGuard);
00435 WThreadInfo wi;
00436 wi.handle = threadHandle;
00437 wi.pTheThread = 0;
00438 g_threads[threadId] = wi;
00439 }
00440
00442 HANDLE
00443 getThreadHandle(DWORD threadId)
00444 {
00445 MutexLock ml(g_threadsGuard);
00446 HANDLE chdl = 0;
00447 Win32ThreadMap::iterator it = g_threads.find(threadId);
00448 if (it != g_threads.end())
00449 {
00450 chdl = it->second.handle;
00451 }
00452 return chdl;
00453 }
00454
00456 void
00457 setThreadPointer(DWORD threadId, Thread* pTheThread)
00458 {
00459 MutexLock ml(g_threadsGuard);
00460 Win32ThreadMap::iterator it = g_threads.find(threadId);
00461 if (it != g_threads.end())
00462 {
00463 it->second.pTheThread = pTheThread;
00464 }
00465 }
00466
00468 HANDLE
00469 removeThreadFromMap(DWORD threadId)
00470 {
00471 MutexLock ml(g_threadsGuard);
00472 HANDLE chdl = 0;
00473 Win32ThreadMap::iterator it = g_threads.find(threadId);
00474 if (it != g_threads.end())
00475 {
00476 chdl = it->second.handle;
00477 g_threads.erase(it);
00478 }
00479 return chdl;
00480 }
00481
00483 Thread*
00484 getThreadObject(DWORD threadId)
00485 {
00486 Thread* pTheThread = 0;
00487 MutexLock ml(g_threadsGuard);
00488 Win32ThreadMap::iterator it = g_threads.find(threadId);
00489 if (it != g_threads.end())
00490 {
00491 pTheThread = it->second.pTheThread;
00492 }
00493 return pTheThread;
00494 }
00495
00496 }
00497
00499
00500 int
00501 createThread(Thread_t& handle, ThreadFunction func,
00502 void* funcParm, UInt32 threadFlags)
00503 {
00504 int cc = -1;
00505 HANDLE hThread;
00506 unsigned threadId;
00507
00508 LocalThreadParm* parg = new LocalThreadParm;
00509 parg->m_func = func;
00510 parg->m_funcParm = funcParm;
00511 hThread = reinterpret_cast<HANDLE>(::_beginthreadex(NULL, 0, threadStarter,
00512 parg, 0, &threadId));
00513 if (hThread != 0)
00514 {
00515 addThreadToMap(threadId, hThread);
00516 handle = threadId;
00517 cc = 0;
00518 }
00519
00520 return cc;
00521 }
00523
00524 void
00525 exitThread(Thread_t&, Int32 rval)
00526 {
00527 ::_endthreadex(static_cast<unsigned>(rval));
00528 }
00529
00531
00532 UInt64 thread_t_ToUInt64(Thread_t thr)
00533 {
00534
00535 OW_ASSERTMSG(sizeof(unsigned long) >= sizeof(Thread_t)," Thread_t truncated!");
00536 return static_cast<UInt64>(thr);
00537 }
00538
00540
00541 void
00542 destroyThread(Thread_t& threadId)
00543 {
00544 HANDLE thdl = removeThreadFromMap(threadId);
00545 if (thdl != 0)
00546 {
00547 ::CloseHandle(thdl);
00548 }
00549 }
00551
00552 int
00553 setThreadDetached(Thread_t& handle)
00554 {
00555
00556 return 0;
00557 }
00559
00560 int
00561 joinThread(Thread_t& threadId, Int32& rvalArg)
00562 {
00563 int cc = -1;
00564 DWORD rval;
00565 HANDLE thdl = getThreadHandle(threadId);
00566 if (thdl != 0)
00567 {
00568 if (::WaitForSingleObject(thdl, INFINITE) != WAIT_FAILED)
00569 {
00570 if (::GetExitCodeThread(thdl, &rval) != 0)
00571 {
00572 rvalArg = static_cast<Int32>(rval);
00573 cc = 0;
00574 }
00575 }
00576 }
00577 return cc;
00578 }
00579
00581 void
00582 testCancel()
00583 {
00584 DWORD threadId = ThreadImpl::currentThread();
00585 Thread* pTheThread = getThreadObject(threadId);
00586 if (pTheThread)
00587 {
00588 NonRecursiveMutexLock l(pTheThread->m_cancelLock);
00589 if (pTheThread->m_cancelRequested)
00590 {
00591
00592
00593
00594
00595
00596 throw ThreadCancelledException();
00597 }
00598 }
00599 }
00601 void saveThreadInTLS(void* pThreadArg)
00602 {
00603 Thread* pThread = static_cast<Thread*>(pThreadArg);
00604 DWORD threadId = pThread->getId();
00605 setThreadPointer(threadId, pThread);
00606 }
00608 void sendSignalToThread(Thread_t threadID, int signo)
00609 {
00610 }
00612 void cancel(Thread_t threadId)
00613 {
00614 HANDLE thdl = getThreadHandle(threadId);
00615 if (thdl != 0)
00616 {
00617 ::TerminateThread(thdl, -1);
00618 }
00619 }
00620
00621 #endif // #ifdef OW_WIN32
00622 }
00623
00624 }
00625