多线程在现在工作中出现越来越频繁、需要我们熟记并且能熟练地使用之、对相关线程池的一些配置需要我们非常熟悉。
1. 参数详解
1.1 corePoolSize
corePoolSize 核心线程数
– 核心线程会一直存活,即使没有任务需要执行- 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理- 设置allowCoreThreadTimeOut=true(默认false)时,核心线程会超时关闭
1.2 queueCapacity
queueCapacity 任务队列容量(阻塞队列)
- 当核心线程数达到最大时,新任务会放在队列中排队等待
1.3 maxPoolSize
maxPoolSize 最大线程数
– 当线程数 >= corePoolSize 且任务队列已满时,线程池会创建新线程来处理任务- 当线程数 = maxPoolSize且任务队列已满时,线程池会拒绝处理任务而抛出异常
1.4 keepAliveTime
keepAliveTime 线程空闲时间
– 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=coolPoolSize- 如果allowCoreThreadTimeOut=true时,则会直到线程数量=0
1.5 allowCoreThreadTimeOut
allowCoreThreadTimeOut 允许核心线程超时
1.6 rejectedExecutionHandler
rejectedExecutionHandler 任务拒绝处理器
- 两种情况会拒绝处理任务
- 当线程数已经达到maxPoolSize且任务队列已满,就会拒绝新的任务
- 当线程池被调运shutdown() 然后,会等待线程池里的任务执行完毕在shutdown。如果在调运shutdown() 和线程池真正shutdown之间提交的任务,会拒绝新的任务
- 线程池会调运rejectedExecutionHandler来处理这个任务,如果没有设置默认是AbortPolicy,会抛出异常
– ThreadPoolExecutor类有几个内部实现类来处理这类情况 1. **AbortPolicy** 丢弃任务、抛运行时异常 1. **CallerRunsPolicy** 执行任务 1. **DisCardPolicy** 忽视,什么都不会发生 1. **DisCardOldestPolicy** 从队列中提出最先 进入队列(最后一个执行)的任务
- 实现RejectedExecutionHandler接口,可自定义处理器
2. ThreadPoolExecutor 执行顺序
线程池按照以下行为执行任务
1. 当线程数 = 核心线程数,且任务队列未满时,将任务放入任务队列。3. 当线程数 >= 核心线程数,且任务队列已满- 若线程数 < 最大线程数,创建线程- 若线程数 = 最大线程数,抛出异常,拒绝任务
3. 如何设置参数
3.1 默认值
corePoolSize = 1queueCapacity = Integer.MAX_VALUEmaxPoolSize = Integer.MAX_VALUEkeepAliveTime = 60sallowCoreThreadTimeOut = falserejectedExecutionHandler = AbortPolicy
3.2 如何来设置
- 需要根据几个值来决定
tasks : 每秒的任务数,假设为 500 – 1000taskcost : 每个任务花费时间,假设为0.1sresponsetime : 系统允许容忍的最大响应时间,假设为1s
- 计算方式
corePoolSize = 每秒需要多少个线程处理 * threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50 * 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可queueCapacity = (coreSizePool/taskcost)*responsetime * 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行 * 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost) * 计算可得 maxPoolSize = (1000-80)/10 = 92 * (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
4. 使用案例
@Componentpublic class KafkaReceiver { @Autowired private BroadbandService broadbandService; private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat(“deal-pool-%d”) .build(); private ExecutorService threadPool = new ThreadPoolExecutor(10, 20, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(10), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); /** * 监听kafka消息受理业务 */ @KafkaListener(topics = “${kafka.topics[1]}”) public void listenerKafkaProductCommit(ConsumerRecord record) { threadPool.execute(() -> { try { BusinessOrder businessOrder = JsonUtil.jsonToObject(record.value(), BusinessOrder.class); log.info(“消息队列正在工作,订单号: {}”, businessOrder.getOrderNo()); //尝试多线程处理 broadbandService.asynSchoolBroadbandRemoveService(businessOrder); } catch (Exception e) { log.error(“消息队列正在工作异常”, e); } }); } }