生产消费者模式
多线程知识点
- synchronized修饰在成员方法上,其实相当于
synchronized(this) { ... },即锁在当前对象的monitor上。
public synchronized void sync {
/* process */
}
// 相当于下面
public void sync {
synchronized(this) {
/* process */
}
}
- 在synchronized方法里,或者synchronized同步代码块中,只要正在执行中,就说明currentThread已经获得当前对象的monitor了。
- 当synchronized方法或synchronized同步代码块结束,currentThread会自动释放对象的monitor。
- wait() \ notify() \ notifyAll()这三方法必须在synchronized方法或synchronized同步代码块中执行,因为这样就说明currentThread已经获得了对象的monitor。
- wait()使得当前线程释放已获得的对象monitor,并陷入一种等待。这种等待必须依靠别的获得同一个对象monitor的线程来调用notify() \ notifyAll()才会重新唤醒,但重新唤醒后需要继续执行没执行完的同步代码,而执行同步代码的前提是获得被调用成员方法的对象的monitor。所以,一个被notify() \ notifyAll()的调用而从wait()中被唤醒后的线程,是不一定会马上执行wait()的下一句代码的,因为它需要和其他竞争同一个对象monitor的线程进行竞争,如果竞争失败了,那么该线程还是只有阻塞在wait()这里,直到它竞争到对象monitor。
- notify()使得wait在同一个对象monitor上的某一个线程被唤醒。另外,synchronized代码执行完毕后,会释放对象monitor,当然,这一点跟notify()无关,因为本来就是这样。
- notifyAll()使得wait在同一个对象monitor上的所有线程被唤醒。
生产者消费者示例
public class SynProCon {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
class Data {
private int number = 0;
// +1
public synchronized void increment() throws InterruptedException {
if (number != 0) {
// 等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "执行加1完毕,number=" + number + ",准备通知其他线程");
// 通知
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
if (number == 0) {
// 等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "执行减1完毕,number=" + number + ",准备通知其他线程");
// 通知
this.notifyAll();
}
}
/**
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
*/
当使用两条线程调用Data的加一和减一方法时,结果没有出现错误。
现在使用四条线程:
public class SynProCon {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
class Data {
private int number = 0;
// +1
public synchronized void increment() throws InterruptedException {
if (number != 0) {
// 等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "执行加1完毕,number=" + number + ",准备通知其他线程");
// 通知
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
if (number == 0) {
// 等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "执行减1完毕,number=" + number + ",准备通知其他线程");
// 通知
this.notifyAll();
}
}
/**
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-0执行加1完毕,number=2,准备通知其他线程
* Thread-2执行加1完毕,number=3,准备通知其他线程
* Thread-1执行减1完毕,number=2,准备通知其他线程
* Thread-1执行减1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-0执行加1完毕,number=2,准备通知其他线程
* Thread-3执行减1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-2执行加1完毕,number=2,准备通知其他线程
* Thread-1执行减1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-0执行加1完毕,number=2,准备通知其他线程
* Thread-3执行减1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-2执行加1完毕,number=2,准备通知其他线程
* Thread-1执行减1完毕,number=1,准备通知其他线程
* Thread-2执行加1完毕,number=2,准备通知其他线程
* Thread-0执行加1完毕,number=3,准备通知其他线程
* Thread-3执行减1完毕,number=2,准备通知其他线程
* Thread-3执行减1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
*/
可以看到number的值有时候会变成3或者2,这并不是所期望的答案。
是什么原因导致的?
虚假唤醒问题
什么是虚假唤醒?
举个例子,我们现在有一个生产者、消费者、队列和三个线程。
1号线程和2号线程扮演消费者
3号线程扮演生产者
假设3号线程已经在队列添加了一个元素。
现在:
1) 1号线程从队列中获取了一个元素,此时队列变为空。
2) 2号线程也想从队列中获取一个元素,但此时队列为空,2号线程便只能进入阻塞(cond.wait()),等待队列非空。
3) 这时,3号线程将一个元素入队,并调用cond.notify()唤醒条件变量。
4) 处于等待状态的2号线程接收到3号线程的唤醒信号,便准备解除阻塞状态,执行接下来的任务(获取队列中的元素)。
5) 然而可能出现这样的情况:当2号线程准备获得队列的锁,去获取队列中的元素时,1号线程获得了队列的锁,检查到队列非空,就获取到了3号线程刚刚入队的元素,然后释放队列锁。
6) 等到2号线程获得队列锁,此时,如果是if判断,那么上面的等待代码执行完毕之后,便直接跳出if代码块,直接执行消费逻辑,者就引发了队列元素数量不正确的问题。1号线程“偷走了”这个元素,所以对于2号线程而言,这次唤醒就是“虚假”的,它需要再次等待队列非空,所以需要使用while再次判断,如果队列为空就继续等待。
修改后的代码:
public class SynProCon {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
class Data {
private int number = 0;
// +1
public synchronized void increment() throws InterruptedException {
while (number != 0) {
// 等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "执行加1完毕,number=" + number + ",准备通知其他线程");
// 通知
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
while (number == 0) {
// 等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "执行减1完毕,number=" + number + ",准备通知其他线程");
// 通知
this.notifyAll();
}
}
/**
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-1执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-0执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
* Thread-2执行加1完毕,number=1,准备通知其他线程
* Thread-3执行减1完毕,number=0,准备通知其他线程
*/
使用Lock方式实现生产消费问题
public class LockProCon {
public static void main(String[] args) {
Data2 data2 = new Data2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()-> {
for (int i = 0; i < 10; i++) {
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()-> {
for (int i = 0; i < 10; i++) {
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
class Data2 {
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// +1
public void increment() throws InterruptedException {
// 加锁
lock.lock();
try {
while (number != 0) {
// 等待
System.out.println("生产线程" + Thread.currentThread().getName() + ",number=" + number + ",不能执行加1操作,需要等待");
condition.await();
}
//
System.out.println("生产线程" + Thread.currentThread().getName() + "执行加一,number=" + ++number + ",通知其他线程工作");
// 通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// -1
public void decrement() throws InterruptedException {
// 加锁
lock.lock();
try {
while (number == 0) {
System.out.println("消费线程" + Thread.currentThread().getName() + "number=" + number + ",不能执行减1操作,需要等待");
// 等待
condition.await();
}
System.out.println("消费线程" + Thread.currentThread().getName() + "执行减一,number=" + --number + ",通知其他线程工作");
// 通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 解锁
lock.unlock();
}
}
}
/**
* 生产线程Thread-0执行加一,number=1,通知其他线程工作
* 生产线程Thread-0,number=1,不能执行加1操作,需要等待
* 消费线程Thread-1执行减一,number=0,通知其他线程工作
* 消费线程Thread-1number=0,不能执行减1操作,需要等待
* 生产线程Thread-0执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-1执行减一,number=0,通知其他线程工作
* 消费线程Thread-1number=0,不能执行减1操作,需要等待
* 生产线程Thread-0执行加一,number=1,通知其他线程工作
* 生产线程Thread-0,number=1,不能执行加1操作,需要等待
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-1执行减一,number=0,通知其他线程工作
* 消费线程Thread-1number=0,不能执行减1操作,需要等待
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-0执行加一,number=1,通知其他线程工作
* 生产线程Thread-0,number=1,不能执行加1操作,需要等待
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-1执行减一,number=0,通知其他线程工作
* 消费线程Thread-1number=0,不能执行减1操作,需要等待
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-0执行加一,number=1,通知其他线程工作
* 生产线程Thread-0,number=1,不能执行加1操作,需要等待
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-1执行减一,number=0,通知其他线程工作
* 消费线程Thread-1number=0,不能执行减1操作,需要等待
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-0执行加一,number=1,通知其他线程工作
* 生产线程Thread-0,number=1,不能执行加1操作,需要等待
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-1执行减一,number=0,通知其他线程工作
* 消费线程Thread-1number=0,不能执行减1操作,需要等待
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-0执行加一,number=1,通知其他线程工作
* 生产线程Thread-0,number=1,不能执行加1操作,需要等待
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-1执行减一,number=0,通知其他线程工作
* 消费线程Thread-1number=0,不能执行减1操作,需要等待
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-0执行加一,number=1,通知其他线程工作
* 生产线程Thread-0,number=1,不能执行加1操作,需要等待
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-1执行减一,number=0,通知其他线程工作
* 消费线程Thread-1number=0,不能执行减1操作,需要等待
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-0执行加一,number=1,通知其他线程工作
* 生产线程Thread-0,number=1,不能执行加1操作,需要等待
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-1执行减一,number=0,通知其他线程工作
* 消费线程Thread-1number=0,不能执行减1操作,需要等待
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-0执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-1执行减一,number=0,通知其他线程工作
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-2执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-3执行减一,number=0,通知其他线程工作
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-2执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-3执行减一,number=0,通知其他线程工作
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-2执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-3执行减一,number=0,通知其他线程工作
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-2执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-3执行减一,number=0,通知其他线程工作
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-2执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-3执行减一,number=0,通知其他线程工作
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-2执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-3执行减一,number=0,通知其他线程工作
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-2执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-3执行减一,number=0,通知其他线程工作
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-2执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-3执行减一,number=0,通知其他线程工作
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-2执行加一,number=1,通知其他线程工作
* 生产线程Thread-2,number=1,不能执行加1操作,需要等待
* 消费线程Thread-3执行减一,number=0,通知其他线程工作
* 消费线程Thread-3number=0,不能执行减1操作,需要等待
* 生产线程Thread-2执行加一,number=1,通知其他线程工作
* 消费线程Thread-3执行减一,number=0,通知其他线程工作
*/
可以看到上面的输出没有问题,但是线程的调用混乱,现在新增一个需要,要精准通知某条线程运行。
使用Condition实现精准通知唤醒线程
package com.pan.productconsumer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @program: juc
* @description: 精准通知唤醒线程
* @author:
* @create: 2023-04-01 21:39
**/
public class ConditionTest {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(()->{for (int i = 0; i < 10; i++) data3.printA();}, "A").start();
new Thread(()->{for (int i = 0; i < 10; i++) data3.printB();}, "B").start();
new Thread(()->{for (int i = 0; i < 10; i++) data3.printC();}, "C").start();
}
}
class Data3 {
private int number = 1;
private Lock lock = new ReentrantLock();
// 使用三个监视器,实现精准通知
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void printA() {
lock.lock();
try {
while (number != 1) {
// 监视器1进入等待状态
condition1.await();
}
number = 2;
System.out.println(Thread.currentThread().getName() + "->AAAAAAA");
// 通知监视器2
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (number != 2) {
// 监视器2进入等待状态
condition2.await();
}
number = 3;
System.out.println(Thread.currentThread().getName() + "->BBBBBBB");
// 通知监视器3
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (number != 3) {
// 监视器3进入等待状态
condition3.await();
}
number = 1;
System.out.println(Thread.currentThread().getName() + "->CCCCCCC");
// 通知监视器1
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
#JUC#