思路:
一个代码示例(简单测试,没有仔细调参,只是为了展示测试思路),首先编写 TestContainer 基类,用于复用(这里并不是所有的都用的上,我放出来只是供大家以后测试各种场景):
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.IOException;
@Testcontainers
public class CommonMicroServiceTest {
private static final Network network = Network.newNetwork();
private static final String HTTPBIN = "httpbin";
public static final int HTTPBIN_PORT = 80;
public static final GenericContainer<?> HTTPBIN_CONTAINER
= new GenericContainer<>("kennethreitz/httpbin:latest")
.withExposedPorts(HTTPBIN_PORT)
.withNetwork(network)
.withNetworkAliases(HTTPBIN);
/**
* <a href="https://java.testcontainers.org/modules/toxiproxy/">toxiproxy</a>
* 使用 toxiproxy 封装 httpbin
* 可以使用 toxiproxy 模拟网络故障等情况
* 可以用的 port 范围是 8666~8697
*/
private static final ToxiproxyContainer TOXIPROXY_CONTAINER = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0")
.withNetwork(network);
private static final int GOOD_HTTPBIN_PROXY_PORT = 8666;
private static final int READ_TIMEOUT_HTTPBIN_PROXY_PORT = 8667;
private static final int RESET_PEER_HTTPBIN_PROXY_PORT = 8668;
public static final String GOOD_HOST;
public static final int GOOD_PORT;
/**
* 以下代表请求已经发出到服务端,但是响应超时,或者不能响应(比如服务器重启)
*/
public static final String READ_TIMEOUT_HOST;
public static final int READ_TIMEOUT_PORT;
public static final String RESET_PEER_HOST;
public static final int RESET_PEER_PORT;
/**
* 以下代表请求都没有发出去,TCP 链接都没有建立
*/
public static final String CONNECT_TIMEOUT_HOST = "localhost";
/**
* 端口 1 一定连不上的
*/
public static final int CONNECT_TIMEOUT_PORT = 1;
static {
//不使用 @Container 注解管理容器声明周期,因为我们需要在静态块生成代理,必须在这之前启动容器
//不用担心容器不会被关闭,因为 testcontainers 会启动一个 ryuk 容器,用于监控并关闭所有容器
HTTPBIN_CONTAINER.start();
TOXIPROXY_CONTAINER.start();
final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(TOXIPROXY_CONTAINER.getHost(), TOXIPROXY_CONTAINER.getControlPort());
try {
Proxy proxy = toxiproxyClient.createProxy("good", "0.0.0.0:" + GOOD_HTTPBIN_PROXY_PORT, HTTPBIN + ":" + HTTPBIN_PORT);
//关闭流量,会 READ TIME OUT
proxy = toxiproxyClient.createProxy("read_timeout", "0.0.0.0:" + READ_TIMEOUT_HTTPBIN_PROXY_PORT, HTTPBIN + ":" + HTTPBIN_PORT);
proxy.toxics().bandwidth("UP_DISABLE", ToxicDirection.UPSTREAM, 0);
proxy.toxics().bandwidth("DOWN_DISABLE", ToxicDirection.DOWNSTREAM, 0);
proxy = toxiproxyClient.createProxy("connect_timeout", "0.0.0.0:" + RESET_PEER_HTTPBIN_PROXY_PORT, HTTPBIN + ":" + HTTPBIN_PORT);
proxy.toxics().resetPeer("UP_SLOW_CLOSE", ToxicDirection.UPSTREAM, 1);
proxy.toxics().resetPeer("DOWN_SLOW_CLOSE", ToxicDirection.DOWNSTREAM, 1);
} catch (IOException e) {
throw new RuntimeException(e);
}
GOOD_HOST = TOXIPROXY_CONTAINER.getHost();
GOOD_PORT = TOXIPROXY_CONTAINER.getMappedPort(GOOD_HTTPBIN_PROXY_PORT);
READ_TIMEOUT_HOST = TOXIPROXY_CONTAINER.getHost();
READ_TIMEOUT_PORT = TOXIPROXY_CONTAINER.getMappedPort(READ_TIMEOUT_HTTPBIN_PROXY_PORT);
RESET_PEER_HOST = TOXIPROXY_CONTAINER.getHost();
RESET_PEER_PORT = TOXIPROXY_CONTAINER.getMappedPort(RESET_PEER_HTTPBIN_PROXY_PORT);
}
}
测试代码:
@Test
public void test() {
// 创建一个自定义的连接提供者
ConnectionProvider provider = ConnectionProvider.builder("customConnectionProvider")
.maxConnections(100) // 增加最大连接数,这个不能太大,否则会被 cloudflare 等 cdn 认为是类 ddos 攻击
.pendingAcquireMaxCount(10000) // 增加等待队列的大小
.build();
HttpClient httpClient = HttpClient.create(provider)
.responseTimeout(Duration.ofMillis(100000)); // 响应超时时间
WebClient build = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();
List<Mono<String>> monos = Lists.newArrayList();
for (int i = 0; i < 10000; i++) {
//我这里使用了 TestContainer 的端口模拟 0.1s 延迟
Mono<String> stringMono = build.get().uri("http://localhost:" +
CommonMicroServiceTest.HTTPBIN_CONTAINER.getMappedPort(HTTPBIN_PORT) + "/delay/0.1")
.retrieve().bodyToMono(String.class);
monos.add(stringMono);
}
long start = System.currentTimeMillis();
String block = Mono.zip(monos, objects -> {
log.info("{}", objects);
return "ok";
}).block();
log.info("block: {} in {}ms", block, System.currentTimeMillis() - start);
}
测试输出:block: ok in 10362ms
基本符合预期:
另外,我一般用 toxicproxy 模拟服务端断开链接,请求发到服务端但是服务端无法响应,请求发不到服务端,发一半到服务端之后的发不到等等等等,在编写微服务基础框架的时候非常好用~
个人简介:个人业余研究了 AI LLM 微调与 RAG,目前成果是微调了三个模型: