响应式编程遵循观察者设计模式,该模式可以定义为:当一个事物发生状态变化时,其他事物将被相应地通知和更新。 因此,观察者不需要轮询事件的变化,而是异步等待事件变化的通知,所以观察者收到通知后就可以处理该事件。 在这个过程中,观察者是发生事件变化时执行的函数,而数据流是可以被观察到的实际可观测数据,也就是被观察者或者称作主题。
几乎所有的语言和框架都在其生态系统中采用了这种响应式编程方法,Java也紧跟时代步伐, 在Java8中引入了响应式编程。现在响应式编程已经开始渗透到Java 8和Java EE 8版本的各个部分。 Java8中引入了诸如CompletionStage及其实现,CompletableFuture等概念,这些类和概念在诸如JAX-RS的Reactive Client API等规范中使用。
JAX-RS客户端API
接下来我们来看看如何在Java EE 8应用程序中使用响应式编程。 在开始本例之前,您需要熟悉基本的Java EE API。 JAX-RS 2.1引入了创建REST客户端的新方法,支持响应式编程。 JAX-RS提供的默认调用者实现是同步的,这意味着创建的客户端将阻塞对服务器端的调用。 这个实现的例子如例一所示。
例一
Response response =ClientBuilder.newClient()
.target("http://localhost:8080/service-url")
.request()
.get();
从JAX-RS 2.0版本开始,JAX-RS为客户端提供了异步的支持,通过调用async()方法为客户端API创建异步调用器,如例二所示。
例二
Future<Response> response =ClientBuilder.newClient()
.target("http://localhost:8080/service-url")
.request()
.async()
.get();
在客户端上使用异步调用器时将返回Future类型的实例,泛型类型为javax.ws.rs .core.Response。 这种方式可以通过调用future.get()来轮询响应,或者通过注册一个回调函数,当HTTP响应可用时将回调该方法。 这两种实现方式都适用于异步编程,但是当你想嵌套回调函数或者在这些异步执行点添加控制条件时会使程序变得复杂。
JAX-RS 2.1提供了一种响应式的编程方式来解决这些问题。当用新的JAX-RS响应式客户端API来构建客户端时, 只需要调用rx()方法就可以完成响应式调用。 在例三中,rx()方法返回存在于客户端运行时的响应式调用者,并且客户端返回类型为CompletionStage.rx()的响应,通过此简单调用就可以实现从同步调用器切换到异步调用器。
例三
CompletionStage<Response> response =ClientBuilder.newClient()
.target("http://localhost:8080/service-url")
.request()
.rx()
.get();
CompletionStage是Java 8中引入的一个新接口,它的名称意味着它可以作为大规模计算中的一个阶段的计算。当我们得到响应实例后,可以调用thenAcceptAsync()方法,在该方法中我们可以提供自己的业务逻辑代码,当响应变为可用时,这些业务逻辑代码片段将异步执行,如例四所示。
例四
response.thenAcceptAsync(res -> {
Temperature t = res.readEntity(Temperature.class);
//do stuff with t
});
响应式编程在服务端的应用
响应式方法不仅局限于JAX-RS中的客户端; 也可以在服务器端利用它。 为了演示这一点,我们将首先模拟一个简单的场景,即我们可以从一个服务器端查询位置列表。 对于每个位置,我们将用该位置数据再次调用另一个服务器端点以获取温度值。 端点的交互如图1所示。
图1 端点交互图
首先,我们定义域模型,然后定义每个域模型的服务。 例五定义了Forecast类,它包装了Temperature和Location类。
例五
public class Temperature {
private Double temperature;
private String scale;
// getters & setters
}
public class Location {
String name;
public Location() {}
public Location(String name) {
this.name = name;
}
// getters & setters
}
public class Forecast {
private Location location;
private Temperature temperature;
public Forecast(Location location) {
this.location = location;
}
public Forecast setTemperature(
final Temperature temperature) {
this.temperature = temperature;
return this;
}
// getters }
例六中实现了ServiceResponse类,该类封装了温度预测列表。
例六
public class ServiceResponse {
private long processingTime;
private List<Forecast> forecasts = new ArrayList<>();
public void setProcessingTime(long processingTime) {
this.processingTime = processingTime;
}
public ServiceResponse forecasts(
List<Forecast> forecasts) {
this.forecasts = forecasts;
return this;
}
// getters
}
例七中显示的LocationResource定义了三个返回的样本位置,请求URL是:/location。
例七
@Path("/location")
public class LocationResource {
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getLocations() {
List<Location> locations = new ArrayList<>();
locations.add(new Location("London"));
locations.add(new Location("Istanbul"));
locations.add(new Location("Prague"));
return Response.ok(
new GenericEntity<List<Location>>(locations){})
.build();
} }
如例八所示,TemperatureResource返回给定位置的随机生成的温度值,温度值介于30到50之间。 在实现中添加500 ms的延迟以模拟传感器获取数据。
例八
@Path("/temperature")
public class TemperatureResource {
@GET
@Path("/{city}")
@Produces(MediaType.APPLICATION_JSON)
public Response getAverageTemperature(
@PathParam("city") String cityName) {
Temperature temperature = new Temperature();
temperature.setTemperature(
(double) (new Random().nextInt(20)+30));
temperature.setScale("Celsius");
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {}
return Response.ok(temperature).build();
}
}
这里首先显示ForecastResource的同步实现方式(如例九所示),它首先获取所有位置。 然后,对于每个位置,它再调用温度服务来检索该位置的温度值。
例九
@Path("/forecast")
public class ForecastResource {
@Uri("location")
private WebTarget locationTarget;
@Uri("temperature/{city}")
private WebTarget temperatureTarget;
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getLocationsWithTemperature() {
long startTime = System.currentTimeMillis();
ServiceResponse response = new ServiceResponse();
List<Location> locations = locationTarget.request()
.get(new GenericType<List<Location>>() {});
locations.forEach(location -> {
Temperature temperature = temperatureTarget
.resolveTemplate("city", location.getName())
.request()
.get(Temperature.class);
response.getForecasts().add(
new Forecast(location)
.setTemperature(temperature));
});
long endTime = System.currentTimeMillis();
response.setProcessingTime(endTime - startTime);
return Response.ok(response).build();
}
}
当请求为URL /forecast时,您应该看到类似于例十的输出结果。请注意,请求的处理时间花费了1,533ms,这很有意义,因为同时为三个不同位置请求温度值的累积请求时间理论上应该为1,500ms(500ms*3)。
例十
{
"forecasts": [
{
"location": {
"name": "London"
},
"temperature": {
"scale": "Celsius",
"temperature": 33
} },
{
"location": {
"name": "Istanbul"
},
"temperature": {
"scale": "Celsius",
"temperature": 38
} },
{
"location": {
"name": "Prague"
},
"temperature": {
"scale": "Celsius",
"temperature": 46
} }
],
}
现在是时候在服务器端引入响应式编程了,在获得所有位置之后,可以并行地完成每个位置的温度服务调用。 这可以绝对增强前面显示的同步调用效率低下的问题。 在例十一中,定义了此温度预测服务的响应式编程版本。
例十一
@Path("/reactiveForecast")
public class ForecastReactiveResource {
@Uri("location")
private WebTarget locationTarget;
@Uri("temperature/{city}")
private WebTarget temperatureTarget;
@GET
@Produces(MediaType.APPLICATION_JSON)
public void getLocationsWithTemperature(
@Suspended final AsyncResponse async) {
long startTime = System.currentTimeMillis();
// Create a stage on retrieving locations
CompletionStage<List<Location>> locationCS =
locationTarget.request()
.rx()
.get(new GenericType<List<Location>>() {});
// By composing another stage on the location stage
// created above, collect the list of forecasts
// as in one big completion stage
final CompletionStage<List<Forecast>> forecastCS =
locationCS.thenCompose(locations -> {
// Create a stage for retrieving forecasts
// as a list of completion stages
List<CompletionStage<Forecast>> forecastList =
// Stream locations and process each
// location individually
locations.stream().map(location -> {
// Create a stage for fetching the
// temperature value just for one city
// given by its name
final CompletionStage<Temperature> tempCS =
temperatureTarget
.resolveTemplate("city",
location.getName())
.request()
.rx()
.get(Temperature.class);
// Then create a completable future that
// contains an instance of forecast
// with location and temperature values
return CompletableFuture.completedFuture(
new Forecast(location))
.thenCombine(tempCS,
});
Forecast::setTemperature);
}).collect(Collectors.toList());
// Return a final completable future instance
// when all provided completable futures are
// completed
return CompletableFuture.allOf(
forecastList.toArray(
new CompletableFuture[forecastList.size()]))
.thenApply(v -> forecastList.stream()
.map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join)
.collect(Collectors.toList()));
// Create an instance of ServiceResponse,
// which contains the whole list of forecasts
// along with the processing time.
// Create a completed future of it and combine to
// forecastCS in order to retrieve the forecasts
// and set into service response
CompletableFuture.completedFuture(
new ServiceResponse())
.thenCombine(forecastCS,
ServiceResponse::forecasts)
.whenCompleteAsync((response, throwable) -> {
response.setProcessingTime(
System.currentTimeMillis() - startTime);
async.resume(response);
});
} }
响应式编程可能第一眼看起来很复杂,但仔细研究后,你会发现它相当简单。在ForecastReactiveResource中,我们首先借助JAX-RS响应式客户端API创建一个客户端调用位置服务。正如我前面提到的,这是对Java EE 8的补充,它可以通过简单地调用rx()方法创建响应式客户端调用者。
响应式编程不仅仅增强了从同步到异步的实现,它也可以通过嵌套阶段等概念简化开发。现在我们根据位置组成另一个阶段来收集温度预测列表。它们将温度预测列表存储在一个名为forecastCS的大完成阶段,作为预测列表。我最终只会使用forecastCS创建服务调用的响应。接下来,我们将每个位置的温度预测阶段存储在forecastList变量中。为了创建每个位置的预测的完成阶段,我在这些位置上进行流式处理,然后再次使用JAX-RS反应客户端API创建tempCS变量,该API将调用指定城市名称的温度服务。在这里我们用resolveTemplate()方法来构建一个客户端,并使我能够将该城市的名称作为参数传递给构建器。
在locations流式输出位置的最后一步,我通过创建一个新的Forecast实例作为参数来调用CompletableFuture.completedFuture()方法。我将这个Future与tempCS阶段结合起来,以便迭代获取每个位置的温度值。
例十一中的CompletableFuture.allOf()方法将完成阶段列表转换为forecastCS。当所有提供的可完成Future完成时,执行此步骤会返回一个完成的Future实例。温度预测服务的响应是ServiceResponse类的一个实例,因此我为此创建了一个完整的Future,然后将forecastCS完成阶段与预测列表组合在一起,并计算服务的响应时间。
当然,这种响应式编程只会使服务器端异步执行;客户端将被阻塞,直到服务器将响应发送回请求者。为了解决这个问题,Server Sent Events(SSE)也可以用来部分发送响应,以便对于每个位置,温度值可以逐一推送到客户端。 ForecastReactiveResource的输出将与例十二类似。如输出所示,处理时间为515ms,这是用于检索一个位置的温度值的理想执行时间。
例十二
{
"forecasts": [
{
"location": {
"name": "London"
},
"temperature": {
"scale": "Celsius",
"temperature": 49
} },
{
"location": {
"name": "Istanbul"
},
"temperature": {
"scale": "Celsius",
"temperature": 32
} },
{
"location": {
"name": "Prague"
},
"temperature": {
"scale": "Celsius",
"temperature": 45
}
} ],
"processingTime": 515
}
在本文的所有例子中,我们首先展示了以同步方式来检索温度预测信息示例。 接着我们采用响应式编程方法,以便在服务调用之间进行异步处理。 当利用Java EE 8的JAX-RS响应式客户端API以及CompletionStage和CompletableFuture等Java 8的类时,异步处理的威力在响应式编程的帮助下释放出来。响应式编程不仅仅是增强从同步模型到异步模型的实现; 它也可以通过嵌套阶段等概念简化开发。 采用的越多,在并行编程中处理复杂场景就越容易。