深入理解Java阻塞队列:实用示例与最佳实践

这是文章《Java阻塞队列示例》的第1部分(共1部分)。

内容片段: 今天我们将研究Java的阻塞队列(BlockingQueue)。java.util.concurrent.BlockingQueue是一种支持在检索和删除元素时等待队列变为非空,并在添加元素时等待队列中有可用空间的Java队列。

Java 阻塞队列

Java的阻塞队列不接受空值,如果尝试将空值存储在队列中,会抛出NullPointerException异常。Java的阻塞队列实现是线程安全的。所有的排队方法具有原子性,并且使用内部锁或其他形式的并发控制。Java的阻塞队列接口是Java集合框架的一部分,主要用于实现生产者和消费者问题。在阻塞队列中,我们不需要担心生产者等待空间可用或消费者等待对象可用,因为这由阻塞队列的实现类处理。Java提供了几种阻塞队列的实现,例如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。在实现生产者和消费者问题时,我们将使用ArrayBlockingQueue实现。以下是一些重要的方法,你应该了解。

  • put(E e): 此方法用于向队列中插入元素。如果队列已满,它会等待空间变为可用。
  • E take(): 此方法从队列头部检索并移除元素。如果队列为空,它会等待元素变为可用。

我们现在来使用Java的阻塞队列来实现生产者消费者问题。

Java 阻塞队列示例-消息

只是一个普通的Java对象,它将由生产者生成并添加到队列中。你也可以称它为有效载荷或队列消息。

package com.Olivia.concurrency;

public class Message {
    private String msg;
    
    public Message(String str){
        this.msg=str;
    }

    public String getMsg() {
        return msg;
    }

}

Java 阻塞队列示例-生产者

生产者类将创建消息并将其放入队列中。

package com.Olivia.concurrency;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue<Message> queue;
    
    public Producer(BlockingQueue<Message> q){
        this.queue=q;
    }
    @Override
    public void run() {
        //生产消息
        for(int i=0; i<100; i++){
            Message msg = new Message(""+i);
            try {
                Thread.sleep(i);
                queue.put(msg);
                System.out.println("生产了 "+msg.getMsg());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //添加退出消息
        Message msg = new Message("exit");
        try {
            queue.put(msg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

Java阻塞队列示例-消费者

当接收到退出信息时,消费者类会处理队列中的消息并终止。

package com.Olivia.concurrency;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{

private BlockingQueue<Message> queue;
    
    public Consumer(BlockingQueue<Message> q){
        this.queue=q;
    }

    @Override
    public void run() {
        try{
            Message msg;
            //消费消息,直到接收到退出消息
            while((msg = queue.take()).getMsg() !="exit"){
            Thread.sleep(10);
            System.out.println("消费了 "+msg.getMsg());
            }
        }catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Java 阻塞队列示例 – 服务

最后我们需要为生产者和消费者创建一个阻塞队列服务。这个生产者消费者服务将创建一个固定大小的阻塞队列,与生产者和消费者共享。这个服务将启动生产者和消费者线程并退出。

package com.Olivia.concurrency;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerService {

    public static void main(String[] args) {
        //创建大小为10的阻塞队列
        BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        //启动生产者,在队列中生产消息
        new Thread(producer).start();
        //启动消费者,从队列中消费消息
        new Thread(consumer).start();
        System.out.println("生产者和消费者已启动");
    }

}

以下是上述Java阻塞队列示例程序的输出。

生产者和消费者已启动
生产了 0
生产了 1
生产了 2
生产了 3
生产了 4
消费了 0
生产了 5
消费了 1
生产了 6
生产了 7
消费了 2
生产了 8
...

Java中的线程睡眠(Thread sleep)用于生产者和消费者以某种延迟生成和消费消息。

bannerAds