信号量、锁模块
1 引入
加锁的目的是保证共享资源在任意时间里,只有一个线程访问,这样就可以避免多线程导致共享数据错乱的问题。
1.1 Linux的4种锁机制:
互斥锁:mutex,用于保证在任何时刻,都只能有一个线程访问该对象。当获取锁操作失败时,线程会进入睡眠,等待锁释放时被唤醒
读写锁:rwlock,分为读锁和写锁。处于读操作时,可以允许多个线程同时获得读操作。但是同一时刻只能有一个线程可以获得写锁。其它获取写锁失败的线程都会进入睡眠状态,直到写锁释放时被唤醒。 注意:写锁会阻塞其它读写锁。当有一个线程获得写锁在写时,读锁也不能被其它线程获取;写者优先于读者(一旦有写者,则后续读者必须等待,唤醒时优先考虑写者)。适用于读取数据的频率远远大于写数据的频率的场合。
自旋锁:spinlock,在任何时刻同样只能有一个线程访问对象。但是当获取锁操作失败时,不会进入睡眠,而是会在原地自旋,直到锁被释放。这样节省了线程从睡眠状态到被唤醒期间的消耗,在加锁时间短暂的环境下会极大的提高效率。但如果加锁时间过长,则会非常浪费CPU资源。
RCU:即read-copy-update,在修改数据时,首先需要读取数据,然后生成一个副本,对副本进行修改。修改完成后,再将老数据update成新的数据。使用RCU时,读者几乎不需要同步开销,既不需要获得锁,也不使用原子指令,不会导致锁竞争,因此就不用考虑死锁问题了。而对于写者的同步开销较大,它需要复制被修改的数据,还必须使用锁机制同步并行其它写者的修改操作。在有大量读操作,少量写操作的情况下效率非常高。
互斥锁和读写锁的区别:
1)读写锁区分读者和写者,而互斥锁不区分
2)互斥锁同一时间只允许一个线程访问该对象,无论读写;读写锁同一时间内只允许一个写者,但是允许多个读者同时读对象。
1.2 区域锁思想
区域锁(Scoped Locking)并非一种锁的种类,而是使用锁的一种模式。这种概念是C++中RAII(Resource Acquisition Is Initialization)模式具体体现之一,"资源需要即初始化"。基本思想:C++中的资源应该交由对象管理,资源在对象的构造函数中完成初始化,在对象的析构函数中被释放。
本项目采用的就是区域锁的思想,需要加锁时,初始化一个锁的对象来保护数据,结束后析构掉。在构造函数中完成对锁的初始化,在析构函数中完成对锁的资源释放。
1.3 实现思路
通过局域锁的思想实现加锁解锁。设计各种锁的类LockType,抽象如下面的代码所示,包含
- 一个LockType类型的成员POSIX信号量
- 构造析构函数 -- 构造函数初始化信号量,析构函数析构信号量
- 加锁解锁函数 -- 实现信号量的加锁解锁
- 一个类型定义 -- 利用类模板初始化一个T为LockType的成员,构造析构实现加锁解锁
class LockType : Noncopyable {
public:
typedef ScopedLockImpl<Mutex> Lock; //局部锁
Mutex() {
pthread_LockType_init(&m_mutex, nullptr);
}
~Mutex() {
pthread_LockType_destroy(&m_mutex);
}
void lock() {
pthread_LockType_lock(&m_mutex);
}
void unlock() {
pthread_LockType_unlock(&m_mutex);
}
private:
pthread_LockType_t m_mutex;
}; //互斥量一般都是在一个局部范围
//为了防止漏掉解锁,一般写一个类,通过构造函数加锁,析构函数解锁
2 信号量
信号量是一种特殊的变量,对其操作访问都是源原子操作。且只允许等待wait和发送post操作。
2.1 POSIX信号量API
POSIX 信号量,定义在头文件<semaphore.h>, 底层是一个非负整数,通过原子操作对其加减,控制线程(进程)对共享资源的访问
int sem_init(sem_t *sem, int pshared, unsigned int value); 初始化-----构造 int sem_wait(sem_t *sem); 减1操作-------wait int sem_post(sem_t *sem); 加1操作-------notify int sem_destroy(sem_t *sem); 释放-------析构
2.2 信号量
class Semaphore : Noncopyable {
public:
Semaphore(uint32_t count = 0); //构造函数 count 信号量值的大小
~Semaphore();
void wait(); //获取信号量,数-1
void notify(); //释放信号量,数+1
private:
sem_t m_semaphore;
};
Semaphore::Semaphore(uint32_t count) {
if(sem_init(&m_semaphore, 0, count)) {
throw std::logic_error("sem_init error");
}
}
Semaphore::~Semaphore() {
sem_destroy(&m_semaphore);
}
void Semaphore::wait() {
if(sem_wait(&m_semaphore)) {
throw std::logic_error("sem_wait error");
}
}
void Semaphore::notify() {
if(sem_post(&m_semaphore)) {
throw std::logic_error("sem_post error");
}
} 2.3 协程信号量
class Scheduler;
class FiberSemaphore : Noncopyable {
public:
typedef Spinlock MutexType;
FiberSemaphore(size_t initial_concurrency = 0);
~FiberSemaphore();
bool tryWait();
void wait();
void notify();
size_t getConcurrency() const { return m_concurrency;}
void reset() { m_concurrency = 0;}
private:
MutexType m_mutex;
std::list<std::pair<Scheduler*, Fiber::ptr> > m_waiters;
size_t m_concurrency;
}; 3 互斥锁
//局部区域互斥锁、自旋锁、原子锁模板类
//T:锁的类型
template<class T>
struct ScopedLockImpl {
public:
ScopedLockImpl(T& mutex)
:m_mutex(mutex) {
m_mutex.lock(); //构造时加锁
m_locked = true;
}
~ScopedLockImpl() {
unlock(); //自动释放锁
}
void lock() {
if(!m_locked) {
m_mutex.lock();
m_locked = true;
}
}
void unlock() {
if(m_locked) {
m_mutex.unlock();
m_locked = false;
}
}
private:
T& m_mutex; //mutex
bool m_locked; //是否已上锁
};
//互斥锁类
class Mutex : Noncopyable {
public:
typedef ScopedLockImpl<Mutex> Lock; //局部锁
Mutex() {
pthread_mutex_init(&m_mutex, nullptr);
}
~Mutex() {
pthread_mutex_destroy(&m_mutex);
}
void lock() {
pthread_mutex_lock(&m_mutex);
}
void unlock() {
pthread_mutex_unlock(&m_mutex);
}
private:
pthread_mutex_t m_mutex;
}; 4 读写锁
//局部读锁模板实现
template<class T>
struct ReadScopedLockImpl {
public:
ReadScopedLockImpl(T& mutex)
:m_mutex(mutex) {
m_mutex.rdlock();
m_locked = true;
}
~ReadScopedLockImpl() {
unlock(); //自动释放锁
}
void lock() {
if(!m_locked) {
m_mutex.rdlock();
m_locked = true;
}
}
void unlock() {
if(m_locked) {
m_mutex.unlock();
m_locked = false;
}
}
private:
T& m_mutex; //mutex
bool m_locked; //是否已上锁
};
//局部写锁模板实现
template<class T>
struct WriteScopedLockImpl {
public:
WriteScopedLockImpl(T& mutex)
:m_mutex(mutex) {
m_mutex.wrlock();
m_locked = true;
}
~WriteScopedLockImpl() {
unlock();
}
void lock() {
if(!m_locked) {
m_mutex.wrlock();
m_locked = true;
}
}
void unlock() {
if(m_locked) {
m_mutex.unlock();
m_locked = false;
}
}
private:
T& m_mutex; //Mutex
bool m_locked; //是否已上锁
};
//读写互斥量
class RWMutex : Noncopyable{
public:
typedef ReadScopedLockImpl<RWMutex> ReadLock; //局部读锁
typedef WriteScopedLockImpl<RWMutex> WriteLock; //局部写锁
RWMutex() {
pthread_rwlock_init(&m_lock, nullptr);
}
~RWMutex() {
pthread_rwlock_destroy(&m_lock);
}
void rdlock() {
pthread_rwlock_rdlock(&m_lock);
}
void wrlock() {
pthread_rwlock_wrlock(&m_lock);
}
void unlock() {
pthread_rwlock_unlock(&m_lock);
}
private:
pthread_rwlock_t m_lock; //读写锁
}; 5 自旋锁
class Spinlock : Noncopyable {
public:
typedef ScopedLockImpl<Spinlock> Lock; //局部锁
Spinlock() {
pthread_spin_init(&m_mutex, 0);
}
~Spinlock() {
pthread_spin_destroy(&m_mutex);
}
void lock() {
pthread_spin_lock(&m_mutex);
}
void unlock() {
pthread_spin_unlock(&m_mutex);
}
private:
//自旋锁
pthread_spinlock_t m_mutex;
}; 6 原子锁
class CASLock : Noncopyable {
public:
typedef ScopedLockImpl<CASLock> Lock; //局部锁
CASLock() {
m_mutex.clear();
}
~CASLock() {
}
void lock() {
//执行本次原子操作之前 所有 读 原子操作必须全部完成
while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire));
}
void unlock() {
//执行本次原子操作之前 所有 写 原子操作必须全部完成
std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release);
}
private:
//原子状态
volatile std::atomic_flag m_mutex;
}; 

