How to Subscribe to MQTT Topics in Java with HiveMQ Client
I was building an IoT application that needed to receive sensor data from MQTT brokers. The problem? My Java application wasn’t picking up any messages from subscribed topics, even though the publisher was working fine.
After some debugging, I realized I had misunderstood how the HiveMQ MQTT Client subscription model works. Here’s what I learned about properly subscribing to MQTT topics in Java.
The Problem: Missing Messages
I initially wrote code like this:
Mqtt5AsyncClient client = Mqtt5Client.builder() .identifier("my-client") .serverHost("broker.hivemq.com") .serverPort(1883) .buildAsync();
client.connect().join();
// Subscribe firstclient.subscribeWith() .topicFilter("sensors/temperature") .send() .join();
// Then register handler - TOO LATE!client.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { System.out.println("Received: " + new String(publish.getPayloadAsBytes()));});Messages published between the subscription and handler registration were lost. The client received them but had nowhere to deliver them.
The Solution: Register Handler First
The fix was simple but critical: register the message handler before subscribing to topics.
┌─────────────────────────────────────────────────────┐│ TIMELINE │├─────────────────────────────────────────────────────┤│ ││ 1. Connect to broker ││ ↓ ││ 2. Register callback with publishes() ││ ↓ ││ 3. Subscribe to topics ││ ↓ ││ 4. Messages flow through callback ││ │└─────────────────────────────────────────────────────┘Here’s the corrected code:
Mqtt5AsyncClient subscriber = Mqtt5Client.builder() .identifier("subscriber-" + UUID.randomUUID()) .serverHost("broker.hivemq.com") .serverPort(1883) .buildAsync();
// Connect firstsubscriber.connect().join();
// Register handler BEFORE subscribing - critical!subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { String topic = publish.getTopic().toString(); String message = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8); System.out.println("Received on " + topic + ": " + message);});
// Now subscribesubscriber.subscribeWith() .topicFilter("sensors/temperature") .send() .join();Understanding the Callback Pattern
The publishes() method registers a callback that gets invoked whenever a message arrives on any subscribed topic. The MqttGlobalPublishFilter.SUBSCRIBED filter ensures we only handle messages from topics we’ve subscribed to.
┌──────────────┐ Publish ┌──────────────┐│ Publisher │ ──────────────→ │ MQTT Broker │└──────────────┘ └──────┬───────┘ │ │ Forward ↓ ┌────────────────┐ │ Subscriber │ │ Client │ └───────┬────────┘ │ │ invokes ↓ ┌────────────────┐ │ Callback │ │ Handler │ └────────────────┘The callback receives a Mqtt5Publish object containing:
- Topic - where the message was published
- Payload - the message content as bytes
- QoS - quality of service level
- Retain flag - whether this is a retained message
- User properties - optional metadata
Testing Message Delivery
For unit tests, I needed to verify messages were actually received. A CountDownLatch works well:
@Testvoid shouldReceivePublishedMessage() throws InterruptedException { Mqtt5AsyncClient subscriber = createClient(); subscriber.connect().join();
CountDownLatch latch = new CountDownLatch(1); AtomicReference<String> receivedMessage = new AtomicReference<>();
subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { receivedMessage.set(new String(publish.getPayloadAsBytes())); latch.countDown(); });
subscriber.subscribeWith() .topicFilter("test/topic") .send() .join();
// Publish a test message Mqtt5AsyncClient publisher = createClient(); publisher.connect().join(); publisher.publishWith() .topic("test/topic") .payload("test-payload".getBytes()) .send() .join();
assertTrue(latch.await(5, TimeUnit.SECONDS)); assertEquals("test-payload", receivedMessage.get());}Common Mistakes to Avoid
1. Blocking in the Callback
The callback runs on an MQTT internal thread. Long-running operations will block other message processing:
// WRONG - blocks MQTT threadsubscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { String message = new String(publish.getPayloadAsBytes()); database.save(message); // Blocking I/O! sendEmail(message); // More blocking!});
// RIGHT - offload to executorExecutorService executor = Executors.newCachedThreadPool();subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { executor.submit(() -> { String message = new String(publish.getPayloadAsBytes()); database.save(message); sendEmail(message); });});2. Forgetting to Call send()
The fluent builder pattern requires calling send() to actually execute the subscription:
// WRONG - nothing happenssubscriber.subscribeWith() .topicFilter("topic");
// RIGHT - actually subscribessubscriber.subscribeWith() .topicFilter("topic") .send() .join();3. Not Handling Connection Failures
Network issues happen. Always handle connection errors:
subscriber.connect() .whenComplete((connAck, throwable) -> { if (throwable != null) { logger.error("Connection failed", throwable); // Implement retry logic return; } logger.info("Connected successfully"); registerHandlerAndSubscribe(); });Subscribing to Multiple Topics
You can subscribe to multiple topics with different QoS levels:
// Subscribe to multiple topicssubscriber.subscribeWith() .topicFilter("sensors/temperature") .qos(MqttQos.AT_LEAST_ONCE) .addTopicFilter() .topicFilter("sensors/humidity") .qos(MqttQos.AT_MOST_ONCE) .send() .join();
// Or use wildcardssubscriber.subscribeWith() .topicFilter("sensors/#") // All sensors topics .send() .join();The single callback handles all subscribed topics:
subscriber.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { String topic = publish.getTopic().toString();
if (topic.startsWith("sensors/temperature")) { handleTemperature(publish.getPayloadAsBytes()); } else if (topic.startsWith("sensors/humidity")) { handleHumidity(publish.getPayloadAsBytes()); }});Why Async Matters
The async API with callbacks enables event-driven architecture:
┌─────────────────────────────────────────────────────┐│ MAIN THREAD ││ - Creates client ││ - Registers callbacks ││ - Subscribes to topics ││ - Continues other work │└─────────────────────────────────────────────────────┘ │ │ non-blocking ↓┌─────────────────────────────────────────────────────┐│ MQTT CALLBACK THREAD ││ - Receives messages ││ - Invokes registered callbacks ││ - Handles reconnection │└─────────────────────────────────────────────────────┘This pattern scales well for applications handling thousands of messages per second without blocking the main application thread.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments