🌈超时任务
# 超时任务
何为超时任务? 顾名思义其实就是单一任务可以通过设置超时时间决定改任务是否继续执行。配置方式很简单。如下操作:
注解配置
使用 @Task
注解中的 timeoutInMilliseconds
属性进行配置。
- timeoutInMilliseconds 固定使用毫秒数
package com.gobrs.async.test.task.timeout;
import com.gobrs.async.core.TaskSupport;
import com.gobrs.async.core.anno.Task;
import com.gobrs.async.core.task.AsyncTask;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* The type A service.
* @program: gobrs -async-starter
* @description:
* @author: sizegang
*/
@Slf4j
@Task(timeoutInMilliseconds = 300)
public class CaseTimeoutTaskA extends AsyncTask {
/**
* The .
*/
int i = 10000;
@Override
public void prepare(Object o) {
log.info(this.getName() + " 使用线程---" + Thread.currentThread().getName());
}
@SneakyThrows
@Override
public String task(Object o, TaskSupport support) {
System.out.println("CaseTimeoutTaskA Begin");
Thread.sleep(400);
for (int i1 = 0; i1 < i; i1++) {
i1 += i1;
}
System.out.println("CaseTimeoutTaskA Finish");
return "result";
}
@Override
public boolean necessary(Object o, TaskSupport support) {
return true;
}
@Override
public void onSuccess(TaskSupport support) {
}
}
# 超时监听线程池配置
所谓的监听线程池配置即:监听任务超时的线程池的核心线程数,关于为什么要配置该参数。请查看下方的原理分析。
gobrs:
async:
config:
rules:
- name: "chain1"
content: "taskA->taskB->taskC"
timeout-core-size: 200 # 核心线程数量
如果不配置则取默认值,默认值计算方式如下:
public static Integer calculateCoreNum() {
int cpuCoreNum = Runtime.getRuntime().availableProcessors();
return new BigDecimal(cpuCoreNum).divide(new BigDecimal("0.2")).intValue();
}
# 单例测试
# 运行结果
2022-12-09 16:51:41.031 INFO 37083 --- [pool-2-thread-1] c.g.a.t.task.timeout.CaseTimeoutTaskA : caseTimeoutTaskA 使用线程---pool-2-thread-1
CaseTimeoutTaskA Begin
2022-12-09 16:51:41.355 ERROR 37083 --- [ GobrsTimer-1] com.gobrs.async.core.timer.GobrsTimer :
com.gobrs.async.core.common.exception.TimeoutException: task caseTimeoutTaskA TimeoutException
at com.gobrs.async.core.TaskLoader$1.tick(TaskLoader.java:394) ~[classes/:na]
at com.gobrs.async.core.timer.GobrsTimer.lambda$addTimerListener$0(GobrsTimer.java:80) ~[classes/:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_251]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_251]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_251]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_251]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_251]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_251]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_251]
2022-12-09 16:51:41.375 INFO 37083 --- [pool-2-thread-2] c.g.a.t.task.timeout.CaseTimeoutTaskB : caseTimeoutTaskB 使用线程---pool-2-thread-2
CaseTimeoutTaskB Begin
CaseTimeoutTaskB Finish
2022-12-09 16:51:41.377 INFO 37083 --- [pool-2-thread-2] c.g.a.t.task.timeout.CaseTimeoutTaskC : caseTimeoutTaskC 使用线程---pool-2-thread-2
CaseTimeoutTaskC Begin
CaseTimeoutTaskC Finish
2022-12-09 16:51:43.426 INFO 37083 --- [ main] com.gobrs.async.core.TaskLoader : 【ProcessTrace】Total cost: 2413ms【task】caseTimeoutTaskA cost :312ms【state】:fail【errMsg】: sleep interrupted; ->【task】caseTimeoutTaskB cost :0ms【state】:success; ->【task】caseTimeoutTaskC cost :2011ms【state】:success;
# 特别说明
- 超时任务不支持线程复用,因为需要通过控制线程超时来进行逻辑判断,如果支持线程复用,可能会出现中断正在复用线程的任务执行。
- 熔断降级原理借鉴
Hystrix
方式处理,如果对Hystrix
不熟悉可以借鉴 Hystrix 熔断降级原理解析 (opens new window) - 因线程调度的原因,超时时间可能存在10ms内的误差,可忽略!
Last Updated: 1/6/2023, 9:59:30 AM