生产者-消费者模式设计要求
这种模式满足三点要求:
(1)生产者生产数据到缓冲区中,消费者从缓冲区中取数据。
(2)缓冲区满时,生产者线程阻塞,进入等待状态。这期间消费者一旦取走数据,队列未满,就会唤醒阻塞的生产者。
(3)缓冲区空时,消费者线程阻塞,进入等待状态。这期间生产者一旦往队列中放入数据,就会唤醒阻塞的消费者。
模式组成:
公共的缓存队列(给予缓存上限)+ 生产者线程 + 消费者线程。
特点:
1.实现了生产者、消费者的解耦:通过共享的数据缓冲区域,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。
2.实现了线程间的并发协作:如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;如果共享数据区为空的话,阻塞消费者继续消费数据。
应用场景:
模式解耦、消息队列、分布式场景中很常见。
通常情况下,有5种方式来实现
- synchronized + wait() + notify() 方式
- 可重入锁ReentrantLock (配合Condition)
- BlockingQueue 阻塞队列方式
- 信号量Semaphore 方式
- 管道输入输出流PipedInputStream和PipedOutputStream 方式
第一种方式(synchronized )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class MyBlockingQueue { private int maxSize; private LinkedList<Integer> queue; public MyBlockingQueue(int size) { this.maxSize = size; queue = new LinkedList<Integer>(); } public synchronized void put() throws InterruptedException{ while(queue.size() == maxSize) { System.out.println("队列已满,生产者:" + Thread.currentThread().getName() + "进入等待"); wait(); } Random random = new Random(); int i = random.nextInt(100); System.out.println("队列未满,生产者:"+Thread.currentThread().getName() + "放入数据" + i); if(queue.size() == 0) { notifyAll(); } queue.add(i); } public synchronized void take() throws InterruptedException{ while(queue.size() == 0) { System.out.println("队列为空,消费者:" + Thread.currentThread().getName() + "进入等待"); wait(); } if(queue.size() == maxSize) { notifyAll(); } System.out.println("队列有数据,消费者:"+Thread.currentThread().getName() + "取出数据" + queue.remove()); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class Producer implements Runnable{ private MyBlockingQueue myBlockingQueue; public Producer(MyBlockingQueue myBlockingQueue) { this.myBlockingQueue = myBlockingQueue; } public void run() { for(int i = 0 ; i < 5 ; i++) { try { myBlockingQueue.put(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class Consumer implements Runnable{ private MyBlockingQueue myBlockingQueue; public Consumer(MyBlockingQueue myBlockingQueue) { this.myBlockingQueue = myBlockingQueue; } public void run() { for(int i = 0 ; i < 5 ; i++) { try { myBlockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class producer_consumer_1 { public static void main(String[] args) { MyBlockingQueue myBlockingQueue = new MyBlockingQueue(8); Producer producer = new Producer(myBlockingQueue); Producer producer2 = new Producer(myBlockingQueue); Consumer consumer = new Consumer(myBlockingQueue); Consumer consumer2 = new Consumer(myBlockingQueue); new Thread(producer).start(); new Thread(producer2).start(); new Thread(consumer).start(); new Thread(consumer2).start(); } }
|
补充说明:
1.使用Linkedlist+等待唤醒机制(wait、notify/notifyAll)+Synchronized实现线程安全。
2.为什么使用while不是if?
判断线程是否进入等待状态时,判断需要while,不能用if。在生产者、消费者线程只有一个时,if可以使用。但是多个线程的情况时就会出现问题。
例如:假设有两个消费者线程,一个生产者线程。队列为空时,消费者1进入等待状态,释放锁。消费者2抢到锁,进去后判断也进入等待,释放锁。这时生产者抢到锁生产数据,队列中有数据了,反过来唤醒两个消费者。
消费者1抢到锁执行wait()的逻辑,取出数据并释放锁。这时消费者2拿到锁,执行wait()后的逻辑取数据,但是此时队列中的数据已经被消费者1取出了,没有数据,这时就会出现线程不安全的情况。
利用while实现多次判断,不管消费者1还是2抢到锁,执行循环体的逻辑之前,会再一次判断条件是否成立,而if不会,所以使用while。
第二种方式(ReentrantLock )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class MyBlockingQueueForCondition { private Queue<Integer> queue; private int max; private ReentrantLock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public MyBlockingQueueForCondition(int max) { this.max = max; queue = new LinkedList<Integer>(); } public void put(int i) throws InterruptedException{ lock.lock(); try { while(queue.size() == max) { System.out.println("队列已满,生产者:" + Thread.currentThread().getName() + "进入等待"); notFull.await(); } if(queue.size() == 0) { notEmpty.signalAll(); } System.out.println("队列未满,生产者:"+Thread.currentThread().getName() + "放入数据" + i); queue.add(i); } catch (Exception e) { }finally { lock.unlock(); } } public int take() throws InterruptedException{ lock.lock(); try { while(queue.size() == 0) { System.out.println("队列为空,消费者:" + Thread.currentThread().getName() + "进入等待"); notEmpty.await(); } if(queue.size() == max) { notFull.signalAll(); } } catch (Exception e) { }finally { lock.unlock(); } int i = queue.remove(); System.out.println("队列有数据,消费者:"+Thread.currentThread().getName() + "取出数据" + i); return i; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class ProducerForCondition implements Runnable{ private MyBlockingQueueForCondition myBlockingQueueForCondition;
public ProducerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) { this.myBlockingQueueForCondition = myBlockingQueueForCondition; } public void run() { for(int i = 0 ; i < 5 ; i++) { try { myBlockingQueueForCondition.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class ConsumerForCondition implements Runnable{ private MyBlockingQueueForCondition myBlockingQueueForCondition;
public ConsumerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) { this.myBlockingQueueForCondition = myBlockingQueueForCondition; } public void run() { for(int i = 0 ; i < 5 ; i++) { try { myBlockingQueueForCondition.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class producer_consumer_2 { public static void main(String[] args) { MyBlockingQueueForCondition myBlockingQueueForCondition = new MyBlockingQueueForCondition(8); ProducerForCondition producerForCondition1 = new ProducerForCondition(myBlockingQueueForCondition); ProducerForCondition producerForCondition2 = new ProducerForCondition(myBlockingQueueForCondition); ConsumerForCondition consumerForCondition1 = new ConsumerForCondition(myBlockingQueueForCondition); ConsumerForCondition consumerForCondition2 = new ConsumerForCondition(myBlockingQueueForCondition); new Thread(producerForCondition1).start(); new Thread(producerForCondition2).start(); new Thread(consumerForCondition1).start(); new Thread(consumerForCondition2).start(); } }
|
第三种方式(BlockingQueue )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class ArrayBlockingQueueTest { private static BlockingQueue<Integer> queue = new ArrayBlockingQueue(10); public static void main(String[] args) { Producer producer1 = new Producer(queue); Consumer consumer1 = new Consumer(queue); new Thread(producer1).start(); new Thread(consumer1).start(); } static class Producer implements Runnable{ private BlockingQueue queue; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { try { for(int i = 0 ; i < 20;i++) { Random random = new Random(); int element = random.nextInt(100); System.out.println("生产者" + Thread.currentThread().getName() + "生产数据:" + element); queue.put(element); } } catch (InterruptedException e) { e.printStackTrace(); } } } static class Consumer implements Runnable{ private BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { try { for(int i = 0 ; i < 20;i++) { Integer element = (Integer) queue.take(); System.out.println("消费者" + Thread.currentThread().getName() + "消费数据:" + element); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
|