生产者-消费者模式设计要求

这种模式满足三点要求:
(1)生产者生产数据到缓冲区中,消费者从缓冲区中取数据。
(2)缓冲区满时,生产者线程阻塞,进入等待状态。这期间消费者一旦取走数据,队列未满,就会唤醒阻塞的生产者。
(3)缓冲区空时,消费者线程阻塞,进入等待状态。这期间生产者一旦往队列中放入数据,就会唤醒阻塞的消费者。

模式组成:
公共的缓存队列(给予缓存上限)+ 生产者线程 + 消费者线程。
特点:
1.实现了生产者、消费者的解耦:通过共享的数据缓冲区域,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。
2.实现了线程间的并发协作:如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;如果共享数据区为空的话,阻塞消费者继续消费数据。
应用场景:
模式解耦、消息队列、分布式场景中很常见。

通常情况下,有5种方式来实现

  • synchronized + wait() + notify() 方式
  • 可重入锁ReentrantLock (配合Condition)
  • BlockingQueue 阻塞队列方式
  • 信号量Semaphore 方式
  • 管道输入输出流PipedInputStream和PipedOutputStream 方式

第一种方式(synchronized )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class MyBlockingQueue {
private int maxSize;
private LinkedList<Integer> queue;

public MyBlockingQueue(int size) {
this.maxSize = size;
queue = new LinkedList<Integer>();
}
public synchronized void put() throws InterruptedException{
while(queue.size() == maxSize) {
System.out.println("队列已满,生产者:" + Thread.currentThread().getName() + "进入等待");
wait();
}
Random random = new Random();
int i = random.nextInt(100);
System.out.println("队列未满,生产者:"+Thread.currentThread().getName() + "放入数据" + i);
if(queue.size() == 0) {
notifyAll();
}
queue.add(i);
}
public synchronized void take() throws InterruptedException{
while(queue.size() == 0) {
System.out.println("队列为空,消费者:" + Thread.currentThread().getName() + "进入等待");
wait();
}
if(queue.size() == maxSize) {
notifyAll();
}
System.out.println("队列有数据,消费者:"+Thread.currentThread().getName() + "取出数据" + queue.remove());//删除第一个数据,最早放入的数据
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Producer implements Runnable{
private MyBlockingQueue myBlockingQueue;
public Producer(MyBlockingQueue myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
public void run() {
for(int i = 0 ; i < 5 ; i++) {
try {
myBlockingQueue.put();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Consumer implements Runnable{
private MyBlockingQueue myBlockingQueue;
public Consumer(MyBlockingQueue myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
public void run() {
for(int i = 0 ; i < 5 ; i++) {
try {
myBlockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class producer_consumer_1 {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(8);
Producer producer = new Producer(myBlockingQueue);
Producer producer2 = new Producer(myBlockingQueue);
Consumer consumer = new Consumer(myBlockingQueue);
Consumer consumer2 = new Consumer(myBlockingQueue);
new Thread(producer).start();//生产者线程1,Thread-0
new Thread(producer2).start();//生产者线程2,Thread-1
new Thread(consumer).start();//消费者线程1,Thread-3
new Thread(consumer2).start();//消费者线程2,Thread-4
}
}

在这里插入图片描述

补充说明:
1.使用Linkedlist+等待唤醒机制(wait、notify/notifyAll)+Synchronized实现线程安全。

2.为什么使用while不是if?

判断线程是否进入等待状态时,判断需要while,不能用if。在生产者、消费者线程只有一个时,if可以使用。但是多个线程的情况时就会出现问题。

例如:假设有两个消费者线程,一个生产者线程。队列为空时,消费者1进入等待状态,释放锁。消费者2抢到锁,进去后判断也进入等待,释放锁。这时生产者抢到锁生产数据,队列中有数据了,反过来唤醒两个消费者。
消费者1抢到锁执行wait()的逻辑,取出数据并释放锁。这时消费者2拿到锁,执行wait()后的逻辑取数据,但是此时队列中的数据已经被消费者1取出了,没有数据,这时就会出现线程不安全的情况。
利用while实现多次判断,不管消费者1还是2抢到锁,执行循环体的逻辑之前,会再一次判断条件是否成立,而if不会,所以使用while。

第二种方式(ReentrantLock )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MyBlockingQueueForCondition {
private Queue<Integer> queue;
private int max;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public MyBlockingQueueForCondition(int max) {
this.max = max;
queue = new LinkedList<Integer>();
}
public void put(int i) throws InterruptedException{
lock.lock();
try {
while(queue.size() == max) {
System.out.println("队列已满,生产者:" + Thread.currentThread().getName() + "进入等待");
notFull.await();
}
if(queue.size() == 0) {
notEmpty.signalAll();
}
System.out.println("队列未满,生产者:"+Thread.currentThread().getName() + "放入数据" + i);
queue.add(i);
} catch (Exception e) {
// TODO: handle exception
}finally {
lock.unlock();
}
}
public int take() throws InterruptedException{
lock.lock();
try {
while(queue.size() == 0) {
System.out.println("队列为空,消费者:" + Thread.currentThread().getName() + "进入等待");
notEmpty.await();
}
if(queue.size() == max) {
notFull.signalAll();
}

} catch (Exception e) {
// TODO: handle exception
}finally {
lock.unlock();
}
int i = queue.remove();
System.out.println("队列有数据,消费者:"+Thread.currentThread().getName() + "取出数据" + i);
return i;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ProducerForCondition implements Runnable{
private MyBlockingQueueForCondition myBlockingQueueForCondition;

public ProducerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {
this.myBlockingQueueForCondition = myBlockingQueueForCondition;
}
public void run() {
for(int i = 0 ; i < 5 ; i++) {
try {
myBlockingQueueForCondition.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ConsumerForCondition implements Runnable{
private MyBlockingQueueForCondition myBlockingQueueForCondition;

public ConsumerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {
this.myBlockingQueueForCondition = myBlockingQueueForCondition;
}
public void run() {
for(int i = 0 ; i < 5 ; i++) {
try {
myBlockingQueueForCondition.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class producer_consumer_2 {
public static void main(String[] args) {
// TODO Auto-generated method stub
MyBlockingQueueForCondition myBlockingQueueForCondition = new MyBlockingQueueForCondition(8);
ProducerForCondition producerForCondition1 = new ProducerForCondition(myBlockingQueueForCondition);
ProducerForCondition producerForCondition2 = new ProducerForCondition(myBlockingQueueForCondition);
ConsumerForCondition consumerForCondition1 = new ConsumerForCondition(myBlockingQueueForCondition);
ConsumerForCondition consumerForCondition2 = new ConsumerForCondition(myBlockingQueueForCondition);
new Thread(producerForCondition1).start();
new Thread(producerForCondition2).start();
new Thread(consumerForCondition1).start();
new Thread(consumerForCondition2).start();
}
}

在这里插入图片描述

第三种方式(BlockingQueue )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ArrayBlockingQueueTest {
private static BlockingQueue<Integer> queue = new ArrayBlockingQueue(10);
public static void main(String[] args) {
Producer producer1 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
new Thread(producer1).start();
new Thread(consumer1).start();
}
static class Producer implements Runnable{
private BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
// TODO Auto-generated method stub
try {
for(int i = 0 ; i < 20;i++) {
Random random = new Random();
int element = random.nextInt(100);
System.out.println("生产者" + Thread.currentThread().getName() + "生产数据:" + element);
queue.put(element);
}
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
static class Consumer implements Runnable{
private BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
// TODO Auto-generated method stub
try {
for(int i = 0 ; i < 20;i++) {
Integer element = (Integer) queue.take();
System.out.println("消费者" + Thread.currentThread().getName() + "消费数据:" + element);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
}