Skip to content

How to Subscribe to MQTT Topics in Java with HiveMQ Client

I was building an IoT application that needed to receive sensor data from MQTT brokers. The problem? My Java application wasn’t picking up any messages from subscribed topics, even though the publisher was working fine.

After some debugging, I realized I had misunderstood how the HiveMQ MQTT Client subscription model works. Here’s what I learned about properly subscribing to MQTT topics in Java.

The Problem: Missing Messages

I initially wrote code like this:

WrongSubscription.java
Mqtt5AsyncClient client = Mqtt5Client.builder()
.identifier("my-client")
.serverHost("broker.hivemq.com")
.serverPort(1883)
.buildAsync();
client.connect().join();
// Subscribe first
client.subscribeWith()
.topicFilter("sensors/temperature")
.send()
.join();
// Then register handler - TOO LATE!
client.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> {
System.out.println("Received: " + new String(publish.getPayloadAsBytes()));
});

Messages published between the subscription and handler registration were lost. The client received them but had nowhere to deliver them.

The Solution: Register Handler First

The fix was simple but critical: register the message handler before subscribing to topics.

Correct Flow
┌─────────────────────────────────────────────────────┐
│ TIMELINE │
├─────────────────────────────────────────────────────┤
│ │
│ 1. Connect to broker │
│ ↓ │
│ 2. Register callback with publishes() │
│ ↓ │
│ 3. Subscribe to topics │
│ ↓ │
│ 4. Messages flow through callback │
│ │
└─────────────────────────────────────────────────────┘

Here’s the corrected code:

CorrectSubscription.java
Mqtt5AsyncClient subscriber = Mqtt5Client.builder()
.identifier("subscriber-" + UUID.randomUUID())
.serverHost("broker.hivemq.com")
.serverPort(1883)
.buildAsync();
// Connect first
subscriber.connect().join();
// Register handler BEFORE subscribing - critical!
subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> {
String topic = publish.getTopic().toString();
String message = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
System.out.println("Received on " + topic + ": " + message);
});
// Now subscribe
subscriber.subscribeWith()
.topicFilter("sensors/temperature")
.send()
.join();

Understanding the Callback Pattern

The publishes() method registers a callback that gets invoked whenever a message arrives on any subscribed topic. The MqttGlobalPublishFilter.SUBSCRIBED filter ensures we only handle messages from topics we’ve subscribed to.

Message Flow
┌──────────────┐ Publish ┌──────────────┐
│ Publisher │ ──────────────→ │ MQTT Broker │
└──────────────┘ └──────┬───────┘
│ Forward
┌────────────────┐
│ Subscriber │
│ Client │
└───────┬────────┘
│ invokes
┌────────────────┐
│ Callback │
│ Handler │
└────────────────┘

The callback receives a Mqtt5Publish object containing:

  • Topic - where the message was published
  • Payload - the message content as bytes
  • QoS - quality of service level
  • Retain flag - whether this is a retained message
  • User properties - optional metadata

Testing Message Delivery

For unit tests, I needed to verify messages were actually received. A CountDownLatch works well:

TestSubscription.java
@Test
void shouldReceivePublishedMessage() throws InterruptedException {
Mqtt5AsyncClient subscriber = createClient();
subscriber.connect().join();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> receivedMessage = new AtomicReference<>();
subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> {
receivedMessage.set(new String(publish.getPayloadAsBytes()));
latch.countDown();
});
subscriber.subscribeWith()
.topicFilter("test/topic")
.send()
.join();
// Publish a test message
Mqtt5AsyncClient publisher = createClient();
publisher.connect().join();
publisher.publishWith()
.topic("test/topic")
.payload("test-payload".getBytes())
.send()
.join();
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals("test-payload", receivedMessage.get());
}

Common Mistakes to Avoid

1. Blocking in the Callback

The callback runs on an MQTT internal thread. Long-running operations will block other message processing:

BlockingCallback.java
// WRONG - blocks MQTT thread
subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> {
String message = new String(publish.getPayloadAsBytes());
database.save(message); // Blocking I/O!
sendEmail(message); // More blocking!
});
// RIGHT - offload to executor
ExecutorService executor = Executors.newCachedThreadPool();
subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> {
executor.submit(() -> {
String message = new String(publish.getPayloadAsBytes());
database.save(message);
sendEmail(message);
});
});

2. Forgetting to Call send()

The fluent builder pattern requires calling send() to actually execute the subscription:

MissingSend.java
// WRONG - nothing happens
subscriber.subscribeWith()
.topicFilter("topic");
// RIGHT - actually subscribes
subscriber.subscribeWith()
.topicFilter("topic")
.send()
.join();

3. Not Handling Connection Failures

Network issues happen. Always handle connection errors:

RobustConnection.java
subscriber.connect()
.whenComplete((connAck, throwable) -> {
if (throwable != null) {
logger.error("Connection failed", throwable);
// Implement retry logic
return;
}
logger.info("Connected successfully");
registerHandlerAndSubscribe();
});

Subscribing to Multiple Topics

You can subscribe to multiple topics with different QoS levels:

MultiTopicSubscription.java
// Subscribe to multiple topics
subscriber.subscribeWith()
.topicFilter("sensors/temperature")
.qos(MqttQos.AT_LEAST_ONCE)
.addTopicFilter()
.topicFilter("sensors/humidity")
.qos(MqttQos.AT_MOST_ONCE)
.send()
.join();
// Or use wildcards
subscriber.subscribeWith()
.topicFilter("sensors/#") // All sensors topics
.send()
.join();

The single callback handles all subscribed topics:

WildcardHandler.java
subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> {
String topic = publish.getTopic().toString();
if (topic.startsWith("sensors/temperature")) {
handleTemperature(publish.getPayloadAsBytes());
} else if (topic.startsWith("sensors/humidity")) {
handleHumidity(publish.getPayloadAsBytes());
}
});

Why Async Matters

The async API with callbacks enables event-driven architecture:

Threading Model
┌─────────────────────────────────────────────────────┐
│ MAIN THREAD │
│ - Creates client │
│ - Registers callbacks │
│ - Subscribes to topics │
│ - Continues other work │
└─────────────────────────────────────────────────────┘
│ non-blocking
┌─────────────────────────────────────────────────────┐
│ MQTT CALLBACK THREAD │
│ - Receives messages │
│ - Invokes registered callbacks │
│ - Handles reconnection │
└─────────────────────────────────────────────────────┘

This pattern scales well for applications handling thousands of messages per second without blocking the main application thread.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments