优雅关闭(线程池)和动态线程池

/ java / 9 条评论 / 2000浏览

优雅关闭线程池

关闭的时候先将自己需要处理的内容处理完了,之后才关闭服务

server:
  port: 8080
  # 优雅停机
  shutdown: graceful
    @PostConstruct
    public void init() {
        for (String groupId : groupIds) {
            // 得到一个线程池
            ExecutorService threadPool = ThreadPoolConfig.getThreadPool(coreSize, maxSize, queueSize);
            // 注册线程池(方便优雅的关闭线程池)
            threadPoolExecutorShutdownDefinition.registryExecutor(threadPool);
            // 将线程池存储
            taskPendingHolder.put(groupId, threadPool);
        }
    }
/**
 * 优雅的关闭线程池
 * @author wj
 * @create 2022-07-20 10:51
 */
@Component
@Slf4j
public class ThreadPoolExecutorShutdownDefinition implements ApplicationListener<ContextClosedEvent> {

    private final List<ExecutorService> POOLS = Collections.synchronizedList(new ArrayList<>(12));

    /**
     * 线程中的任务在接收到应用关闭信号量后最多等待多久就强制终止,其实就是给剩余任务预留的时间, 到时间后线程池必须销毁
     */
    private final long AWAIT_TERMINATION = 60;
    /**
     * awaitTermination的单位
     */
    private final TimeUnit TIME_UNIT = TimeUnit.SECONDS;

    /**
     * 注册线程池
     * @param threadPoolTaskExecutor
     */
    public void registryExecutor(ThreadPoolTaskExecutor threadPoolTaskExecutor) {
        POOLS.add(threadPoolTaskExecutor.getThreadPoolExecutor());
    }

    /**
     * 注册线程池
     * @param threadPoolTaskExecutor
     */
    public void registryExecutor(ThreadPoolTaskScheduler threadPoolTaskExecutor) {
        POOLS.add(threadPoolTaskExecutor.getScheduledThreadPoolExecutor());
    }

    /**
     * 注册线程池
     * @param executor
     */
    public void registryExecutor(ExecutorService executor) {
        POOLS.add(executor);
    }

    /**
     * 参考{@link org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#shutdown()}
     *
     * @param event the event to respond to
     */
    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        log.info("容器关闭前处理线程池优雅关闭开始, 当前要处理的线程池数量为: {} >>>>>>>>>>>>>>>>", POOLS.size());
        if (CollectionUtils.isEmpty(POOLS)) {
            return;
        }
        for (ExecutorService pool : POOLS) {
            pool.shutdown();
            try {
                if (!pool.awaitTermination(AWAIT_TERMINATION, TIME_UNIT)) {
                    if (log.isWarnEnabled()) {
                        log.warn("Timed out while waiting for executor [{}] to terminate", pool);
                    }
                }
            } catch (InterruptedException ex) {
                if (log.isWarnEnabled()) {
                    log.warn("Timed out while waiting for executor [{}] to terminate", pool);
                }
                Thread.currentThread().interrupt();
            }
        }
    }
}

动态线程池(项目用DynamicTp)

DynamicTp

            <dependency>
                <groupId>io.github.lyh200</groupId>
                <artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
                <version>1.0.1</version>
            </dependency>
@Slf4j
@RestController
public class ThreadPoolTest {

    @GetMapping("/tp")
    public void tp() {
        DtpExecutor dtpExecutor2 = DtpRegistry.getExecutor("execute-xxl-thread-pool");
        DtpExecutor dtpExecutor1 = DtpRegistry.getExecutor("imwj.im.notice");

        for(int i=0; i<10; i++){
            log.info("dtpExecutor1:{}   {}",dtpExecutor1.getCorePoolSize(), dtpExecutor1.getMaximumPoolSize());
            log.info("dtpExecutor2:{}   {}",dtpExecutor2.getCorePoolSize(), dtpExecutor2.getMaximumPoolSize());

            dtpExecutor1.execute(() -> log.info("test1"));
            dtpExecutor2.execute(() -> log.info("test2"));
            ThreadUtil.sleep(1000 * 10);
        }
    }
}