@@ -0,0 +1,67 @@ | |||||
package com.telpo.dipperposition.config; | |||||
import lombok.extern.slf4j.Slf4j; | |||||
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; | |||||
import org.springframework.beans.factory.annotation.Value; | |||||
import org.springframework.context.annotation.Bean; | |||||
import org.springframework.context.annotation.Configuration; | |||||
import org.springframework.scheduling.annotation.AsyncConfigurer; | |||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; | |||||
import java.util.concurrent.Executor; | |||||
import java.util.concurrent.ThreadPoolExecutor; | |||||
/** | |||||
* @program: DataPushServer | |||||
* @description: | |||||
* @author: linwl | |||||
* @create: 2020-07-11 09:17 | |||||
*/ | |||||
@Configuration | |||||
@Slf4j | |||||
public class AsyncExecutorConfig implements AsyncConfigurer { | |||||
@Value("${async.pool.corePoolSize}") | |||||
private int corePoolSize; | |||||
@Value("${async.pool.maxPoolSize}") | |||||
private int maxPoolSize; | |||||
@Value("${async.pool.queueCapacity}") | |||||
private int queueCapacity; | |||||
@Bean(name = "asyncServiceExecutor") | |||||
public Executor asyncServiceExecutor() { | |||||
log.info("start asyncServiceExecutor"); | |||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); | |||||
// 配置核心线程数 | |||||
executor.setCorePoolSize(corePoolSize); | |||||
// 配置最大线程数 | |||||
executor.setMaxPoolSize(maxPoolSize); | |||||
// 配置队列大小 | |||||
executor.setQueueCapacity(queueCapacity); | |||||
// 配置线程池中的线程的名称前缀 | |||||
String threadNamePrefix = "async-pool-"; | |||||
executor.setThreadNamePrefix(threadNamePrefix); | |||||
// 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务 | |||||
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 | |||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); | |||||
// 执行初始化 | |||||
executor.initialize(); | |||||
return executor; | |||||
} | |||||
/** | |||||
* 异步任务中异常处理 | |||||
* | |||||
* @return | |||||
*/ | |||||
@Override | |||||
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { | |||||
return (arg0, arg1, arg2) -> { | |||||
log.error("==========================" + arg0.getMessage() + "=======================", arg0); | |||||
log.error("com.telpo.auth.exception method:" + arg1.getName()); | |||||
}; | |||||
} | |||||
} |
@@ -121,7 +121,8 @@ public class DipperAstPosAsyncTaskServiceImpl implements IDipperAstPosAsyncTaskS | |||||
} | } | ||||
// 将IP对应的省会保存到mongoDB | // 将IP对应的省会保存到mongoDB | ||||
private void createIPProvince(String ipAddress, String province) { | |||||
@Async("asyncServiceExecutor") | |||||
public void createIPProvince(String ipAddress, String province) { | |||||
log.debug("异步创建推送失败任务记录!"); | log.debug("异步创建推送失败任务记录!"); | ||||
try { | try { | ||||
IPProvinceEntity ipProvinceEntity = iPProvinceService.getIPProvince(ipAddress); | IPProvinceEntity ipProvinceEntity = iPProvinceService.getIPProvince(ipAddress); | ||||
@@ -130,10 +131,10 @@ public class DipperAstPosAsyncTaskServiceImpl implements IDipperAstPosAsyncTaskS | |||||
ipProvinceEntity.setIp(ipAddress); | ipProvinceEntity.setIp(ipAddress); | ||||
ipProvinceEntity.setProvince(province); | ipProvinceEntity.setProvince(province); | ||||
iPProvinceService.saveIPProvince(ipProvinceEntity); | iPProvinceService.saveIPProvince(ipProvinceEntity); | ||||
} else { | |||||
ipProvinceEntity.setProvince(province); | |||||
iPProvinceService.updateIPProvince(ipProvinceEntity, ipProvinceEntity); | |||||
// } else { | |||||
// | |||||
// ipProvinceEntity.setProvince(province); | |||||
// iPProvinceService.updateIPProvince(ipProvinceEntity, ipProvinceEntity); | |||||
} | } | ||||
} catch (Exception e) { | } catch (Exception e) { | ||||
log.error("创建推送失败记录异常:", e); | log.error("创建推送失败记录异常:", e); | ||||
@@ -145,6 +146,7 @@ public class DipperAstPosAsyncTaskServiceImpl implements IDipperAstPosAsyncTaskS | |||||
* @param ipAddress | * @param ipAddress | ||||
*/ | */ | ||||
@Override | @Override | ||||
@Async("asyncServiceExecutor") | |||||
public String pushAstPos(String ipAddress, | public String pushAstPos(String ipAddress, | ||||
String centerProvinceFilePath, | String centerProvinceFilePath, | ||||
String centerProvince, | String centerProvince, | ||||
@@ -5,6 +5,7 @@ import com.telpo.dipperposition.common.SocketClient; | |||||
import com.telpo.dipperposition.service.IDipperAstTimeAsyncTaskService; | import com.telpo.dipperposition.service.IDipperAstTimeAsyncTaskService; | ||||
import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||||
import org.springframework.beans.factory.annotation.Value; | import org.springframework.beans.factory.annotation.Value; | ||||
import org.springframework.scheduling.annotation.Async; | |||||
import org.springframework.stereotype.Service; | import org.springframework.stereotype.Service; | ||||
import java.time.LocalDateTime; | import java.time.LocalDateTime; | ||||
@@ -20,6 +21,7 @@ import java.time.LocalDateTime; | |||||
public class DipperAstTimeAsyncTaskServiceImpl implements IDipperAstTimeAsyncTaskService { | public class DipperAstTimeAsyncTaskServiceImpl implements IDipperAstTimeAsyncTaskService { | ||||
@Override | @Override | ||||
@Async("asyncServiceExecutor") | |||||
public String pushAstTime() { | public String pushAstTime() { | ||||
// (1) 发送SDBP-AST-TIME | // (1) 发送SDBP-AST-TIME | ||||
@@ -8,6 +8,7 @@ import com.telpo.dipperposition.service.IDipperDataAsyncTaskService; | |||||
import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||||
import org.springframework.beans.factory.annotation.Autowired; | import org.springframework.beans.factory.annotation.Autowired; | ||||
import org.springframework.beans.factory.annotation.Value; | import org.springframework.beans.factory.annotation.Value; | ||||
import org.springframework.scheduling.annotation.Async; | |||||
import org.springframework.stereotype.Service; | import org.springframework.stereotype.Service; | ||||
/** | /** | ||||
@@ -60,6 +61,7 @@ public class DipperDataAsyncTaskServiceImpl implements IDipperDataAsyncTaskServi | |||||
} | } | ||||
@Override | @Override | ||||
@Async("asyncServiceExecutor") | |||||
public String getAstEPH(){ | public String getAstEPH(){ | ||||
return (String)redisUtil.get(DIPPER_DATA_KEY); | return (String)redisUtil.get(DIPPER_DATA_KEY); | ||||
} | } | ||||