多线程实现生产者/消费者模型
生产者/消费者模型不多讲,默认大家都是了解的,直接上代码。
一对一(wait/notify)
package chapter3.producerconsumer;
public class Count {
private int count;
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
package chapter3.producerconsumer;
public class Consumer {
private Count c;
private Object lock;
public Consumer(Count c, Object lock) {
super();
this.c = c;
this.lock = lock;
}
public void sub() {
synchronized (lock) {
if (c.getCount() > 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
c.setCount(c.getCount() - 1);
System.out.println("消费-1,当前count = " + c.getCount());
lock.notify();
} else {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
package chapter3.producerconsumer;
public class Producer {
private Count c;
private Object lock;
public Producer(Count c, Object lock) {
super();
this.c = c;
this.lock = lock;
}
public void add() {
synchronized (lock) {
if (c.getCount() < 5) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
c.setCount(c.getCount() + 1);
System.out.println("生产+1,当前count = " + c.getCount());
lock.notify();
} else {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
package chapter3.producerconsumer;
public class Run {
public static void main(String[] args) {
Count c = new Count();
Object lock = new Object();
Consumer consumer = new Consumer(c, lock);
Producer producer = new Producer(c, lock);
new Thread(new Runnable() {
@Override
public void run() {
while(true){
consumer.sub();
}
}
},"C").start();
new Thread(new Runnable() {
@Override
public void run() {
while(true){
producer.add();
}
}
},"P").start();
}
}
多对多(wait/notifyAll)
package chapter3.producerconsumer.copy;
public class Consumer {
private Count c;
private Object lock;
public Consumer(Count c, Object lock) {
super();
this.c = c;
this.lock = lock;
}
public void sub() {
synchronized (lock) {
if (c.getCount() > 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
c.setCount(c.getCount() - 1);
System.out.println(Thread.currentThread().getName() + " 消费-1,当前count = " + c.getCount());
lock.notifyAll();
} else {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
package chapter3.producerconsumer.copy;
public class Producer {
private Count c;
private Object lock;
public Producer(Count c, Object lock) {
super();
this.c = c;
this.lock = lock;
}
public void add() {
synchronized (lock) {
if (c.getCount() < 5) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
c.setCount(c.getCount() + 1);
System.out.println(Thread.currentThread().getName() + " 生产+1,当前count = " + c.getCount());
lock.notifyAll();
} else {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
package chapter3.producerconsumer.copy;
public class Run {
public static void main(String[] args) {
Count c = new Count();
Object lock = new Object();
Consumer consumer = new Consumer(c, lock);
Producer producer = new Producer(c, lock);
Thread[] pThread = new Thread[2];
Thread[] cThread = new Thread[2];
for (int i = 0; i < 2; i++) {
pThread[i] = new Thread(() -> {
while (true) {
producer.add();
}
}, "生产者" + (i + 1));
cThread[i] = new Thread(() -> {
while (true) {
consumer.sub();
}
}, "消费者" + (i + 1));
}
for (int i = 0; i < 2; i++) {
pThread[i].start();
cThread[i].start();
}
}
}
通过Lock对象实现
package chapter4.locks.producerConsumer;
public class Count {
public static int count = 1;
}
package chapter4.locks.producerConsumer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Consumer {
private Lock lock;
private Condition c;
public Consumer(Lock lock,Condition c) {
this.lock = lock;
this.c = c;
}
public void sub(){
try {
lock.lock();
if (Count.count > 0){
Count.count--;
System.out.print(Thread.currentThread().getName() + "-1");
System.out.println(" Current count = " + Count.count);
c.signalAll();
}else{
c.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
package chapter4.locks.producerConsumer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Producer {
private Lock lock;
private Condition c;
public Producer(Lock lock,Condition c) {
this.lock = lock;
this.c = c;
}
public void add(){
try {
lock.lock();
if (Count.count < 5){
Count.count++;
System.out.print(Thread.currentThread().getName() + "+1");
System.out.println(" Current count = " + Count.count);
c.signalAll();
}else{
c.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
package chapter4.locks.producerConsumer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * * <p> * Title: Run.java * </p> * <p> * Description: 使用Lock、Condition对象实现生产者消费者模型 单个生产者和单个消费者 * <p> * * @author tianqb * @mail tqb820965236@163.com * @date 2019年6月12日下午5:39:42 * @version 1.0 * */
public class Run {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Consumer consumer = new Consumer(lock, condition);
Producer producer = new Producer(lock, condition);
new Thread(() -> {
while (true) {
consumer.sub();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Consumer").start();;
new Thread(() -> {
while (true) {
producer.add();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Producer").start();;
}
}
多对多
package chapter4.locks.producerConsumer.moreandmore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * * <p>Title: Run.java</p> * <p>Description: * 使用Lock、Condition对象实现生产者消费者模型 * 多个生产者和多个消费者 * <p> * @author tianqb * @mail tqb820965236@163.com * @date 2019年6月12日下午5:39:42 * @version 1.0 * */
public class Run {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Consumer consumer = new Consumer(lock, condition);
Producer producer = new Producer(lock, condition);
Thread[] p = new Thread[3];
Thread[] c = new Thread[3];
for (int i = 0; i < 3; i++) {
p[i] = new Thread(() -> {
while (true) {
producer.add();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Producer-" + (i + 1));
c[i] = new Thread(() -> {
while (true) {
consumer.sub();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Consumer-" + (i + 1));
p[i].start();
c[i].start();
}
}
}