Published by orzz.org(). (https://orzz.org/%e5%a4%9a%e7%ba%bf%e7%a8%8b%e6%93%8d%e4%bd%9c%e5%b0%8f%e7%bb%93/)
线程操作比较多也比较繁杂,线程创建之后就一直是一个活动的对象,管理起来也比较麻烦(尤其是在多线程同步的时候).
下面简单总结一下多线程中的各种操作和概念.
首先是如何创建多线程.线程需要一个入口,所以需要一个静态的函数指针作为线程入口.Win32下的入口函数定义如下:
1 2 3 4 |
typedef DWORD (WINAPI *PTHREAD_START_ROUTINE)( LPVOID lpThreadParameter ); typedef PTHREAD_START_ROUTINE LPTHREAD_START_ROUTINE; |
然后再调用CreateThread就可以创建线程:
1 2 |
HANDLE hThread = ::CreateThread(NULL, 0, lpStartAddr, lpParam, dwFlag, lpIDThread); ::CloseHandle(hThread); // 不需要的时候记得关闭句柄 |
就算关闭了线程句柄线程本身也不会停止,只是外部没办法通过句柄操作线程.拿到线程句柄之后的一些操作:
1 2 3 |
::SuspendThread(hThread); // 挂起线程 ::ResumeThread(hThread); // 继续线程 ::TerminateThread(hThread, dwExitCode); // 强制结束线程 |
在线程函数里的一些常用操作:
1 2 3 |
HANDLE hThread = ::GetCurrentThread(); // 获得当前线程句柄 DWORD dwThread = ::GetCurrentThreadId(); // 获得当前线程ID ::ExitThread(0); // 直接退出线程.其实一般在线程函数里用reture就可以了 |
对于Win32,可以创建UI线程.创建方法是在线程函数内跑一个消息循环,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
DWORD WINAPI ThreadProc(LPVOID) { MSG msg = {0}; // 代表一条消息 BOOL bRet == FALSE; // 从UI线程消息队列中取出一条消息 while( (bRet = ::GetMessage( &msg, NULL, 0, 0 )) != FALSE ) { if (bRet == -1) return 0; else { ::TranslateMessage(&msg); // 转换消息格式 ::DispatchMessage(&msg); // 分发消息给相应的窗体 } } return 0; } |
上面是一个带消息循环的UI线程入口函数定义.我们可以在前面注册一个窗口类,这样消息就会被路由到窗口的回调函数中;也可以在消息循环中写回调转发消息.之后可以在线程外部通过API给线程里的窗口或者消息回调发送消息:
1 2 3 4 |
::PostThreadMessage(idThread, WM_QUIT, 0, 0); // 线程退出 ::PostQuitMessage(0); // 线程对自身发退出消息 ::PostMessage(hWnd, WM_CLOSE, 0, 0); // 对窗口发关闭消息 ::SendMessage(hWnd, WM_CLOSE, 0, 0); |
然后是如何做线程同步.线程同步的部分比较复杂,因为往往需要针对具体情况做具体分析,不存在通用的算法.比较高级的一些线程同步技术,如lock-free,讲起来也比较麻烦.下面只简单的整理一下相关概念,并给出一些例子.
线程同步的基本操作是加锁.锁有很多种类型,互斥量(Mutex);递归锁(Recursive lock);读写锁(Read-Write lock);旋转锁(Spin Lock)...下面是个多线程锁模型的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
struct _LockExchangePolicy { typedef long lock_t; static void IntLock(lock_t& /*lc*/) {} static void DelLock(lock_t& /*lc*/) {} static void Lock(lock_t& lc) { while( ::InterlockedExchange(&lc, 1) ) #if (_WIN32_WINNT >= 0x0400) && !defined(_WIN32_WCE) SwitchToThread(); #else Sleep(5); #endif/*(_WIN32_WINNT >= 0x0400)*/ } static void Unlock(lock_t& lc) { ::InterlockedExchange(&lc, 0); } }; struct _LockCriticalPolicy { typedef CRITICAL_SECTION lock_t; static void IntLock(lock_t& lc) { InitializeCriticalSection(&lc); } static void DelLock(lock_t& lc) { DeleteCriticalSection(&lc); } static void Lock(lock_t& lc) { EnterCriticalSection(&lc); } static void Unlock(lock_t& lc) { LeaveCriticalSection(&lc); } }; |
_LockExchangePolicy是旋转锁,_LockCriticalPolicy是Win32中的临界区,属于可重入的递归锁.使用起来也很简单,在多个线程之外定义对象:
1 |
_LockCriticalPolicy::lock_t lc; |
在需要加锁的时候,如某个数据对象需要被使用时调用Lock方法,使用完毕后调用Unlock:
1 2 3 4 5 |
// ... _LockCriticalPolicy::Lock(lc); // 使用某个数据对象 _LockCriticalPolicy::Unlock(lc); // ... |
读写锁定义比较复杂,貌似Win32中没有现成的单写多读锁可以用.示例实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
struct _LockSharedPolicy { // 可重入的共享锁 typedef _LockCriticalPolicy policy_t; typedef struct { CEvent rw_event; // 读写事件 long rc_w, rc_r; // 读写状态 DWORD id; // 当前线程id policy_t::lock_t rc_lock; // 状态锁 } lock_t; EXP_INLINE static void IntLock(lock_t& lc) { lc.rw_event.Create(true, true); lc.rc_r = lc.rc_w = 0; lc.id = 0; policy_t::IntLock(lc.rc_lock); } EXP_INLINE static void DelLock(lock_t& lc) { lc.rw_event.Close(); policy_t::DelLock(lc.rc_lock); } EXP_INLINE static void Lock(lock_t& lc, bool bRead) { policy_t::Lock(lc.rc_lock, false); if (bRead) { // 加读锁 if (lc.id == GetCurrentThreadId()) ++ lc.rc_r; else { while (lc.rc_w > 0) { // 正在被写 policy_t::Unlock(lc.rc_lock, false); lc.rw_event.Wait(); // 等待写结束 policy_t::Lock(lc.rc_lock, false); } // 正在被读或无访问 ++ lc.rc_r; lc.id = GetCurrentThreadId(); } } else { // 加写锁 if (lc.id == GetCurrentThreadId()) ++ lc.rc_w; else { while (lc.rc_w > 0 || lc.rc_r > 0) { // 正在被写或正在被读 policy_t::Unlock(lc.rc_lock, false); lc.rw_event.Wait(); // 等待访问结束 lc.rw_event.Reset(); // 复位事件 policy_t::Lock(lc.rc_lock, false); } // 无访问 ++ lc.rc_w; lc.id = GetCurrentThreadId(); } } policy_t::Unlock(lc.rc_lock, false); } EXP_INLINE static void Unlock(lock_t& lc, bool bRead) { policy_t::Lock(lc.rc_lock, false); if (bRead) { if (lc.rc_r == 0) { policy_t::Unlock(lc.rc_lock, false); return; } else -- lc.rc_r; } else { if (lc.rc_w == 0) { policy_t::Unlock(lc.rc_lock, false); return; } else -- lc.rc_w; } if (lc.id != GetCurrentThreadId()) lc.id = GetCurrentThreadId(); policy_t::Unlock(lc.rc_lock, false); // 通知等待结束 lc.rw_event.Set(); } }; |
使用上需要注意该读的时候调用读参数,该写的时候调用写参数.读的时候是不会加锁的,但是一旦有写操作,所有的其他读写操作都会被锁住.
实际使用中,这样的操作还是比较复杂,我们往往可以统一了上面的操作接口后,用自动化的方式帮助我们操作锁对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
template <typename PolicyT = EXP_THREAD_MODEL::_LockPolicy> class CLockT : INonCopyable { public: typedef PolicyT policy_t; typedef typename policy_t::lock_t lock_t; typedef CLockT mutex_t; private: lock_t m_lc; lock_t& m_Lock; public: CLockT() : m_Lock(m_lc) { PolicyT::IntLock(m_Lock); } CLockT(lock_t& lc) : m_Lock(lc) { PolicyT::IntLock(m_Lock); } ~CLockT() { PolicyT::DelLock(m_Lock); } EXP_INLINE void Lock(bool bRead = false) { PolicyT::Lock(m_Lock, bRead); } EXP_INLINE void Unlock(bool bRead = false) { PolicyT::Unlock(m_Lock, bRead); } }; typedef CLockT<> CMutex; typedef CLockT<EXP_THREAD_MODEL::_ExcPolicy> CExcMutex; typedef CLockT<EXP_THREAD_MODEL::_CriPolicy> CCriMutex; typedef CLockT<EXP_THREAD_MODEL::_ShrPolicy> CShrMutex; ////////////////////////////////////////////////////////////////// // 自动化的加锁/解锁类 template <typename MutexT = CMutex> class CLockerT : INonCopyable { public: typedef typename MutexT::mutex_t mutex_t; private: mutex_t& m_Lock; bool m_bRead; public: CLockerT(mutex_t& lock, bool bRead = false) : m_Lock(lock), m_bRead(bRead) { m_Lock.Lock(m_bRead); } ~CLockerT() { m_Lock.Unlock(m_bRead); } }; typedef CLockerT<> CLocker; ////////////////////////////////////////////////////////////////// #define ExLock(mutex, is_read, mutex_t) CLockerT<mutex_t> locker(mutex, is_read) #define ExLockThis(policy_t) static CLockT<policy_t>::lock_t lc; static CLockT<policy_t> mutex(lc); ExLock(mutex, false, CLockT<policy_t>) |
此时前面的锁抽象为基本的线程模型,由下面的锁对象负责调用.现在我们可以直接在需要加锁的位置这样写:
1 2 3 4 |
CMutex mutex; // ... ExLock(mutex, false, CMutex); // ... |
当过程结束时此锁会被自动释放.
然后就是事件对象,它不仅可以用来做类似锁的同步,还可以用来传递消息.基本实现如下:
首先定义同步对象接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
interface ISyncObject : INonCopyable { protected: HANDLE m_hSync; public: ISyncObject() : m_hSync(INVALID_HANDLE_VALUE) {} virtual ~ISyncObject() { Close(); } public: bool IsClosed() { return (!m_hSync || m_hSync == INVALID_HANDLE_VALUE); } bool Close() { bool ret = true; if(!IsClosed()) ret = CloseHandle(m_hSync); m_hSync = INVALID_HANDLE_VALUE; return ret; } DWORD Wait(DWORD dwWaitTime = INFINITE) { if (IsClosed()) return WAIT_FAILED; return WaitForSingleObject(m_hSync, dwWaitTime); } static DWORD Wait(HANDLE* Syncs, DWORD nCount, bool bWaitAll, DWORD dwWaitTime = INFINITE) { if (Syncs == NULL || nCount == 0) return WAIT_FAILED; return WaitForMultipleObjects(nCount, Syncs, bWaitAll, dwWaitTime); } static DWORD Wait(ISyncObject* Syncs, DWORD nCount, bool bWaitAll, DWORD dwWaitTime = INFINITE) { HANDLE syncs[MAXIMUM_WAIT_OBJECTS]; DWORD limit = min(MAXIMUM_WAIT_OBJECTS, nCount); for(DWORD i = 0; i < limit; ++i) syncs[i] = Syncs[i].GetHandle(); return Wait(syncs, limit, bWaitAll, dwWaitTime); } template <DWORD SizeT> static DWORD Wait(HANDLE (&Syncs)[SizeT], bool bWaitAll, DWORD dwWaitTime = INFINITE) { return Wait(Syncs, SizeT, bWaitAll, dwWaitTime); } template <DWORD SizeT> static DWORD Wait(ISyncObject (&Syncs)[SizeT], bool bWaitAll, DWORD dwWaitTime = INFINITE) { return Wait(Syncs, SizeT, bWaitAll, dwWaitTime); } HANDLE GetHandle() { return m_hSync; } operator HANDLE() { return GetHandle(); } }; |
然后定义事件对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
class CEvent : public ISyncObject { public: CEvent() : ISyncObject() {} CEvent(bool bManualReset, bool bInitialState = false) : ISyncObject() { Create(bManualReset, bInitialState); } ~CEvent() {} public: bool Create(bool bManualReset = false, bool bInitialState = false) { Close(); m_hSync = CreateEvent(0, bManualReset, bInitialState, NULL); if (m_hSync == NULL) m_hSync = INVALID_HANDLE_VALUE; if (m_hSync == INVALID_HANDLE_VALUE) return false; return true; } bool Set() { if (IsClosed()) return false; return SetEvent(m_hSync); } bool Reset() { if (IsClosed()) return false; return ResetEvent(m_hSync); } bool IsSetted() { return (Wait(0) != WAIT_TIMEOUT); } }; |
通过设置一个事件对象,可以同时让多个线程恢复运行,锁就只能恢复一个线程的运行.
最后是信号量,相当与一个线程计数器.基本实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
class CSemaphore : public ISyncObject { public: CSemaphore(LONG lInitialCount = 1, LONG lMaxCount = 1, LPCTSTR pstrName = NULL, LPSECURITY_ATTRIBUTES lpsaAttributes = NULL) : ISyncObject() { Create(lInitialCount, lMaxCount, pstrName, lpsaAttributes); } ~CSemaphore() {} public: bool Create(LONG lInitialCount = 1, LONG lMaxCount = 1, LPCTSTR pstrName = NULL, LPSECURITY_ATTRIBUTES lpsaAttributes = NULL) { Close(); m_hSync = CreateSemaphore(lpsaAttributes, lInitialCount, lMaxCount, pstrName); if (m_hSync == NULL) m_hSync = INVALID_HANDLE_VALUE; if (m_hSync == INVALID_HANDLE_VALUE) return false; return true; } bool Release(LONG lReleaseCount = 1, LPLONG lpPreviousCount = NULL) { if (IsClosed()) return false; return ReleaseSemaphore(m_hSync, lReleaseCount, lpPreviousCount); } }; |
当需要控制访问某个数据对象的线程个数时,通过对一个CSemaphore对象做Release()操作,此时在这个CSemaphore上Wait的线程就会收到通知.当进入的线程个数达到Release的数量时,此时CSemaphore将阻止新的线程继续访问.
不管上面哪种线程同步的方法,其实本质就是在需要暂停线程的时候把线程挂起,在需要恢复运行的时候放行线程.比较高级一点的线程同步方式就是不对线程做挂起操作,而又能够保证数据的一致性,此时就需要使用无锁编程的相关技术,比如lock-free.
lock-free的话题在网络上已经讨论得比较深入了,其实现方式主要是依赖CAS原子操作:
1 2 3 4 |
EXP_INLINE static bool CAS(volatile LONG* lpTarget, LONG lComperand, LONG lValue) { return (lComperand == ::InterlockedCompareExchange(lpTarget, lValue, lComperand)); } |
对对象做循环检测,并在能够交换对象的时候,将改写之后的数据替换掉原来的旧数据.比如下面的一个多线程计数器(Counter)实现(参考:http://www.cnblogs.com/lucifer1982/archive/2009/04/08/1431992.html):
1 2 3 4 5 6 7 8 9 10 11 12 13 |
static volatile int counter = 0; static void CASCounter() { for (int i = 0; i < Count; i++) { int oldValue; do { oldValue = counter; } while (!CAS(counter, oldValue, oldValue + 1)); } } |
上面是一个简单的示例,往往在实际实现中需要配合引用计数指针或者GC来完成缓存数据部分的内存清理工作.
实际上各种不同的无锁编程方式,本质上就是依赖缓存的机制,对新的数据做缓存,并在老数据访问完毕的某个时间上替换数据访问点.由于问题本身的复杂性,无锁编程很难有统一的编程模型,往往只有针对具体需求做具体的编程工作才能得到比较优良的解.