生产者与消费者模式

生产者与消费者模式是一个十分经典的多线程并发协作的模式。如图21.7所示,所谓生产者与消费者模式,指的是两类线程:一类是用于生产数据的生产者线程,另一类是用于消费数据的消费者线程。在程序开发设计过程中,通常会采用数据共享的方式,解耦生产者和消费者的关系。

image 2024 03 06 13 12 15 373
Figure 1. 图21.7 生产者与消费者模式

生产者生产数据后,只需将其放置在数据共享的区域中,并不需要关心消费者的行为。消费者只需从数据共享的区域中获取数据,并不需要关心生产者的行为。

在实现生产者与消费者模式时,可以采用如下3种方式:

  • 使用 Object 的 wait() 方法和 notify() 方法实现消息通知机制。

  • 使用 Lock 的 Condition 的 await() 方法和 signal() 方法实现消息通知机制。

  • 使用 BlockingQueue 实现消息通知机制。

下面将分别使用 Object 的 wait() 方法和 notify() 方法、Condition 的 await() 方法和 signal() 方法实现生产者与消费者模式。

wait()方法和notify()方法

在线程中调用 Object 的 wait() 方法时,将阻塞当前线程,并且释放锁,直至等到其他线程调用了 notify() 方法或者 notifyAll() 方法后,当前线程才能被唤醒,继续执行下面的操作。

Object 的 notify() 方法用于唤醒正在处于等待状态的线程,这使得该线程从等待队列中移入同步队列中,等待下一次能够获取到对象监视器锁的机会。

notifyAll() 方法用于唤醒全部正在处于等待状态的线程,与 notify() 方法的作用大致相同。

【例21.5】使用 Object 的相关方法实现生产者与消费者模式(实例位置:资源包\TM\sl\21\5)

创建一个工厂类 ProductFactory,该类包含两个方法:produce() 生产方法和 consume() 消费方法。对于 produce() 方法,当没有库存或者库存达到5时,停止生产,为了更便于观察结果,每生产一件产品,让当前线程休眠1000毫秒;对于 consume() 方法,只要有库存就进行消费,为了更便于观察结果,每消费一件产品,让当前线程休眠1000毫秒。库存使用 LinkedList 进行实现,此时 LinkedList 即共享数据内存。

创建一个 Producer 生产者类,用于调用 ProductFactory 的 produce() 方法,生产过程中,要对每个产品从 0 开始进行编号;创建一个 Consumer 消费者类,用于调用 ProductFactory 的 consume() 方法;创建一个 Demo 类,在 main() 函数中创建1个生产者和2个消费者,运行程序并观察结果。代码如下:

import java.util.LinkedList;

class ProductFactory {           // 工厂类
    private LinkedList<String> products;   // 定义存储已经生成的产品的集合
    private int stockNums = 5;             // 定义最大库存 5 个
    public ProductFactory() {
        products = new LinkedList<String>();
    }

