Contents

Creating notification workers in Java

An exercise for practicing concurrency where we will use LinkedBlockingQueue with single worker (thread) per job type (key) to trigger work in a non-blocking way and also try out the Semaphore. Imagine publish/subscribe but without queuing and only doing 1 job at a time, but without any externals like Redis, RabbitMQ and so on, only Java.

To paint a better picture when we would need something like this, here are a couple of examples

Throttling example
We can use this functionality to throttle the amount of calls we want to make towards a service, 1 trigger at a time.

Unique job example
We want to process only 1 job per type at a time to avoid conflicts.
Monitoring for latest values
Imagine that we want in real-time to collect data on heat and average it, it’s ok for us to lose a bit of data as we always value more the latest data coming in.

Abstract view of the implementation would look something like this

/posts/building-notification-workers-in-java/notifications_abstraction.webp

We have an N number of concurrent threads trying to send the notification, where only 1 notification per T type of worker is allowed for worker to start processing. Worker does not accept any new notifications until it’s done.

Requirements

  1. The group of N threads on the left to send/trigger notifications for a job based on the job type without being blocked (waiting for worker to take the job)
  2. Worker should only execute 1 job at the time
  3. No queuing of jobs
  4. No data race between threads and no blocking
  5. Handle generic data
  6. Should be testable

New to threads?

If you are new to threads, I suggest checking this fantastic channel, and it’s playlist for concurrency in Java, otherwise skip to the next section.

Solution

LinkedBlockingQueue is a good and clean solution as it will help us to pass data towards one (or multiple) worker/s without many complications by following the principle coined by Go:

Do not communicate by sharing memory; instead, share memory by communicating.

The meaning
Threads changing each others shared memory can lead to problems and making debugging easier, but if there is a channel of sorts for them to send notifications, they can notify other threads that they need to change their memory and share that data.

We want to have a main class to house the logic and provide the method (API) for interacting by all other outside threads concurrently, let’s call it Workshop. It will first register jobs and workers, then create the queues that link each other, so it will look like the following:

/posts/building-notification-workers-in-java/notifications_workshop.webp
Notifications workshop

To give a better image of how it’s going to look at the end in the main method:

package org.example;

import org.example.workshop.EmittersSwarm;
import org.example.workshop.SlowWorker;
import org.example.workshop.Workshop;

public class App {

    public static Workshop<String> threadQueueWorkshop() {
        return ThreadQueueWorkshop.<String>newBuilder()
                .registerWorker("delivery", new SlowWorker<>("delivery-worker"))
                .registerWorker("processing", new SlowWorker<>("processing-worker"))
                .build();
    }
    
    public static void main(String[] args) {
        Workshop<String> workshop = virtualWorkshop();

        EmittersSwarm swarm = EmittersSwarm.getBuilder()
                .setWorkshop(workshop)
                .build();

        try (swarm; workshop) {
            swarm.startAndAwait();
        }

        System.out.println("Done.");
    }
}

We have a builder for the Workshop (ThreadQueueWorkshop implementation) and a builder for the EmittersSwarm that will start the “attack” on the workshop’s method trySendJob and await for a set number of threads and requests per thread to complete. By default, it will spawn 5 emitters (threads) 10 emissions per emitter with a 1-second pause between each.

Ok, so let’s start with it.

Worker setup

Suggest you create a workshop package to house everything and for starters add a helper Color class to add a little bit of style so that we can easily visually see what’s going on later with output.

package org.example.workshop;

public class Color {
    public static final String RESET = "\033[0m";  // Text Reset

    public static final String BLACK = "\033[0;30m";   // BLACK
    public static final String RED = "\033[0;31m";     // RED
    public static final String GREEN = "\033[0;32m";   // GREEN
    public static final String YELLOW = "\033[0;33m";  // YELLOW
    public static final String BLUE = "\033[0;34m";    // BLUE
    public static final String PURPLE = "\033[0;35m";  // PURPLE
    public static final String CYAN = "\033[0;36m";    // CYAN
    public static final String WHITE = "\033[0;37m";   // WHITE
}

Quick definition of a generic worker interface

package org.example.workshop;

public interface Worker<T> {
    void work(T value) throws Throwable;
}

And a slow worker that will sleep for 3 seconds to simulate a slow job and help us notice easier what’s going on when we start sending notifications/jobs

package org.example.workshop;

public class SlowWorker<T> implements Worker<T> {

    private final String name;

    public SlowWorker(String name) {
        this.name = name;
    }

    @Override
    public void work(T value) throws InterruptedException {
        System.out.println(Color.CYAN + "[Worker](" + name + ") starting to work on " + value + Color.RESET);
        Thread.sleep(3_000);
    }
}

Workshop

We will need a main class that will encapsulate the concurrency safety model for triggering these workers, but before that, let’s first create an interface as it will make switching workshops easier later on.

package org.example.workshop;

import java.util.Set;

public interface Workshop<T> extends AutoCloseable {
    /**
     * Starts consumer threads if any or any other requirement needed for operating.
     */
    void run();

    /**
     * Names of the registered workers
     */
    Set<String> getWorkerNames();

    /**
     * This method is thread safe
     *
     * @return true if sending of the job was successful
     */
    boolean trySendJob(String workerName, T work);
}

This also helps to imagine the interface we need.

You probably noticed that we implement AutoCloseable, that’s because we need to cleanly close the executor thread pool with try-with usage later on.

Ok, to get back to the implementation, we will create our implementation of the Workshop called ThreadQueueWorkshop

public class ThreadQueueWorkshop<T> implements Workshop<T> {
    private final HashMap<String, Worker<T>> workerMap;
    private final HashMap<String, LinkedBlockingQueue<T>> jobQueueMap;
    private final ExecutorService executor;

    public Workshop(
            HashMap<String, Worker<T>> workerMap,
            ExecutorService executor,
            HashMap<String, LinkedBlockingQueue<T>> jobQueueMap
    ) {
        this.workerMap = workerMap;
        this.jobQueueMap = jobQueueMap;
        this.executor = executor;
    }

    public Set<String> getWorkerKeyMap() {
        return workerMap.keySet();
    }

    @Override
    public void close() {
        System.out.println("[Workshop] closing");
        executor.close();
        System.out.println("[Workshop] closed");
    }
}

We purposely want to avoid putting a lot of initialization logic in the constructor, so for that we will mark it as private and create a Builder class to fluently build and configure our ThreadQueueWorkshop.

Add within the ThreadQueueWorkshop class the Builder class.

public class ThreadQueueWorkshop<T> implements Workshop<T> {
    public static class Builder<T> {
        private final HashMap<String, Worker<T>> workerMap = new HashMap<>();

        public Builder<T> registerWorker(String name, Worker<T> worker) {
            workerMap.put(name, worker);
            return this;
        }

        public ThreadQueueWorkshop<T> build() {
            if (workerMap.isEmpty()) {
                throw new IllegalArgumentException("workerMap is empty");
            }

            ExecutorService executor = Executors.newFixedThreadPool(workerMap.size());
            HashMap<String, LinkedBlockingQueue<T>> jobQueueMap = new HashMap<>(workerMap.size());

            workerMap.forEach((key, worker) -> {
                // The queue will be able to hold only 1 message, adding others would be ignored until
                // the queue is empty
                LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(1);
                jobQueueMap.put(key, queue);
            });

            return new ThreadQueueWorkshop<>(workerMap, executor, jobQueueMap);
        }
    }

    public static <T> Builder<T> newBuilder() {
        return new Builder<>();
    }
    
    // ... rest of the code
}

In the builder we have method to map the workers based on name and build the correct number of threads in executor service, queues, link them together and perform some validation.

This will allow us to build it in the following way:

public static void main(String[] args) {
    // Here we register 2 workers of same class by different names
    Workshop<String> workshop = ThreadQueueWorkshop.<String>newBuilder()
            .registerWorker("delivery", new SlowWorker<>("delivery-worker"))
            .registerWorker("processing", new SlowWorker<>("processing-worker"))
            .build();

    try(workshop) {
        // ... use workshop to send notifications/jobs
    }
    // after the scope the workshop is closed
}

Now that setup of the ThreadQueueWorkshop is done, let’s continue with the sending of jobs.

Sending the job is relatively simple, after we fetch the queue based on job key, we send it. But important thing to note is that for that we are using the offer method of the LinkedBlockingQueue. The documentation says the following:

offer
Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue’s capacity, returning true upon success and false if this queue is full

Which is the complete opposite of using put:

put
Inserts the specified element at the tail of this queue, waiting if necessary for space to become available.

…and with this we won’t be blocking the invoker of the method.

class Workshop<T> implements AutoCloseable {
    
    // ... rest of the code

    public boolean trySendJob(String workerName, T work) {
        LinkedBlockingQueue<T> workerQueue = jobQueueMap.get(workerName);
        if (workerQueue == null) {
            throw new IllegalArgumentException("worker " + workerName + " not found");
        }
        return workerQueue.offer(work);
    }
}

You noticed that there is no synchronization or lock for this method, this is because the jobQueueMap won’t change. It’s set as it is during creation of the Workshop, so there is no expectation of data race incurring.

LinkedBlockingQueue
LinkedBlockingQueue is using locks for sharing data, thus it has already built in mechanisms for data race protection, but you should always check method implementation if it’s safe for concurrent usage.

Ok, let’s move to starting the workers that will consume these notifications/jobs.

class ThreadQueueWorkshop<T> implements Workshop<T> {
    
    // ... other code

    public void start() {
        System.out.println("[Workshop] starting...");

        workerMap.forEach((key, worker) -> {
            LinkedBlockingQueue<T> queue = jobQueueMap.get(key);
            if (queue == null) {
                throw new RuntimeException("No queue for key: " + key);
            }

            executor.submit(() -> {
                while (true) {
                    try {
                        T message = queue.take();
                        worker.work(message.value);
                    } catch (Throwable e) {
                        throw new RuntimeException(e);
                    }
                }
                System.out.println("Consumer closed");
            });
        });
    }
}

When the workshop is started, a new task is created per worker thread that will wait to data to appear in queue before executing Worker.work method on the value within an infinite loop and will shut down when we call the executor close method in the Workshop.closeor will it? ⁉️

Did you catch that bug? 🦋

Now let’s see if you managed to see problems in the previous code and design.

1. Never ending consumers 🐛

Maybe it’s better to start from the end. You saw that we started infinite tasks that consume data forever and thought they close after calling close on the ExecutorsService? Well the service just prevents further tasks from being sent to it via executor.submit and will try to wait on the ongoing tasks to finish, which in our case would never finish!

We need a way to let the consumer tasks break the while loop. Because they are stuck on waiting for the queue to have data, it would be smart to use the queue to send a DEAD_PILL message that’s basically a signal for it to end.

Some would have the dead pill message be -1 if we were sending ints or DEAD_PILL if we were sending strings, but we are sending generic data here, how do we proceed with that in mind?

Strange dead pill values
It’s still unsafe in the previous examples to be using strange values (like -1 or "DEAD_PILL") as they might cause problems down the line and get bugs if they actually receive those values from somewhere else and kill the consumer task.

Better approach is to wrap the message within a private record that can carry metadata like shouldShutdown in it that is only set and visible by our Workshop class, so let’s do that.

Create a private record within separating the two fields and the poison pill message

class ThreadQueueWorkshop<T> implements Workshop<T> {
    private record Message<T>(T value, boolean shouldShutdown) {
    }

    private final Message<T> POISON_PILL = new Message<>(null, true);
    // other code
}

Update the consumer task in while loop so that when it encounters the poison pill message and sees the signal, it will break the loop.

Message<T> message = queue.take();
if (message.shouldShutdown) {
    break;
}
worker.work(message.value);

Update the tryToSendJob method to envelop the value in the Message record

workerQueue.offer(new Message<>(work, false));

After you update all the parameter type changes of message from T to Message<T> we can finally update the close method of the workshop to send the DEAD_PILL message to consumers

@Override
public void close() {
    System.out.println("[Workshop] closing");
    try {
        jobQueueMap.forEach((key, queue) -> {
            try {
                // With put, we wait if necessary to ensure we send this message
                queue.put(POISON_PILL);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    } finally {
        // We shut the executor down after sending the poison message so it gets to process it
        executor.close();
    }
    System.out.println("[Workshop] closed");
}
Ensure dead pill is added
Notice that we used the put method, that is because we want to ensure we put the pill inside the queue.

2. Another safety net 🪲

It’s always good to put more security measures as you never know what might lead your application to a certain condition. For example, I don’t know about you, but while(true) always looked scary to me and I would also like to prevent some method usage when application is shut down as it’s expected for the code that’s maintaining the Workshop entity to know that they should not send jobs if they closed it (shut it down), otherwise it seems they have a bug on their end.

For those reasons, let’s add another field to the Workshop: isShutdown

First, let’s set it when the shutdown happens:

    @Override
    public void close() {
        System.out.println("[Workshop] closing");
        isShutdown = true; // We set the state of the workshop to closed 
        try {
            jobQueueMap.forEach((key, queue) -> {
                try {
                    // Even though we set the isShutdown flag above, the consumer might not reach it if it's still
                    // waiting for the queue to receive an item and would be stuck. So for that scenario we need to
                    // send it a POISON PILL message
                    queue.offer(POISON_PILL);
                } catch (Throwable e) {
                    throw new RuntimeException(e);
                }
            });
        } finally {
            // We shut the executor down after sending the poison message so it gets to process it
            executor.close();
        }
        System.out.println("[Workshop] closed");
    }
Change from put to offer
Now we need to change how we pass the poison pill, because we will use a flag for shutting down the while loop we need to avoid blocking with put in case the loop ends.

Then we replace the while(true) with it

while (!isShutdown){
    // ... other code
}

And the final place is where we try to send the job/notification

    public boolean trySendJob(String workerName, T work) {
    // In the shutdown state we want to prevent sending messages so we can send our dead pill without trouble
    if (isShutdown) {
        throw new IllegalStateException("Workshop is shutdown, cannot send job");
    }
    // ... other code
}

Ok, we just have to declare the variable, but remember, because this variable is accessed for read by many threads we need to ensure it’s visible to all of them correctly, thus we need to set it as volatile!

private volatile boolean isShutdown = false;
Volatile
The volatile keyword provides a lighter alternative to synchronization when full thread safety (like atomicity in compound actions) isn’t required. For instance, it’s suitable for “flag” variables that multiple threads read and write but don’t need to modify in compound actions. This is because volatile doesn’t prevent race conditions for compound actions like count++ (incrementing a shared variable) — it only ensures visibility.

3. In-flight messages 🐛

Our queue holds 1 message only, but when it consumes it another one is put right in on the next send. But imagine if our workers is slow like a snail, it can get the first message and immediately after the second message. But because it’s slow, there might be soo many more messages after the second that the second actually becomes stale and maybe non-relevant (old data).

To prevent old data to be next in line, we must purge the queue after finishing the job so that we can grab the next fresh data from it. Though, because there might be cases when you are interested in triggering and not loosing in-flight messages, we should create a configurable variable shouldIgnoreInflightMessage

public class ThreadQueueWorkshop<T> implements Workshop<T> {

    public static class Builder<T> {
        private boolean shouldIgnoreInFlightMessage = false;
        
        public Builder<T> skipInFlightMessage() {
            shouldIgnoreInFlightMessage = true;
            return this;
        }
        // ...other code
    }
}

And extend the constructor of Workshop to accept the new field

new Workshop<>(workerMap, executor, jobQueueMap, shouldIgnoreInFlightMessage);

Then let’s add the check for queue purging

executor.submit(() -> {
    while (!isShutdown) {
        try {
                Message<T> message = queue.take();
                if (message.shouldShutdown) {
                    break;
                }
                worker.work(message.value);
            } catch (Throwable e) {
                throw new RuntimeException(e);
            } finally {
                // When we don't want the stored in-flight messages in queue and want to use newer ones
                // after finishing of processing
                if (shouldIgnoreInFlightMessage) {
                    queue.clear();
                }
            }
    }
    System.out.println("Consumer closed");
});

Now let’s move onto creating some threads that will send a lot of data quickly to our Workshop.

Emitters

For the emitters, I’ll paste the whole example and explain on it whole, but before that example I want to show how it looks like in usage with the Workshop:

package org.example;

import org.example.workshop.EmittersSwarm;
import org.example.workshop.SlowWorker;
import org.example.workshop.Workshop;

public class App {

    public static Workshop<String> threadQueueWorkshop() {
        return ThreadQueueWorkshop.<String>newBuilder()
                .registerWorker("delivery", new SlowWorker<>("delivery-worker"))
                .registerWorker("processing", new SlowWorker<>("processing-worker"))
                .build();
    }
    
    public static void main(String[] args) {
        Workshop<String> workshop = virtualWorkshop();

        EmittersSwarm swarm = EmittersSwarm.getBuilder()
                .setWorkshop(workshop)
                .build();

        try (swarm; workshop) {
            swarm.startAndAwait();
        }

        System.out.println("Done.");
    }
}

We pass it the workshop, so the swarm will try to send the jobs to the method for all registered jobs .

Here is the code for the EmitterSwarm, note that we are using virtual threads to change a little bit from traditional threads with enough support for testing and dynamically + consistently going through all worker keys.

package org.example.workshop;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class EmittersSwarm implements AutoCloseable {

    public static class Builder {
        private Workshop<String> workshop;
        private int numOfEmitters = 5;
        private int numOfEmissions = 10;
        private int waitMls = 1_000;

        public Builder() {
        }

        public Builder setWorkshop(Workshop<String> workshop) {
            this.workshop = workshop;
            return this;
        }

        public Builder numOfEmitters(int numOfEmitters) {
            this.numOfEmitters = numOfEmitters;
            return this;
        }

        public Builder numOfEmissions(int numOfEmissions) {
            this.numOfEmissions = numOfEmissions;
            return this;
        }

        public Builder waitOnEmission(int waitMls) {
            this.waitMls = waitMls;
            return this;
        }

        public EmittersSwarm build() {
            if (workshop == null) {
                throw new IllegalStateException("You must specify a workshop");
            }

            return new EmittersSwarm(workshop, numOfEmitters, numOfEmissions, waitMls);
        }
    }

    public static Builder getBuilder() {
        return new Builder();
    }

    private final Workshop<String> workshop;
    private final ExecutorService executor;
    private final int numOfEmitters;
    private final int numOfEmissions;
    private final int waitMls;

    private EmittersSwarm(
            Workshop<String> workshop,
            int numOfEmitters,
            int numOfEmissions,
            int waitMls
    ) {
        this.workshop = workshop;
        this.numOfEmitters = numOfEmitters;
        this.numOfEmissions = numOfEmissions;
        this.waitMls = waitMls;
        executor = Executors.newVirtualThreadPerTaskExecutor();
    }

    public void startAndAwait() {
        System.out.println("[EmittersSwarm] starting...");
        workshop.run();

        String[] workerNames = workshop.getWorkerNames().toArray(new String[0]);
        List<Callable<Void>> tasks = new ArrayList<>();

        for (int i = 0; i < numOfEmitters; i++) {
            final int emitterId = i;
            System.out.println("[Emitter](" + emitterId + ") created");

            tasks.add(() -> {
                for (int j = 0; j < numOfEmissions; j++) {
                    try {
                        String workerName = workerNames[emitterId % workerNames.length];

                        boolean success = workshop.trySendJob(workerName, "[Work](" + emitterId + ") " + j);
                        String color = success ? Color.GREEN : Color.RED;
                        System.out.println(
                                "[Emitter](" + emitterId + ") --" + workerName + "--> " + j + " " + color + success + Color.RESET
                        );
                        Thread.sleep(waitMls);
                    } catch (InterruptedException e) {
                        System.out.println(
                                "[Emitter] (" + emitterId + ")" + e.getMessage() + " " + Arrays.toString(e.getStackTrace())
                        );
                    }
                }
                System.out.println("[Emitter](" + emitterId + ") closed.");

                return null;
            });
        }

        System.out.println("[EmittersSwarm] waiting to finish...");
        try {
            executor.invokeAll(tasks);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("[EmittersSwarm] done.");
    }

    @Override
    public void close() {
        executor.shutdown();
        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Now you can run the code and observe the following output

[EmittersSwarm] starting...
[Workshop] starting...
[Emitter](0) created
[Emitter](1) created
[Emitter](2) created
[Emitter](3) created
[Emitter](4) created
[EmittersSwarm] waiting to finish...
[Worker](processing-worker) starting to work on [Work](3) 0
[Worker](delivery-worker) starting to work on [Work](2) 0
[Emitter](4) --delivery--> 0 true
[Emitter](1) --processing--> 0 false
[Emitter](2) --delivery--> 0 true
[Emitter](3) --processing--> 0 true
[Emitter](0) --delivery--> 0 false
[Emitter](4) --delivery--> 1 false
[Emitter](2) --delivery--> 1 false
[Emitter](3) --processing--> 1 true
[Emitter](1) --processing--> 1 false
[Emitter](0) --delivery--> 1 false
[Emitter](3) --processing--> 2 false
[Emitter](1) --processing--> 2 false
[Emitter](2) --delivery--> 2 false
[Emitter](4) --delivery--> 2 false
[Emitter](0) --delivery--> 2 false
[Worker](processing-worker) starting to work on [Work](3) 1
[Worker](delivery-worker) starting to work on [Work](4) 0
[Emitter](2) --delivery--> 3 true
[Emitter](3) --processing--> 3 false
[Emitter](1) --processing--> 3 true
[Emitter](0) --delivery--> 3 false
[Emitter](4) --delivery--> 3 false
[Emitter](2) --delivery--> 4 false
[Emitter](0) --delivery--> 4 false
[Emitter](3) --processing--> 4 false
[Emitter](4) --delivery--> 4 false
[Emitter](1) --processing--> 4 false
[Emitter](2) --delivery--> 5 false
[Emitter](1) --processing--> 5 false
[Emitter](4) --delivery--> 5 false
[Emitter](3) --processing--> 5 false
[Emitter](0) --delivery--> 5 false
[Worker](processing-worker) starting to work on [Work](1) 3
[Worker](delivery-worker) starting to work on [Work](2) 3
[Emitter](1) --processing--> 6 true
[Emitter](0) --delivery--> 6 false
[Emitter](2) --delivery--> 6 true
[Emitter](4) --delivery--> 6 false
[Emitter](3) --processing--> 6 false
[Emitter](0) --delivery--> 7 false
[Emitter](2) --delivery--> 7 false
[Emitter](1) --processing--> 7 false
[Emitter](4) --delivery--> 7 false
[Emitter](3) --processing--> 7 false
[Emitter](0) --delivery--> 8 false
[Emitter](2) --delivery--> 8 false
[Emitter](1) --processing--> 8 false
[Emitter](4) --delivery--> 8 false
[Emitter](3) --processing--> 8 false
[Worker](processing-worker) starting to work on [Work](1) 6
[Worker](delivery-worker) starting to work on [Work](2) 6
[Emitter](0) --delivery--> 9 true
[Emitter](2) --delivery--> 9 false
[Emitter](1) --processing--> 9 true
[Emitter](4) --delivery--> 9 false
[Emitter](3) --processing--> 9 false
[Emitter](0) closed.
[Emitter](2) closed.
[Emitter](1) closed.
[Emitter](4) closed.
[Emitter](3) closed.
[EmittersSwarm] done.
[Workshop] closing
Consumer closed
Consumer closed
[Workshop] closed
Done.

The logs show the following format:

  • for sending [Emitter]({emitterId}) --{jobName}--> {iteration} {wasSentToQueue}
  • for consuming [Worker]({jobName}) starting to work on [Work]({emitterId}) {iteration}

So you can find the messages like: [Emitter](3) --processing--> 0 true corresponding to [Worker](processing-worker) starting to work on [Work](3) 0 by searching for successfully sent.

In-flight setting

Note that if you want to avoid the in-flight message consumption, you need to set the option on the ThreadQueueWorkshop:

Workshop<String> workshop = ThreadQueueWorkshop.<String>newBuilder()
        .registerWorker("delivery", new SlowWorker<>("delivery-worker"))
        .registerWorker("processing", new SlowWorker<>("processing-worker"))
        .skipInFlightMessage() // sets the flag to true
        .build();
Order is not guaranteed!
Threads are ran concurrently and there is no guarantee of the order of the messages.

Testing

Of course, we need some sort of proof if all of this works. Here we create a test that has a TestWorker which is drastically faster than the SlowWorker at only 50ms of execution time and pauses between emission of 20ms.

package org.example;

import org.example.workshop.Color;
import org.example.workshop.Worker;

import java.util.concurrent.atomic.AtomicInteger;

public record TestWorker(String name, AtomicInteger counter) implements Worker<String> {
    @Override
    public void work(String value) throws InterruptedException {
        System.out.println(Color.CYAN + "[Worker](" + name + ") consuming " + value + Color.RESET);
        counter.incrementAndGet();
        Thread.sleep(50);
    }
}

We use atomic integer for counting the number of times each worker was triggered, and we ran the test multiple times using @RepeatedTest(10) to ensure everything is stable.

package org.example;

import org.example.workshop.EmittersSwarm;
import org.example.workshop.ThreadQueueWorkshop;
import org.junit.jupiter.api.RepeatedTest;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.*;

class WorkshopTest {

    @RepeatedTest(10)
    void givenPairOfWorkers_whenThereAreManyConcurrentEmissions_thenProcessOnlyConsumedAndInFlight() {
        final AtomicInteger deliveryCounter = new AtomicInteger(0);
        final AtomicInteger processingCounter = new AtomicInteger(0);

        ThreadQueueWorkshop<String> workshop = ThreadQueueWorkshop.<String>newBuilder()
                .registerWorker("delivery", new TestWorker("delivery", deliveryCounter))
                .registerWorker("processing", new TestWorker("processing", processingCounter))
                .build();

        EmittersSwarm swarm = EmittersSwarm.getBuilder()
                .setWorkshop(workshop)
                .waitOnEmission(20)
                .numOfEmitters(20)
                .build();
        try (workshop; swarm) {
            swarm.startAndAwait();
        }

        assertEquals(5, deliveryCounter.get());
        assertEquals(5, processingCounter.get());
    }

    @RepeatedTest(10)
    void givenPairOfWorkers_whenThereAreManyConcurrentEmissionsAndSkipInFlightIsActive_thenProcessOnlyConsumed() throws InterruptedException {
        final AtomicInteger deliveryCounter = new AtomicInteger(0);
        final AtomicInteger processingCounter = new AtomicInteger(0);

        ThreadQueueWorkshop<String> workshop = ThreadQueueWorkshop.<String>newBuilder()
                .registerWorker("delivery", new TestWorker("delivery", deliveryCounter))
                .registerWorker("processing", new TestWorker("processing", processingCounter))
                .skipInFlightMessage()
                .build();

        EmittersSwarm swarm = EmittersSwarm.getBuilder()
                .setWorkshop(workshop)
                .waitOnEmission(20)
                .numOfEmitters(20)
                .build();
        try (workshop; swarm) {
            swarm.startAndAwait();
        }

        assertEquals(4, deliveryCounter.get());
        assertEquals(4, processingCounter.get());
    }
}

Where we see all the tests passing! ✅

Improvements

What if there is no work? Empty queue, all of our threads are waiting, doing nothing and wasting threads (especially OS threads), which is costly as we could use them somewhere else.

How do we spawn a thread when there is a job, but remove it when there are no jobs?

/posts/building-notification-workers-in-java/waiting_queue.webp
Waiting queue

When I think about discarding threads, my mind tends to jump to Virtual threads as we can have hundreds of thousands of them, and also they are forbidden from being polled like traditional threads, basically they are throw-away.

Another thing we can utilize to allow only 1 virtual thread to spawn per worker type is to use a Semaphore and replace our LinkedBlockingQueue, let’s see how that could look like by creating a VirtualWorkshop.

Because of using a Semaphore (Binary semaphore to be precise) and no queues the majority of the functionality will be within the sending method:

public boolean trySendJob(String name, T job) {
    // Each job type has its own semaphore
    Semaphore semaphore = semaphoreMap.get(name);
    if (semaphore == null) {
        throw new IllegalStateException("Semaphore for " + name + " does not exist");
    }
    Worker<T> worker = workerMap.get(name);
    if (worker == null) {
        throw new IllegalStateException("Worker for " + name + " does not exist");
    }
    
    boolean permit = semaphore.tryAcquire();
    if (!permit) return false;

    // We create a new thread because we don't want this method to block 
    executor.submit(() -> {
        try {
            worker.work(job);
        } catch (Throwable e) {
            System.out.println(e.getMessage());
            throw new RuntimeException(e);
        } finally {
            // We release here after the work is done, otherwise if we do it outside of this new thread
            // next method invocation will be able to obtain a new lock
            semaphore.release();
        }
    });

    return true;
}

Here we check the semaphore to see if it’s free or not, because we are using binary it allows access to only 1 thread and the Semaphore method tryToAcquire does not block for until release.

The whole VirtualWorkshop class is quite similar to previous ThreadQueueWorkshop and looks like this:

package org.example.workshop;

import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class VirtualWorkshop<T> implements Workshop<T> {

    public static class Builder<T> {
        private final HashMap<String, Worker<T>> workerMap = new HashMap<>();
        private final HashMap<String, Semaphore> semaphoreMap = new HashMap<>();

        public Builder<T> registerWorker(String name, Worker<T> worker) {
            workerMap.put(name, worker);
            return this;
        }

        public VirtualWorkshop<T> build() {
            workerMap.forEach((name, worker) -> {
                semaphoreMap.put(name, new Semaphore(1));
            });
            return new VirtualWorkshop<>(workerMap, semaphoreMap);
        }
    }

    public static <T> Builder<T> getBuilder() {
        return new Builder<>();
    }

    private final HashMap<String, Worker<T>> workerMap;
    private final HashMap<String, Semaphore> semaphoreMap;
    private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

    private VirtualWorkshop(
            HashMap<String, Worker<T>> workerMap,
            HashMap<String, Semaphore> semaphoreMap
    ) {
        this.workerMap = workerMap;
        this.semaphoreMap = semaphoreMap;
    }

    public Set<String> getWorkerNames() {
        return workerMap.keySet();
    }

    @Override
    public void run() {
        // We have nothing to start :)
    }

    public boolean trySendJob(String name, T job) {
        Semaphore semaphore = semaphoreMap.get(name);
        if (semaphore == null) {
            throw new IllegalStateException("Semaphore for " + name + " does not exist");
        }
        Worker<T> worker = workerMap.get(name);
        if (worker == null) {
            throw new IllegalStateException("Worker for " + name + " does not exist");
        }
        boolean permit = semaphore.tryAcquire();
        if (!permit) return false;

        executor.submit(() -> {
            try {
                worker.work(job);
            } catch (Throwable e) {
                System.out.println(e.getMessage());
                throw new RuntimeException(e);
            } finally {
                semaphore.release();
            }
        });

        return true;
    }

    @Override
    public void close() {
        executor.shutdown();
        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

We run it the same way, just by replacing the previous one

package org.example;

import org.example.workshop.*;

public class App {

    public static Workshop<String> threadQueueWorkshop() {
        return ThreadQueueWorkshop.<String>newBuilder()
                .registerWorker("delivery", new SlowWorker<>("delivery-worker"))
                .registerWorker("processing", new SlowWorker<>("processing-worker"))
                .build();
    }

    public static Workshop<String> virtualWorkshop() throws Throwable {
        return VirtualWorkshop.<String>getBuilder()
                .registerWorker("delivery", new SlowWorker<>("delivery-worker"))
                .registerWorker("processing", new SlowWorker<>("processing-worker"))
                .build();
    }

    public static void main(String[] args) throws Throwable {
        Workshop<String> workshop = virtualWorkshop();

        EmittersSwarm swarm = EmittersSwarm.getBuilder()
                .setWorkshop(workshop)
                .build();

        try (swarm; workshop) {
            swarm.startAndAwait();
        }

        System.out.println("Done.");
    }
}

and same test suite, we just replace the Workshop type and will see that everything passes on all 10 executions! ✅

No In-Flight messages
Did you note that we don’t need to worry or handle in-flight messages as well? Because of the semaphore, there are no queues or possibility to enter this method without the worker finishing first.

Summary

As you can see, the virtual threads solution is much simpler and efficient, but it’s good to know and understand alternatives, because in some cases the queue might be a better approach.

It’s generally better to use completed solutions, but from using all of them we forget about how some things work or just never get a chance to build some of our own and learn from them.

Hope this exercise was fun, stimulated thinking for a little bit and maybe answered to some questions or gave some ideas 💡 . Here is the link to the repository if you want to check it fully.