生产者消费者模式

简介

  最近在学习操作系统这门课程,在学习的过程中遇到了生产者与消费者模式这个知识。同样的,在以前学习Java多线程编程时也遇到过相同的知识点。这让我
不得不对这个知识点深入的理一理。
  什么是消费者与生产者模式?在操作系统这门课中,这是一个描述进程之间的问题。系统中有一组生产者进程和一组消费者进程,生产者进程每次生产一个产品
放入缓冲区,消费者进程每次从缓冲区中取出一个产品并使用。生产者与消费者共享一个初始为空,大小为n的缓冲区。而且两个进程要满足:只有缓冲区没满时,
生产者才能把产品放入缓冲区,否则阻塞等待;同样只有缓冲区不为空时,消费者才能从中取出产品,否则阻塞等待。
  综上所述,我们必须使这两个进程满足:1.互斥地访问缓冲区;2.两个进程之间保证两个同步关系:缓冲区满时,需要消费者消费后生产者才能生产;缓冲区为
空时,生产者生产后,消费者才能消费。理清了进程之间的关系后,即可利用信号量进制进行实现。
  而在多线程中的消费者与生产者问题仅仅是将进程换做了线程,实现方式也由操作系统中的原语实现方式该为了Java代码方式实现。

代码实现:

1.synchronized、wait和notify方式实现(管程)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class ProducerConsumerWithWaitNotify {
public static void main(String[] args) {
Resource resource = new Resource();
ProducerThread p1 = new ProducerThread(resource);
ProducerThread p2 = new ProducerThread(resource);
ProducerThread p3 = new ProducerThread(resource);
ConsumerThread c1 = new ConsumerThread(resource);
// ConsumerThread c2 = new ConsumerThread(resource);
// ConsumerThread c3 = new ConsumerThread(resource);
new Thread(p1).start();
new Thread(p2).start();
new Thread(p3).start();
new Thread(c1).start();
// new Thread(c2).start();
// new Thread(c3).start();
}
}

/**
* 公共资源类
* @author aaa3
*
*/
class Resource{
// 当前缓存区中的初始资源数
private int num = 0;
// 当前缓冲区中剩余空间
private int size = 10;
// synchronized加锁使得多个线程只能互斥地访问缓存区
public synchronized void remove() {
if (num > 0) {
num--;
System.out.println("消费者" + Thread.currentThread().getName()+
"消耗一件资源" + "当前缓冲区还有"+num+"资源");
// 相当于原语V操作
notifyAll();
// 相当于原语P操作
}else {
try {
wait();
System.out.println("消费者" + Thread.currentThread().getName()+"线程进入等待状态");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public synchronized void add() {
if (num < size) {
num++;
System.out.println(Thread.currentThread().getName()+"生产一个资源,当前缓冲区有"+num+"个资源");
notifyAll();
}else {
try {
wait();
System.out.println(Thread.currentThread().getName()+"线程进入等待状态");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
class ConsumerThread implements Runnable{
private Resource resource;
public ConsumerThread(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
resource.remove();
}
}
}
class ProducerThread implements Runnable{
private Resource resource;
public ProducerThread(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
resource.add();;
}
}
}

2.阻塞队列BlockingQueue实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumerWithBlockingQueue {
public static void main(String[] args) {
Resource1 resource1 = new Resource1();
Producer p1 = new Producer(resource1);
Consumer c1 = new Consumer(resource1);
Producer p2 = new Producer(resource1);
Producer p3 = new Producer(resource1);
Producer p4 = new Producer(resource1);
Producer p5 = new Producer(resource1);
new Thread(p1).start();
new Thread(p2).start();
new Thread(p3).start();
new Thread(p4).start();
new Thread(p5).start();
new Thread(c1).start();
}
}
class Producer implements Runnable{
private Resource1 resource;
public Producer(Resource1 resource) {
this.resource = resource;
}
@Override
public void run() {
while(true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
resource.add();
}
}
}
class Consumer implements Runnable{
private Resource1 resource;
public Consumer(Resource1 resource) {
this.resource = resource;
}
@Override
public void run() {
while(true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
resource.take();
}
}
}
class Resource1{
private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10);
public void add() {
try {
blockingQueue.put(1);
System.out.println("生产者"+Thread.currentThread().getName()+
"生产一件产品,当前资源池有"+blockingQueue.size()+"件产品");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void take() {
try {
blockingQueue.take();
System.out.println("消费者"+Thread.currentThread().getName()+
"消费一件产品,当前资源池有"+blockingQueue.size()+"件产品");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

参考链接

《王道操作系统考研》
https://www.cnblogs.com/fankongkong/p/7339848.html