OW_ThreadImpl.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2001-2004 Vintela, Inc. All rights reserved.
00003 *
00004 * Redistribution and use in source and binary forms, with or without
00005 * modification, are permitted provided that the following conditions are met:
00006 *
00007 *  - Redistributions of source code must retain the above copyright notice,
00008 *    this list of conditions and the following disclaimer.
00009 *
00010 *  - Redistributions in binary form must reproduce the above copyright notice,
00011 *    this list of conditions and the following disclaimer in the documentation
00012 *    and/or other materials provided with the distribution.
00013 *
00014 *  - Neither the name of Vintela, Inc. nor the names of its
00015 *    contributors may be used to endorse or promote products derived from this
00016 *    software without specific prior written permission.
00017 *
00018 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00019 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc. OR THE CONTRIBUTORS
00022 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 * POSSIBILITY OF SUCH DAMAGE.
00029 *******************************************************************************/
00030 
00035 #include "OW_config.h"
00036 #include "OW_ThreadImpl.hpp"
00037 #include "OW_Mutex.hpp"
00038 #include "OW_Assertion.hpp"
00039 #include "OW_Thread.hpp"
00040 #include "OW_NonRecursiveMutexLock.hpp"
00041 #include "OW_Format.hpp"
00042 #if defined(OW_WIN32)
00043 #include "OW_Map.hpp"
00044 #include "OW_MutexLock.hpp"
00045 #endif
00046 #include <cassert>
00047 #include <cstring>
00048 #include <cstddef>
00049 
00050 extern "C"
00051 {
00052 #ifdef OW_HAVE_SYS_TIME_H
00053 #include <sys/time.h>
00054 #endif
00055 
00056 #include <sys/types.h>
00057 
00058 #ifdef OW_HAVE_UNISTD_H
00059 #include <unistd.h>
00060 #endif
00061 
00062 #include <errno.h>
00063 #include <signal.h>
00064 
00065 #ifdef OW_USE_PTHREAD
00066 #include <pthread.h>
00067 #endif
00068 
00069 #ifdef OW_WIN32
00070 #include <process.h>
00071 #endif
00072 }
00073 
00074 namespace OW_NAMESPACE
00075 {
00076 
00077 namespace ThreadImpl {
00078 
00080 // STATIC
00081 void
00082 sleep(UInt32 milliSeconds)
00083 {
00084    ThreadImpl::testCancel();
00085 #if defined(OW_HAVE_NANOSLEEP)
00086    struct timespec wait;
00087    wait.tv_sec = milliSeconds / 1000;
00088    wait.tv_nsec = (milliSeconds % 1000) * 1000000;
00089    while (nanosleep(&wait, &wait) == -1 && errno == EINTR)
00090    {
00091       ThreadImpl::testCancel();
00092    }
00093 #elif OW_WIN32
00094    Sleep(milliSeconds);
00095 #else
00096    timeval now, end;
00097    unsigned long microSeconds = milliSeconds * 1000;
00098    const UInt32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
00099    gettimeofday(&now, NULL);
00100    end = now;
00101    end.tv_sec  += microSeconds / 1000000;
00102    end.tv_usec += microSeconds % 1000000;
00103    while ((now.tv_sec < end.tv_sec)
00104        || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec)))
00105    {
00106       timeval tv;
00107       tv.tv_sec = end.tv_sec - now.tv_sec;
00108       if (end.tv_usec >= now.tv_usec)
00109       {
00110          tv.tv_usec = end.tv_usec - now.tv_usec;
00111       }
00112       else
00113       {
00114          tv.tv_sec--;
00115          tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00116       }
00117       if (tv.tv_sec > 0 || tv.tv_usec > loopMicroSeconds)
00118       {
00119          tv.tv_sec = 0;
00120          tv.tv_usec = loopMicroSeconds;
00121       }
00122       ThreadImpl::testCancel();
00123       select(0, NULL, NULL, NULL, &tv);
00124       gettimeofday(&now, NULL);
00125    }
00126 #endif
00127 }
00129 // STATIC
00130 void
00131 yield()
00132 {
00133 #if defined(OW_HAVE_SCHED_YIELD)
00134    sched_yield();
00135 #elif defined(OW_WIN32)
00136    ThreadImpl::testCancel();
00137    ::SwitchToThread();
00138 #else
00139    ThreadImpl::sleep(1);
00140 #endif
00141 }
00142 
00143 #if defined(OW_USE_PTHREAD)
00144 namespace {
00145 struct LocalThreadParm
00146 {
00147    ThreadFunction m_func;
00148    void* m_funcParm;
00149 };
00150 extern "C" {
00151 static void*
00152 threadStarter(void* arg)
00153 {
00154    // set our cancellation state to asynchronous, so we can actually be
00155    // killed if need be.
00156    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00157    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00158 
00159    // block all signals except SIGUSR1, which is used to signal termination
00160    sigset_t signalSet;
00161    int rv = sigfillset(&signalSet);
00162    OW_ASSERT(rv == 0);
00163    rv = sigdelset(&signalSet, SIGUSR1);
00164    OW_ASSERT(rv == 0);
00165    rv = pthread_sigmask(SIG_SETMASK, &signalSet, 0);
00166    OW_ASSERT(rv == 0);
00167 
00168    LocalThreadParm* parg = static_cast<LocalThreadParm*>(arg);
00169    ThreadFunction func = parg->m_func;
00170    void* funcParm = parg->m_funcParm;
00171    delete parg;
00172    Int32 rval = (*func)(funcParm);
00173    void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00174    pthread_exit(prval);
00175    return prval;
00176 }
00177 }
00178 // The purpose of this class is to retrieve the default stack size only once
00179 // at library load time and re-use it thereafter.
00180 struct default_stack_size
00181 {
00182    default_stack_size()
00183    {
00184       // if anything in this function fails, we'll just leave val == 0.
00185       val = 0;
00186       needsSetting = false;
00187 
00188 // make sure we have a big enough stack.  OpenWBEM can use quite a bit, so we'll try to make sure we get at least 1 MB.
00189 // 1 MB is just an arbitrary number.  The default on Linux is 2 MB which has never been a problem.  However, on UnixWare
00190 // the default is really low (16K IIRC) and that isn't enough. It would be good to do some sort of measurement...
00191 #ifdef _POSIX_THREAD_ATTR_STACKSIZE
00192       pthread_attr_t stack_size_attr;
00193       if (pthread_attr_init(&stack_size_attr) != 0)
00194       {
00195          return;
00196       }
00197       if (pthread_attr_getstacksize(&stack_size_attr, &val) != 0)
00198       {
00199          return;
00200       }
00201 
00202       if (val < 1048576) 
00203       {
00204          val = 1048576; // 1 MB
00205          needsSetting = true;
00206       }
00207 #ifdef PTHREAD_STACK_MIN
00208       if (PTHREAD_STACK_MIN > val) 
00209       {
00210          val = PTHREAD_STACK_MIN;
00211          needsSetting = true;
00212       }
00213 #endif
00214 #endif
00215    }
00216    static size_t val;
00217    static bool needsSetting;
00218 };
00219 size_t default_stack_size::val = 0;
00220 bool default_stack_size::needsSetting(false);
00221 default_stack_size g_theDefaultStackSize;
00223 pthread_once_t once_control = PTHREAD_ONCE_INIT;
00224 pthread_key_t theKey;
00225 extern "C" {
00227 static void initializeTheKey()
00228 {
00229    pthread_key_create(&theKey,NULL);
00230    // set SIGUSR1 to SIG_IGN so we can safely send it to threads when we want to cancel them.
00231    struct sigaction temp;
00232    memset(&temp, '\0', sizeof(temp));
00233    sigaction(SIGUSR1, 0, &temp);
00234    if (temp.sa_handler != SIG_IGN)
00235    {
00236       temp.sa_handler = SIG_IGN;
00237       sigemptyset(&temp.sa_mask);
00238       temp.sa_flags = 0;
00239       sigaction(SIGUSR1, &temp, NULL);
00240    }
00241 }
00242 } // end extern "C"
00243 } // end unnamed namespace
00245 // STATIC
00246 int
00247 createThread(Thread_t& handle, ThreadFunction func,
00248    void* funcParm, UInt32 threadFlags)
00249 {
00250    int cc = 0;
00251    pthread_attr_t attr;
00252    pthread_attr_init(&attr);
00253    if (!(threadFlags & OW_THREAD_FLG_JOINABLE))
00254    {
00255       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00256    }
00257 
00258 #if !defined(OW_VALGRIND_SUPPORT) // valgrind doesn't like us to set the stack size
00259    // Won't be set to true unless _POSIX_THREAD_ATTR_STACKSIZE is defined
00260    if (default_stack_size::needsSetting)
00261    {
00262       pthread_attr_setstacksize(&attr, default_stack_size::val);
00263    }
00264 #endif
00265 
00266    LocalThreadParm* parg = new LocalThreadParm;
00267    parg->m_func = func;
00268    parg->m_funcParm = funcParm;
00269    if (pthread_create(&handle, &attr, threadStarter, parg) != 0)
00270    {
00271       cc = -1;
00272    }
00273    pthread_attr_destroy(&attr);
00274    return cc;
00275 }
00277 // STATIC
00278 void
00279 exitThread(Thread_t&, Int32 rval)
00280 {
00281    void* prval = reinterpret_cast<void*>(static_cast<ptrdiff_t>(rval));
00282    pthread_exit(prval);
00283 }
00284 
00285 
00286 #if defined(OW_SIZEOF_PTHREAD_T)
00287 #if OW_SIZEOF_PTHREAD_T == 2
00288 #define OW_THREAD_CONVERTER UInt16
00289 #elif OW_SIZEOF_PTHREAD_T == 4
00290 #define OW_THREAD_CONVERTER UInt32
00291 #elif OW_SIZEOF_PTHREAD_T == 8
00292 #define OW_THREAD_CONVERTER UInt64
00293 #else
00294 #error Unexpected size for pthread_t
00295 #endif /* OW_SIZEOF_PTHREAD_T */
00296 #else
00297 #error No pthread_t size was found!
00298 #endif /* defined(OW_SIZEOF_PTHREAD_T) */
00299 
00300 UInt64 thread_t_ToUInt64(Thread_t thr)
00301 {
00302    return UInt64(OW_THREAD_CONVERTER(thr));
00303 }
00304 #undef OW_THREAD_CONVERTER
00305 
00307 // STATIC
00308 void
00309 destroyThread(Thread_t& )
00310 {
00311 }
00313 // STATIC
00314 int
00315 setThreadDetached(Thread_t& handle)
00316 {
00317    int cc = pthread_detach(handle);
00318    if (cc != 0)
00319    {
00320       if (cc != EINVAL)
00321       {
00322          cc = -1;
00323       }
00324    }
00325    return cc;
00326 }
00328 // STATIC
00329 int
00330 joinThread(Thread_t& handle, Int32& rval)
00331 {
00332    void* prval(0);
00333    if ((errno = pthread_join(handle, &prval)) == 0)
00334    {
00335       rval = static_cast<Int32>(reinterpret_cast<ptrdiff_t>(prval));
00336       return 0;
00337    }
00338    else
00339    {
00340       return 1;
00341    }
00342 }
00344 void
00345 testCancel()
00346 {
00347    // set up our TLS which will be used to store the Thread* in.
00348    pthread_once(&once_control, &initializeTheKey);
00349    Thread* theThread = reinterpret_cast<Thread*>(pthread_getspecific(theKey));
00350    if (theThread == 0)
00351    {
00352       return;
00353    }
00354    NonRecursiveMutexLock l(theThread->m_cancelLock);
00355    if (theThread->m_cancelRequested)
00356    {
00357       // We don't use OW_THROW here because 
00358       // ThreadCancelledException is special.  It's not derived
00359       // from Exception on purpose so it can be propagated up
00360       // the stack easier. This exception shouldn't be caught and not
00361       // re-thrown anywhere except in Thread::threadRunner()
00362       throw ThreadCancelledException();
00363    }
00364 }
00366 void saveThreadInTLS(void* pTheThread)
00367 {
00368    // set up our TLS which will be used to store the Thread* in.
00369    pthread_once(&once_control, &initializeTheKey);
00370    int rc;
00371    if ((rc = pthread_setspecific(theKey, pTheThread)) != 0)
00372    {
00373       OW_THROW(ThreadException, Format("pthread_setspecific failed.  error = %1(%2)", rc, strerror(rc)).c_str());
00374    }
00375 }
00377 void sendSignalToThread(Thread_t threadID, int signo)
00378 {
00379    int rc;
00380    if ((rc = pthread_kill(threadID, signo)) != 0)
00381    {
00382       OW_THROW(ThreadException, Format("pthread_kill failed.  error = %1(%2)", rc, strerror(rc)).c_str());
00383    }
00384 }
00386 void cancel(Thread_t threadID)
00387 {
00388    int rc;
00389    if ((rc = pthread_cancel(threadID)) != 0)
00390    {
00391       OW_THROW(ThreadException, Format("pthread_cancel failed.  error = %1(%2)", rc, strerror(rc)).c_str());
00392    }
00393 }
00394 #endif // #ifdef OW_USE_PTHREAD
00395 
00396 #if defined(OW_WIN32)
00397 
00398 namespace {
00399 
00400 struct WThreadInfo
00401 {
00402    HANDLE   handle;
00403    OW_NAMESPACE::Thread* pTheThread;
00404 };
00405 
00406 typedef Map<DWORD, WThreadInfo> Win32ThreadMap;
00407 Win32ThreadMap g_threads;
00408 Mutex g_threadsGuard;
00409 
00410 struct LocalThreadParm
00411 {
00412    ThreadFunction m_func;
00413    void* m_funcParm;
00414 };
00415 
00417 extern "C" {
00418 unsigned __stdcall threadStarter(void* arg)
00419 {
00420    LocalThreadParm* parg = reinterpret_cast<LocalThreadParm*>(arg);
00421    ThreadFunction func = parg->m_func;
00422    void* funcParm = parg->m_funcParm;
00423    delete parg;
00424    Int32 rval = (*func)(funcParm);
00425    ::_endthreadex(static_cast<unsigned>(rval));
00426    return rval;
00427 }
00428 }  // End extern "C"
00429 
00431 void
00432 addThreadToMap(DWORD threadId, HANDLE threadHandle)
00433 {
00434    MutexLock ml(g_threadsGuard);
00435    WThreadInfo wi;
00436    wi.handle = threadHandle;
00437    wi.pTheThread = 0;
00438    g_threads[threadId] = wi;
00439 }
00440 
00442 HANDLE
00443 getThreadHandle(DWORD threadId)
00444 {
00445    MutexLock ml(g_threadsGuard);
00446    HANDLE chdl = 0;
00447    Win32ThreadMap::iterator it = g_threads.find(threadId);
00448    if (it != g_threads.end())
00449    {
00450       chdl = it->second.handle;
00451    }
00452    return chdl;
00453 }
00454 
00456 void
00457 setThreadPointer(DWORD threadId, Thread* pTheThread)
00458 {
00459    MutexLock ml(g_threadsGuard);
00460    Win32ThreadMap::iterator it = g_threads.find(threadId);
00461    if (it != g_threads.end())
00462    {
00463       it->second.pTheThread = pTheThread;
00464    }
00465 }
00466 
00468 HANDLE
00469 removeThreadFromMap(DWORD threadId)
00470 {
00471    MutexLock ml(g_threadsGuard);
00472    HANDLE chdl = 0;
00473    Win32ThreadMap::iterator it = g_threads.find(threadId);
00474    if (it != g_threads.end())
00475    {
00476       chdl = it->second.handle;
00477       g_threads.erase(it);
00478    }
00479    return chdl;
00480 }
00481 
00483 Thread*
00484 getThreadObject(DWORD threadId)
00485 {
00486    Thread* pTheThread = 0;
00487    MutexLock ml(g_threadsGuard);
00488    Win32ThreadMap::iterator it = g_threads.find(threadId);
00489    if (it != g_threads.end())
00490    {
00491       pTheThread = it->second.pTheThread;
00492    }
00493    return pTheThread;
00494 }
00495 
00496 }  // End unnamed namespace
00497 
00499 // STATIC
00500 int
00501 createThread(Thread_t& handle, ThreadFunction func,
00502    void* funcParm, UInt32 threadFlags)
00503 {
00504    int cc = -1;
00505    HANDLE hThread;
00506    unsigned threadId;
00507 
00508    LocalThreadParm* parg = new LocalThreadParm;
00509    parg->m_func = func;
00510    parg->m_funcParm = funcParm;
00511    hThread = reinterpret_cast<HANDLE>(::_beginthreadex(NULL, 0, threadStarter,
00512       parg, 0, &threadId));
00513    if (hThread != 0)
00514    {
00515       addThreadToMap(threadId, hThread);
00516       handle = threadId;
00517       cc = 0;
00518    }
00519 
00520    return cc;
00521 }
00523 // STATIC
00524 void
00525 exitThread(Thread_t&, Int32 rval)
00526 {
00527    ::_endthreadex(static_cast<unsigned>(rval));
00528 }
00529 
00531 // STATIC
00532 UInt64 thread_t_ToUInt64(Thread_t thr)
00533 {
00534    //  This should really be a compile time assert.
00535    OW_ASSERTMSG(sizeof(unsigned long) >= sizeof(Thread_t),"  Thread_t truncated!");
00536    return static_cast<UInt64>(thr);
00537 }
00538 
00540 // STATIC
00541 void
00542 destroyThread(Thread_t& threadId)
00543 {
00544    HANDLE thdl = removeThreadFromMap(threadId);
00545    if (thdl != 0)
00546    {
00547       ::CloseHandle(thdl);
00548    }
00549 }
00551 // STATIC
00552 int
00553 setThreadDetached(Thread_t& handle)
00554 {
00555    // No need for this on Win32
00556    return 0;
00557 }
00559 // STATIC
00560 int
00561 joinThread(Thread_t& threadId, Int32& rvalArg)
00562 {
00563    int cc = -1;
00564    DWORD rval;
00565    HANDLE thdl = getThreadHandle(threadId);
00566    if (thdl != 0)
00567    {
00568       if (::WaitForSingleObject(thdl, INFINITE) != WAIT_FAILED)
00569       {
00570          if (::GetExitCodeThread(thdl, &rval) != 0)
00571          {
00572             rvalArg = static_cast<Int32>(rval);
00573             cc = 0;
00574          }
00575       }
00576    }
00577    return cc;
00578 }
00579 
00581 void
00582 testCancel()
00583 {
00584    DWORD threadId = ThreadImpl::currentThread();
00585    Thread* pTheThread = getThreadObject(threadId);
00586    if (pTheThread)
00587    {
00588       NonRecursiveMutexLock l(pTheThread->m_cancelLock);
00589       if (pTheThread->m_cancelRequested)
00590       {
00591          // We don't use OW_THROW here because 
00592          // ThreadCancelledException is special.  It's not derived
00593          // from Exception on purpose so it can be propagated up
00594          // the stack easier. This exception shouldn't be caught and not
00595          // re-thrown anywhere except in Thread::threadRunner()
00596          throw ThreadCancelledException();
00597       }
00598    }
00599 }
00601 void saveThreadInTLS(void* pThreadArg)
00602 {
00603    Thread* pThread = static_cast<Thread*>(pThreadArg);
00604    DWORD threadId = pThread->getId();
00605     setThreadPointer(threadId, pThread);
00606 }
00608 void sendSignalToThread(Thread_t threadID, int signo)
00609 {
00610 }
00612 void cancel(Thread_t threadId)
00613 {
00614    HANDLE thdl = getThreadHandle(threadId);
00615    if (thdl != 0)
00616    {
00617       ::TerminateThread(thdl, -1);
00618    }
00619 }
00620 
00621 #endif // #ifdef OW_WIN32
00622 } // end namespace OW_ThreadImpl
00623 
00624 } // end namespace OW_NAMESPACE
00625 

Generated on Thu Feb 9 08:48:17 2006 for openwbem by  doxygen 1.4.6