Entries tagged [concurrency]

Wednesday June 15, 2022

GPars meets Virtual Threads

gpars-rgb.pngAn exciting preview feature coming in JDK19 is Virtual Threads (JEP 425). In my experiments so far, virtual threads work well with my favourite Groovy parallel and concurrency library GPars. GPars has been around a while (since Java 5 and Groovy 1.8 days) but still has many useful features. Let's have a look at a few examples.

If you want to try these out, make sure you have a recent JDK19 (currently EA) and enable preview features with your Groovy tooling.

Parallel Collections

First a refresher, to use the GPars parallel collections feature with normal threads, use the GParsPool.withPool method as follows:

withPool {
assert [1, 2, 3].collectParallel{ it ** 2 } == [1, 4, 9] }

For any Java readers, don't get confused with the collectParallel method name. Groovy's collect method (naming inspired by Smalltalk) is the equivalent of Java's map method. So, the equivalent Groovy code using the Java streams API would be something like:

assert [1, 2, 3].parallelStream().map(n -> n ** 2).collect(Collectors.toList()) == [1, 4, 9]

Now, let's bring virtual threads into the picture. Luckily, GPars parallel collection facilities provide a hook for using an existing custom executor service. This makes using virtual threads for such code easy:

withExistingPool(Executors.newVirtualThreadPerTaskExecutor()) {
assert [1, 2, 3].collectParallel{ it ** 2 } == [1, 4, 9]
}

Nice! But let's move onto some areas examples which might be less familiar to Java developers.

GPars has additional features for providing custom thread pools and the remaining examples rely on those features. The current version of GPars doesn't have a DefaultPool constructor that takes a vanilla executor service, so, we'll write our own class:

@AutoImplement
class VirtualPool implements Pool {
private final ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor()
int getPoolSize() { pool.poolSize }
void execute(Runnable task) { pool.execute(task) }
ExecutorService getExecutorService() { pool }
}

It is essentially a delegate from the GPars Pool interface to the virtual threads executor service.

We'll use this in the remaining examples.

Agents

Agents provide a thread-safe non-blocking wrapper around an otherwise potentially mutable shared state object. They are inspired by agents in Clojure.

In our case we'll use an agent to "protect" a plain ArrayList. For this simple case, we could have used some synchronized list, but in general, agents eliminate the need to find thread-safe implementation classes or indeed care at all about the thread safety of the underlying wrapped object.

def mutableState = []     // a non-synchronized mutable list
def agent = new Agent(mutableState)

agent.attachToThreadPool(new VirtualPool()) // omit line for normal threads

agent { it << 'Dave' } // one thread updates list
agent { it << 'Joe' } // another thread also updating
assert agent.val.size() == 2

Actors

Actors allow for a message passing-based concurrency model. The actor model ensures that at most one thread processes the actor's body at any time. The GPars API and DSLs for actors are quite rich supporting many features. We'll look at a simple example here.

GPars manages actor thread pools in groups. Let's create one backed by virtual threads:

def vgroup = new DefaultPGroup(new VirtualPool())

Now we can write an encrypting and decrypting actor pair as follows:

def decryptor = vgroup.actor {
loop {
react { String message ->
reply message.reverse()
}
}
}

def console = vgroup.actor {
decryptor << 'lellarap si yvoorG'
react {
println 'Decrypted message: ' + it
}
}

console.join() // output: Decrypted message: Groovy is parallel

Dataflow

Dataflow offers an inherently safe and robust declarative concurrency model. Dataflows are also managed via thread groups, so we'll use vgroup which we created earlier.

We have three logical tasks which can run in parallel and perform their work. The tasks need to exchange data and they do so using dataflow variables. Think of dataflow variables as one-shot channels safely and reliably transferring data from producers to their consumers.

def df = new Dataflows()

vgroup.task {
df.z = df.x + df.y
}

vgroup.task {
df.x = 10
}

vgroup.task {
df.y = 5
}

assert df.z == 15

The dataflow framework works out how to schedule the individual tasks and ensures that a task's input variables are ready when needed.

Conclusion

We have had a quick glimpse at using virtual threads with Groovy and GPars. It is very early days, so expect much more to emerge in this space once virtual threads are released in preview in production versions of JDK19 and eventually beyond a preview feature.

Calendar

Search

Hot Blogs (today's hits)

Tag Cloud

Categories

Feeds

Links

Navigation