Java阻塞队列示例
今天我们将研究Java的BlockingQueue。java.util.concurrent.BlockingQueue是一种支持在检索和删除元素时等待队列变为非空,并在添加元素时等待队列中有可用空间的Java队列。
Java 阻塞队列
Java的BlockingQueue不接受空值,如果尝试将空值存储在队列中,会抛出NullPointerException异常。Java的BlockingQueue实现是线程安全的。所有的排队方法具有原子性,并且使用内部锁或其他形式的并发控制。Java的BlockingQueue接口是Java集合框架的一部分,主要用于实现生产者和消费者问题。在BlockingQueue中,我们不需要担心生产者等待空间可用或消费者等待对象可用,因为这由BlockingQueue的实现类处理。Java提供了几种BlockingQueue的实现,例如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。在实现生产者和消费者问题时,我们将使用ArrayBlockingQueue实现。以下是一些重要的方法,你应该了解。
- put(E e): This method is used to insert elements to the queue. If the queue is full, it waits for the space to be available.
- E take(): This method retrieves and remove the element from the head of the queue. If queue is empty it waits for the element to be available.
我们现在来使用Java的BlockingQueue来实现生产者消费者问题。
Java BlockingQueue示例-消息
只是一个普通的Java对象,它将由生产者生成并添加到队列中。你也可以称它为有效载荷或队列消息。
package com.Olivia.concurrency;
public class Message {
private String msg;
public Message(String str){
this.msg=str;
}
public String getMsg() {
return msg;
}
}
Java 阻塞队列示例-生产者
生成器类将创建消息并将其放入队列中。
package com.Olivia.concurrency;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue<Message> queue;
public Producer(BlockingQueue<Message> q){
this.queue=q;
}
@Override
public void run() {
//produce messages
for(int i=0; i<100; i++){
Message msg = new Message(""+i);
try {
Thread.sleep(i);
queue.put(msg);
System.out.println("Produced "+msg.getMsg());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//adding exit message
Message msg = new Message("exit");
try {
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Java阻塞队列示例-消费者
当接收到退出信息时,消费者类会处理队列中的消息并终止。
package com.Olivia.concurrency;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<Message> queue;
public Consumer(BlockingQueue<Message> q){
this.queue=q;
}
@Override
public void run() {
try{
Message msg;
//consuming messages until exit message is received
while((msg = queue.take()).getMsg() !="exit"){
Thread.sleep(10);
System.out.println("Consumed "+msg.getMsg());
}
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
Java 阻塞队列示例 – 服务
最后我们需要为生产者和消费者创建一个BlockingQueue服务。这个生产者消费者服务将创建一个固定大小的BlockingQueue,与生产者和消费者共享。这个服务将启动生产者和消费者线程并退出。
package com.Olivia.concurrency;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerService {
public static void main(String[] args) {
//Creating BlockingQueue of size 10
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//starting producer to produce messages in queue
new Thread(producer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}
}
以下是上述Java BlockingQueue示例程序的输出。
Producer and Consumer has been started
Produced 0
Produced 1
Produced 2
Produced 3
Produced 4
Consumed 0
Produced 5
Consumed 1
Produced 6
Produced 7
Consumed 2
Produced 8
...
Java中的线程睡眠(Thread sleep)用于生产者和消费者以某种延迟生成和消费消息。