Handling Virtual Threads

Virtual threads were released as a preview feature in September 2022 as a part of Java 19. They are lightweight versions of platform threads. Unlike legacy platform threads, the memory footprint of a virtual thread is minute. For an in-depth introduction to virtual threads, check out the following article.

Virtual threads support the creation of a thread per unit of work model, no matter how many tasks we have to process. Instead of reusing legacy platform threads via thread pools, we can have a virtual thread for each task. Following the programming flow for each task on a per-thread basis simplifies the coding and eases the maintenance. In particular, virtual threads shine with I/O requests to support concurrent high-throughput I/O programming.

CPU-bound calculations get optimal CPU utilization with platform threads and thread pools. We can perform CPU-intensive tasks and utilize the thread pool to support parallelization, allowing us to use the CPU multi-core architecture to maximize the speed and resource utilization.

So we have at our disposal virtual threads to support I/O bound requests and thread pools with their associated platform threads to work with CPU-bound tasks. In this article, we will provide different strategies that we can follow when we have requests for units of work that require both I/O tasks and resource-expensive CPU-intensive calculations. We will explore some of the options at our disposal and evaluate the tradeoffs for the different programming choices.

Problem Overview

Let's consider a simple example of a mission-critical application that monitors different machinery in power plants and quickly determines if they are functioning correctly. We assume that the machines have sensors connected to the network that provide reading on temperature, pressure, and so on. The gist of the problem is that we have to connect to these devices, get their data readings, and quickly do a computationally intense data analysis to report back on the system's state.

Suppose we have a batch process that is continuously running to check the state of the different machines. To simplify the problem, let's assume that the process reads data continuously from a file where it gets the locations and the time intervals it has to monitor. A sample of the data can look like this:

url id startTime endTime
https://pw1/section/sensors 756a9c 12/21/2022 11:00 12/21/2022 12:00
https://pw1/section2/sensors 647d5m 12/21/2022 11:00 12/21/2022 12:00
https://pw2/section/sensors 948k4l 12/21/2022 8:00 12/21/2022 12:00
https://pw2/section2/sensors 938r8c 12/21/2022 9:00 12/21/2022 12:00

This example showcases a dramatically simplified generic type of problem, but without much effort, it can be easily generalized. Our task is to show how we would write this batch process using virtual threads and showcase the new ways that we will programmatically tackle this problem and its advantages over the legacy methodology.

Programming With Virtual Threads

To begin our batch process, we must read the entries of the table and place them in a list of entries (List<InputEntry>), where InputEntry is defined as a java record:

Java
 
record InputEntry(String url, String id, String startTime, String endTime) {}

Next, we have to go over the list and analyze the sensor reading for each machine:

Java
 
public static void processData(List<InputEntry> inputEntries) {
	System.out.println("processSensorReadings()");
	ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
	CompletionService<String> cService = new ExecutorCompletionService<>(executor);
	for (InputEntry inputEntry : inputEntries) {
		cService.submit(() -> processSensorData(inputEntry));
	}

	int processed = 0;
	while (processed < inputEntries.size()) {
		processed++;
		try {
			Future<String> resultFuture = cService.take();
			System.out.println("Handle status:" + resultFuture.get());
		} catch (ExecutionException | InterruptedException e) {
			System.out.println("Failed to process:" + e.getMessage());
		}
	}
}

In line 3, we use a new executor that starts a virtual thread for each task. 

In line 4, we wrap this executor in a CompletionService, which allows us to take the tasks run by the virtual threads in the order that they complete as can be seen in lines 10-18. 

In line 6, we submit the processSensorData() to the CompletionService which, as we will see, is going to do I/O request plus possibly data analysis.

Since virtual threads are a preview feature, we need to run Java 19 with the --enable-preview VM argument.

At this stage, we notice that for each InputEntry, we are creating a virtual thread. This process contrasts with how we would have programmed with legacy threads, using thread pools to reuse this expensive resource. The advantage of this approach is evident, with a virtual thread per unit of work. It facilitates the code logic, plus it's much easier to maintain or troubleshoot problems — for example, through thread dumps or stack traces.

Fetching and Processing the Data

We need to get the sensor data to analyze it, so we do the following:

Java
 
public static String processSensorData(InputEntry inputEntry) throws IOException, InterruptedException {

	DoubleStream data = fetchSensorData(inputEntry);

	return "ID: " + inputEntry.id() + ": " + analyzeSensorData(data);
}

private static DoubleStream fetchSensorData(InputEntry inputEntry) throws MalformedURLException, InterruptedException {
	URL pwUrl = new URL(inputEntry.url() + "/startTime/endTime");
	// In a real application open a secure url stream and fetch the data
	// For this example we return some random data and simulate network latencies
	Thread.sleep((long) (Math.random() * 100));
	DoubleStream data = DoubleStream.generate(() -> new Random().nextDouble()).limit(100);
	return data;
}

In this simple example, we only fetch data from one sensor, but in a more realistic application, we might need to process several sensors with multiple I/O operations on each virtual thread.

At this point, the process is clear — our virtual threads are fetching the data and running in parallel. The question we have next is how we are going to analyze the data. Let's look at the different strategies we can follow.

Analyzing the Data

We assume that the data analysis we have to perform is CPU-intensive. Ideally, we would like to perform this calculation in the same thread to maintain the thread per unit of work paradigm. But, if the analysis is time sensitive, there might be better approaches. We know that for a CPU-bound workload, it is best to use thread pools to get optimal performance.

In this case, it might be best that the virtual thread delegates the work to a thread pool:

Java
 
public static int analyzeSensorData(DoubleStream data) {

	double resultMean = 0;
	// Uses ForkJoinPool to improve performance
	// In real application do proper data analysiys
	resultMean = data.parallel().average().getAsDouble();
	return determineStatusCode(resultMean);
}

private static int determineStatusCode(double result) {
	// In a real application determine code based on result parameters
	if (result > 0.49) {
		return 719; // Made up error code
	} else {
		return 0;
	}
}

For this example, we use Java Streams, which, under the covers, utilizes the ForkJoinPool to do the calculation. 

In line 6, we signal the Stream framework to parallelize the calculation. We find the average, but the analysis will be much more involved in an actual application. 

We use the common ForkJoinPool provided by Java, but we will define our own in a more realistic scenario. We will also have to determine the pool size to maximize CPU utilization. For the common pool, you can set the system property:

Java
 
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

where we assume that 10 is the number of concurrent threads that optimizes our system.

We can see that, following this path, we optimized the data analysis processing by invoking the thread pool from the virtual thread. As the virtual threads complete their I/O tasks, they will utilize the Java Stream platform to perform (or queue) the calculations in the ForkJoinPool.

This is an example; we could have used other thread pools to do the calculations. In the provided sample code, we show further examples.

Alternatively, if we need to throttle the utilization of a scarce resource by the virtual threads, we can use semaphores, as we show in the following code:

Java
 
public static Semaphore semaphoreService = new Semaphore(NUM_PERMITS); 

public static String performScarceResourceRequest() throws InterruptedException, IOException {
	
	semaphoreService.acquire();
	//perform expensive request
	// For this example we return some random data and simulate network latencies
	Thread.sleep((long) Math.random());
	semaphoreService.release();
	return "Requested data";
}

We fine-tune the number of NUM_PERMITS that we set in the semaphore in line 1 to optimize the access to the resource. The virtual thread has to acquire a permit from the semaphore to be able to run.

Finally, we cover the cases where the virtual thread performs a task that requires other sub-tasks to be completed. The Java 19 release also has a framework that supports structured concurrency. This project is at an incubator stage. It provides powerful support to run related tasks with virtual threads. We will cover a simple case where we have to perform some validations before we analyze the data from the sensors:

Java
 
public static int validateAndAnalyzeSensorData(DoubleStream data) throws IOException, InterruptedException, ExecutionException {
	try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
		Future<String> validatedData = scope.fork(() -> validateData(data));
		Future<String> checkedEnvironment = scope.fork(() -> checkEnvironment(data));
		scope.join();
		scope.throwIfFailed(e -> new IOException(e));
		if (validatedData.get().equals("ok") && checkedEnvironment.get().equals("ok"))
			return analyzeSensorData(data);
		else 
			return -1;
	}
}

public static String validateData(DoubleStream data) throws IOException, InterruptedException {
	// In a real application open a secure url stream and fetch the data
	Thread.sleep((long) Math.random());
	return "ok";
}

public static String checkEnvironment(DoubleStream data) throws IOException, InterruptedException {
	// In a real application open a secure url stream and fetch the data
	Thread.sleep((long) Math.random());
	return "ok";
}

In line 2, we use StructuredTaskScope to coordinate running the tasks concurrently. StructuredTaskScope supports try-with-resources statement to ensure that the resource closes at the end of the code's try block.

We run the validation in lines 3 and 4 and check the environment invoking scope.fork() that takes as a parameter a Callable, spawns a new thread, and returns a Future. The parent task scope's ThreadFactory creates new threads, so we'll have virtual threads.

In line 4, we join the two tasks, and line 6 invokes scope.throwIfFailed() to propagate any errors. Finally, in line 7, we know that the two tasks have successfully been completed, so we can proceed.

Since structure concurrency is in incubator mode, we need to run the VM with the argument:--add-modules=jdk.incubator.concurrent.

The alternative approach would be to do the two sub-tasks serially in the parent's virtual thread. This might be desirable if tasks are intimately related and we want to keep a thread per unit of work paradigm. However, this will not be as performant as the previous approach.

Conclusions

Virtual threads promise to revolutionize how we accomplish concurrent I/O programming. This tutorial has covered different approaches to tackling thread per unit of work when we have tasks that need to support I/O and CPU-bound workloads.

You can find the source code used in this article on GitHub.

 

 

 

 

Top