    public synchronized void produce(String product) {     // 创建生产方法
        while (stockNums == products.size()) {          // 如果库存达到 5 个,则停止生产
            try {
                System.out.println("警告:线程(" + Thread.currentThread().getName() + ")准备生产产品,但产品池已满");
                wait();       // 库存达到 5 个,生产线程进入等待状态
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        products.add(product);        // 如果库存没有到达 5 个,则添加产品
        try {
            Thread.sleep(1000);      // 每生产一件产品,让生产者线程休眠 1000 毫秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("线程(" + Thread.currentThread().getName() + ")生产了一件产品;当前剩余商品" + products.size() + "个");
        // 生产了产品,把消费者线程从等待状态中唤醒以进行消费
        notify();
    }

    public synchronized String consume() {    // 创建消费方法
        while (products.size() == 0) {        // 根据需求:没有库存消费者进入等待状态
            try {
                System.out.println("警告:线程(" + Thread.currentThread().getName() + ")准备消费产品,但当前没有产品");
                // 库存为0,无法消费,消费线程进入等待状态,等待生产者线程唤醒
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        String product = products.remove(0);    // 如果有库存,则消费,并移除消费掉的产品
        try {
            Thread.sleep(1000);        // 每消费一件产品,让消费者线程休眠 1000 毫秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("线程(" + Thread.currentThread().getName() + ")消费了一件产品;当前剩余商品" + products.size() + "个");
        notify();
        return product;
    }

    class Producer implements Runnable {        // 生产者线程类
        private ProductFactory productFactory;   // 关联工厂类
        public Producer(ProductFactory productFactory) {
            this.productFactory = productFactory;
        }
        public void run() {
            int i = 0;                // 对产品进行编号
            while (true) {
                productFactory.produce(String.valueOf(i));   // 调用 productFactory 的 produce() 方法
                i++;
            }
        }
    }

    class Consumer implements Runnable {        // 消费者线程类
        private ProductFactory productFactory;   // 关联工厂类
        public Consumer(ProductFactory productFactory) {
            this.productFactory = productFactory;
        }
        public void run() {
            while (true) {
                productFactory.consume();    // 调用 productFactory 的 consume() 方法
            }
        }
    }
}

public class Demo {
    public static void main(String[] args) {
        ProductFactory productFactory = new ProductFactory();
        new Thread(new ProductFactory.Producer(productFactory), "生产者").start();
        new Thread(new ProductFactory.Consumer(productFactory), "消费者_1").start();
        new Thread(new ProductFactory.Consumer(productFactory), "消费者_2").start();
    }
}

运行结果如下:

     线程(生产者)生产了一件产品;当前剩余商品1个
     线程(生产者)生产了一件产品;当前剩余商品2个
     线程(消费者_2)消费了一件产品;当前剩余商品1个
     线程(消费者_2)消费了一件产品;当前剩余商品0个
     警告:线程(消费者_2)准备消费产品,但当前没有产品
     警告:线程(消费者_1)准备消费产品,但当前没有产品
     线程(生产者)生产了一件产品;当前剩余商品1个
     线程(消费者_2)消费了一件产品;当前剩余商品0个
     警告:线程(消费者_2)准备消费产品,但当前没有产品
     警告:线程(消费者_1)准备消费产品,但当前没有产品
     线程(生产者)生产了一件产品;当前剩余商品1个
     线程(生产者)生产了一件产品;当前剩余商品2个
     线程(消费者_2)消费了一件产品;当前剩余商品1个
     线程(消费者_2)消费了一件产品;当前剩余商品0个
     警告:线程(消费者_2)准备消费产品,但当前没有产品
     线程(生产者)生产了一件产品;当前剩余商品1个
     线程(生产者)生产了一件产品;当前剩余商品2个
     线程(生产者)生产了一件产品;当前剩余商品3个
     线程(生产者)生产了一件产品;当前剩余商品4个
     线程(生产者)生产了一件产品;当前剩余商品5个
     警告:线程(生产者)准备生产产品,但产品池已满
     线程(消费者_1)消费了一件产品;当前剩余商品4个
     ……

从运行结果来看,生产者线程和消费者线程合作无间。当仓库没有产品时,消费者线程进入等待;当仓库产品达到最大库存(5个)时,生产者线程进入等待。这就是经典的生产者与消费者模式。

await()方法和signal()方法

Condition 接口提供了类似 Object 的监视器方法,与 Lock 配合可以实现消息通知机制。在 Condition 接口中,能够找到 Obejct 类的 wait()、notify()、notifyAll() 方法的替代方法。Condition 接口的语法格式如下:

import java.util.concurrent.TimeUnit;public interface Condition {
    void await() throws InterruptedException;
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

方法说明:

  • void await() throws InterruptedException:当前线程进入等待状态,直到被其他线程唤醒或者被中断。

  • long awaitNanos(long nanosTimeout) throws InterruptedException:当前线程进入等待状态,直到被其他线程唤醒或者被中断,抑或是处于等待状态一段时间后结束。nanosTimeout 为超时时间,返回值是超时时间减去实际消耗时间的结果。

  • boolean await(long time, TimeUnit unit) throws InterruptedException:当前线程进入等待状态,直到被其他线程唤醒或者被中断,抑或是处于等待状态一段时间后结束。与上个方法的区别在于其可以设置时间,未超时被唤醒返回 true,超时则返回 false。

  • boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态,直到被其他线程唤醒或者被中断,抑或是处于等待状态一段时间后结束。如果当前线程在截止时间结束前被唤醒,则返回 true,否则返回 false。

  • void signal():唤醒一个线程。

  • void signalAll():唤醒所有线程。

Condition 对象是由 Lock 对象调用 newCondition() 方法创建的(即 Lock.newCondition() )。也就是说,Condition 是依赖 Lock 对象的。创建 Condition 对象的代码如下:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

【例21.6】使用 Condition 的相关方法实现生产者与消费者模式(实例位置:资源包\TM\sl\21\6)

编写一个程序,在保证例21.5的前提条件不发生变化的情况下,使用 Condition 的相关方法替换例21.5中 Object 的 wait() 方法和 notify() 方法,实现生产者与消费者模式。代码如下:

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ProductFactory {           // 工厂类
    private LinkedList<String> products;   // 定义存储已经生成的产品的集合
    private int stockNums = 5;             // 定义最大库存 5 个

    private Lock lock = new ReentrantLock(false);
    private Condition p = lock.newCondition();  // 与生产者线程对应的 Condition 对象
    private Condition c = lock.newCondition();  // 与消费者线程对应的 Condition 对象

    public ProductFactory() {
        products = new LinkedList<String>();
    }

    public void produce(String product) {     // 创建生产方法

        try {
            lock.lock();
            while (stockNums == products.size()) {          // 如果库存达到 5 个,则停止生产
                try {
                    System.out.println("警告:线程(" + Thread.currentThread().getName() + ")准备生产产品,但产品池已满");
                    p.wait();       // 库存达到 5 个,生产线程进入等待状态
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            products.add(product);        // 如果库存没有到达 5 个,则添加产品
            System.out.println("线程(" + Thread.currentThread().getName() + ")生产了一件产品;当前剩余商品" + products.size() + "个");
            // 生产了产品,把消费者线程从等待状态中唤醒以进行消费
            c.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String consume() {    // 创建消费方法

        try {
            lock.lock();
            while (products.size() == 0) {        // 根据需求:没有库存消费者进入等待状态
                try {
                    System.out.println("警告:线程(" + Thread.currentThread().getName() + ")准备消费产品,但当前没有产品");
                    // 库存为0,无法消费,消费线程进入等待状态,等待生产者线程唤醒
                    c.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            String product = products.remove(0);    // 如果有库存,则消费,并移除消费掉的产品
            System.out.println("线程(" + Thread.currentThread().getName() + ")消费了一件产品;当前剩余商品" + products.size() + "个");
            p.signalAll();
            return product;
        } finally {
            lock.unlock();
        }
    }

    class Producer implements Runnable {        // 生产者线程类
        private ProductFactory productFactory;   // 关联工厂类
        public Producer(ProductFactory productFactory) {
            this.productFactory = productFactory;
        }
        public void run() {
            int i = 0;                // 对产品进行编号
            while (true) {
                productFactory.produce(String.valueOf(i));   // 调用 productFactory 的 produce() 方法
                try {
                    Thread.sleep(1000);  // 每生产一件产品,让生产者线程休眠 1000 毫秒
                } catch (Exception e) {
                    e.printStackTrace();
                }
                i++;
            }
        }
    }

    class Consumer implements Runnable {        // 消费者线程类
        private ProductFactory productFactory;   // 关联工厂类
        public Consumer(ProductFactory productFactory) {
            this.productFactory = productFactory;
        }
        public void run() {
            while (true) {
                productFactory.consume();    // 调用 productFactory 的 consume() 方法
                try {
                    Thread.sleep(1000);  // 每消费一件产品,让消费者线程休眠 1000 毫秒
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

public class Demo {
    public static void main(String[] args) {
        ProductFactory productFactory = new ProductFactory();
        new Thread(new ProductFactory.Producer(productFactory), "生产者").start();
        new Thread(new ProductFactory.Consumer(productFactory), "消费者_1").start();
        new Thread(new ProductFactory.Consumer(productFactory), "消费者_2").start();
    }
}

运行结果如下:

     线程(生产者)生产了一件产品;当前剩余商品1个
     线程(消费者_1)消费了一件产品;当前剩余商品0个
     警告:线程(消费者_2)准备消费产品,但当前没有产品
     线程(生产者)生产了一件产品;当前剩余商品1个
     线程(消费者_1)消费了一件产品;当前剩余商品0个
     警告:线程(消费者_2)准备消费产品,但当前没有产品
     警告:线程(消费者_1)准备消费产品,但当前没有产品
     线程(生产者)生产了一件产品;当前剩余商品1个
     线程(消费者_2)消费了一件产品;当前剩余商品0个
     警告:线程(消费者_1)准备消费产品,但当前没有产品
     线程(生产者)生产了一件产品;当前剩余商品1个
     线程(消费者_1)消费了一件产品;当前剩余商品0个
     警告:线程(消费者_2)准备消费产品,但当前没有产品
     警告:线程(消费者_1)准备消费产品,但当前没有产品
     线程(生产者)生产了一件产品;当前剩余商品1个
     线程(消费者_2)消费了一件产品;当前剩余商品0个
     警告:线程(消费者_1)准备消费产品,但当前没有产品
     线程(生产者)生产了一件产品;当前剩余商品1个
     线程(消费者_1)消费了一件产品;当前剩余商品0个
     警告:线程(消费者_2)准备消费产品,但当前没有产品
     线程(生产者)生产了一件产品;当前剩余商品1个
     ……