Structured Concurrency is avaiable as an incubator API in the JDK 19. This is still about Project Loom, probably one of the most anticipated features of the JDK 19.
There are two things we need to remember about virtual threads. First, they are cheap to create, and much cheaper than the regular platform threads we've been using in the JDK for many years, in fact, since the beginning of Java. Second, they are cheap to block.
Blocking a platform thread is expensive, and you should avoid doing that, it's not the case for virtual threads. And btw, this is why asynchronous programming based on callbacks and futures has been developed: to allow you to switch from one task to the other when this task is blocking, within the same thread, to avoid blocking this thread. Why? Because blocking a platform thread is expensive.
What is the price of asynchronous code? Well, there are three things that we are not that great:
So choosing to write asynchronous code based on callbacks is not a decision that we should make lightly, because it will have an impact on all our application. And lastly, it almost next to impossible to profile an asynchronous application.
On the other hand, blocking a virtual thread is cheap. No need to try to avoid the blocking a virtual thread: just block it and that's it. We can write our code in a blocking, synchronous way, as long as we run it on top of virtual threads, it's perfectly fine.
That been said, do virtual threads solve all our problems? The answer is "NOT". There are still problems that need to be addressed. One of them is that, because they are so cheap, we can quickly end up with millions of them in our application. And that, is a problem.
How can we find our way in so many threads? Will your IDE even be able to display all these threads in this little thread panel that you are used to? And if it can do that, how are you going to find the one thread we need to debug, if there are millions? The real question is: how are you going to interact with virtual threads?
Since the JDK 5, we are not supposed to interact with threads directly. The right pattern is to submit a task, as a Runnable
or as a Callable
, to an ExecutorService
, or an Executor
, and work with the Future
we get in return.
In fact, Loom keeps this model, and adds nice features to it. The first object I'm going to talk to you about is this scope object. The exact type is StructuredTaskScope
, that's the name of the class. But we are going to call it scope, just because it's simpler.
What is this scope object about? Well, we can see this object as a virtual threads launcher. We submit tasks to it, in the form of Callables
, we get a future in return, and this callable is been executed in a virtual thread, created for you by the scope.
Plain and simple. We may be thinking that it really looks like an executor, and it does. But there also are big differences between executors and scopes.
Suppose we want to query a weather forcast server. So here is the code that is going to query it:
public static void main(String[] args) throws IOException, InterruptedException {
Instant begin = Instant.now();
Weather weather = Weather.readWeather();
Instant end = Instant.now();
System.out.println("weather = " + weather);
System.out.println("Time is = " + Duration.between(begin, end).toMillis() + "ms");
}
Before we start this code, we have to have a Weather
class, you can replace the SERVER_ADDRESS
to yours, just make sure that it can return a correct weather information that we need.
public record Weather(String server, String weather) {
private static final String SERVER_ADDRESS = "http://localhost:8080";
private static Weather fromJson(String json) {
JSONObject object = JSON.parseObject(json);
String server = object.getString("server");
String weather = object.getString("weather");
return new Weather(server, weather);
}
public static Weather readWeather() throws IOException, InterruptedException {
return readWeatherFromA();
}
private static Weather readWeatherFromA() throws IOException, InterruptedException {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(SERVER_ADDRESS + "/01/weather"))
.GET()
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
String json = response.body();
return fromJson(json);
} else {
throw new RuntimeException("Server unavailable");
}
}
}
Here we can start a web flux spring server as a demo and configure it's controller.
@RestController
@RequestMapping("/01/weather")
public class WeatherController {
@GetMapping
public Mono<Weather> getWeather() {
return Mono.just(new Weather("Server A", "Sunny"));
}
}
After running this code, we can see that we could query this server in roughly 384 ms.
So now, let us make this code asynchronous. For that we need a scope object, so we're going to create it. It;s a StructuredTaskScope
instance. It has a parameter, which is going to be Weather
. And because this StructuredTaskScope
instance is in fact AutoCloseable
, we're going to surround it with a try-with-resources
pattern.
The first step for this scope pattern, is that we are going to fork a task, which is going to be a Callable
. And this Callable is going to simply be Weather::readWeatherFromA
And then, once we've done that, we need to call the join()
method of this scope object. This fork()
method in fact, returns a Future
object, we're going to call it futureA
, because it reads weather from A. And getting this Future
object is non-blocking.
We get it immediately. Now when we call this join()
method, join()
is a blocking call, that will block this thread until all the tasks that have been submitted to this StructuredTaskScope
are complete. So when join()
returns, we know that futureA
is complete, and we can just call resultNow()
on this future.
resultNow()
will throw an exception if we call it and the future is not complete, so we really need to do that after the call to join()
. And this is going to be our weather, let's call it weatherA
. And now we can return weatherA, just like that.
public static Weather readWeather() throws InterruptedException {
try (var scope = new StructuredTaskScope<Weather>()) {
Future<Weather> futureA = scope.fork(Weather::readWeatherFromA);
scope.join();
Weather weatherA = futureA.resultNow();
return weatherA;
}
}
Before running this code, remember to use the vm options like this --enable-preview --add-modules jdk.incubator.concurrent
. And now, if we run this code again, of course, the result will be the same. And that's it. This is how we can use a scope.
At this point you may be thinking... All this mess just for that? Bear with me, there is more to be seen on there scope object. First, what are the differences between a scope object and an ExecutorService
?
Well, there are two main differences:
ExecutorService
has the same life cycle as your application. This is how should be using executor services, because executor services hold platform threads, and platform threads are expensive to create. So you want to pool them. They are preciousLet's go one step further. Suppose you want to query several weather forecast servers instead of just one. This could speed up your process: because all your results are supposed to be equivalent. So once you get the first one, you can cancel all the others.
It turns out that there is a special scope for that. That does exactly that. It is an extension of the basic StructuredTaskScope
class, and is called the StructuredTaskScope.ShutdownOnSuccess
. And yes, there is also a ShutdownOnFailure
class. So what does this ShutdownOnSuccess
scope do? Well, let us take a look at it.
The pattern to use this ShutdownOnSuccess
scope is exactyly the same as the other one. We are going to open this ShutdownOnSuccess
. First of all, let us do add some methods in our Weather
recode like below:
public record Weather(String server, String weather) {
private static Weather readWeatherFromServer(String url) throws IOException, InterruptedException {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(SERVER_ADDRESS + url))
.GET()
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
Thread.sleep(Duration.of(RANDOM.nextInt(10, 1200), ChronoUnit.MILLIS));
if (response.statusCode() == 200) {
String json = response.body();
return fromJson(json);
} else {
throw new RuntimeException("Server unavailable");
}
}
}
Also add some methods in our spring controller:
@RestController
public class WeatherController {
@GetMapping("/01/weather")
public Mono<Weather> getWeatherA() {
return Mono.just(new Weather("Server A", "Sunny"));
}
@GetMapping("/02/weather")
public Mono<Weather> getWeatherB() {
return Mono.just(new Weather("Server B", "Rain"));
}
@GetMapping("/03/weather")
public Mono<Weather> getWeatherC() {
return Mono.just(new Weather("Server C", "Snowy"));
}
}
now let's use ShutdownOnSuccess
in our try-with-resource
pattern. And now we can get three futures: futureA
, futureB
, futureC
. And the way this ShutdownOnSuccess
is working is that it will take the first future to provide an answer, and cancel all the others.
So instead of calling resultA.resultNow()
, we're now going to call scope.result()
, and get rid of this. This throws an ExecutionException
, then we print the state of this different futures: futureA.state()
, same for B and C.
public static Weather readWeather() throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()) {
Stream.of("/01/weather", "/02/weather", "/03/weather")
.<Callable<Weather>>map(url -> () -> readWeatherFromServer(url))
.forEach(scope::fork);
scope.join();
list.forEach(f -> System.out.println(f.state()));
return scope.result();
}
}
Now we run this code again. And we can see that futureC
is the winner, because it was the first to provide an answer, and this ShutdownOnSuccess
scope cancelled automaticaly the two othertasks, that are futureA
and futureB
. These two are in a FAILED
state, meaning that they have been interrupted by the scope itself.
The pattern to use this ShutdownOnSuccess
scope is exactly the same as the other one. you open, fork your tasks, you join, and you get the result. The wat it workds is a little different. Here we have all the future objects, and when the join()
returns, there is one future that is done, and the others have been cancelled.
This is very handy: the interruptions are handled by the scope itself. And btw, we do not need to get these futures, we can just call the result()
method of this scope object to get your result. No need to handle the future objects there. Our business code will become much cleaner, without any technical objects in our way.
Futures are technical objects. We just fork our tasks, call join, call result, and taht's it. Why is it possible to do so? Well, precisely because a scope object is in fact specialized, it is business focused. It processes one bussiness nedd, instead of blindly processing all the asynchronous tasks of our application.
What is happening if a task fails with an exception? Well, it depends on the scope. For the ShutdownOnSuccess
scope, this task will not be selected to produce a result. But now, if all our tasks are failing then we will get an ExecutionException
, with the exception from the first future that completed as a root cause.
Could it be possible to go one step further, and create your own business scope? Well, we cannot extend ShutdownOnSuccess
, because it's a final class, but we can still wrap it, we can still compose it, if this is what we want.
But we can certainly extend StructuredTaskScope
. So let's do that. Suppose that, instead of Weather
forecasts, we are now going to query quotations, for a travel agency. And, as we did for the weather forecast, we need to quert serveral quotations servers, to get the best price possible.
The code we want to write is the following: fork the queries on the quotation server, call join()
, because this is how scopes are working, and then just call bestQuatation()
, just as we called result()
on this StructuredTaskScope.ShutdownOnSuccess
.
Now of course, there is no bestQuotation()
method on this StrucrturedTaskScope
class, so what we really need to do is create our own class, and instead of having that, having a QuotationScope
.
Before we define our quotation class, we need to have a new controller for our test.
@RestController
public class QuotationController {
@GetMapping("/01/quotation")
public Mono<Quotation> getQuotationA() {
return Mono.just(new Quotation("Server A", 25));
}
@GetMapping("/02/quotation")
public Mono<Quotation> getQuotationB() {
return Mono.just(new Quotation("Server B", 13));
}
@GetMapping("/03/quotation")
public Mono<Quotation> getQuotationC() {
return Mono.just(new Quotation("Server C", 68));
}
@GetMapping("/04/quotation")
public Mono<Quotation> getQuotationD() {
return Mono.just(new Quotation("Server D", 36));
}
@GetMapping("/05/quotation")
public Mono<Quotation> getQuotationE() {
return Mono.just(new Quotation("Server E", 71));
}
}
Now, suppose we have to call these api and get the lowest price of the Quotation
we have to extend StructuredTaskScope
class and override some of its methods like below:
public record Quotation(String agency, int price) {
private static final String SERVER_ADDRESS = "http://localhost:8080";
private static Quotation fromJson(String json) {
JSONObject object = JSON.parseObject(json);
String agency = object.getString("agency");
int price = object.getIntValue("price");
return new Quotation(agency, price);
}
public static class QuotationException extends RuntimeException {
}
public static class QuotationScope extends StructuredTaskScope<Quotation> {
private final Collection<Quotation> quotations = new ConcurrentLinkedDeque<>();
private final Collection<Throwable> exceptions = new ConcurrentLinkedDeque<>();
@Override
protected void handleComplete(Future<Quotation> future) {
switch (future.state()) {
case RUNNING -> throw new IllegalStateException("Future is still running...");
case SUCCESS -> this.quotations.add(future.resultNow());
case FAILED -> this.exceptions.add(future.exceptionNow());
case CANCELLED -> { }
}
}
public QuotationException exceptions() {
QuotationException exception = new QuotationException();
this.exceptions.forEach(exception::addSuppressed);
return exception;
}
public Quotation bestQuotation() {
return quotations.stream()
.min(Comparator.comparing(Quotation::price))
.orElseThrow(this::exceptions);
}
}
public static Quotation readQuotation() throws InterruptedException {
try (var scope = new QuotationScope()) {
scope.fork(Quotation::readQuotationFromA);
scope.fork(Quotation::readQuotationFromB);
scope.fork(Quotation::readQuotationFromC);
scope.fork(Quotation::readQuotationFromD);
scope.fork(Quotation::readQuotationFromE);
scope.join();
return scope.bestQuotation();
}
}
private static Quotation readQuotationFromA() throws IOException, InterruptedException {
return readQuotationFromServer("/01/quotation");
}
private static Quotation readQuotationFromB() throws IOException, InterruptedException {
return readQuotationFromServer("/02/quotation");
}
private static Quotation readQuotationFromC() throws IOException, InterruptedException {
return readQuotationFromServer("/03/quotation");
}
private static Quotation readQuotationFromD() throws IOException, InterruptedException {
return readQuotationFromServer("/04/quotation");
}
private static Quotation readQuotationFromE() throws IOException, InterruptedException {
return readQuotationFromServer("/05/quotation");
}
private static Quotation readQuotationFromServer(String url) throws IOException, InterruptedException {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(SERVER_ADDRESS + url))
.GET()
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
String json = response.body();
return fromJson(json);
} else {
throw new RuntimeException("Server unavailable");
}
}
public static void main(String[] args) throws InterruptedException {
Instant begin = Instant.now();
Quotation quotation = Quotation.readQuotation();
Instant end = Instant.now();
System.out.println("quotation = " + quotation);
System.out.println("Time is = " + Duration.between(begin, end).toMillis() + "ms");
}
}
In this class, we define an inner class QuotationScope
which extends StructuredTaskScope
, then we override its handleComplete
method to make sure that when a task is finished we can put the result into our ConcurrentLinkedDeque
.
Outside the class, we can use bestQuotation()
method, instead of result()
, to get the minimum price Quotation
. Now let's run this code and check out if the result can satisfy us.
We can see that this time, Server B answered with a price of 13. And we can see that compared to the classical asynchronous code with callbacks, this code itself fully asynchronous: each quotation is conducted in its own thread, but with a pattern that is completely synchronous.
The nice thing with this scope object, is that we can write our code in a synchronous way, following very simple patterns, but it is executed in an asynchronous wat, based on virtual threads.
So that's the final code. As we can see, our business code is super simple: fork our tasks, call join()
and then call our own business methods that will produce the result we need. The technical part of our code is also simple: all our need to do is write a callback to handle our future objects, one future at a time, and then our business code that will decide how to reduce our partial results.
Writing your uni tests is also super simple: you can create a completed future with a result or an exception directly with the API. Don't create mocks for that!
You can create a complete Future
with CompletableFuture.completedFuture()
and pass the value you need, or if you need a failing future, you can use CompletableFuture.failedFuture()
and pass the exception you want to throw.
Future<String> completableFuture = CompletableFuture.completedFuture("Complete");
Exception exception = ...;
Future<String> failedFuture = CompletableFuture.failedFuture(exception);
So really, writing unit tests for this class is super easy.
Ok, how can we assemble our quotation and weather forecast in a nice travel page? What about we create a TravelPage
record and put a quotation and a weather forecast in it? Here we have a readTravelPage()
factory method, and like before we should have a TravelPageScope
.
We also need an interface to make sure that Weather
and Quotation
can be used in our TravelPageScope
. Let's define an interface PageComponent
and make Weather
and Quotation
imlpement it.
public sealed interface PageComponent
permits Weather, Quotation {
}
Now then we can finish our TravelPage
:
public record TravelPage(Quotation quotation, Weather weather) {
public static TravelPage readTravelPage()
throws InterruptedException {
try(var scope = new TravelPageScope()) {
scope.fork(Weather::readWeather);
scope.fork(Quotation::readQuotation);
scope.join();
return scope.travelPage();
}
}
public static void main(String[] args) throws InterruptedException {
Instant begin = Instant.now();
TravelPage travelPage = TravelPage.readTravelPage();
Instant end = Instant.now();
System.out.println("TravelPage = " + travelPage);
System.out.println("Time is = " + Duration.between(begin, end).toMillis() + "ms");
}
private static class TravelPageScope extends StructuredTaskScope<PageComponent> {
private volatile Weather weather;
private volatile Quotation quotation;
private volatile Quotation.QuotationException quotationException;
private volatile Throwable exception;
@Override
protected void handleComplete(Future<PageComponent> future) {
switch (future.state()) {
case RUNNING -> throw new IllegalStateException("Future is still running...");
case SUCCESS -> {
switch (future.resultNow()) {
case Weather weather -> this.weather = weather;
case Quotation quotation -> this.quotation = quotation;
}
}
case FAILED -> {
Throwable exception = future.exceptionNow();
switch (exception) {
case Quotation.QuotationException quotationException ->
this.quotationException = quotationException;
default -> this.exception = exception;
}
}
case CANCELLED -> { }
}
}
public TravelPage travelPage() {
if (this.quotation == null) {
if (this.quotationException != null) {
throw new RuntimeException(this.quotationException);
} else {
throw new RuntimeException(this.exception);
}
} else {
return new TravelPage(
this.quotation,
Objects.requireNonNullElse(
this.weather,
new Weather("Unknown", "Mostly sunny")
)
);
}
}
}
}
So let's run this code, and we can see that we can have our travel page, with the information like below:
There are two other things that we need to pay attention to, let's go on.
First, we'd better to add a timeout on this weather forecast. Because we wouldn't want our visitors to wait for 10 years just because I cannot get the weather forecast quickly enough. So it turns out that there is a nifty method called joinUntil()
that does exactly that. So instead of calling join()
, let's call joinUntil()
.
scope.joinUntil(Instant.now().plusMillis(1_00));
Let's run the code again:
Now we can see that the weather forecast we have is from an unknown server, and the weather is mostly sunny. Btw, if you want to handle this exception separately, you can also add a branch in the switch statement.
ScopedValue
used to calledExtentValue
And the second thing is how we can handle ThreadLocal
in this case. We remember ThreadLocal
, this old stuff from the JDK 1. Loom's virtual threads fully support ThreadLocal
, so if we want to stick with them, we can do that.
But! We can also do much better. Loom adds a new concept called ScopedValue
. ScopedValue
allows you to give a value to a variable and run a task within the context of this value. Let's take a look at how this is working.
First, we need to create an ScopedValue
variable with a given type, and for that we have a factory method: ScopedValue.newInstance()
. And then, we can create a Runnable
, let's call it task
. And it will do the following. If the KEY
is bound, it will print the value of this key, which is basically KEY.get()
. And if it's not the case, it will just print "Not bound".
public static void main(String[] args) {
ScopedValue<String> KEY = ScopedValue.newInstance();
Runnable task = () -> System.out.println(KEY.isBound() ? KEY.get() : "Not bound");
task.run();
}
And of course, if we run this code, it will tell us that the key is not bound.
And now, what we are going to do, it to call ScopedValue
where KEY
has the value "A", and within this context we're going to run this Runnable
, and btw we can see that we could also execute a Callable
. So let's just run this task, OK, inthat context. And do the same tieh a value that is "B".
public static void main(String[] args) {
ScopedValue<String> KEY = ScopedValue.newInstance();
Runnable task = () -> System.out.println(KEY.isBound() ? KEY.get() : "Not bound");
ScopedValue.where(KEY, "A").run(task);
ScopedValue.where(KEY, "B").run(task);
task.run();
}
So let's run this code.
And now we can see that in the first run, the KEY
was bound to the value "A", and this is what our task saw. And in the second run, the KEY
was bound to the value "B", and this is also what our task saw. So. this is a very powerful mechanism, just to share variables among different tasks, and among different threads.
In this case, we're not running in a multi-threaded environment. And we didn't create any new thread, so really exevrything took place in the main thread. But of course, we can make it work in scopes and threads, let's do that.
We just add some new code to the Quotation
record. First created a LICENCE
, which is an ScopedValue
variable of type String
. And then added this validation rule to the compact constructor of this Quotation
record. Basically, if the LICENCE
has not been bound, and if the calue is not "Licence A", then no Quotation
record can be created, because there is this IllegalStateException
that will be thrown.
public static ScopedValue<String> LICENCE = ScopedValue.newInstance();
public Quotation {
if (!(LICENCE.isBound() && LICENCE.get().equals("Licence A"))) {
throw new IllegalStateException("No licence found");
}
}
Let's go back to the TravelPage
example, run it, and now we see our suppredded exceptions mechanism that we set up with the QuotationException
. The QuotationException
has been thrown, that's one exception. And it has a bunch of IllegalStateException: No licence found
, because this record could not be created.
So let's bind this ScopedValue
to a value. We can do it in that way: ScopedValue.where()
Quotation.LICENCE has the value "Licence A". And now, we call this Callable: TravelPage.readTravelPage()
.
public static void main(String[] args) throws Exception {
Instant begin = Instant.now();
TravelPage travelPage = ScopedValue.where(Quotation.LICENCE, "Licence A")
.call(TravelPage::readTravelPage);
Instant end = Instant.now();
System.out.println("TravelPage = " + travelPage);
System.out.println("Time is = " + Duration.between(begin, end).toMillis() + "ms");
}
Let's run this code again. And now everything is fine, the licence has been found.
Bytheway if we put a wrong value, we will see that we will have this IllegalStateException
again. So this licence was made avaiable at the TravelPage
level, and transmitted to all the scopes created within this TravelPage
. The TravelPage
is executed in its own scope, but it created an other scope for the Quotation
, and another scope for the Weather
. And in fact, this ScopedValue
is avaiable in all the scopes created by this TravelPageScope
.
We can see that using these scope objects makes our code much simpler than having to write callbacks within callbacks within callbacks within callbacks. Our code is synchronous, it is blocking, but is is fine because it is running on top of virtual threads, and it is much easier to read.
Creating scopes is really easy. All we need to do is override this handleComplete()
method, that handles one future at a time in a synchronous way, so it;s super easy to write. And then we can handle our exceptions as we need, including timeouts.
And, with the partial results, and our exceptions, we can add our business code, following our business rules, this is basically what we did in the examples that we just saw. We can also easily write unit tests, whether it is our scope objects, or our regular classes.
So inthe end, our application is fully asynchronous, but it does not rely on nested callbacks, only on code written in a synchronous way. And that's a huge step forward!
1 Java. 2022. Java Asynchronous Programming Full Tutorial with Loom and Structured Concurrency - JEP Café #13. Retrived Aug 24, 2023, from https://www.youtube.com/watch?v=2nOj8MKHvmw
2 Alan Bateman, Ron Pressler. 2021. JEP 428: Structured Concurrency (Incubator). Retrieved Aug 23, 2023, from https://openjdk.org/jeps/425
3 Ron Pressler, Alan Bateman. 2021. JEP 425: Virtual Threads (Preview). Retrieved Aug 23, 2023, from https://openjdk.org/jeps/425
4 Andrew Haley, Andrew Dinn. 2021. JEP 429: Scoped Values (Incubator). Retrieved Aug 24, 2023, from https://openjdk.org/jeps/429
5 Dioxide CN. 2023. ThreadLocal与ScopedValue. Retrieved Aug 24, 2023, from https://cloud.tencent.com/developer/article/2348213