Skip to content

How to Test MQTT Message Delivery in Java Integration Tests

I was writing integration tests for an MQTT-based messaging system and kept running into flaky tests. Sometimes they passed, sometimes they failed, and sometimes they just hung forever.

The problem? MQTT is asynchronous. When you publish a message, the test keeps running before the subscriber has a chance to receive it. Classic race condition.

The Naive Approach That Doesn’t Work

My first attempt looked something like this:

NaiveTest.java
@Test
void testMessageDeliveryWrong() {
Mqtt5AsyncClient subscriber = createSubscriber();
Mqtt5BlockingClient publisher = createPublisher();
// Subscribe and publish
subscriber.subscribeWith().topicFilter("test/topic").send().join();
publisher.publishWith().topic("test/topic").payload("hello".getBytes()).send();
// Problem: This assertion runs BEFORE the message arrives!
// How do we even check what was received?
}

This doesn’t work because there’s no way to capture the received message. The subscriber’s callback runs on a different thread, and the test thread has no visibility into it.

The Solution: CountDownLatch + AtomicReference

The trick is to use two Java concurrency primitives together:

  1. CountDownLatch - Makes the test thread wait until the callback fires
  2. AtomicReference - Safely passes data from callback thread to test thread
Thread Communication Diagram
┌─────────────────┐ ┌─────────────────┐
│ Test Thread │ │ Callback Thread │
├─────────────────┤ ├─────────────────┤
│ │ │ │
│ 1. Setup │ │ │
│ latch(1) │ │ │
│ ref={null} │ │ │
│ │ │ │
│ 2. Subscribe │ │ │
│ │ │ │
│ 3. Publish ─┼───MQTT──────▶ 4. Receive msg │
│ │ │ │
│ 5. await() │ │ 6. ref.set(msg) │
│ (blocks) │◀────signal───┤ latch.count() │
│ │ │ │
│ 7. Assert │ │ │
│ ref.get() │ │ │
└─────────────────┘ └─────────────────┘

The Complete Working Test

Here’s a complete integration test that actually works:

MqttIntegrationTest.java
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.*;
class MqttIntegrationTest {
@Test
void testMessageDelivery() throws InterruptedException {
// Create subscriber client
Mqtt5AsyncClient subscriber = Mqtt5Client.builder()
.identifier("test-sub-" + UUID.randomUUID())
.serverHost("broker.hivemq.com")
.serverPort(1883)
.buildAsync();
// Create publisher client
Mqtt5BlockingClient publisher = Mqtt5Client.builder()
.identifier("test-pub-" + UUID.randomUUID())
.serverHost("broker.hivemq.com")
.serverPort(1883)
.buildBlocking();
// Connect both clients
subscriber.connect().join();
publisher.connect();
// Test data
String topic = "test/topic/" + UUID.randomUUID();
String payload = "Hello MQTT Test " + System.currentTimeMillis();
// Synchronization primitives
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> receivedMessage = new AtomicReference<>();
// Register callback BEFORE subscribing
subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> {
String message = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
receivedMessage.set(message);
latch.countDown();
});
// Subscribe to topic
subscriber.subscribeWith()
.topicFilter(topic)
.send()
.join();
// Publish message
publisher.publishWith()
.topic(topic)
.payload(payload.getBytes(StandardCharsets.UTF_8))
.send();
// Wait for message with timeout
boolean received = latch.await(5, TimeUnit.SECONDS);
// Cleanup
publisher.disconnect();
subscriber.disconnect().join();
// Assertions
assertTrue(received, "Message should have been received within timeout");
assertEquals(payload, receivedMessage.get(), "Received payload should match published");
}
}

Key Points I Learned the Hard Way

1. Register Callback Before Subscribing

If you subscribe first, then register the callback, you might miss messages that arrive in between. Always register first:

Correct Order
// RIGHT: Callback first
subscriber.publishes(filter, callback);
subscriber.subscribeWith().topicFilter(topic).send().join();
// WRONG: Subscribe first
subscriber.subscribeWith().topicFilter(topic).send().join();
subscriber.publishes(filter, callback); // Might miss messages!

2. Always Use Timeout

Never use latch.await() without a timeout. If something goes wrong, your test hangs forever:

Timeout Patterns
// RIGHT: With timeout
assertTrue(latch.await(5, TimeUnit.SECONDS));
// WRONG: No timeout (test hangs forever on failure)
latch.await();

3. Use AtomicReference for Thread Safety

The callback runs on a different thread. Regular variables aren’t thread-safe:

Thread Safety
// RIGHT: Thread-safe
AtomicReference<String> received = new AtomicReference<>();
// In callback: received.set(message);
// WRONG: Not thread-safe
String received; // Regular field - visibility issues!

4. Unique Topics for Parallel Tests

If tests run in parallel, they might interfere with each other. Use unique topics:

Unique Topic Pattern
String topic = "test/topic/" + UUID.randomUUID();

Common Mistakes and How to Fix Them

MistakeSymptomFix
No timeout in await()Test hangs foreverAlways use await(timeout, unit)
Callback after subscribeLost messagesRegister callback before subscribing
Regular field for messageFlaky assertionsUse AtomicReference
Shared topic nameTests interfereUse UUID.randomUUID() in topic
Missing countDown()Timeout every timeEnsure callback calls latch.countDown()

Why This Matters for Real Projects

Integration tests verify your MQTT infrastructure actually works:

  • Network issues: Broker connectivity, firewall rules
  • Configuration: Client IDs, QoS levels, retain flags
  • Message format: Payload encoding, topic patterns
  • Clean-up: Proper disconnect handling

Unit tests can’t catch these. You need real MQTT communication.

Alternative Approaches

For more complex scenarios, consider:

Awaitility - More expressive waiting:

AwaitilityExample.java
await()
.atMost(5, TimeUnit.SECONDS)
.until(() -> receivedMessage.get() != null);

Testcontainers - Local broker for isolation:

TestcontainersExample.java
@Container
static GenericContainer<?> mqttBroker = new GenericContainer<>("hivemq/hivemq-ce:latest")
.withExposedPorts(1883);

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments