マルチスレッドの学習を始めました。 私は複数のスレッドで5人のプロデューサーと2人のコンシューマーを持っています。基本的にこのプログラムは100個のアイテムをキューに追加します。プロデューサは、キューサイズが100になると追加を停止します。コンシューマが再度追加を開始できるように、コンシューマがキューからすべてのアイテムを削除したときにコンシューマにプロデューサに通知するようにします。現在、プロデューサは待機しますが、コンシューマから通知されることはありません。
プロデューサー:
public class Producer implements Runnable {
private BlockingQueue sharedQueue;
private final int queueSize;
private Object lock = new Object();
public Producer(BlockingQueue sharedQueue, int queueSize){
this.sharedQueue = sharedQueue;
this.queueSize = queueSize;
}
public void run() {
while(true) {
if(sharedQueue.size()== queueSize){
try {
synchronized (lock) {
sharedQueue.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
sharedQueue.put("Producer: " + sharedQueue.size());
Thread.sleep(500);
System.out.println("Producer: Queue Size " + sharedQueue.size() + " Current Thread " + Thread.currentThread());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消費者:
public class Consumer implements Runnable{
private BlockingQueue sharedQueue;
private final int queueSize;
private final int queueEmpty=0;
private Object lock = new Object();
public Consumer(BlockingQueue sharedQueue, int queueSize){
this.sharedQueue = sharedQueue;
this.queueSize = queueSize;
}
//Notify awaiting thread if the sharedQueue is empty
public void run() {
while (true) {
if(sharedQueue.size()==queueEmpty){
synchronized (lock) {
this.notifyAll();
}
}
try {
sharedQueue.take();
Thread.sleep(800);
System.out.println("Consumer: Queue Size " + sharedQueue.size() + " Current Thread" + Thread.currentThread());
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
メインクラス
public class App{
//A simple program to illustrate how producer and consumer pattern works with blocking queue using executor service
public static void main( String[] args )
{
final BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<String> (100);
final int queueSize =100;
final int producerNum = 5;
final int consumerNum = 2;
final ExecutorService executorProducer = Executors.newFixedThreadPool(producerNum);
final ExecutorService executorConsumer = Executors.newFixedThreadPool(consumerNum);
for(int i=0;i<producerNum;i++){
Producer producer = new Producer(sharedQueue,queueSize);
executorProducer.execute(producer);
}
for(int j=0;j<consumerNum;j++){
Consumer consumer = new Consumer(sharedQueue,queueSize);
executorConsumer.execute(consumer);
}
}
}
回答:
回答№1は2Oracleのドキュメントから ページ:
BlockingQueueの実装はスレッドセーフです。すべてのキューイング方法は、内部ロックまたは他の形式の並行性制御を使用して自動的にその効果を達成します
あなたはすでに使っているので BlockingQueues
、あなたは取り除くことができます wait()
そして notify()
API。
複数のプロデューサとコンシューマを使用するコード例 BlockingQueue
:
import java.util.concurrent.*;
public class ProducerConsumerDemo {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
Thread consThread2 = new Thread(new Consumer(sharedQueue,2));
prodThread1.start();
prodThread2.start();
consThread1.start();
consThread2.start();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
どのように機能するのですか?
- 生産者スレッド1は11から15の整数に及ぶ整数を置きます
BlockingQueue
- 生産者スレッド2は21 - 25の範囲の整数を
BlockingQueue
- コンシューマスレッドのいずれか - スレッド1またはスレッド2はから値を読み取ります。
BlockingQueue
(この例では整数)
サンプル出力:
Produced:21:by thread:2
Produced:11:by thread:1
Produced:12:by thread:1
Produced:13:by thread:1
Produced:14:by thread:1
Produced:22:by thread:2
Produced:23:by thread:2
Produced:24:by thread:2
Produced:25:by thread:2
Consumed: 21:by thread:1
Consumed: 12:by thread:1
Consumed: 13:by thread:1
Consumed: 14:by thread:1
Consumed: 22:by thread:1
Consumed: 23:by thread:1
Consumed: 24:by thread:1
Consumed: 25:by thread:1
Produced:15:by thread:1
Consumed: 11:by thread:2
Consumed: 15:by thread:1