How to Test MQTT Message Delivery in Java Integration Tests
I was writing integration tests for an MQTT-based messaging system and kept running into flaky tests. Sometimes they passed, sometimes they failed, and sometimes they just hung forever.
The problem? MQTT is asynchronous. When you publish a message, the test keeps running before the subscriber has a chance to receive it. Classic race condition.
The Naive Approach That Doesn’t Work
My first attempt looked something like this:
@Testvoid testMessageDeliveryWrong() { Mqtt5AsyncClient subscriber = createSubscriber(); Mqtt5BlockingClient publisher = createPublisher();
// Subscribe and publish subscriber.subscribeWith().topicFilter("test/topic").send().join(); publisher.publishWith().topic("test/topic").payload("hello".getBytes()).send();
// Problem: This assertion runs BEFORE the message arrives! // How do we even check what was received?}This doesn’t work because there’s no way to capture the received message. The subscriber’s callback runs on a different thread, and the test thread has no visibility into it.
The Solution: CountDownLatch + AtomicReference
The trick is to use two Java concurrency primitives together:
- CountDownLatch - Makes the test thread wait until the callback fires
- AtomicReference - Safely passes data from callback thread to test thread
┌─────────────────┐ ┌─────────────────┐│ Test Thread │ │ Callback Thread │├─────────────────┤ ├─────────────────┤│ │ │ ││ 1. Setup │ │ ││ latch(1) │ │ ││ ref={null} │ │ ││ │ │ ││ 2. Subscribe │ │ ││ │ │ ││ 3. Publish ─┼───MQTT──────▶ 4. Receive msg ││ │ │ ││ 5. await() │ │ 6. ref.set(msg) ││ (blocks) │◀────signal───┤ latch.count() ││ │ │ ││ 7. Assert │ │ ││ ref.get() │ │ │└─────────────────┘ └─────────────────┘The Complete Working Test
Here’s a complete integration test that actually works:
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;import java.util.UUID;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.*;
class MqttIntegrationTest {
@Test void testMessageDelivery() throws InterruptedException { // Create subscriber client Mqtt5AsyncClient subscriber = Mqtt5Client.builder() .identifier("test-sub-" + UUID.randomUUID()) .serverHost("broker.hivemq.com") .serverPort(1883) .buildAsync();
// Create publisher client Mqtt5BlockingClient publisher = Mqtt5Client.builder() .identifier("test-pub-" + UUID.randomUUID()) .serverHost("broker.hivemq.com") .serverPort(1883) .buildBlocking();
// Connect both clients subscriber.connect().join(); publisher.connect();
// Test data String topic = "test/topic/" + UUID.randomUUID(); String payload = "Hello MQTT Test " + System.currentTimeMillis();
// Synchronization primitives CountDownLatch latch = new CountDownLatch(1); AtomicReference<String> receivedMessage = new AtomicReference<>();
// Register callback BEFORE subscribing subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { String message = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8); receivedMessage.set(message); latch.countDown(); });
// Subscribe to topic subscriber.subscribeWith() .topicFilter(topic) .send() .join();
// Publish message publisher.publishWith() .topic(topic) .payload(payload.getBytes(StandardCharsets.UTF_8)) .send();
// Wait for message with timeout boolean received = latch.await(5, TimeUnit.SECONDS);
// Cleanup publisher.disconnect(); subscriber.disconnect().join();
// Assertions assertTrue(received, "Message should have been received within timeout"); assertEquals(payload, receivedMessage.get(), "Received payload should match published"); }}Key Points I Learned the Hard Way
1. Register Callback Before Subscribing
If you subscribe first, then register the callback, you might miss messages that arrive in between. Always register first:
// RIGHT: Callback firstsubscriber.publishes(filter, callback);subscriber.subscribeWith().topicFilter(topic).send().join();
// WRONG: Subscribe firstsubscriber.subscribeWith().topicFilter(topic).send().join();subscriber.publishes(filter, callback); // Might miss messages!2. Always Use Timeout
Never use latch.await() without a timeout. If something goes wrong, your test hangs forever:
// RIGHT: With timeoutassertTrue(latch.await(5, TimeUnit.SECONDS));
// WRONG: No timeout (test hangs forever on failure)latch.await();3. Use AtomicReference for Thread Safety
The callback runs on a different thread. Regular variables aren’t thread-safe:
// RIGHT: Thread-safeAtomicReference<String> received = new AtomicReference<>();// In callback: received.set(message);
// WRONG: Not thread-safeString received; // Regular field - visibility issues!4. Unique Topics for Parallel Tests
If tests run in parallel, they might interfere with each other. Use unique topics:
String topic = "test/topic/" + UUID.randomUUID();Common Mistakes and How to Fix Them
| Mistake | Symptom | Fix |
|---|---|---|
| No timeout in await() | Test hangs forever | Always use await(timeout, unit) |
| Callback after subscribe | Lost messages | Register callback before subscribing |
| Regular field for message | Flaky assertions | Use AtomicReference |
| Shared topic name | Tests interfere | Use UUID.randomUUID() in topic |
| Missing countDown() | Timeout every time | Ensure callback calls latch.countDown() |
Why This Matters for Real Projects
Integration tests verify your MQTT infrastructure actually works:
- Network issues: Broker connectivity, firewall rules
- Configuration: Client IDs, QoS levels, retain flags
- Message format: Payload encoding, topic patterns
- Clean-up: Proper disconnect handling
Unit tests can’t catch these. You need real MQTT communication.
Alternative Approaches
For more complex scenarios, consider:
Awaitility - More expressive waiting:
await() .atMost(5, TimeUnit.SECONDS) .until(() -> receivedMessage.get() != null);Testcontainers - Local broker for isolation:
@Containerstatic GenericContainer<?> mqttBroker = new GenericContainer<>("hivemq/hivemq-ce:latest") .withExposedPorts(1883);Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments