Java的并行世界-3.0 线程池与拒绝策略

新SingleThreadExecutor创建一个只有一个线程的线程池。新ScheduledThreadPool创建一个支持定时任务的线程池。ForkJoinPool是一个用于执行分而治之任务的线程池,特别适用于递归分解的问题,例如并行归并排序、并行求和等。ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinPool在java.util.concurrent包...
Java的并行世界-3.0 线程池与拒绝策略
Jdk并发相关核心类:java.util.concurrent

java.util.concurrent.Executors提供了一些静态工厂方法,用于创建不同类型的线程池,例如:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);

可以通过new ThreadPoolExecutor()方法手动创建线程池,该方法需要传入四个参数,分别是核心线程数、最大线程数、线程保活时间和任务队列。其中,核心线程数和最大线程数是必填参数,线程保活时间和任务队列是可选参数。

Java中的Executors共有四种创建方式,这些方式包括使用newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor和newScheduledThreadPool。在使用这些方法时,可以根据实际需求选择最适合的方式来创建线程池。无论哪种方式,线程池都可以有效地管理和控制线程,提高程序的执行效率。

新FixedThreadPool创建一个固定大小的线程池。

以下是一个Java中创建newFixedThreadPool的代码例子:

新CachedThreadPool创建一个根据需要自动扩展的线程池,线程数根据任务数量动态调整。

以下是一个newCachedThreadPool的Java代码示例:

新SingleThreadExecutor创建一个只有一个线程的线程池。

新ScheduledThreadPool创建一个支持定时任务的线程池。

ForkJoinPool是一个用于执行分而治之任务的线程池,特别适用于递归分解的问题,例如并行归并排序、并行求和等。

ForkJoinPool forkJoinPool = new ForkJoinPool();

ForkJoinPool在java.util.concurrent包下,从Java 7开始引入,专门用于处理需要递归分解的任务。它利用工作窃取(Work-Stealing)算法来实现高效的任务调度和线程利用,能够充分利用多核处理器的优势。

ForkJoinPool的主要特点包括:

下面是一个简单的使用ForkJoinPool的示例,计算斐波那契数列的值:

虽然上面5个线程池看上去功能特点不同,但是其内部实现原理都调用了JDK的ThreadPoolExecutor线程池类。

ThreadPoolExecutor的构造函数有多个参数,允许根据实际需求配置线程池的行为,主要包括:

corePoolSize:线程池的核心线程数,即线程池中保持的最小线程数量。即使线程处于空闲状态,核心线程也不会被销毁。

maximumPoolSize:线程池的最大线程数,即线程池中允许的最大线程数量。当任务数量超过核心线程数时,线程池会根据实际情况动态地创建新的线程,但不会超过最大线程数。

keepAliveTime:非核心线程的空闲时间超过这个时间后,会被销毁,从而控制线程池的大小。时间单位可以通过指定的TimeUnit来定义。

workQueue:任务队列,用于存放等待执行的任务。ThreadPoolExecutor支持多种类型的任务队列,如ArrayBlockingQueue、LinkedBlockingQueue等。

threadFactory:用于创建线程的工厂,可以自定义线程的创建方式。

handler:拒绝策略,当线程池和队列都满了,无法继续接受新的任务时,会触发拒绝策略来处理新的任务。常见的拒绝策略包括AbortPolicy(默认策略,直接抛出异常)、CallerRunsPolicy(由调用线程来执行被拒绝的任务)、DiscardPolicy(丢弃被拒绝的任务)、DiscardOldestPolicy(丢弃队列中最老的任务)。

ThreadPoolExecutor提供了submit()和execute()等方法来向线程池提交任务,其中:

submit()方法可以接受Callable和Runnable类型的任务,并返回一个Future对象,通过这个对象可以获取任务的执行结果或者取消任务。

execute()方法只能接受Runnable类型的任务,无法获取任务的返回结果。

ThreadPoolExecutor还提供了一些方法来管理和监控线程池的状态,如getActiveCount()、getCompletedTaskCount()、getTaskCount()等。

workQueue:任务队列,其中的任务是被提交但尚未执行的任务。其类型是:BlockingQueue接口,只能用于存放Runable对象。

有界队列可以通过ArrayBlockingQueue实现,当有新任务到来时,如果线程池的实际线程数量小于corePoolSize,则会优先创建新的线程;如果大于corePoolSize,则会将新任务加入等待队列;若队列已满,无法加入,则在总线程数量不大于maximumPoolSize的前提下,创建新的线程执行任务;若大于maximumPoolSize,则执行拒绝策略。

无界队列可以通过LinkedBlockingQueue实现,与有界队列相比,除非系统资源耗尽,不然当有新的任务到来,并且线程数量小于corePoolSize时,线程池就会创建新的线程执行任务。但当线程数量大于corePoolSize后,就不会继续创建了。若还有任务进来,系统CPU没那么忙,还有线程资源,则任务直接进入队列等待。

优先任务队列可以通过PriorityBlockingQueue实现,可以根据任务的优先级执行任务,这是一种特殊的无界队列。

有界队列和无界队列需要做demo测试。新CachedThreadPool内部使用的是SynchronousQueue队列,这是一个直接提交队列,系统会增加新的线程执行任务,当任务执行完毕后,线程会被回收;如果开启大量任务提交,每个任务执行慢,系统会开启等量的线程处理,直到系统资源耗尽。

ThreadPoolExecutor的核心调度代码包括workerCountOf(c)来获取当前工作线程池的总数,addWorker(command)用于提交任务或创建线程池,以及reject(command)来执行拒绝策略。

Handler是RejectedExecutionHandler接口类型,代表了不同的拒绝策略。常见的拒绝策略包括:

CallerRunsPolicy:如果线程池未关闭,会直接在调用者当前线程执行等待队列放不下的任务。

AbortPolicy:会直接抛出异常,阻止系统正常运行。

DiscardPolicy:直接丢失无法放入等待队列的任务,不做异常抛出。

DiscardOldestPolicy:丢弃最老的一个请求,然后尝试把当前请求任务加入到等待队列。

注意:在创建ThreadPoolExecutor时需要指定拒绝策略,如果以上拒绝策略无法满足,可以继承RejectedExecutionHandler接口来实现自定义的拒绝策略。

最常用的是升级DiscardPolicy策略,但需要在放弃前记录请求;示例如下:

以下是RejectedExecutionHandler接口代码,可以重新实现该方法以满足自定义需求。

熟悉拒绝策略后,在线程池中还有重要参数ThreadFactory,用于控制线程的创建。通过ThreadFactory,可以实现以下功能:

命名线程:通过为线程指定有意义的名称,便于跟踪日志和调试信息。

设置线程属性:根据需要设置线程的优先级、守护状态、异常处理器等。

定制化线程创建逻辑:添加自定义逻辑来创建线程,如记录线程的创建次数、设置线程组等。

以下是简单的ThreadFactory示例:2024-08-20
mengvlog 阅读 8 次 更新于 2025-07-19 13:22:48 我来答关注问题0
檬味博客在线解答立即免费咨询

编程相关话题

Copyright © 2023 WWW.MENGVLOG.COM - 檬味博客
返回顶部