多线程批量调用下游接口,设置总超时时间是一种常见的需求,特别是在需要保证程序在预定时间内必须返回,否则超时设置不合理,导致接口变慢。
设置场景:多线程批量执行三个接口,耗时分别为10s、15s、20s(一般不会设置这么大的超时时间,此值为了模拟),总超时时间为15s。
错误做法
package com.renzhikeji.demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author 认知科技技术团队
* 微信公众号:认知科技技术团队
*/
public class JdkDemo {
private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), Thread::new,
new ThreadPoolExecutor.AbortPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
System.out.println("rejectedExecution");
super.rejectedExecution(r, e);
}
});
public static void main(String[] args) {
List<Future<Integer>> futures = new ArrayList<>(10);
Future<Integer> future1 = poolExecutor.submit(() -> {
System.out.println(Thread.currentThread());
TimeUnit.SECONDS.sleep(10);
return 1;
});
futures.add(future1);
Future<Integer> future2 = poolExecutor.submit(() -> {
System.out.println(Thread.currentThread());
TimeUnit.SECONDS.sleep(15);
return 1;
});
futures.add(future2);
Future<Integer> future3 = poolExecutor.submit(() -> {
System.out.println(Thread.currentThread());
TimeUnit.SECONDS.sleep(20);
return 1;
});
futures.add(future3);
long start = System.currentTimeMillis();
for (Future<Integer> integerFuture : futures) {
try {
integerFuture.get(15, TimeUnit.SECONDS);
} catch (Throwable e) {
e.printStackTrace();
}
}
long d = System.currentTimeMillis() - start;
System.out.println(d / 1000);
}
}
执行结果:总超时时间为20s,大于预设置的15s。
上述错误做法:线程池提交任务后,每个任务的超时时间都设置为一个固定值,从而总任务超时超时延长。
java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)方法是对每个任务的超时时间设置,而不是对总任务设置超时时间。
注意:必须保证所有的任务同时执行,核心线程数必须大于等于3,否则会进入队列等待,超时时间会更长。
线程池实现原理解析 崔认知,公众号:认知科技技术团队【八股文Java】图解Java线程池实现原理(ThreadPoolExecutor)
正确做法:使用线程池invokeAll方法
package com.renzhikeji.demo;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
/**
* @author 认知科技技术团队
* 微信公众号:认知科技技术团队
*/
public class JdkDemo {
private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Thread::new, new ThreadPoolExecutor.AbortPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
System.out.println("rejectedExecution");
super.rejectedExecution(r, e);
}
});
public static void main(String[] args) {
List<Future<Integer>> futures = new ArrayList<>(10);
Callable<Integer> callable1 = () -> {
System.out.println(Thread.currentThread());
TimeUnit.SECONDS.sleep(10);
return 1;
};
Callable<Integer> callable2 = () -> {
System.out.println(Thread.currentThread());
TimeUnit.SECONDS.sleep(15);
return 1;
};
Callable<Integer> callable3 = () -> {
System.out.println(Thread.currentThread());
TimeUnit.SECONDS.sleep(20);
return 1;
};
long start = System.currentTimeMillis();
try {
List<Future<Integer>> invoked = poolExecutor.invokeAll(Arrays.asList(callable1, callable2, callable3),
15L, TimeUnit.SECONDS);
for (Future<Integer> future : invoked) {
try {
Integer a = future.get();
} catch (Throwable e) {
e.printStackTrace();
}
}
} catch (Throwable e) {
System.out.println("12");
e.printStackTrace();
}
long d = System.currentTimeMillis() - start;
System.out.println(d / 1000);
}
}
运行结果:总超时时间为预设值的15s。
线程池invokeAll的原理其实是动态改动了java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)设置的超时时间,每次都会设置为:任务截止时间减去当前时间,如下图源码所示:
正确做法:使用CompletableFuture
使用CompletableFuture.allOf(......).get(15L, TimeUnit.SECONDS),也能设置总任务超时时间。
package com.renzhikeji.demo;
import java.util.concurrent.*;
import java.util.function.Supplier;
/**
* @author 认知科技技术团队
* 微信公众号:认知科技技术团队
*/
public class JdkDemo {
private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Thread::new, new ThreadPoolExecutor.AbortPolicy() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
System.out.println("rejectedExecution");
super.rejectedExecution(r, e);
}
});
public static void main(String[] args) {
Supplier<Integer> callable1 = () -> {
System.out.println(Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
};
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(callable1, poolExecutor);
Supplier<Integer> callable2 = () -> {
System.out.println(Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
};
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(callable2, poolExecutor);
Supplier<Integer> callable3 = () -> {
System.out.println(Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
};
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(callable3, poolExecutor);
long start = System.currentTimeMillis();
try {
CompletableFuture.allOf(future2, future2, future3).get(15L, TimeUnit.SECONDS);
} catch (Throwable e) {
e.printStackTrace();
}
long d = System.currentTimeMillis() - start;
System.out.println(d / 1000);
try {
Integer integer = future1.get();
System.out.println("future1");
} catch (Throwable e) {
e.printStackTrace();
}
try {
Integer integer = future2.get();
System.out.println("future2");
} catch (Throwable e) {
e.printStackTrace();
}
try {
Integer integer = future3.get();
System.out.println("future3");
} catch (Throwable e) {
e.printStackTrace();
}
}
}
执行结果:任务1、任务2执行完了,任务3超时了。