diff --git a/src/main/java/com/telpo/dipperposition/config/AsyncExecutorConfig.java b/src/main/java/com/telpo/dipperposition/config/AsyncExecutorConfig.java new file mode 100644 index 0000000..9806f31 --- /dev/null +++ b/src/main/java/com/telpo/dipperposition/config/AsyncExecutorConfig.java @@ -0,0 +1,67 @@ +package com.telpo.dipperposition.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @program: DataPushServer + * @description: + * @author: linwl + * @create: 2020-07-11 09:17 + */ +@Configuration +@Slf4j +public class AsyncExecutorConfig implements AsyncConfigurer { + + @Value("${async.pool.corePoolSize}") + private int corePoolSize; + + @Value("${async.pool.maxPoolSize}") + private int maxPoolSize; + + @Value("${async.pool.queueCapacity}") + private int queueCapacity; + + @Bean(name = "asyncServiceExecutor") + public Executor asyncServiceExecutor() { + log.info("start asyncServiceExecutor"); + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 配置核心线程数 + executor.setCorePoolSize(corePoolSize); + // 配置最大线程数 + executor.setMaxPoolSize(maxPoolSize); + // 配置队列大小 + executor.setQueueCapacity(queueCapacity); + // 配置线程池中的线程的名称前缀 + String threadNamePrefix = "async-pool-"; + executor.setThreadNamePrefix(threadNamePrefix); + + // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务 + // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 执行初始化 + executor.initialize(); + return executor; + } + + /** + * 异步任务中异常处理 + * + * @return + */ + @Override + public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { + return (arg0, arg1, arg2) -> { + log.error("==========================" + arg0.getMessage() + "=======================", arg0); + log.error("com.telpo.auth.exception method:" + arg1.getName()); + }; + } +} diff --git a/src/main/java/com/telpo/dipperposition/service/impl/DipperAstPosAsyncTaskServiceImpl.java b/src/main/java/com/telpo/dipperposition/service/impl/DipperAstPosAsyncTaskServiceImpl.java index c51c6e5..4b100eb 100644 --- a/src/main/java/com/telpo/dipperposition/service/impl/DipperAstPosAsyncTaskServiceImpl.java +++ b/src/main/java/com/telpo/dipperposition/service/impl/DipperAstPosAsyncTaskServiceImpl.java @@ -121,7 +121,8 @@ public class DipperAstPosAsyncTaskServiceImpl implements IDipperAstPosAsyncTaskS } // 将IP对应的省会保存到mongoDB - private void createIPProvince(String ipAddress, String province) { + @Async("asyncServiceExecutor") + public void createIPProvince(String ipAddress, String province) { log.debug("异步创建推送失败任务记录!"); try { IPProvinceEntity ipProvinceEntity = iPProvinceService.getIPProvince(ipAddress); @@ -130,10 +131,10 @@ public class DipperAstPosAsyncTaskServiceImpl implements IDipperAstPosAsyncTaskS ipProvinceEntity.setIp(ipAddress); ipProvinceEntity.setProvince(province); iPProvinceService.saveIPProvince(ipProvinceEntity); - } else { - - ipProvinceEntity.setProvince(province); - iPProvinceService.updateIPProvince(ipProvinceEntity, ipProvinceEntity); +// } else { +// +// ipProvinceEntity.setProvince(province); +// iPProvinceService.updateIPProvince(ipProvinceEntity, ipProvinceEntity); } } catch (Exception e) { log.error("创建推送失败记录异常:", e); @@ -145,6 +146,7 @@ public class DipperAstPosAsyncTaskServiceImpl implements IDipperAstPosAsyncTaskS * @param ipAddress */ @Override + @Async("asyncServiceExecutor") public String pushAstPos(String ipAddress, String centerProvinceFilePath, String centerProvince, diff --git a/src/main/java/com/telpo/dipperposition/service/impl/DipperAstTimeAsyncTaskServiceImpl.java b/src/main/java/com/telpo/dipperposition/service/impl/DipperAstTimeAsyncTaskServiceImpl.java index 488936d..1ae8c2d 100644 --- a/src/main/java/com/telpo/dipperposition/service/impl/DipperAstTimeAsyncTaskServiceImpl.java +++ b/src/main/java/com/telpo/dipperposition/service/impl/DipperAstTimeAsyncTaskServiceImpl.java @@ -5,6 +5,7 @@ import com.telpo.dipperposition.common.SocketClient; import com.telpo.dipperposition.service.IDipperAstTimeAsyncTaskService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.time.LocalDateTime; @@ -20,6 +21,7 @@ import java.time.LocalDateTime; public class DipperAstTimeAsyncTaskServiceImpl implements IDipperAstTimeAsyncTaskService { @Override + @Async("asyncServiceExecutor") public String pushAstTime() { // (1) 发送SDBP-AST-TIME diff --git a/src/main/java/com/telpo/dipperposition/service/impl/DipperDataAsyncTaskServiceImpl.java b/src/main/java/com/telpo/dipperposition/service/impl/DipperDataAsyncTaskServiceImpl.java index 3315aeb..0928dc5 100644 --- a/src/main/java/com/telpo/dipperposition/service/impl/DipperDataAsyncTaskServiceImpl.java +++ b/src/main/java/com/telpo/dipperposition/service/impl/DipperDataAsyncTaskServiceImpl.java @@ -8,6 +8,7 @@ import com.telpo.dipperposition.service.IDipperDataAsyncTaskService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; /** @@ -60,6 +61,7 @@ public class DipperDataAsyncTaskServiceImpl implements IDipperDataAsyncTaskServi } @Override + @Async("asyncServiceExecutor") public String getAstEPH(){ return (String)redisUtil.get(DIPPER_DATA_KEY); }