线程池

线程池实质上就是一种多线程处理形式,处理过程中可以先将任务添加到队列中,在创建线程后再自动启动这些任务。

使用线程池最重要的原因就是可以根据系统的需求和硬件环境灵活地控制线程的数量,并且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统的运行压力。

使用线程池具有以下优势:

  • 使线程和任务分离,提升线程重用性。

  • 控制线程并发数量,降低服务器压力,统一管理所有线程。

  • 提升系统响应速度。如果创建线程的时间为 T1,执行任务的时间为 T2,销毁线程的时间为 T3,那么使用线程池则免去了时间 T1 和时间 T3。

创建线程池有两种方式:一种是使用 Executors 的默认方法,另一种是通过 ThreadPoolExecutor 进行自定义的方法。不推荐前者是因为前者的配置很多,而且都是取 Integer 的最大值,很容易造成内存溢出。

ThreadPoolExecutor 自定义的语法格式如下:

ThreadPoolExecutor tpe = new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

参数说明。

  • int corePoolSize:核心线程数。

  • int maximumPoolSize:最大线程数。

  • long keepAliveTime:非核心线程的闲置超时时间。

  • TimeUnit unit:释放时间的单位,如m(分钟)、h(小时)、d(天)等。

  • BlockingQueue<Runnable> workQueue:阻塞消息队列。

  • ThreadFactory threadFactory:线程工厂。

  • RejectedExecutionHandler handler:拒绝策略。

下面对上述 7 个参数中的“核心线程数”、“最大线程数”、“阻塞消息队列”、“线程工厂”、“拒绝策略” 进行详解。

核心线程数

当线程是 IO 密集型时,主要消耗磁盘的读写性能,设置为 2*n,n 为当前服务器核数(如 8 核 16G 的服务器被设置为 16,由 Runtime.getRuntime().availableProcessors() 进行获取)。

当线程是 CPU 密集型时,主要消耗 CPU 的性能,设置为 n+1。

最大线程数

当核心线程数和消息队列都满了后,才会创建最大线程,直到达到最大线程数。之后的线程就会执行拒绝策略。

阻塞消息队列

  • ArrayBlockingQueue:基于数组的先进先出队列。此队列创建时必须指定大小,读写用一把锁,性能较差。

  • LinkedBlockingQueue:基于链表的先进先出队列。如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE。写和读用两把锁进行操作,因此性能较好。

  • synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是直接新建一个线程来执行新的任务。

需要注意的是:当核心线程数满了后,新线程会先被存储在消息队列中;当消息队列也满了后,才会创建最大线程;直到达到最大线程数,之后的线程就会执行拒绝策略。

线程工厂

创建线程的类,既可以使用默认工厂,也可以自定义线程工厂实现 ThreadFactory 接口,重写 newThread() 方法。自定义工厂的优势在于可以设置线程名或者定义辅助线程。

拒绝策略

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。

  • ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。

  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。

  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务。

当自定义拒绝策略时,需要实现 RejectedExecutionHandler 接口,重写 rejectedExecution(Runnable r,ThreadPoolExecutor executor) 方法。此方法中可以通过类型转换确定线程具体的类型,从而获取线程的相关信息。需要注意的是,只能转换线程池中通过 Runnable 接口实现的线程。如果线程池执行的线程是通过实现 Callable 实现的,在执行前会把线程封装成 FutureTask,这样相当于转换再转换,就没法转换成原来的对象了。

execute 只能提交 Runnable 线程,submit 可以提交所有的 Runnable、Callable、Thread 线程。

下面将演示如何通过 ThreadPoolExecutor 自定义一个线程池。代码如下:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ExecutorTest {
    public static class ThreadOne implements Runnable {
        private Integer number;

        public ThreadOne(Integer temp) {
            this.number = temp;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ":ThreadOne--" + number);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class ThreadTwo extends Thread {
        @Override
        public void run() {

        }
    }

    public static class ThreadThree implements Callable<Integer> {
        Integer number;

        public ThreadThree(Integer temp) {
            this.number = temp;
        }

        public Integer call() throws Exception {
            System.out.println(Thread.currentThread().getName() + ":ThreadThree--" + number);
            Thread.sleep(3000);
            return number;
        }
    }

    public static void main(String[] args) {
        ExecutorService es = new ThreadPoolExecutor(3, 10, 10, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(5), new Factory(), new Handler());
        try {
            for (int i = 0; i < 50; i++) {
                es.execute(new ThreadOne(i));
            }
        } finally {
            es.shutdown();
        }
    }

    private static class Factory implements ThreadFactory {
        private AtomicInteger count = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread thd = new Thread(r);
            String threadName = "Factory--" + count.addAndGet(1);
            thd.setName(threadName);
            System.out.println("线程" + threadName + "创建完成");
            return thd;
        }
    }

    private static class Handler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            ThreadOne thdOne = (ThreadOne) r;
            System.out.println(thdOne.number + "被阻塞了");
        }
    }
}

运行结果如下:(运行结果不唯一)

     线程Factory--1创建完成
     线程Factory--2创建完成
     Factory--1:ThreadOne--0
     线程Factory--3创建完成
     线程Factory--4创建完成
     Factory--2:ThreadOne--1
     线程Factory--5创建完成
     线程Factory--6创建完成
     线程Factory--7创建完成
     线程Factory--8创建完成
     Factory--3:ThreadOne--2
     线程Factory--9创建完成
     线程Factory--10创建完成
     15被阻塞了
     16被阻塞了
     17被阻塞了
     18被阻塞了
     19被阻塞了
     20被阻塞了
     21被阻塞了
     22被阻塞了
     23被阻塞了
     24被阻塞了
     25被阻塞了
     26被阻塞了
     27被阻塞了
     28被阻塞了
     29被阻塞了
     30被阻塞了
     31被阻塞了
     32被阻塞了
     Factory--4:ThreadOne--8
     33被阻塞了
     34被阻塞了
     35被阻塞了
     36被阻塞了
     37被阻塞了
     38被阻塞了
     39被阻塞了
     40被阻塞了
     41被阻塞了
     42被阻塞了
     43被阻塞了
     44被阻塞了
     45被阻塞了
     46被阻塞了
     47被阻塞了
     48被阻塞了
     49被阻塞了
     Factory--5:ThreadOne--9
     Factory--6:ThreadOne--10
     Factory--7:ThreadOne--11
     Factory--8:ThreadOne--12
     Factory--9:ThreadOne--13
     Factory--10:ThreadOne--14
     Factory--1:ThreadOne--3
     Factory--2:ThreadOne--4
     Factory--3:ThreadOne--5
     Factory--4:ThreadOne--6
     Factory--10:ThreadOne--7