RWMutex:共享/专有的递归互斥锁
具有共享/独占访问权限,且具有升级/降级功能的互斥锁
介绍
我的目标是创建可以充当读/写锁定机制的对象。任何线程都可以锁定它以进行读取,但是只有一个线程可以锁定它以进行写入。在写入线程释放它之前,所有其他线程都将等待。在释放任何其他线程之前,写线程不会获取互斥体。
我可以使用Slim Reader / Writer锁,但是:
- 它们不是递归的,例如,AcquireSRWLockExclusive()如果同一线程较早调用同一函数,则对的调用将阻塞。
- 它们不可升级,例如,已将锁锁定为读取访问权限的线程无法将其锁定为写入操作。
- 它们不是可复制的句柄。
我可以尝试C ++ 14,shared_lock但是我仍然需要C ++ 11支持。此外,我还不确定它是否可以真正满足我的要求。
因此,我不得不手动实现它。由于缺少,删除了普通的C ++ 11方法WaitForMultipleObjects (nyi)。现在具有升级/降级功能。
RWMUTEX
这一节很简单。
class RWMUTEX { private: HANDLE hChangeMap; std::map<DWORD, HANDLE> Threads; RWMUTEX(const RWMUTEX&) = delete; RWMUTEX(RWMUTEX&&) = delete;
我需要std::map<DWORD,HANDLE>为所有尝试访问共享资源的线程存储一个句柄,并且还需要一个互斥锁以确保对此映射的所有更改都是线程安全的。
构造函数
RWMUTEX(const RWMUTEX&) = delete; void operator =(const RWMUTEX&) = delete; RWMUTEX() { hChangeMapWrite = CreateMutex(0,0,0); }
简单地创建一个映射互斥的句柄。对象不可复制。
创建
HANDLE CreateIf(bool KeepReaderLocked = false) { WaitForSingleObject(hChangeMap, INFINITE); DWORD id = GetCurrentThreadId(); if (Threads[id] == 0) { HANDLE e0 = CreateMutex(0, 0, 0); Threads[id] = e0; } HANDLE e = Threads[id]; if (!KeepReaderLocked) ReleaseMutex(hChangeMap); return e; }
当调用LockRead()或LockWrite()来锁定对象时,将调用这个私有函数。如果当前线程还没有将自己变为可能访问这个互斥锁的线程中,这个函数将为该线程创建一个互斥锁。如果其他线程已经锁定这个互斥对象进行写访问,那么这个函数就会阻塞,直到写线程释放这个对象为止。这个函数返回当前线程的互斥句柄。
锁定读取/释放读取
HANDLE LockRead() { auto f = CreateIf(); WaitForSingleObject(f,INFINITE); return f; } void ReleaseRead(HANDLE f) { ReleaseMutex(f); }
当您要锁定对象以进行读取访问并稍后释放它时,将调用这些函数。
锁/释放
void LockWrite() { CreateIf(true); // Wait for all vector<HANDLE> AllThreads; AllThreads.reserve(Threads.size()); for (auto& a : Threads) { AllThreads.push_back(a.second); } WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, INFINITE); // Reader is locked } void ReleaseWrite() { // Release All for (auto& a : Threads) ReleaseMutex(a.second); ReleaseMutex(hChangeMap); }
当您希望锁定对象以进行写访问并在稍后释放它时,将调用这些函数。函数的作用是:
1.在锁期间没有注册新线程
2.任何读取线程都释放了锁
析构函数
RWMUTEX() { CloseHandle(hChangeMap); hChangeMap = 0; for (auto& a : Threads) CloseHandle(a.second); Threads.clear(); }
析构函数确保清除所有句柄。
可升级/可升级锁
有时,您希望将读锁升级为写锁,而不先解锁,以提高效率。因此,LockWrite被修改为:
void LockWrite(DWORD updThread = 0) { CreateIf(true); // Wait for all AllThreads.reserve(Threads.size()); AllThreads.clear(); for (auto& a : Threads) { if (updThread == a.first) // except ourself if in upgrade operation continue; AllThreads.push_back(a.second); } auto tim = WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, wi); if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"LockWrite debug timeout!"); // We don't want to keep threads, the hChangeMap is enough // We also release the handle to the upgraded thread, if any for (auto& a : Threads) ReleaseMutex(a.second); // Reader is locked } void Upgrade() { LockWrite(GetCurrentThreadId()); } HANDLE Downgrade() { DWORD id = GetCurrentThreadId(); auto z = Threads[id]; auto tim = WaitForSingleObject(z, wi); if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"Downgrade debug timeout!"); ReleaseMutex(hChangeMap); return z; }
调用Upgrade()现在的结果是:
更改被锁定的映射
等待除我们自己的线程之外的所有读线程退出
然后我们释放我们自己的线程互斥锁,因为更改锁定的映射就足够了。
调用Downgrade()结果:
- 直接从映射上获取手柄,无需重新锁定
- 锁定此句柄,就像我们处于读取模式一样
- 发布变更映射
因此,整个代码是(带有一些调试帮助):
// RWMUTEX class RWMUTEX { private: HANDLE hChangeMap = 0; std::map<DWORD, HANDLE> Threads; DWORD wi = INFINITE; RWMUTEX(const RWMUTEX&) = delete; RWMUTEX(RWMUTEX&&) = delete; operator=(const RWMUTEX&) = delete; public: RWMUTEX(bool D = false) { if (D) wi = 10000; else wi = INFINITE; hChangeMap = CreateMutex(0, 0, 0); } ~RWMUTEX() { CloseHandle(hChangeMap); hChangeMap = 0; for (auto& a : Threads) CloseHandle(a.second); Threads.clear(); } HANDLE CreateIf(bool KeepReaderLocked = false) { auto tim = WaitForSingleObject(hChangeMap, INFINITE); if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"LockRead debug timeout!"); DWORD id = GetCurrentThreadId(); if (Threads[id] == 0) { HANDLE e0 = CreateMutex(0, 0, 0); Threads[id] = e0; } HANDLE e = Threads[id]; if (!KeepReaderLocked) ReleaseMutex(hChangeMap); return e; } HANDLE LockRead() { auto z = CreateIf(); auto tim = WaitForSingleObject(z, wi); if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"LockRead debug timeout!"); return z; } void LockWrite(DWORD updThread = 0) { CreateIf(true); // Wait for all AllThreads.reserve(Threads.size()); AllThreads.clear(); for (auto& a : Threads) { if (updThread == a.first) // except ourself if in upgrade operation continue; AllThreads.push_back(a.second); } auto tim = WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, wi); if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"LockWrite debug timeout!"); // We don't want to keep threads, the hChangeMap is enough // We also release the handle to the upgraded thread, if any for (auto& a : Threads) ReleaseMutex(a.second); // Reader is locked } void ReleaseWrite() { ReleaseMutex(hChangeMap); } void ReleaseRead(HANDLE f) { ReleaseMutex(f); } void Upgrade() { LockWrite(GetCurrentThreadId()); } HANDLE Downgrade() { DWORD id = GetCurrentThreadId(); auto z = Threads[id]; auto tim = WaitForSingleObject(z, wi); if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"Downgrade debug timeout!"); ReleaseMutex(hChangeMap); return z; } };
要使用RWMUTEX,可以简单地创建锁定类:
class RWMUTEXLOCKREAD { private: RWMUTEX* mm = 0; public: RWMUTEXLOCKREAD(const RWMUTEXLOCKREAD&) = delete; void operator =(const RWMUTEXLOCKREAD&) = delete; RWMUTEXLOCKREAD(RWMUTEX*m) { if (m) { mm = m; mm->LockRead(); } } ~RWMUTEXLOCKREAD() { if (mm) { mm->ReleaseRead(); mm = 0; } } }; class RWMUTEXLOCKWRITE { private: RWMUTEX* mm = 0; public: RWMUTEXLOCKWRITE(RWMUTEX*m) { if (m) { mm = m; mm->LockWrite(); } } ~RWMUTEXLOCKWRITE() { if (mm) { mm->ReleaseWrite(); mm = 0; } } };
还有一个用于升级机制的新类:
class RWMUTEXLOCKREADWRITE { private: RWMUTEX* mm = 0; HANDLE lm = 0; bool U = false; public: RWMUTEXLOCKREADWRITE(const RWMUTEXLOCKREADWRITE&) = delete; void operator =(const RWMUTEXLOCKREADWRITE&) = delete; RWMUTEXLOCKREADWRITE(RWMUTEX*m) { if (m) { mm = m; lm = mm->LockRead(); } } void Upgrade() { if (mm && !U) { mm->Upgrade(); lm = 0; U = 1; } } void Downgrade() { if (mm && U) { lm = mm->Downgrade(); U = 0; } } ~RWMUTEXLOCKREADWRITE() { if (mm) { if (U) mm->ReleaseWrite(); else mm->ReleaseRead(lm); lm = 0; mm = 0; } } };
用法示例:
RWMUTEX m; // ... other code void foo1() { RWMUTEXLOCKREAD lock(&m); } void foo2() { RWMUTEXLOCKWRITE lock(&m); }