Creating notification workers in Java
An exercise for practicing concurrency where we will use LinkedBlockingQueue
with single worker (thread) per
job type (key) to trigger work in a non-blocking way and also try out the Semaphore
. Imagine publish/subscribe
but without queuing and only doing 1 job at a time, but without any externals like Redis, RabbitMQ and so on,
only Java.
To paint a better picture when we would need something like this, here are a couple of examples
Abstract view of the implementation would look something like this
We have an N number of concurrent threads trying to send the notification, where only 1 notification per T type of worker is allowed for worker to start processing. Worker does not accept any new notifications until it’s done.
Requirements
- The group of N threads on the left to send/trigger notifications for a job based on the job type without being blocked (waiting for worker to take the job)
- Worker should only execute 1 job at the time
- No queuing of jobs
- No data race between threads and no blocking
- Handle generic data
- Should be testable
New to threads?
If you are new to threads, I suggest checking this fantastic channel, and it’s playlist for concurrency in Java, otherwise skip to the next section.
Solution
LinkedBlockingQueue is a good and clean solution as it will help us to pass data towards one (or multiple) worker/s without many complications by following the principle coined by Go:
Do not communicate by sharing memory; instead, share memory by communicating.
We want to have a main class to house the logic and provide the method (API) for interacting by all other outside
threads concurrently, let’s call it Workshop
. It will first register jobs and workers, then create the queues
that link each other, so it will look like the following:
To give a better image of how it’s going to look at the end in the main method:
package org.example;
import org.example.workshop.EmittersSwarm;
import org.example.workshop.SlowWorker;
import org.example.workshop.Workshop;
public class App {
public static Workshop<String> threadQueueWorkshop() {
return ThreadQueueWorkshop.<String>newBuilder()
.registerWorker("delivery", new SlowWorker<>("delivery-worker"))
.registerWorker("processing", new SlowWorker<>("processing-worker"))
.build();
}
public static void main(String[] args) {
Workshop<String> workshop = virtualWorkshop();
EmittersSwarm swarm = EmittersSwarm.getBuilder()
.setWorkshop(workshop)
.build();
try (swarm; workshop) {
swarm.startAndAwait();
}
System.out.println("Done.");
}
}
We have a builder for the Workshop
(ThreadQueueWorkshop
implementation) and a builder for the EmittersSwarm
that
will start the “attack” on the workshop’s method trySendJob
and await for a set number of threads and requests per
thread to complete. By default, it will spawn 5 emitters (threads) 10 emissions per emitter with a 1-second pause
between each.
Ok, so let’s start with it.
Worker setup
Suggest you create a workshop
package to house everything and for starters add a helper Color
class to add a little
bit of style so that we can easily visually see what’s going on later with output.
package org.example.workshop;
public class Color {
public static final String RESET = "\033[0m"; // Text Reset
public static final String BLACK = "\033[0;30m"; // BLACK
public static final String RED = "\033[0;31m"; // RED
public static final String GREEN = "\033[0;32m"; // GREEN
public static final String YELLOW = "\033[0;33m"; // YELLOW
public static final String BLUE = "\033[0;34m"; // BLUE
public static final String PURPLE = "\033[0;35m"; // PURPLE
public static final String CYAN = "\033[0;36m"; // CYAN
public static final String WHITE = "\033[0;37m"; // WHITE
}
Quick definition of a generic worker interface
package org.example.workshop;
public interface Worker<T> {
void work(T value) throws Throwable;
}
And a slow worker that will sleep for 3 seconds to simulate a slow job and help us notice easier what’s going on when we start sending notifications/jobs
package org.example.workshop;
public class SlowWorker<T> implements Worker<T> {
private final String name;
public SlowWorker(String name) {
this.name = name;
}
@Override
public void work(T value) throws InterruptedException {
System.out.println(Color.CYAN + "[Worker](" + name + ") starting to work on " + value + Color.RESET);
Thread.sleep(3_000);
}
}
Workshop
We will need a main class that will encapsulate the concurrency safety model for triggering these workers, but before that, let’s first create an interface as it will make switching workshops easier later on.
package org.example.workshop;
import java.util.Set;
public interface Workshop<T> extends AutoCloseable {
/**
* Starts consumer threads if any or any other requirement needed for operating.
*/
void run();
/**
* Names of the registered workers
*/
Set<String> getWorkerNames();
/**
* This method is thread safe
*
* @return true if sending of the job was successful
*/
boolean trySendJob(String workerName, T work);
}
This also helps to imagine the interface we need.
You probably noticed that we implement AutoCloseable
, that’s because we need to cleanly close the executor thread
pool with try-with
usage later on.
Ok, to get back to the implementation, we will create our implementation of the Workshop
called ThreadQueueWorkshop
public class ThreadQueueWorkshop<T> implements Workshop<T> {
private final HashMap<String, Worker<T>> workerMap;
private final HashMap<String, LinkedBlockingQueue<T>> jobQueueMap;
private final ExecutorService executor;
public Workshop(
HashMap<String, Worker<T>> workerMap,
ExecutorService executor,
HashMap<String, LinkedBlockingQueue<T>> jobQueueMap
) {
this.workerMap = workerMap;
this.jobQueueMap = jobQueueMap;
this.executor = executor;
}
public Set<String> getWorkerKeyMap() {
return workerMap.keySet();
}
@Override
public void close() {
System.out.println("[Workshop] closing");
executor.close();
System.out.println("[Workshop] closed");
}
}
We purposely want to avoid putting a lot of initialization logic in the constructor, so for that we will mark it as
private and create a Builder
class to fluently build and configure our ThreadQueueWorkshop
.
Add within the ThreadQueueWorkshop
class the Builder
class.
public class ThreadQueueWorkshop<T> implements Workshop<T> {
public static class Builder<T> {
private final HashMap<String, Worker<T>> workerMap = new HashMap<>();
public Builder<T> registerWorker(String name, Worker<T> worker) {
workerMap.put(name, worker);
return this;
}
public ThreadQueueWorkshop<T> build() {
if (workerMap.isEmpty()) {
throw new IllegalArgumentException("workerMap is empty");
}
ExecutorService executor = Executors.newFixedThreadPool(workerMap.size());
HashMap<String, LinkedBlockingQueue<T>> jobQueueMap = new HashMap<>(workerMap.size());
workerMap.forEach((key, worker) -> {
// The queue will be able to hold only 1 message, adding others would be ignored until
// the queue is empty
LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(1);
jobQueueMap.put(key, queue);
});
return new ThreadQueueWorkshop<>(workerMap, executor, jobQueueMap);
}
}
public static <T> Builder<T> newBuilder() {
return new Builder<>();
}
// ... rest of the code
}
In the builder we have method to map the workers based on name and build the correct number of threads in executor service, queues, link them together and perform some validation.
This will allow us to build it in the following way:
public static void main(String[] args) {
// Here we register 2 workers of same class by different names
Workshop<String> workshop = ThreadQueueWorkshop.<String>newBuilder()
.registerWorker("delivery", new SlowWorker<>("delivery-worker"))
.registerWorker("processing", new SlowWorker<>("processing-worker"))
.build();
try(workshop) {
// ... use workshop to send notifications/jobs
}
// after the scope the workshop is closed
}
Now that setup of the ThreadQueueWorkshop
is done, let’s continue with the sending of jobs.
Sending the job is relatively simple, after we fetch the queue based on job key, we send it. But important thing to note
is that for that we are using the offer
method of the LinkedBlockingQueue
. The documentation says the following:
Which is the complete opposite of using put
:
…and with this we won’t be blocking the invoker of the method.
class Workshop<T> implements AutoCloseable {
// ... rest of the code
public boolean trySendJob(String workerName, T work) {
LinkedBlockingQueue<T> workerQueue = jobQueueMap.get(workerName);
if (workerQueue == null) {
throw new IllegalArgumentException("worker " + workerName + " not found");
}
return workerQueue.offer(work);
}
}
You noticed that there is no synchronization or lock for this method, this is because the jobQueueMap
won’t change.
It’s set as it is during creation of the Workshop, so there is no expectation of data race incurring.
Ok, let’s move to starting the workers that will consume these notifications/jobs.
class ThreadQueueWorkshop<T> implements Workshop<T> {
// ... other code
public void start() {
System.out.println("[Workshop] starting...");
workerMap.forEach((key, worker) -> {
LinkedBlockingQueue<T> queue = jobQueueMap.get(key);
if (queue == null) {
throw new RuntimeException("No queue for key: " + key);
}
executor.submit(() -> {
while (true) {
try {
T message = queue.take();
worker.work(message.value);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
System.out.println("Consumer closed");
});
});
}
}
When the workshop is started, a new task is created per worker thread that will wait to data to appear in queue
before executing Worker.work
method on the value within an infinite loop and will shut down when we call the
executor close
method in the Workshop.close
… or will it? ⁉️
Did you catch that bug? 🦋
Now let’s see if you managed to see problems in the previous code and design.
1. Never ending consumers 🐛
Maybe it’s better to start from the end. You saw that we started infinite tasks that consume data forever and thought
they close after calling close
on the ExecutorsService
? Well the service just prevents further tasks from being
sent to it via executor.submit
and will try to wait on the ongoing tasks to finish, which in our case would never
finish!
We need a way to let the consumer tasks break the while
loop. Because they are stuck on waiting for the queue to
have data, it would be smart to use the queue to send a DEAD_PILL
message that’s basically a signal for it to end.
Some would have the dead pill message be -1
if we were sending ints or DEAD_PILL
if we were sending strings, but
we are sending generic data here, how do we proceed with that in mind?
-1
or "DEAD_PILL"
) as they might
cause problems down the line and get bugs if they actually receive those values from somewhere else and kill the
consumer task.Better approach is to wrap the message within a private record that can carry metadata like shouldShutdown
in
it that is only set and visible by our Workshop
class, so let’s do that.
Create a private record within separating the two fields and the poison pill message
class ThreadQueueWorkshop<T> implements Workshop<T> {
private record Message<T>(T value, boolean shouldShutdown) {
}
private final Message<T> POISON_PILL = new Message<>(null, true);
// other code
}
Update the consumer task in while loop so that when it encounters the poison pill message and sees the signal, it will break the loop.
Message<T> message = queue.take();
if (message.shouldShutdown) {
break;
}
worker.work(message.value);
Update the tryToSendJob
method to envelop the value in the Message
record
workerQueue.offer(new Message<>(work, false));
After you update all the parameter type changes of message from T
to Message<T>
we can finally update the
close
method of the workshop to send the DEAD_PILL
message to consumers
@Override
public void close() {
System.out.println("[Workshop] closing");
try {
jobQueueMap.forEach((key, queue) -> {
try {
// With put, we wait if necessary to ensure we send this message
queue.put(POISON_PILL);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} finally {
// We shut the executor down after sending the poison message so it gets to process it
executor.close();
}
System.out.println("[Workshop] closed");
}
put
method, that is because we want to ensure we put the pill inside the queue.2. Another safety net 🪲
It’s always good to put more security measures as you never know what might lead your application to a certain condition.
For example, I don’t know about you, but while(true)
always looked scary to me and I would also like to prevent some
method usage when application is shut down as it’s expected for the code that’s maintaining the Workshop
entity to
know that they should not send jobs if they closed it (shut it down), otherwise it seems they have a bug on their end.
For those reasons, let’s add another field to the Workshop
: isShutdown
First, let’s set it when the shutdown happens:
@Override
public void close() {
System.out.println("[Workshop] closing");
isShutdown = true; // We set the state of the workshop to closed
try {
jobQueueMap.forEach((key, queue) -> {
try {
// Even though we set the isShutdown flag above, the consumer might not reach it if it's still
// waiting for the queue to receive an item and would be stuck. So for that scenario we need to
// send it a POISON PILL message
queue.offer(POISON_PILL);
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
} finally {
// We shut the executor down after sending the poison message so it gets to process it
executor.close();
}
System.out.println("[Workshop] closed");
}
Then we replace the while(true)
with it
while (!isShutdown){
// ... other code
}
And the final place is where we try to send the job/notification
public boolean trySendJob(String workerName, T work) {
// In the shutdown state we want to prevent sending messages so we can send our dead pill without trouble
if (isShutdown) {
throw new IllegalStateException("Workshop is shutdown, cannot send job");
}
// ... other code
}
Ok, we just have to declare the variable, but remember, because this variable is accessed for read by many threads we
need to ensure it’s visible to all of them correctly, thus we need to set it as volatile
!
private volatile boolean isShutdown = false;
volatile
keyword provides a lighter alternative to synchronization when full thread safety (like atomicity in
compound actions) isn’t required. For instance, it’s suitable for “flag” variables that multiple threads read and
write but don’t need to modify in compound actions. This is because volatile doesn’t prevent race conditions for
compound actions like count++ (incrementing a shared variable) — it only ensures visibility.3. In-flight messages 🐛
Our queue holds 1 message only, but when it consumes it another one is put right in on the next send. But imagine if our workers is slow like a snail, it can get the first message and immediately after the second message. But because it’s slow, there might be soo many more messages after the second that the second actually becomes stale and maybe non-relevant (old data).
To prevent old data to be next in line, we must purge the queue after finishing the job so that we can grab the next
fresh data from it. Though, because there might be cases when you are interested in triggering and not loosing in-flight
messages, we should create a configurable variable shouldIgnoreInflightMessage
public class ThreadQueueWorkshop<T> implements Workshop<T> {
public static class Builder<T> {
private boolean shouldIgnoreInFlightMessage = false;
public Builder<T> skipInFlightMessage() {
shouldIgnoreInFlightMessage = true;
return this;
}
// ...other code
}
}
And extend the constructor of Workshop to accept the new field
new Workshop<>(workerMap, executor, jobQueueMap, shouldIgnoreInFlightMessage);
Then let’s add the check for queue purging
executor.submit(() -> {
while (!isShutdown) {
try {
Message<T> message = queue.take();
if (message.shouldShutdown) {
break;
}
worker.work(message.value);
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
// When we don't want the stored in-flight messages in queue and want to use newer ones
// after finishing of processing
if (shouldIgnoreInFlightMessage) {
queue.clear();
}
}
}
System.out.println("Consumer closed");
});
Now let’s move onto creating some threads that will send a lot of data quickly to our Workshop
.
Emitters
For the emitters, I’ll paste the whole example and explain on it whole, but before that example I want to show how it
looks like in usage with the Workshop
:
package org.example;
import org.example.workshop.EmittersSwarm;
import org.example.workshop.SlowWorker;
import org.example.workshop.Workshop;
public class App {
public static Workshop<String> threadQueueWorkshop() {
return ThreadQueueWorkshop.<String>newBuilder()
.registerWorker("delivery", new SlowWorker<>("delivery-worker"))
.registerWorker("processing", new SlowWorker<>("processing-worker"))
.build();
}
public static void main(String[] args) {
Workshop<String> workshop = virtualWorkshop();
EmittersSwarm swarm = EmittersSwarm.getBuilder()
.setWorkshop(workshop)
.build();
try (swarm; workshop) {
swarm.startAndAwait();
}
System.out.println("Done.");
}
}
We pass it the workshop, so the swarm will try to send the jobs to the method for all registered jobs .
Here is the code for the EmitterSwarm
, note that we are using virtual threads to change a little bit from traditional
threads with enough support for testing and dynamically + consistently going through all worker keys.
package org.example.workshop;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class EmittersSwarm implements AutoCloseable {
public static class Builder {
private Workshop<String> workshop;
private int numOfEmitters = 5;
private int numOfEmissions = 10;
private int waitMls = 1_000;
public Builder() {
}
public Builder setWorkshop(Workshop<String> workshop) {
this.workshop = workshop;
return this;
}
public Builder numOfEmitters(int numOfEmitters) {
this.numOfEmitters = numOfEmitters;
return this;
}
public Builder numOfEmissions(int numOfEmissions) {
this.numOfEmissions = numOfEmissions;
return this;
}
public Builder waitOnEmission(int waitMls) {
this.waitMls = waitMls;
return this;
}
public EmittersSwarm build() {
if (workshop == null) {
throw new IllegalStateException("You must specify a workshop");
}
return new EmittersSwarm(workshop, numOfEmitters, numOfEmissions, waitMls);
}
}
public static Builder getBuilder() {
return new Builder();
}
private final Workshop<String> workshop;
private final ExecutorService executor;
private final int numOfEmitters;
private final int numOfEmissions;
private final int waitMls;
private EmittersSwarm(
Workshop<String> workshop,
int numOfEmitters,
int numOfEmissions,
int waitMls
) {
this.workshop = workshop;
this.numOfEmitters = numOfEmitters;
this.numOfEmissions = numOfEmissions;
this.waitMls = waitMls;
executor = Executors.newVirtualThreadPerTaskExecutor();
}
public void startAndAwait() {
System.out.println("[EmittersSwarm] starting...");
workshop.run();
String[] workerNames = workshop.getWorkerNames().toArray(new String[0]);
List<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < numOfEmitters; i++) {
final int emitterId = i;
System.out.println("[Emitter](" + emitterId + ") created");
tasks.add(() -> {
for (int j = 0; j < numOfEmissions; j++) {
try {
String workerName = workerNames[emitterId % workerNames.length];
boolean success = workshop.trySendJob(workerName, "[Work](" + emitterId + ") " + j);
String color = success ? Color.GREEN : Color.RED;
System.out.println(
"[Emitter](" + emitterId + ") --" + workerName + "--> " + j + " " + color + success + Color.RESET
);
Thread.sleep(waitMls);
} catch (InterruptedException e) {
System.out.println(
"[Emitter] (" + emitterId + ")" + e.getMessage() + " " + Arrays.toString(e.getStackTrace())
);
}
}
System.out.println("[Emitter](" + emitterId + ") closed.");
return null;
});
}
System.out.println("[EmittersSwarm] waiting to finish...");
try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("[EmittersSwarm] done.");
}
@Override
public void close() {
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Now you can run the code and observe the following output
[EmittersSwarm] starting...
[Workshop] starting...
[Emitter](0) created
[Emitter](1) created
[Emitter](2) created
[Emitter](3) created
[Emitter](4) created
[EmittersSwarm] waiting to finish...
[Worker](processing-worker) starting to work on [Work](3) 0
[Worker](delivery-worker) starting to work on [Work](2) 0
[Emitter](4) --delivery--> 0 true
[Emitter](1) --processing--> 0 false
[Emitter](2) --delivery--> 0 true
[Emitter](3) --processing--> 0 true
[Emitter](0) --delivery--> 0 false
[Emitter](4) --delivery--> 1 false
[Emitter](2) --delivery--> 1 false
[Emitter](3) --processing--> 1 true
[Emitter](1) --processing--> 1 false
[Emitter](0) --delivery--> 1 false
[Emitter](3) --processing--> 2 false
[Emitter](1) --processing--> 2 false
[Emitter](2) --delivery--> 2 false
[Emitter](4) --delivery--> 2 false
[Emitter](0) --delivery--> 2 false
[Worker](processing-worker) starting to work on [Work](3) 1
[Worker](delivery-worker) starting to work on [Work](4) 0
[Emitter](2) --delivery--> 3 true
[Emitter](3) --processing--> 3 false
[Emitter](1) --processing--> 3 true
[Emitter](0) --delivery--> 3 false
[Emitter](4) --delivery--> 3 false
[Emitter](2) --delivery--> 4 false
[Emitter](0) --delivery--> 4 false
[Emitter](3) --processing--> 4 false
[Emitter](4) --delivery--> 4 false
[Emitter](1) --processing--> 4 false
[Emitter](2) --delivery--> 5 false
[Emitter](1) --processing--> 5 false
[Emitter](4) --delivery--> 5 false
[Emitter](3) --processing--> 5 false
[Emitter](0) --delivery--> 5 false
[Worker](processing-worker) starting to work on [Work](1) 3
[Worker](delivery-worker) starting to work on [Work](2) 3
[Emitter](1) --processing--> 6 true
[Emitter](0) --delivery--> 6 false
[Emitter](2) --delivery--> 6 true
[Emitter](4) --delivery--> 6 false
[Emitter](3) --processing--> 6 false
[Emitter](0) --delivery--> 7 false
[Emitter](2) --delivery--> 7 false
[Emitter](1) --processing--> 7 false
[Emitter](4) --delivery--> 7 false
[Emitter](3) --processing--> 7 false
[Emitter](0) --delivery--> 8 false
[Emitter](2) --delivery--> 8 false
[Emitter](1) --processing--> 8 false
[Emitter](4) --delivery--> 8 false
[Emitter](3) --processing--> 8 false
[Worker](processing-worker) starting to work on [Work](1) 6
[Worker](delivery-worker) starting to work on [Work](2) 6
[Emitter](0) --delivery--> 9 true
[Emitter](2) --delivery--> 9 false
[Emitter](1) --processing--> 9 true
[Emitter](4) --delivery--> 9 false
[Emitter](3) --processing--> 9 false
[Emitter](0) closed.
[Emitter](2) closed.
[Emitter](1) closed.
[Emitter](4) closed.
[Emitter](3) closed.
[EmittersSwarm] done.
[Workshop] closing
Consumer closed
Consumer closed
[Workshop] closed
Done.
The logs show the following format:
- for sending
[Emitter]({emitterId}) --{jobName}--> {iteration} {wasSentToQueue}
- for consuming
[Worker]({jobName}) starting to work on [Work]({emitterId}) {iteration}
So you can find the messages like: [Emitter](3) --processing--> 0 true
corresponding to
[Worker](processing-worker) starting to work on [Work](3) 0
by searching for successfully sent.
Note that if you want to avoid the in-flight message consumption, you need to set the option on the ThreadQueueWorkshop
:
Workshop<String> workshop = ThreadQueueWorkshop.<String>newBuilder()
.registerWorker("delivery", new SlowWorker<>("delivery-worker"))
.registerWorker("processing", new SlowWorker<>("processing-worker"))
.skipInFlightMessage() // sets the flag to true
.build();
Testing
Of course, we need some sort of proof if all of this works. Here we create a test that has a TestWorker
which is
drastically faster than the SlowWorker
at only 50ms
of execution time and pauses between emission of 20ms
.
package org.example;
import org.example.workshop.Color;
import org.example.workshop.Worker;
import java.util.concurrent.atomic.AtomicInteger;
public record TestWorker(String name, AtomicInteger counter) implements Worker<String> {
@Override
public void work(String value) throws InterruptedException {
System.out.println(Color.CYAN + "[Worker](" + name + ") consuming " + value + Color.RESET);
counter.incrementAndGet();
Thread.sleep(50);
}
}
We use atomic integer for counting the number of times each worker was triggered, and we ran the test multiple times
using @RepeatedTest(10)
to ensure everything is stable.
package org.example;
import org.example.workshop.EmittersSwarm;
import org.example.workshop.ThreadQueueWorkshop;
import org.junit.jupiter.api.RepeatedTest;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.*;
class WorkshopTest {
@RepeatedTest(10)
void givenPairOfWorkers_whenThereAreManyConcurrentEmissions_thenProcessOnlyConsumedAndInFlight() {
final AtomicInteger deliveryCounter = new AtomicInteger(0);
final AtomicInteger processingCounter = new AtomicInteger(0);
ThreadQueueWorkshop<String> workshop = ThreadQueueWorkshop.<String>newBuilder()
.registerWorker("delivery", new TestWorker("delivery", deliveryCounter))
.registerWorker("processing", new TestWorker("processing", processingCounter))
.build();
EmittersSwarm swarm = EmittersSwarm.getBuilder()
.setWorkshop(workshop)
.waitOnEmission(20)
.numOfEmitters(20)
.build();
try (workshop; swarm) {
swarm.startAndAwait();
}
assertEquals(5, deliveryCounter.get());
assertEquals(5, processingCounter.get());
}
@RepeatedTest(10)
void givenPairOfWorkers_whenThereAreManyConcurrentEmissionsAndSkipInFlightIsActive_thenProcessOnlyConsumed() throws InterruptedException {
final AtomicInteger deliveryCounter = new AtomicInteger(0);
final AtomicInteger processingCounter = new AtomicInteger(0);
ThreadQueueWorkshop<String> workshop = ThreadQueueWorkshop.<String>newBuilder()
.registerWorker("delivery", new TestWorker("delivery", deliveryCounter))
.registerWorker("processing", new TestWorker("processing", processingCounter))
.skipInFlightMessage()
.build();
EmittersSwarm swarm = EmittersSwarm.getBuilder()
.setWorkshop(workshop)
.waitOnEmission(20)
.numOfEmitters(20)
.build();
try (workshop; swarm) {
swarm.startAndAwait();
}
assertEquals(4, deliveryCounter.get());
assertEquals(4, processingCounter.get());
}
}
Where we see all the tests passing! ✅
Improvements
What if there is no work? Empty queue, all of our threads are waiting, doing nothing and wasting threads (especially OS threads), which is costly as we could use them somewhere else.
How do we spawn a thread when there is a job, but remove it when there are no jobs?
When I think about discarding threads, my mind tends to jump to Virtual threads as we can have hundreds of thousands of them, and also they are forbidden from being polled like traditional threads, basically they are throw-away.
Another thing we can utilize to allow only 1 virtual thread to spawn per worker type is to use a Semaphore
and
replace our LinkedBlockingQueue
, let’s see how that could look like by creating a VirtualWorkshop
.
Because of using a Semaphore
(Binary semaphore to be precise) and no queues the majority of the functionality will
be within the sending method:
public boolean trySendJob(String name, T job) {
// Each job type has its own semaphore
Semaphore semaphore = semaphoreMap.get(name);
if (semaphore == null) {
throw new IllegalStateException("Semaphore for " + name + " does not exist");
}
Worker<T> worker = workerMap.get(name);
if (worker == null) {
throw new IllegalStateException("Worker for " + name + " does not exist");
}
boolean permit = semaphore.tryAcquire();
if (!permit) return false;
// We create a new thread because we don't want this method to block
executor.submit(() -> {
try {
worker.work(job);
} catch (Throwable e) {
System.out.println(e.getMessage());
throw new RuntimeException(e);
} finally {
// We release here after the work is done, otherwise if we do it outside of this new thread
// next method invocation will be able to obtain a new lock
semaphore.release();
}
});
return true;
}
Here we check the semaphore to see if it’s free or not, because we are using binary it allows access to only 1 thread
and the Semaphore
method tryToAcquire
does not block for until release.
The whole VirtualWorkshop
class is quite similar to previous ThreadQueueWorkshop
and looks like this:
package org.example.workshop;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class VirtualWorkshop<T> implements Workshop<T> {
public static class Builder<T> {
private final HashMap<String, Worker<T>> workerMap = new HashMap<>();
private final HashMap<String, Semaphore> semaphoreMap = new HashMap<>();
public Builder<T> registerWorker(String name, Worker<T> worker) {
workerMap.put(name, worker);
return this;
}
public VirtualWorkshop<T> build() {
workerMap.forEach((name, worker) -> {
semaphoreMap.put(name, new Semaphore(1));
});
return new VirtualWorkshop<>(workerMap, semaphoreMap);
}
}
public static <T> Builder<T> getBuilder() {
return new Builder<>();
}
private final HashMap<String, Worker<T>> workerMap;
private final HashMap<String, Semaphore> semaphoreMap;
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
private VirtualWorkshop(
HashMap<String, Worker<T>> workerMap,
HashMap<String, Semaphore> semaphoreMap
) {
this.workerMap = workerMap;
this.semaphoreMap = semaphoreMap;
}
public Set<String> getWorkerNames() {
return workerMap.keySet();
}
@Override
public void run() {
// We have nothing to start :)
}
public boolean trySendJob(String name, T job) {
Semaphore semaphore = semaphoreMap.get(name);
if (semaphore == null) {
throw new IllegalStateException("Semaphore for " + name + " does not exist");
}
Worker<T> worker = workerMap.get(name);
if (worker == null) {
throw new IllegalStateException("Worker for " + name + " does not exist");
}
boolean permit = semaphore.tryAcquire();
if (!permit) return false;
executor.submit(() -> {
try {
worker.work(job);
} catch (Throwable e) {
System.out.println(e.getMessage());
throw new RuntimeException(e);
} finally {
semaphore.release();
}
});
return true;
}
@Override
public void close() {
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
We run it the same way, just by replacing the previous one
package org.example;
import org.example.workshop.*;
public class App {
public static Workshop<String> threadQueueWorkshop() {
return ThreadQueueWorkshop.<String>newBuilder()
.registerWorker("delivery", new SlowWorker<>("delivery-worker"))
.registerWorker("processing", new SlowWorker<>("processing-worker"))
.build();
}
public static Workshop<String> virtualWorkshop() throws Throwable {
return VirtualWorkshop.<String>getBuilder()
.registerWorker("delivery", new SlowWorker<>("delivery-worker"))
.registerWorker("processing", new SlowWorker<>("processing-worker"))
.build();
}
public static void main(String[] args) throws Throwable {
Workshop<String> workshop = virtualWorkshop();
EmittersSwarm swarm = EmittersSwarm.getBuilder()
.setWorkshop(workshop)
.build();
try (swarm; workshop) {
swarm.startAndAwait();
}
System.out.println("Done.");
}
}
and same test suite, we just replace the Workshop
type and will see that everything passes on all 10 executions!
✅
Summary
As you can see, the virtual threads solution is much simpler and efficient, but it’s good to know and understand alternatives, because in some cases the queue might be a better approach.
It’s generally better to use completed solutions, but from using all of them we forget about how some things work or just never get a chance to build some of our own and learn from them.
Hope this exercise was fun, stimulated thinking for a little bit and maybe answered to some questions or gave some ideas 💡 . Here is the link to the repository if you want to check it fully.