RWMutex:共享/专有的递归互斥锁

具有共享/独占访问权限,且具有升级/降级功能的互斥锁


介绍

我的目标是创建可以充当读/写锁定机制的对象。任何线程都可以锁定它以进行读取,但是只有一个线程可以锁定它以进行写入。在写入线程释放它之前,所有其他线程都将等待。在释放任何其他线程之前,写线程不会获取互斥体。


我可以使用Slim Reader / Writer锁,但是:

  • 它们不是递归的,例如,AcquireSRWLockExclusive()如果同一线程较早调用同一函数,则对的调用将阻塞。
  • 它们不可升级,例如,已将锁锁定为读取访问权限的线程无法将其锁定为写入操作。
  • 它们不是可复制的句柄。

我可以尝试C ++ 14,shared_lock但是我仍然需要C ++ 11支持。此外,我还不确定它是否可以真正满足我的要求。

因此,我不得不手动实现它。由于缺少,删除了普通的C ++ 11方法WaitForMultipleObjects (nyi)。现在具有升级/降级功能。


RWMUTEX

这一节很简单。

class RWMUTEX { private: HANDLE hChangeMap; std::map<DWORD, HANDLE> Threads; RWMUTEX(const RWMUTEX&) = delete; RWMUTEX(RWMUTEX&&) = delete; 

我需要std::map<DWORD,HANDLE>为所有尝试访问共享资源的线程存储一个句柄,并且还需要一个互斥锁以确保对此映射的所有更改都是线程安全的。

构造函数

RWMUTEX(const RWMUTEX&) = delete; void operator =(const RWMUTEX&) = delete; RWMUTEX() { hChangeMapWrite = CreateMutex(0,0,0); } 

简单地创建一个映射互斥的句柄。对象不可复制。

创建

HANDLE CreateIf(bool KeepReaderLocked = false) { WaitForSingleObject(hChangeMap, INFINITE);  DWORD id = GetCurrentThreadId(); if (Threads[id] == 0) { HANDLE e0 = CreateMutex(0, 0, 0); Threads[id] = e0; } HANDLE e = Threads[id]; if (!KeepReaderLocked) ReleaseMutex(hChangeMap); return e; } 

当调用LockRead()或LockWrite()来锁定对象时,将调用这个私有函数。如果当前线程还没有将自己变为可能访问这个互斥锁的线程中,这个函数将为该线程创建一个互斥锁。如果其他线程已经锁定这个互斥对象进行写访问,那么这个函数就会阻塞,直到写线程释放这个对象为止。这个函数返回当前线程的互斥句柄。

锁定读取/释放读取

HANDLE LockRead() { auto f = CreateIf();  WaitForSingleObject(f,INFINITE); return f; } void ReleaseRead(HANDLE f) { ReleaseMutex(f); } 

当您要锁定对象以进行读取访问并稍后释放它时,将调用这些函数。

锁/释放

void LockWrite() { CreateIf(true); // Wait for all  vector<HANDLE> AllThreads; AllThreads.reserve(Threads.size()); for (auto& a : Threads) { AllThreads.push_back(a.second); } WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, INFINITE);  // Reader is locked  } void ReleaseWrite() { // Release All  for (auto& a : Threads) ReleaseMutex(a.second); ReleaseMutex(hChangeMap); } 

当您希望锁定对象以进行写访问并在稍后释放它时,将调用这些函数。函数的作用是:

1.在锁期间没有注册新线程

2.任何读取线程都释放了锁


析构函数

RWMUTEX() { CloseHandle(hChangeMap);  hChangeMap = 0; for (auto& a : Threads) CloseHandle(a.second); Threads.clear(); } 

析构函数确保清除所有句柄。

可升级/可升级锁

有时,您希望将读锁升级为写锁,而不先解锁,以提高效率。因此,LockWrite被修改为:

void LockWrite(DWORD updThread = 0) { CreateIf(true);  // Wait for all  AllThreads.reserve(Threads.size());  AllThreads.clear(); for (auto& a : Threads) { if (updThread == a.first) // except ourself if in upgrade operation  continue; AllThreads.push_back(a.second); } auto tim = WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, wi);  if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"LockWrite debug timeout!"); // We don't want to keep threads, the hChangeMap is enough  // We also release the handle to the upgraded thread, if any  for (auto& a : Threads) ReleaseMutex(a.second); // Reader is locked } void Upgrade() { LockWrite(GetCurrentThreadId()); }  HANDLE Downgrade() { DWORD id = GetCurrentThreadId(); auto z = Threads[id]; auto tim = WaitForSingleObject(z, wi); if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"Downgrade debug timeout!"); ReleaseMutex(hChangeMap); return z; } 

调用Upgrade()现在的结果是:

更改被锁定的映射

等待除我们自己的线程之外的所有读线程退出

然后我们释放我们自己的线程互斥锁,因为更改锁定的映射就足够了。

调用Downgrade()结果:

  • 直接从映射上获取手柄,无需重新锁定
  • 锁定此句柄,就像我们处于读取模式一样
  • 发布变更映射

因此,整个代码是(带有一些调试帮助):

// RWMUTEX  class RWMUTEX { private: HANDLE hChangeMap = 0; std::map<DWORD, HANDLE> Threads; DWORD wi = INFINITE; RWMUTEX(const RWMUTEX&) = delete; RWMUTEX(RWMUTEX&&) = delete;  operator=(const RWMUTEX&) = delete; public: RWMUTEX(bool D = false) { if (D) wi = 10000; else wi = INFINITE; hChangeMap = CreateMutex(0, 0, 0); } ~RWMUTEX() { CloseHandle(hChangeMap);  hChangeMap = 0; for (auto& a : Threads) CloseHandle(a.second); Threads.clear(); } HANDLE CreateIf(bool KeepReaderLocked = false) { auto tim = WaitForSingleObject(hChangeMap, INFINITE); if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"LockRead debug timeout!"); DWORD id = GetCurrentThreadId(); if (Threads[id] == 0) { HANDLE e0 = CreateMutex(0, 0, 0); Threads[id] = e0; } HANDLE e = Threads[id];  if (!KeepReaderLocked) ReleaseMutex(hChangeMap); return e; } HANDLE LockRead() { auto z = CreateIf(); auto tim = WaitForSingleObject(z, wi);  if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"LockRead debug timeout!"); return z; } void LockWrite(DWORD updThread = 0) { CreateIf(true); // Wait for all  AllThreads.reserve(Threads.size()); AllThreads.clear();  for (auto& a : Threads) { if (updThread == a.first) // except ourself if in upgrade operation  continue; AllThreads.push_back(a.second); } auto tim = WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, wi); if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"LockWrite debug timeout!"); // We don't want to keep threads, the hChangeMap is enough  // We also release the handle to the upgraded thread, if any  for (auto& a : Threads) ReleaseMutex(a.second); // Reader is locked  } void ReleaseWrite() { ReleaseMutex(hChangeMap); } void ReleaseRead(HANDLE f) { ReleaseMutex(f); } void Upgrade() { LockWrite(GetCurrentThreadId()); } HANDLE Downgrade() { DWORD id = GetCurrentThreadId(); auto z = Threads[id]; auto tim = WaitForSingleObject(z, wi); if (tim == WAIT_TIMEOUT && wi != INFINITE) OutputDebugString(L"Downgrade debug timeout!"); ReleaseMutex(hChangeMap); return z; } }; 

要使用RWMUTEX,可以简单地创建锁定类:

class RWMUTEXLOCKREAD { private: RWMUTEX* mm = 0; public: RWMUTEXLOCKREAD(const RWMUTEXLOCKREAD&) = delete;  void operator =(const RWMUTEXLOCKREAD&) = delete; RWMUTEXLOCKREAD(RWMUTEX*m) { if (m) { mm = m; mm->LockRead(); } } ~RWMUTEXLOCKREAD() { if (mm) { mm->ReleaseRead(); mm = 0; } } }; class RWMUTEXLOCKWRITE { private: RWMUTEX* mm = 0; public: RWMUTEXLOCKWRITE(RWMUTEX*m) { if (m) { mm = m; mm->LockWrite(); } } ~RWMUTEXLOCKWRITE() { if (mm) { mm->ReleaseWrite(); mm = 0; } } }; 


还有一个用于升级机制的新类:

class RWMUTEXLOCKREADWRITE { private: RWMUTEX* mm = 0; HANDLE lm = 0; bool U = false;  public: RWMUTEXLOCKREADWRITE(const RWMUTEXLOCKREADWRITE&) = delete; void operator =(const RWMUTEXLOCKREADWRITE&) = delete;  RWMUTEXLOCKREADWRITE(RWMUTEX*m) { if (m) { mm = m; lm = mm->LockRead(); } } void Upgrade() { if (mm && !U) { mm->Upgrade(); lm = 0; U = 1; } } void Downgrade() { if (mm && U) { lm = mm->Downgrade(); U = 0; } } ~RWMUTEXLOCKREADWRITE() { if (mm) { if (U) mm->ReleaseWrite(); else mm->ReleaseRead(lm); lm = 0; mm = 0; } } }; 

用法示例:

RWMUTEX m; // ... other code void foo1() { RWMUTEXLOCKREAD lock(&m); } void foo2() { RWMUTEXLOCKWRITE lock(&m); }
全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务