HiveMQ Extension SDK Services
HiveMQ services provide a convenient way for extensions to interact with the HiveMQ core. You can access HiveMQ Extension SDK services through the Services class.
The HiveMQ Community Extension SDK provides the following services:
Service | Description |
---|---|
Allows extensions to get client session information, disconnect clients, and remove client sessions. |
|
Allows extensions to get, remove, and add subscriptions for specific clients. |
|
Allows extensions to get, remove, and add retained messages for specific topics. Or delete all retained messages at once. |
|
Allows extensions to manage client connection attributes that have the same lifetime as the connection. |
|
Allows extensions to send PUBLISH messages. |
|
Allows extensions to use a HiveMQ-managed executor service for non-blocking operations. |
|
Allows extensions to get information about the broker instance. |
|
Allows extensions to dynamically discover HiveMQ cluster nodes. |
The HiveMQ Enterprise Extension SDK offers these additional HiveMQ services:
Service | Description |
---|---|
Allows extensions to consume messages from a set of specified topics and map the messages to other topics. |
|
Allows extensions to get, remove, and add session attributes. |
|
Allows extensions to send and receive non-MQTT messages for internal cluster communication. |
|
Allows extensions to add custom views with different layouts to the HiveMQ Control Center. |
|
Allows extensions to register a custom REST API application with the HiveMQ REST API. |
|
Allows extensions to interact with the events of specific clients over a defined timeframe. |
HiveMQ Community Extension SDK Services
Client Service
The Client Service allows extensions to gather information about clients:
-
Online status
-
Client identifier
-
Session expiry interval
Extensions can use the Client Service to do the following tasks:
-
Query the connection status of a client.
-
Get session information of a client.
-
Disconnect a specific client with the option to send or not send the last-will message.
-
Invalidate the session of a client, regardless of whether the client is currently connected and disconnected. For connected clients, invalidation disconnects the client and sends the last-will message.
-
Iterate over all clients and associated client session information.
For more information, see Client Services JavaDoc.
Query Client Connection
This example shows how to get information about the online status of a client:
The isClientConnected
method returns true
for online clients and false
for offline clients.
CompletableFuture<Boolean> connectedFuture = clientService.isClientConnected("my-client-id");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String clientId = "client-123";
final ClientService clientService = Services.clientService();
CompletableFuture<Boolean> connectedFuture = clientService.isClientConnected(clientId);
connectedFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
@Override
public void accept(Boolean connected, Throwable throwable) {
if(throwable == null) {
System.out.println("Client with id {" + clientId + "} is connected: " + connected);
} else {
//please use more sophisticated logging
throwable.printStackTrace();
}
}
});
}
...
Get Session Information
This example shows how to get all session information for a client with a specific client ID.
The getSession
method returns an Optional
of a SessionInformation
object.
If no session is found, the object is empty. Otherwise, it contains the following information:
-
The online connection status of the client
-
The session expiry interval of the client
-
The client identifier that owns the session
CompletableFuture<Optional<SessionInformation>> sessionFuture = clientService.getSession("my-client-id");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String clientId = "my-client-id";
final ClientService clientService = Services.clientService();
CompletableFuture<Optional<SessionInformation>> sessionFuture = clientService.getSession(clientId);
sessionFuture.whenComplete(new BiConsumer<Optional<SessionInformation>, Throwable>() {
@Override
public void accept(Optional<SessionInformation> sessionInformationOptional, Throwable throwable) {
if(throwable == null) {
if(sessionInformationOptional.isPresent()){
SessionInformation information = sessionInformationOptional.get();
System.out.println("Session Found");
System.out.println("ID: " + information.getClienIdentifier());
System.out.println("Connected: " + information.isConnected());
System.out.println("Session Expiry Interval " + information.getSessionExpiryInterval());
} else {
System.out.println("No session found for client id: " + clientId);
}
} else {
//please use more sophisticated logging
throwable.printStackTrace();
}
}
});
}
...
Disconnect Client
This example shows how to forcibly disconnect a client with a specific client ID. You can also select not to send the optional last-will message of the client on this disconnect.
The disconnectClient
method returns true
when an online client is disconnected.
Otherwise, the method returns false
.
The following examples show you how to disconnect the client with and without sending the last-will message and with disconnect reason information:
clientService.disconnectClient("my-client-id");
clientService.disconnectClient("my-client-id", true)
clientService.disconnectClient("my-client-id", true, DisconnectReasonCode.NORMAL_DISCONNECTION, "my-reason-string")
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String clientId = "client-123";
final ClientService clientService = Services.clientService();
CompletableFuture<Boolean> disconnectFuture = clientService.disconnectClient(clientId, true);
disconnectFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
@Override
public void accept(Boolean disconnected, Throwable throwable) {
if(throwable == null) {
if(disconnected){
System.out.println("Client was successfully disconnected and no Will message was sent");
} else {
System.out.println("Client not found");
}
} else {
//please use more sophisticated logging
throwable.printStackTrace();
}
}
});
}
...
Use of a deprecated DisconnectReasonCode throws an exception. Deprecated codes include: CLIENT_IDENTIFIER_NOT_VALID, DISCONNECT_WITH_WILL_MESSAGE, and BAD_AUTHENTICATION_METHOD. |
Invalidate Client Session
This example shows how to invalidate a client session.
Invalidation of a client session forcibly disconnects an online client and sends the optional last-will message of the client. The session information of the client is removed and cannot be restored.
The invalidateSession
method returns true
when an online client is disconnected.
Otherwise the method returns false
.
clientService.invalidateSession("my-client-id");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String clientId = "client-123";
final ClientService clientService = Services.clientService();
CompletableFuture<Boolean> invalidateSessionFuture = clientService.invalidateSession(clientId);
invalidateSessionFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
@Override
public void accept(Boolean disconnected, Throwable throwable) {
if(throwable == null) {
if(disconnected){
System.out.println("Client was disconnected");
System.out.println("Will message was sent");
System.out.println("Client session was removed");
} else {
System.out.println("Client was offline");
System.out.println("Client session was removed");
}
} else {
if(throwable instanceof NoSuchClientIdException){
System.out.println("Client not found");
}
//please use more sophisticated logging
throwable.printStackTrace();
}
}
});
}
...
Iterate All Clients
You can use the Client Service to iterate the session information of all clients. This iteration includes all currently connected clients and all disconnected clients with sessions that are not yet expired.
To use iteration over all clients, every node in the HiveMQ cluster must run HiveMQ version 4.2.0 or higher. |
The callback passed to the iterateAllClients
method is called once for each client.
By default, the Managed Extension Executor Service executes the callback.
However, you can also pass your own executor.
Session information is not provided to the callback in any particular order.
clientService.iterateAllClients(new IterationCallback<SessionInformation>() {
@Override
public void iterate(IterationContext context, SessionInformation sessionInformation) {
// this callback is called for every client with its session information
}
});
In large-scale deployments, iteration over all clients can be a very expensive operation. Do not call the method in short time intervals. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final ClientService clientService = Services.clientService();
// this is the default executor but used as executor argument for demonstration purposes
final Executor executor = Services.extensionExecutorService();
final Pattern pattern = Pattern.compile("client-[1-9]+");
CompletableFuture<Void> iterationFuture = clientService.iterateAllClients(
new IterationCallback<SessionInformation>() {
@Override
public void iterate(IterationContext context, SessionInformation sessionInformation) {
final String clientIdentifier = sessionInformation.getClientIdentifier();
if (pattern.matcher(clientIdentifier).matches()) {
System.out.println("Found client for pattern " + clientIdentifier);
// abort the iteration if you are not interested in the remaining information as this saves resources
context.abortIteration();
}
}
}, executor);
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Iterated all clients");
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final ClientService clientService = Services.clientService();
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = clientService.iterateAllClients(
new IterationCallback<SessionInformation>() {
@Override
public void iterate(IterationContext context, SessionInformation sessionInformation) {
if (sessionInformation.isConnected()) {
counter.incrementAndGet();
}
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Connected clients: " + counter.get());
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
If the topology of the cluster changes during the iteration, the iteration is canceled. For example, if a network splits or a node leaves or joins the cluster. |
The following example shows how topology changes can be handled:
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
iterate(0);
}
public void iterate(final int attempts) {
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = Services.clientService().iterateAllClients(
new IterationCallback<SessionInformation>() {
@Override
public void iterate(IterationContext context, SessionInformation sessionInformation) {
if (sessionInformation.isConnected()) {
counter.incrementAndGet();
}
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Connected clients: " + counter.get());
// in case the cluster topology changes during iteration, an IterationFailedException is thrown
} else if (throwable instanceof IterationFailedException) {
// only retry 3 times
if (attempts < 3) {
final int newAttemptCount = attempts + 1;
Services.extensionExecutorService().schedule(() ->
iterate(newAttemptCount), newAttemptCount * 10, TimeUnit.SECONDS); // schedule retry with delay in case topology change is not over, else we would get another IterationFailedException
} else {
System.out.println("Could not fully iterate all clients.");
}
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
Subscription Store
The Subscription Store allows extensions to do the following:
-
Get all subscriptions from a client
-
Add a subscription for a client
-
Remove a subscription from a client
-
Add multiple subscriptions for a client
-
Remove multiple subscriptions from a client
-
Iterate all subscribers with subscriptions to a specified topic filter
-
Iterate all subscribers with subscriptions that match a specified topic
For more information, see Subscription Store JavaDoc.
Add Subscription
This example shows how to add a subscription to a specific client with the Subscription Store.
TopicSubscription subscription = Builders.topicSubscription()
.topicFilter(topic)
.qos(Qos.AT_MOST_ONCE)
.build();
Services.subscriptionStore().addSubscription("test-client", subscription);
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String topic = "topic";
final String clientId = "test-client";
TopicSubscriptionBuilder subscriptionBuilder = Builders.topicSubscription()
.topicFilter(topic)
.noLocal(false)
.retainAsPublished(true)
.qos(Qos.AT_MOST_ONCE)
.subscriptionIdentifier(1);
CompletableFuture<Void> addFuture = Services.subscriptionStore().addSubscription(clientId, subscriptionBuilder.build());
addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
System.out.println("Successfully added subscription for topic: " + topic + " | client: " + clientId);
}
});
}
...
When you use the HiveMQ extension system or the HiveMQ Control Center to add a subscription for a client, the retained messages on the topics in the subscription are not published to the client. |
Add Multiple Subscriptions
This example shows how to add multiple subscriptions to a specific client with the Subscription Store.
final Set<TopicSubscription> topicSet = new HashSet<>();
topicSet.add(Builders.topicSubscription().topicFilter("$share/group/topic1").build());
topicSet.add(Builders.topicSubscription().topicFilter("topic2").build());
topicSet.add(Builders.topicSubscription().topicFilter("topic3").build());
Services.subscriptionStore().addSubscriptions("test-client", topicSet);
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
final String clientID = "test-client";
final Set<TopicSubscription> topicSet = new HashSet<>();
topicSet.add(Builders.topicSubscription().topicFilter("$share/group/topic1").build());
topicSet.add(Builders.topicSubscription().topicFilter("topic2").build());
topicSet.add(Builders.topicSubscription().topicFilter("topic3").build());
final CompletableFuture<Void> addFuture = subscriptionStore.addSubscriptions(clientID, topicSet);
addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(final Void result, final Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
System.out.println("Successfully added subscriptions to client: " + clientID);
}
});
}
...
Remove Subscription
This example shows how to remove a subscription from a specific client with the Subscription Store.
Services.subscriptionStore().removeSubscription("test-client", "topic/to/remove");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String topic = "topic";
final String clientId = "test-client";
CompletableFuture<Void> removeFuture = Services.subscriptionStore().removeSubscription(clientId, topic);
removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
System.out.println("Successfully removed subscription for topic: " + topic + " | client: " + clientId);
}
});
}
...
Remove Multiple Subscriptions
This example shows how to remove multiple subscriptions from a specific client with the Subscription Store.
final Set<String> topicSet = new HashSet<>();
topicSet.add("topic1");
topicSet.add("topic2");
Services.subscriptionStore().removeSubscriptions("test-client", topicSet);
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
final String clientID = "test-client";
final Set<String> topicSet = new HashSet<>();
topicSet.add("$share/group/topic1");
topicSet.add("topic2");
topicSet.add("topic3");
final CompletableFuture<Void> removeFuture = subscriptionStore.removeSubscriptions(clientID, topicSet);
removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(final Void result, final Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
System.out.println("Successfully removed subscriptions for topics: " + topicSet + " | from client: " + clientID);
}
});
}
...
Get Subscriptions
This example shows how to get the subscriptions from a specific client with the Subscription Store.
CompletableFuture<Set<TopicSubscription>> future = Services.subscriptionStore().getSubscriptions("test-client");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String clientId = "test-client";
CompletableFuture<Set<TopicSubscription>> getFuture = Services.subscriptionStore().getSubscriptions(clientId);
getFuture.whenComplete(new BiConsumer<Set<TopicSubscription>, Throwable>() {
@Override
public void accept(Set<TopicSubscription> topicSubscriptions, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
if(topicSubscriptions.isEmpty()){
System.out.println("Found no subscriptions for client: " + clientId);
return;
}
System.out.println("Found subscriptions for client: " + clientId);
for (TopicSubscription topicSubscription : topicSubscriptions) {
System.out.println("---------------------");
System.out.println("Topic :" + topicSubscription.getTopicFilter());
System.out.println("Qos :" + topicSubscription.getQos().getQosNumber());
System.out.println("No local :" + topicSubscription.getNoLocal());
System.out.println("Retain as published :" + topicSubscription.getRetainAsPublished());
System.out.println("Subscription identifier :" + topicSubscription.getSubscriptionIdentifier());
}
}
});
}
...
Iterate All Subscriptions
You can use the Subscription Store to iterate all subscriptions of all clients. This iteration includes the subscriptions of all currently connected clients and all disconnected clients with sessions that are not yet expired.
The callback passed to the iterateAllSubscriptions
method is called once for each client. All subscriptions of the respective client are provided per method call.
By default, the Managed Extension Executor Service executes the callback.
However, you can also pass your own executor.
Subscriptions are not provided to the callback in any particular order.
To use iteration over all clients, every node in the HiveMQ cluster must run HiveMQ version 4.2.0 or higher. |
subscriptionStore.iterateAllSubscriptions(new IterationCallback<SubscriptionsForClientResult>() {
@Override
public void iterate(IterationContext context, SubscriptionsForClientResult subscriptionsForClient) {
// this callback is called for every client with its subscriptions
final String clientId = subscriptionsForClient.getClientId();
final Set<TopicSubscription> subscriptions = subscriptionsForClient.getSubscriptions();
}
});
In large scale deployments, iteration over all clients can be a very expensive operation. Do not call the method in short time intervals. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
// this is the default executor but used as executor argument for demonstration purposes
final Executor executor = Services.extensionExecutorService();
final Pattern pattern = Pattern.compile("client-[1-9]+");
CompletableFuture<Void> iterationFuture = subscriptionStore.iterateAllSubscriptions(
new IterationCallback<SubscriptionsForClientResult>() {
@Override
public void iterate(IterationContext context, SubscriptionsForClientResult subscriptionsForClient) {
final String clientIdentifier = subscriptionsForClient.getClientId();
final Set<TopicSubscription> subscriptions = subscriptionsForClient.getSubscriptions();
if (pattern.matcher(clientIdentifier).matches()) {
System.out.println("Found client with subscriptions " + subscriptions);
// abort the iteration if you are not interested in the remaining information as this saves resources
context.abortIteration();
}
}
},
executor);
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Iterated all subscriptions");
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = subscriptionStore.iterateAllSubscriptions(
new IterationCallback<SubscriptionsForClientResult>() {
@Override
public void iterate(IterationContext context, SubscriptionsForClientResult subscriptionsForClient) {
counter.addAndGet(subscriptionsForClient.getSubscriptions().size());
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Number of subscriptions: " + counter.get());
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
If the topology of the cluster changes during the iteration, the iteration is canceled. For example, if a network splits or a node leaves or joins. |
The following example shows how topology changes during the iteration can be handled:
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
iterate(0);
}
public void iterate(final int attempts) {
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = Services.subscriptionStore().iterateAllSubscriptions(
new IterationCallback<SubscriptionsForClientResult>() {
@Override
public void iterate(IterationContext context, SubscriptionsForClientResult subscriptionsForClient) {
counter.addAndGet(subscriptionsForClient.getSubscriptions().size());
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Number of subscriptions: " + counter.get());
// in case the cluster topology changes during iteration, an IterationFailedException is thrown
} else if (throwable instanceof IterationFailedException) {
// only retry 3 times
if (attempts < 3) {
final int newAttemptCount = attempts + 1;
Services.extensionExecutorService().schedule(() ->
iterate(newAttemptCount), newAttemptCount * 10, TimeUnit.SECONDS); // schedule retry with delay in case topology change is not over, else we would get another IterationFailedException
} else {
System.out.println("Could not fully iterate all clients.");
}
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
Iterate All Subscribers with Subscriptions to a Specified Topic Filter
You can use the Subscription Store to iterate all subscribers that have subscriptions with a specified topic filter. Filtered iteration is best practice when you are only interested in subscribers that have subscriptions with a specific topic filter. This method is more resource-efficient than iterating all subscriptions.
To filter subscriptions even more precisely, you can limit the iteration to shared or non-shared (individual) subscriptions.
The callback passed to the iterateAllSubscribersWithTopicFilter
method is called one time for each client that has a subscription with the specified topic filter.
By default, the Managed Extension Executor Service executes the callback.
However, you can also pass your own executor.
Subscribers are not provided to the callback in any particular order.
To use iteration over all clients, every node in the HiveMQ cluster must run HiveMQ version 4.2.0 or higher. |
Example:
For the topic filter example/#
, the iteration covers all clients that have a subscription with the exact same topic filter:
example/#
The specified iteration does not cover subscriptions with the following topic filters:
-
example/topic
-
example/+
-
+/#
-
and other wildcard matches
The method iterateAllSubscribersWithTopicFilter only provides subscribers that have subscribed with the exact
same topic filter as specified.To query subscriptions with the usual topic filter matching algorithm, use iterateAllSubscribersForTopic .
|
subscriptionStore.iterateAllSubscribersWithTopicFilter("example/topic",
new IterationCallback<SubscriberWithFilterResult>() {
@Override
public void iterate(IterationContext context, SubscriberWithFilterResult subscriberWithFilter) {
// this callback is called for every client that has a subscription with the specified topic filter
final String clientId = subscriberWithFilter.getClientId();
}
});
In large scale deployments, iteration over all clients can be a very expensive operation. Do not call the method in short time intervals. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
// this is the default executor but used as executor argument for demonstration purposes
final Executor executor = Services.extensionExecutorService();
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = subscriptionStore.iterateAllSubscribersWithTopicFilter(
"example/topic",
SubscriptionType.INDIVIDUAL,
new IterationCallback<SubscriberWithFilterResult>() {
@Override
public void iterate(IterationContext context, SubscriberWithFilterResult subscriberWithFilter) {
counter.incrementAndGet();
}
},
executor);
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Number of subscribers with specified topic filter: " + counter.get());
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
Iterate All Subscribers with Subscriptions that Match a Specified Topic
You can use the Subscription Store to iterate all subscribers with subscriptions that match a specified topic. When you are only interested in subscribers with subscriptions that match a specific topic, filtered iteration is best practice. This method is more resource-efficient than iterating all subscriptions.
To filter subscriptions even more precisely, you can limit the iteration to shared or non-shared (individual) subscriptions.
The callback passed to the iterateAllSubscribersForTopic
method is called one time for each client that has a
subscription that matches the specified topic.
By default, the Managed Extension Executor Service executes the callback.
However, you can also pass your own executor.
Subscribers are not provided to the callback in any particular order.
To use iteration over all clients, every node in the HiveMQ cluster must run HiveMQ version 4.2.0 or higher. |
Example:
For the topic example/topic
, the iteration covers all clients that have a subscription with the following topic filters:
-
example/topic
-
example/#
-
example/+
-
+/#
-
and other wildcard matches
subscriptionStore.iterateAllSubscribersForTopic("example/topic",
new IterationCallback<SubscriberWithFilterResult>() {
@Override
public void iterate(IterationContext context, SubscriberWithFilterResult subscriberWithFilter) {
// this callback is called for every client that has a subscription matching the specified topic
final String clientId = subscriberWithFilter.getClientId();
}
});
In large scale deployments, iteration over all clients can be a very expensive operation. Do not call the method in short time intervals. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
// this is the default executor but used as executor argument for demonstration purposes
final Executor executor = Services.extensionExecutorService();
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = subscriptionStore.iterateAllSubscribersForTopic(
"example/topic",
SubscriptionType.INDIVIDUAL,
new IterationCallback<SubscriberWithFilterResult>() {
@Override
public void iterate(IterationContext context, SubscriberWithFilterResult subscriberWithFilter) {
counter.incrementAndGet();
}
},
executor);
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Number of subscribers for specified topic: " + counter.get());
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
Retained Message Store
The Retained Message Store enables extensions to interact with retained messages in the following ways:
-
Get the retained message for a specific topic
-
Add or replace the retained message for a topic
-
Remove the retained message for a topic
-
Clear all retained messages from the HiveMQ cluster
-
Iterate over all retained messages that are stored in the HiveMQ cluster
The RetainedMessageStore
can be accessed through the Services
class.
The retained messages that the Retained Message Store adds are processed differently than retained messages that clients send. Clients that are currently subscribed to the topic where the retained message is added do not receive the retained message from the Retained Message Store as a publish. The newly added retained message is only available to clients that subscribe or resubscribe to the topic after the Retained Message Store added the message. |
For more information, see Retained Message Store JavaDoc.
Access Retained Message Store
final RetainedMessageStore store = Services.retainedMessageStore();
To avoid errors such as an IterationFailedException , verify that your HiveMQ instance has started successfully before you call methods in your extension start. For more information, see Admin Service.
|
Add Retained Message
This example shows how to add a retained message to a specific topic with the Retained Message Store.
When you add a retained message to a specific topic with the Retained Message Store, the newly-added message overwrites any existing retained message on the selected topic. |
RetainedPublish retainedMessage = retainedPublishBuilder
.topic("add/message")
.payload(ByteBuffer.wrap("test".getBytes()))
.qos(Qos.AT_LEAST_ONCE)
.build();
Services.retainedMessageStore().addOrReplace(retainedMessage);
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
// 1. build the retained message via the RetainedPublishBuilder
final RetainedPublishBuilder retainedPublishBuilder = Builders.retainedPublish();
final RetainedPublish retainedMessage = retainedPublishBuilder
.topic("add/message")
.payload(ByteBuffer.wrap("test".getBytes()))
.userProperty("reason","message-update")
.qos(Qos.AT_LEAST_ONCE)
.build();
// 2. add the retained message (if a retained message already exists for the topic, it will be overwritten)
final CompletableFuture<Void> addFuture = Services.retainedMessageStore().addOrReplace(retainedMessage);
addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
// 3. log when the message was successfully added/replaced
System.out.println("Successfully added retained message for topic: " + topic);
}
});
}
...
Remove Retained Message
This example shows how to remove a retained message from a specific topic with the Retained Message Store.
Services.retainedMessageStore().remove("topic/to/remove");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String topic = "topic";
// 1. remove the retained message from the given topic
final CompletableFuture<Void> removeFuture = Services.retainedMessageStore().remove(topic);
removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
// 2. log when the message was successfully removed (also happens when no retained message for that topic)
System.out.println("Successfully removed retained message for topic: " + topic);
}
});
}
...
Get Retained Message
This example shows how to get the retained message from a specific topic with the Retained Message Store.
CompletableFuture<Optional<RetainedPublish>> future = Services.retainedMessageStore().getRetainedMessage("topic/to/get");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String topic = "topic";
// 1. request retained message for topic
final CompletableFuture<Optional<RetainedPublish>> getFuture = Services.retainedMessageStore().getRetainedMessage(topic);
getFuture.whenComplete(new BiConsumer<Optional<RetainedPublish>, Throwable>() {
@Override
public void accept(Optional<RetainedPublish> retainedPublishOptional, Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace();
return;
}
// 2. check if a retained message exists for that topic
if (!retainedPublishOptional.isPresent()) {
System.out.println("Found no retained message for topic: " + topic);
return;
}
// 3. log some information about the retained message
final RetainedPublish retainedPublish = retainedPublishOptional.get();
System.out.println("Found retained message for topic: " + topic);
System.out.println("---------------------");
System.out.println("Topic :" + retainedPublish.getTopic());
System.out.println("Qos :" + retainedPublish.getQos().getQosNumber());
}
});
}
...
Clear All Retained Messages
This example shows how to remove all retained messages from a HiveMQ cluster with the Retained Message Store.
Services.retainedMessageStore().clear()
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
// 1. request to delete all retained messages from the HiveMQ cluster
final CompletableFuture<Void> clearFuture = Services.retainedMessageStore().clear();
clearFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
// 2. log when all retained messages were removed
System.out.println("Successfully removed all retained messages");
}
});
}
...
Iterate All Retained Messages
You can use the Retained Message Store to iterate over all stored retained messages in HiveMQ.
The callback passed to the iterateAllRetainedMessages
method is called one time for each retained message.
The call for each retained messages contains all information for the retained message in the form of a RetainedPublish
and an IterationContext
that can be used to cancel the iteration prematurely if desired.
By default, the Managed Extension Executor Service executes the callback.
However, you can also pass your own executor.
Retained message information is not provided to the callback in any particular order.
To iterate over all retained messages, all nodes in the HiveMQ cluster must run HiveMQ version 4.4.0 or higher. |
Services.retainedMessageStore().iterateAllRetainedMessages(new IterationCallback<RetainedPublish>() {
@Override
public void iterate(final @NotNull IterationContext context, final @NotNull RetainedPublish retainedPublish) {
// this callback is called for every stored retained message
}
});
If you have large numbers of retained messages, iteration over all retained messages can be resource intensive. Avoid calling this method in short time intervals. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final RetainedMessageStore retainedMessageStore = Services.retainedMessageStore();
// this is the default executor but used as executor argument for demonstration purposes
final Executor executor = Services.extensionExecutorService();
final Pattern pattern = Pattern.compile("sensor/id-[1-9]+");
CompletableFuture<Void> iterationFuture = retainedMessageStore.iterateAllRetainedMessages(
new IterationCallback<RetainedPublish>() {
@Override
public void iterate(final @NotNull IterationContext context, final @NotNull RetainedPublish retainedPublish) {
final String retainedPublishTopic = retainedPublish.getTopic();
if (pattern.matcher(retainedPublishTopic).matches()) {
System.out.println("Found retained messages with topic matching pattern " + retainedPublishTopic);
// abort the iteration if you are not interested in the remaining retained messages as this saves resources
context.abortIteration();
}
}
}, executor);
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Iteration over retained messages complete"); // this will also be called if iteration is aborted manually
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final RetainedMessageStore retainedMessageStore = Services.retainedMessageStore();
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = retainedMessageStore.iterateAllRetainedMessages(
new IterationCallback<RetainedPublish>() {
@Override
public void iterate(final @NotNull IterationContext context, final @NotNull RetainedPublish retainedPublish) {
if (retainedPublish.getQos() == Qos.EXACTLY_ONCE) {
counter.incrementAndGet();
}
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Retained messages with QoS level 2: " + counter.get());
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
If the topology of the cluster changes during the iteration, the iteration is canceled. For example, when a network splits or a node leaves or joins the network. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
iterate(0);
}
public void iterate(final int attempts) {
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = Services.retainedMessageStore().iterateAllRetainedMessages(
new IterationCallback<RetainedPublish>() {
@Override
public void iterate(final @NotNull IterationContext context, final @NotNull RetainedPublish retainedPublish) {
if (retainedPublish.getQos() == Qos.EXACTLY_ONCE) {
counter.incrementAndGet();
}
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Retained messages with QoS level 2: " + counter.get());
// in case the cluster topology changes during iteration, an IterationFailedException is thrown
} else if (throwable instanceof IterationFailedException) {
// only retry 3 times
if (attempts < 3) {
final int newAttemptCount = attempts + 1;
Services.extensionExecutorService().schedule(() ->
iterate(newAttemptCount), newAttemptCount * 10, TimeUnit.SECONDS); // schedule retry with delay in case topology change is not over, else we would get another IterationFailedException
} else {
System.out.println("Could not fully iterate all retained messages.");
}
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
Connection Attribute Store
The Connection Attribute Store is a key-value store that preserves data as additional information in the MQTT client connection. All data is stored in memory. The maximum size of a single key-value pair is 10 kilobytes.
The Connection Attribute Store allows extensions to do the following:
-
Set a connection attribute for a connected client
-
Set a connection attribute as a UTF-8 string for the connected client
-
Set a connection attribute with a defined character set for the connected client
-
Get the value of a connection attribute for the connected client with a given key
-
Get the value of a connection attribute as a UTF-8 string for the connected client with a given key>>
-
Get the value of a connection attribute in a defined character set for the connected client with a given key>>
-
Get all connection attributes for a connected client
-
Remove a connection attribute with the given key for a connected client
-
Clear all connection attributes for a connected client
The Connection Attribute Store is useful for storing temporary data for a connected MQTT client or data that you want to clean up automatically after the client disconnects. The Connection Attribute Store is also useful for storing temporary information that you want to share across callbacks.
Through the Connection Attribute Store an extension can manage client connection attributes with the same lifetime as the connection.
The Connection Attribute Store is thread safe.
For more information, see Connection Attributes Store JavaDoc.
Access Connection Attribute Store
You access the Connection Attribute Store through the client connection on a particular node, not via the generic services class. For example, through different lifecycle events or interceptors. The following code examples show some ways to access the Connection Attribute Store: |
// Examples to access the Connection Attribute Store via a client lifecycle event method
Services.eventRegistry().setClientLifecycleEventListener(input1 -> new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
// access the Connection Attribute Store via the connection information from the ConnectionStartInput interface
final ConnectionAttributeStore connectionAttributeStore = connectionStartInput.getConnectionInformation().getConnectionAttributeStore();
// use the ConnectionAttributeStore
connectionAttributeStore.putAsString("my key", "my value");
}
@Override
public void onAuthenticationSuccessful(final @NotNull AuthenticationSuccessfulInput authenticationSuccessfulInput) {
// access the Connection Attribute Store via the connection information from the AuthenticationSuccessfulInput interface
final ConnectionAttributeStore connectionAttributeStore = authenticationSuccessfulInput.getConnectionInformation().getConnectionAttributeStore();
// use the ConnectionAttributeStore
connectionAttributeStore.putAsString("my key2", "my value2");
}
@Override
public void onDisconnect(final @NotNull DisconnectEventInput disconnectEventInput) {
// access the Connection Attribute Store via the connection information from the DisconnectEventInput interface
final ConnectionAttributeStore connectionAttributeStore = disconnectEventInput.getConnectionInformation().getConnectionAttributeStore();
// use the ConnectionAttributeStore
connectionAttributeStore.putAsString("my key3", "my value3");
}
});
// Example to access the Connection Attribute Store in an interceptor. You can access the Connection Attribute Store from any interceptor.
Services.initializerRegistry().setClientInitializer(new ClientInitializer() {
@Override
public void initialize(final @NotNull InitializerInput initializerInput, final @NotNull ClientContext clientContext) {
clientContext.addPublishInboundInterceptor(new PublishInboundInterceptor() {
@Override
public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
// get the Connection Attribute Store via the connection information in the input object
final ConnectionAttributeStore connectionAttributeStore = publishInboundInput.getConnectionInformation().getConnectionAttributeStore();
}
});
}
});
Set Connection Attribute
This example shows how to set the given connection attribute for the connected client.
By default, connection attributes store information as binary data. For your convenience, methods such as putAsString(String, String) are available in case string representations need to be stored. For more information, see Set Connection Attribute as UTF-8 String and Set Connection Attribute with Defined Character Set. If you need to store complex objects in the Connection Attribute Store, you must implement manual serialization and deserialization in your extension.
|
// put a ByteBuffer as the value
// this example uses an empty byte array for the binary data
byte[] binaryData = new byte[1000];
// wrap the byte array inside a ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.wrap(binaryData);
// set the created ByteBuffer as the value for the key "my data"
connectionAttributeStore.put("my data", byteBuffer);
Services.eventRegistry().setClientLifecycleEventListener(input1 -> new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
// access the ConnectionAttributeStore via the ConnectionInformation from the ConnectionStartInput
final ConnectionAttributeStore connectionAttributeStore = connectionStartInput.getConnectionInformation().getConnectionAttributeStore();
// this example uses an empty byte array for the binary data
byte[] binaryData = new byte[1000];
// wrap the byte array inside a ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.wrap(binaryData);
// set the created ByteBuffer as the value for the key "my data"
connectionAttributeStore.put("my data", byteBuffer);
}
Set Connection Attribute as UTF-8 String
This example shows how to set the given connection attribute as a UTF-8 string representation for the connected client.
// use the putAsString convenience method
connectionAttributeStore.putAsString("my data", "my value");
Services.eventRegistry().setClientLifecycleEventListener(input1 -> new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
// access the Connection Attribute Store via the connection information from the ConnectionStartInput interface
final ConnectionAttributeStore connectionAttributeStore = connectionStartInput.getConnectionInformation().getConnectionAttributeStore();
// use the putAsString convenience method
connectionAttributeStore.putAsString("my data", "my value");
}
Set Connection Attribute with Defined Character Set
This example shows how to set the given connection attribute as a string representation for the connected client with the defined charset.
// use a defined charset in conjunction with the putAsString convenience method
connectionAttributeStore.putAsString("my key", "my value", StandardCharsets.ISO_8859_1);
Services.eventRegistry().setClientLifecycleEventListener(input1 -> new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
// access the Connection Attribute Store via the connection information from the ConnectionStartInput interface
final ConnectionAttributeStore connectionAttributeStore = connectionStartInput.getConnectionInformation().getConnectionAttributeStore();
// use a special charset in conjunction with the putAsString convenience method
connectionAttributeStore.putAsString("my key", "my value", StandardCharsets.ISO_8859_1);
}
Get Connection Attribute Value
This example shows how to retrieve the value of a connection attribute with the given key for the connected client.
final Optional<ByteBuffer> bufferOptional = connectionAttributeStore.get("exampleKey");
Services.eventRegistry().setClientLifecycleEventListener(input1 -> new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
// access the Connection Attribute Store via the connection information from the ConnectionStartInput interface
final ConnectionAttributeStore connectionAttributeStore = connectionStartInput.getConnectionInformation().getConnectionAttributeStore();
final Optional<ByteBuffer> bufferOptional = connectionAttributeStore.get("exampleKey");
// verify that a value is set for the given key
if(bufferOptional.isEmpty()){
// If no value is present, return to handle the missing value. Another option is to set the value.
return;
}
// Retrieve the ByteBuffer from the optional. This operation is safe due to the previous verification that the value is present.
final ByteBuffer byteBuffer = bufferOptional.get();
// CAUTION: Because the ByteBuffer is read-only, you must copy the buffer to a new byte array:
final ByteBuffer rewind = byteBuffer.asReadOnlyBuffer().rewind();
final byte[] array = new byte[rewind.remaining()];
rewind.get(array);
// use the array
System.out.println(new String(array, StandardCharsets.UTF_8));
}
});
Get All Connection Attributes
This example shows how to retrieve all connection attributes for the connected client.
final Optional<Map<String, ByteBuffer>> optionalConnectionAttributes = connectionAttributeStore.getAll();
Services.eventRegistry().setClientLifecycleEventListener(input1 -> new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
// access the Connection Attribute Store via the connection information from the ConnectionStartInput interace
final ConnectionAttributeStore connectionAttributeStore = connectionStartInput.getConnectionInformation().getConnectionAttributeStore();
final Optional<Map<String, ByteBuffer>> optionalConnectionAttributes = connectionAttributeStore.getAll();
// verify that connection attributes are present:
if(optionalConnectionAttributes.isEmpty()){
// If no value is present, return to handle the missing value. Another option is to set the value.
return;
}
// this operation is safe due to the previous verification that the value is present
final Map<String, ByteBuffer> allConnectionAttributes = optionalConnectionAttributes.get();
// iterate the entries for the given client
for (Map.Entry<String, ByteBuffer> entry : allConnectionAttributes.entrySet()) {
// CAUTION: Because the ByteBuffer is read-only, you must copy the buffer to a new byte array:
final ByteBuffer rewind = entry.getValue().asReadOnlyBuffer().rewind();
final byte[] array = new byte[rewind.remaining()];
rewind.get(array);
System.out.println(entry.getKey() + ":" + new String(array));
}
}
});
Get Connection Attribute Value as UTF-8 String
This example shows how to retrieve the value of the connection attribute with the given key for the connected client as a UTF-8 string.
final Optional<String> optionalValue = connectionAttributeStore.getAsString("exampleKey");
Services.eventRegistry().setClientLifecycleEventListener(input1 -> new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
// access the Connection Attribute Store via the connection information from the ConnectionStartInput interface
final ConnectionAttributeStore connectionAttributeStore = connectionStartInput.getConnectionInformation().getConnectionAttributeStore();
final Optional<String> optionalValue = connectionAttributeStore.getAsString("exampleKey");
// verify that a value is set for the given key
if (optionalValue.isEmpty()) {
// If no value is present, return to handle the missing value. Another option is to set the value.
return;
}
// Retrieve the ByteBuffer from the optional. This operation is safe due to the previous verification that the value is present
final String value = optionalValue.get();
// use the value, for example, to print
System.out.println(value);
}
});
Get Connection Attribute Value as Defined Character Set
This example shows how to retrieve the value of the connection attribute with the given key for the connected client as a string with the defined charset.
final Optional<String> optionalValue = connectionAttributeStore.getAsString("exampleKey", StandardCharsets.ISO_8859_1);
Services.eventRegistry().setClientLifecycleEventListener(input1 -> new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
// access the Connection Attribute Store via the connection information from the ConnectionStartInput interface
final ConnectionAttributeStore connectionAttributeStore = connectionStartInput.getConnectionInformation().getConnectionAttributeStore();
final Optional<String> optionalValue = connectionAttributeStore.getAsString("exampleKey", StandardCharsets.ISO_8859_1);
// verify that a value is set for the given key
if (optionalValue.isEmpty()) {
// If no value is present, return to handle the missing value. Another option is to set the value.
return;
}
// Retrieve the ByteBuffer from the optional. This operation is safe due to the previous verification that the value is present
final String value = optionalValue.get();
// use the value, for example, to print
System.out.println(value);
}
});
Remove Connection Attribute
This example shows how to remove a connection attribute with the given key from the connected client.
final Optional<ByteBuffer> optionalPreviousValue = connectionAttributeStore.remove("exampleKey");
Services.eventRegistry().setClientLifecycleEventListener(input1 -> new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
// access the Connection Attribute Store via the connection information from the ConnectionStartInput interface
final ConnectionAttributeStore connectionAttributeStore = connectionStartInput.getConnectionInformation().getConnectionAttributeStore();
// Remove the value that is associated with the key. If a value is present, the value is returned within a optional.
final Optional<ByteBuffer> optionalPreviousValue = connectionAttributeStore.remove("exampleKey");
// test whether there is a previous value and print the value for reference
if (optionalPreviousValue.isPresent()) {
final ByteBuffer byteBuffer = optionalPreviousValue.get();
// CAUTION: Because the ByteBuffer is read-only, you must copy the buffer to a new byte array:
final ByteBuffer rewind = byteBuffer.asReadOnlyBuffer().rewind();
final byte[] array = new byte[rewind.remaining()];
rewind.get(array);
System.out.println("Previous value:" + new String(array));
}
}
});
Clear All Connection Attributes
This example shows how to clear all connection attributes for the connected client.
connectionAttributeStore.clear();
Services.eventRegistry().setClientLifecycleEventListener(input1 -> new ClientLifecycleEventListener() {
@Override
public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
// access the Connection Attribute Store via the connection information from the ConnectionStartInput interface
final ConnectionAttributeStore connectionAttributeStore = connectionStartInput.getConnectionInformation().getConnectionAttributeStore();
// remove all attributes
connectionAttributeStore.clear();
}
});
Publish Service
The Publish Service enables extensions to send PUBLISH messages. These messages can also be sent to a specific client only.
PUBLISH messages that are sent through the Publish Service are processed in the same way as the PUBLISH messages that a client sends. All MQTT 3 and MQTT 5 features for PUBLISH messages are supported. The limits that are configured in the config.xml as part of the MQTT entity are also validated for the PUBLISH messages sent through the Publish Service.
PUBLISH messages that are sent to a specific client have some unique behavior and requirements.
-
The topic of the PUBLISH must match at least one subscription of the client, or the PUBLISH is not forwarded to the client
-
If the specified client has a shared subscription that matches the topic of the PUBLISH, the message is sent to the client but not sent to other clients with the same shared subscription
For more information, see Publish Service JavaDoc.
Publish
This example shows how to send a regular PUBLISH message.
Publish message = Builders.publish()
.topic("topic")
.qos(Qos.AT_LEAST_ONCE)
.payload(payload)
.build();
Services.publishService().publish(message);
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
// Create a publish builder
final PublishBuilder publishBuilder = Builders.publish();
final ByteBuffer payload = ByteBuffer.wrap("message".getBytes());
// Build the publish
publishBuilder.topic("topic").qos(Qos.AT_LEAST_ONCE).payload(payload);
// Access the Publish Service
final PublishService publishService = Services.publishService();
// Asynchronously sent PUBLISH
final CompletableFuture<Void> future = publishService.publish(publishBuilder.build());
future.whenComplete((aVoid, throwable) -> {
if(throwable == null) {
System.out.println("Publish sent successfully");
} else {
//please use more sophisticated logging
throwable.printStackTrace();
}
});
}
...
Publish to Client
This example shows how to send a PUBLISH to a client with a specific client ID.
Publish message = Builders.publish()
.topic("topic")
.qos(Qos.AT_LEAST_ONCE)
.payload(payload)
.build();
Services.publishService().publishToClient(message, "test-client");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
// Create a publish builder
final PublishBuilder publishBuilder = Builders.publish();
final ByteBuffer payload = ByteBuffer.wrap("message".getBytes());
// Build the publish
publishBuilder.topic("topic").qos(Qos.AT_LEAST_ONCE).payload(payload);
// Access the Publish Service
final PublishService publishService = Services.publishService();
// Asynchronously sent PUBLISH
final CompletableFuture<Void> future = publishService.publish(publishBuilder.build());
final String clientId = "client";
final CompletableFuture<PublishToClientResult> future = publishService.publishToClient(publishBuilder.build(), clientId);
future.whenComplete((result, throwable) -> {
if (throwable == null) {
if (result == PublishToClientResult.NOT_SUBSCRIBED) {
System.out.println("Publish was not sent to client ("+clientId+
") because it is not subscribed to a matching topic");
} else {
System.out.println("Publish sent successfully");
}
} else {
//please use more sophisticated logging
throwable.printStackTrace();
}
});
}
...
Managed Extension Executor Service
Many MQTT integrations depend on operations that are potentially expensive in terms of CPU time:
-
Calling webservices
-
Persisting or querying data from a database
-
Writing data to disk
-
Other blocking operations
A central paradigm of HiveMQ extension development is to never block.
If your business model requires blocking operations, you can use the
the ManagedExtensionExecutorService
to enable asynchronous calls to these operations.
The HiveMQ managed executor service is shared between all HiveMQ extensions and can be monitored with the standard HiveMQ monitoring system.
The ManagedExtensionExecutorService
is a sophisticated implementation that can be used
as a ScheduledExecutorService
. This capability allows the use of a callback-based future handling for true non-blocking behavior.
The extension executor service also allows the scheduling of tasks periodically.
Never create your own thread pools. Thread pools can significantly decrease the performance of HiveMQ. This is especially true for Java Cached Thread pools since these pools increase Java threads without any limit and can make your system unresponsive. If you use a library with thread pools such as the Jersey Client library, limit the number of threads. |
The thread-pool of this executor service is dependent on the available cores of the JVM HiveMQ runs in.
Since HiveMQ cancels all schedulers when extensionStop()
executes, no additional or new tasks can be submitted.
After shutdown, HiveMQ continues to execute previously-submitted tasks for a three-minute grace period. After three minutes, the executor service shuts down ungracefully. Any tasks that remain at that time are not executed.
Access the Managed Extension Executor Service
final ManagedExtensionExecutorService executorService = Services.extensionExecutorService();
Log Incoming Publishes
This example shows how to log incoming publishes per minute with the ManagedExtensionExecutorService
.
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final ManagedExtensionExecutorService executorService = Services.extensionExecutorService();
final MetricRegistry metricRegistry = Services.metricRegistry();
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Meter incomingPublishRate = metricService.getHiveMQMetric(HiveMQMetrics.INCOMING_PUBLISH_RATE);
logger.info("Incoming publishes last minute = {}", incomingPublishRate.getOneMinuteRate());
}
}, 1, 1, TimeUnit.MINUTES);
}
Add a Callback
This example shows how to add a callback to the submitted task to receive the return value when the future completes.
private void methodWithCompletableFuture() {
final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();
final CompletableFuture<String> result = extensionExecutorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "Test";
}
});
result.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(final String resultString, final Throwable throwable) {
if(throwable != null){
//please use more sophisticated logging
throwable.printStackTrace();
return;
}
if(resultString != null){
System.out.println(resultString);
}
}
});
}
Admin Service
At runtime, you often need to get information about the broker instance without a triggering event such as a client connect. For this purpose, the extension SDK offers an Admin Service that provides the following:
-
The current lifecycle stage of the broker
-
The license the broker uses
-
The broker node with the ServerInformation object
Access the Admin Service
Extensions can access the AdminService
object through Services.adminService()
.
final @NotNull AdminService adminService = Services.adminService();
Lifecycle Stage
The Admin Service provides information on the current status of the broker lifecycle. Your HiveMQ broker can be in one of two states:
-
STARTING
: The broker is in this state from the moment the JVM starts until the start procedure of the broker concludes. -
STARTED_SUCCESSFULLY
: The broker is in this state after the start procedure concludes. This means that the extension system is started, the persistence is running, the listeners are up, a cluster is joined, and the HiveMQ Control Center is accessible.
Before you announce that a HiveMQ instance is ready, we recommend that you verify the lifecycle state of the broker to ensure that HiveMQ has successfully completed startup. |
private final @NotNull AdminService adminService;
private final @NotNull ManagedExtensionExecutorService executorService;
public void schedulePublishing() {
executorService.schedule(() -> {
// check if broker is ready
if (adminService.getCurrentStage() == LifecycleStage.STARTED_SUCCESSFULLY) {
// do action
publishListenerInformation();
} else {
// schedule next check
schedulePublishing();
}
}, 10, TimeUnit.SECONDS);
}
Server Information
In the Admin Service and other parts of the Extension SDK, HiveMQ provides a ServerInformation object that contains runtime information about the broker node to which the extension is attached.
The ServerInformation object provides the following information:
-
The HiveMQ version
-
The folder structure:
-
The home folder. For example, set by the
HIVEMQ_HOME
environment variable. -
The data folder. For example, set by the
HIVEMQ_DATA_FOLDER
environment variable. -
The log folder. For example, set by the
HIVEMQ_LOG_FOLDER
environment variable. -
The extensions folder. For example, set by the
HIVEMQ_EXTENSION_FOLDER
environment variable.
-
-
The active MQTT listeners
Example to publish the available listeners to an external directory service
private void publishListenerInformation() {
final ServerInformation serverInformation = adminService.getServerInformation();
for (final Listener listener : serverInformation.getListener()) {
// publishes listeners to a registry
publishListener(listener);
}
}
Cluster Service
The Cluster Service enables extensions to dynamically discover HiveMQ cluster nodes.
Extensions can access the ClusterService
object through Services.clusterService()
.
ClusterService
public class MyExtensionMain implements ExtensionMain {
private final MyClusterDiscoveryCallback myCallback;
public MyExtensionMain() {
myCallback = new MyClusterDiscoveryCallback();
}
@Override
public void extensionStart(
final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {
Services.clusterService().addDiscoveryCallback(myCallback);
}
@Override
public void extensionStop(
final @NotNull ExtensionStopInput input, final @NotNull ExtensionStopOutput output) {
Services.clusterService().removeDiscoveryCallback(myCallback);
}
}
Cluster Discovery
To realize discovery of HiveMQ cluster nodes, an extension can implement a ClusterDiscoveryCallback
and add it through the ClusterService
.
To use cluster discovery in your HiveMQ extension, <discovery> must be set to <extension> in the <cluster> section of the your HiveMQ configuration file.
|
<?xml version="1.0"?>
<hivemq xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
...
<cluster>
...
<discovery>
<extension></extension>
</discovery>
...
</cluster>
...
</hivemq>
The lifecycle of a ClusterDiscoveryCallback
consists of three methods:
-
init
: HiveMQ calls theinit
method one time when the callback is added. You can use this method to register the HiveMQ instance with a central registry. For example, to save address information of the HiveMQ instance to a database or server. -
reload
: HiveMQ calls thereload
method regularly to discover all currently available HiveMQ cluster nodes. The default interval between calls to this method is 60 seconds and can be overwritten by an individualClusterDiscoveryCallback
. -
destroy
: HiveMQ calls thedestroy
method one time in the following cases:-
The callback is removed
-
The extension that added the callback is stopped
-
The HiveMQ instance is shut down
-
The destroy
method can be used to unregister the HiveMQ instance on which the extension runs from a central registry.
If an exception is thrown inside one of these methods, HiveMQ ignores the provided output.
ClusterDiscoveryCallback
public class MyClusterDiscoveryCallback implements ClusterDiscoveryCallback {
private final MyClusterNodesService myService = ...
@Override
public void init(
final @NotNull ClusterDiscoveryInput clusterDiscoveryInput,
final @NotNull ClusterDiscoveryOutput clusterDiscoveryOutput) {
myService.registerHiveMQNode(clusterDiscoveryInput.getOwnAddress());
final List<ClusterNodeAddress> hiveMQNodes = myService.getHiveMQNodes();
clusterDiscoveryOutput.provideCurrentNodes(hiveMQNodes);
final int nextReloadInterval = myService.getNextReloadInterval();
clusterDiscoveryOutput.setReloadInterval(nextReloadInterval);
}
@Override
public void reload(
final @NotNull ClusterDiscoveryInput clusterDiscoveryInput,
final @NotNull ClusterDiscoveryOutput clusterDiscoveryOutput) {
final List<ClusterNodeAddress> hiveMQNodes = myService.getHiveMQNodes();
clusterDiscoveryOutput.provideCurrentNodes(hiveMQNodes);
final int nextReloadInterval = myService.getNextReloadInterval();
clusterDiscoveryOutput.setReloadInterval(nextReloadInterval);
}
@Override
public void destroy(final @NotNull ClusterDiscoveryInput clusterDiscoveryInput) {
myService.unregisterHiveMQNode(clusterDiscoveryInput.getOwnAddress());
}
}
Additional HiveMQ Enterprise Extension SDK Services
The HiveMQ Enterprise Extension SDK adds several powerful services to the extension framework that specifically focus on the additional features the HiveMQ Enterprise and Professional Editions provide.
Access to the HiveMQ Enterprise Extension SDK requires certification.
Once certified, you can use the services of the HiveMQ Enterprise Extension SDK to build and implement valid extensions for your Professional or Enterprise version of HiveMQ.
To learn more about HiveMQ Enterprise Extension SDK certification training or request scheduling and pricing information, contact HiveMQ sales.
Consumer Service
The Consumer Service allows you to register and unregister message consumers. Message consumers can be used to react quickly to incoming publishes on a specific set of topics and further process the consumed messages however your business case requires.
The Consumer Service is great fit when you want to write all or specific incoming publish messages to a third party system. For example, the HiveMQ Enterprise Extension for Kafka uses the Consumer Service to transform MQTT publish messages into Kafka records and then write the records into Kafka.
Each consumer must be registered with a unique consumer ID and defined consumer options.
The Consumer Service allows extensions to do the following:
The extension that runs the consumer must be registered on every node in the HiveMQ cluster. |
Consumer Options
Consumer options describe the intended use of the consumer.
The definition of Consumer Options is mandatory for each consumer that you want to implement.
Option | Description |
---|---|
Topic Filters |
Sets all the configured topic filters for the selected consumer option. |
Queue Limit |
Sets the queue limit of the consumer per topic filter. The default value is |
final ConsumerOptions consumerOptions = EnterpriseBuilders.consumerOptions()
.topicFilters("+/IN/#", "+/OUT/#")
.queueLimit(1_000_000).build();
Create a Consumer
A message consumer is called whenever the broker receives an incoming publish messages that matches the topic filters for which the consumer is registered. When you use the Message Consumer Interface, only the method for consuming messages must be implemented. To avoid blocking the consumer, we recommend the use of an async wrapper.
This example show how to create a simple message consumer.
public class MySimpleConsumer implements MessageConsumer {
@Override
public void consume(@NotNull ConsumerInput consumerInput, @NotNull ConsumerOutput consumerOutput) {
final PublishPacket publishPacket = consumerInput.getPublish();
final Async<ConsumerOutput> async = consumerOutput.async(Duration.ofSeconds(2));
Services.extensionExecutorService().submit(() -> doSomethingWith(publishPacket, async));
}
private void doSomethingWith(final @NotNull PublishPacket publishPacket, final @NotNull Async<ConsumerOutput> async) {
try {
//your business logic
} catch (Exception any) {
async.getOutput().cancelConsumption();
} finally {
async.resume();
}
}
}
ConsumerOutput
marks this operation as async.
-
Always call
Async.resume()
, regardless of whether an operation is successful or unsuccessful. -
If the timeout expires before
Async.resume()
is called, then the outcome is handled as failed. -
Do not call this method more than once. If an async method is called multiple times an exception is thrown.
CancelConsumption()
indicates a PUBLISH message was not consumed. Use this method to cancel unprocessed messages when you shut down your consumer.
The QoS of the canceled message determines the exact processing of the message:
-
Cancelled messages with QoS 0 (at most once) are discarded.
-
Cancelled messages with QoS 1 (at least once) or QoS 2 (exactly once) are offered again later to the same consumer (determined by consumer ID) on the current node.
Make sure that your business logic prevents endless retry loops for messages that fail and get cancelled. |
Register a Consumer with Options to HiveMQ
Before you work with a consumer, the consumer must be registered. Consumer registration is ideally done at the start of the extension. To register a consumer with options to HiveMQ, you need the unique ID of the consumer and the associated consumer options definition.
This example shows a consumer registration with consumer options.
EnterpriseServices.consumerService()
.registerConsumer(consumerId, consumerOptions, new MessageConsumerProvider() {
@Override
public @NotNull MessageConsumer get() {
return myConsumer;
}
});
The Message Consumer interface contains an init
method that can be optionally overwritten.
When you register your consumer implementation, HiveMQ calls the init
method with the defined consumer options as the parameters.
This example shows the init
method with the consumer options that where provided when the consumer was registered, and the custom code that is needed for the specific consumer.
public void init(final @NotNull ConsumerOptions consumerOptions) {
log.info("Initialize myConsumer");
}
final ConsumerOptions consumerOptions = EnterpriseBuilders.consumerOptions().topicFilters("+/STAT").build();
final String consumerId = "myConsumerId";
final MessageConsumer myConsumer = new MySimpleConsumer();
final CompletableFuture<Void> registerFuture =
EnterpriseServices.consumerService()
.registerConsumer(consumerId, consumerOptions, new MessageConsumerProvider() {
@Override
public @NotNull MessageConsumer get() {
return myConsumer;
}
});
registerFuture.whenComplete((aVoid, throwable) -> {
if (throwable == null) {
log.info("Consumer {} started.", consumerId);
} else {
log.warn("Not able to start Consumer \"{}\", reason:", throwable.getMessage());
}
});
The HiveMQ consumer API is built for high performance that allows asynchronous and simultaneous thread handling. To ensure consistency, the business logic you implement must be thread-safe. |
Remove a Consumer from HiveMQ
It is best practice to unregister and remove consumers that you no longer plan to use.
Consumer removal is usually done when the extension stops.
This example shows how to remove a consumer with options from HiveMQ.
EnterpriseServices.consumerService().removeConsumer(consumerId);
The Message Consumer interface contains a destroy
method that can be optionally overwritten.
When you remove your consumer, HiveMQ calls the destroy
method with the defined consumer options. A consumer can be destroyed during runtime and additional code can be added via the destroy method.
This example shows the destroy
method with the consumer options that where provided when the consumer was registered, and the custom code the specific consumer needs.
public void destroy(final @NotNull ConsumerOptions consumerOptions) {
log.info("Destroy myConsumer");
}
Get All Consumers
If your extension uses multiple consumers, the ability to retrieve a map of all consumers that are registered to the extension can be very helpful. The key-value pairs that the method returns show the consumer ID and the modifiable consumer options for each consumer.
This example shows how to get a map of all consumers registered to the extension:
final @NotNull Map<String, @NotNull ModifiableConsumerOptions> myConsumers = EnterpriseServices.consumerService().getConsumers();
Get Options for a Specific Consumer
This example shows how to get the options for a specific consumer.
EnterpriseServices.consumerService().getConsumerOptions(“myConsumerId”);
Consumer options can be modified. This ability can be very helpful when a topic structure must be changed during runtime. |
final @NotNull Optional<ModifiableConsumerOptions> options = EnterpriseServices.consumerService().getConsumerOptions(consumerId);
if( options.isPresent()) {
options.get().removeTopicFilter("+/STATUS");
options.get().addTopicFilter("+/ERROR");
}
Session Attribute Store
This service allows an extension to manage the session attributes of clients that have the same lifecycle as the existing MQTT client session of your HiveMQ broker.
The Session Attribute Store manages the sessions of clients that have an existing session. The service cannot be used to add a client session. |
The Sessions Attribute Store allows extensions to do the following:
You can also view the session attributes of a client on the client detail pages in your HiveMQ Control Center. For more information, see Control Center Session Attributes. |
Access the Session Attribute Store Interface
EnterpriseServices.sessionAttributeStore()
The earliest point that you can use the session attribute store for the client is the InitializerInput callback. This method is called when a client connects to a new or existing session. The method is also called for online clients when the extension starts. |
Get All Session Attributes for a Client
This example shows how to retrieve all session attributes for a client with a specific client ID.
final @NotNull CompletableFuture<Map<String, ByteBuffer>> attributes = EnterpriseServices.sessionAttributeStore().getAll(clientId);
attributes.whenComplete( (aMap, throwable) -> {
if (throwable == null) {
log.info("Attributes found", aMap.keySet());
checkAttributes(aMap);
} else {
log.warn("Exception with reason:", throwable.getMessage());
}
});
Clear All Session Attributes for a Client
This example shows how to remove all session attributes from a specific client.
final String clientId = initializerInput.getClientInformation().getClientId();
EnterpriseServices.sessionAttributeStore().clear(clientId);
Add a Session Attribute to a Client
This example shows how to add a session attribute to a specific client.
EnterpriseServices.sessionAttributeStore().put(clientId, "myAttributeKey",
ByteBuffer.wrap("myAttributeValue".getBytes(StandardCharsets.UTF_8)));
The session attribute that you set for the client must contain a key and a value. The maximum key length is 65535 characters. The maximum value size is 5 MB. Null values are not permitted. |
Get a Session Attribute for a Client
This example shows how to retrieve a specific session attribute of a specific client.
final @NotNull CompletableFuture<Optional<ByteBuffer>> myAttributeValue = EnterpriseServices.sessionAttributeStore().get(clientId, "myAttributeKey");
myAttributeValue.whenComplete( (aBuffer, throwable) -> {
if (throwable == null) {
if (aBuffer.isPresent()) {
log.info("Attribute found", getValueAsStringFrom(aBuffer.get()));
} else {
log.info("Attribute not found");
}
} else {
log.warn("Exception with reason:", throwable.getMessage());
}
});
To get the desired session attribute from the Session Attribute Store, you must have the attribute key and client ID. |
Remove a Session Attribute from a Client
This example shows how to remove a specific session attribute of a specific client.
EnterpriseServices.sessionAttributeStore().remove(clientId,"myAttributeKey");
To remove the desired session attribute from the Session Attribute Store, you must have the attribute key and client ID. |
Extension Messaging Service
The Extension Messaging Service makes it possible to send non-MQTT messages through the cluster and is intended for internal cluster traffic/communication between the extensions that run on the HiveMQ instances in your cluster.
The service is helpful when your use case requires the exchange of client information or data that is distributed over the cluster to be fully available on all nodes.
The ExtensionMessagingService allows extensions to do the following:
Define Extension Messaging Broadcast Options
The BroadcastMode
of the Extension Messaging Service allows you to configure to which nodes information is sent.
The following options are available:
-
ALL
: Sends information to all nodes in the cluster and includes the originating node. -
OTHER
: Sends information to the other nodes in the cluster and omits the originating node.
This example shows how to set the options to send information to all nodes in a cluster, omitting the originating node.
EnterpriseBuilders.extensionMessageOptions().mode(BroadcastMode.OTHERS).build()
Register Message Response to Receive Messages for a Specific Identifier
This example shows how to register to receive responses for a specific identifier to receive messages sent for the identifier with the Extension Messaging Service.
EnterpriseServices.extensionMessagingService()
.register(MY_MESSAGE_ID, new MySimpleMessagingService.MySimpleRespondCallback());
When multiple extensions register a response callback for the same ID, the extension with the highest priority overrides any previous registration.
Every extensionMessageCallback is removed after extension stop.
|
Send Messages for a Specific Identifier
This example shows how to send a message with ExtensionMessageOptions
for a specific identifier with the Extension Messaging Service.
Every registered ExtensionMessageCallback
with the specific identifier receives this message and can respond with a reply message.
The method returns a list of completable futures that contain the reply messages from all callbacks registered with the same identifier.
final List<CompletableFuture<ExtensionMessageResponse>> responseList =
EnterpriseServices.extensionMessagingService.send(MY_MESSAGE_ID,
new byte[0],
extensionMessageOptions);
The completable futures of this method can fail throw an exception for the following reasons: - A message is sent to a cluster node that runs a HiveMQ version older than 4.1.0. - A message is sent to a cluster node that does not have a callback registered for the selected identifier. - A message is sent to a cluster node that is currently not reachable. |
Respond to Received Messages
This example shows how to complete message communication with a response.
static class MyRespondCallback implements ExtensionMessageCallback {
...
@Override
public void receive(@NotNull ExtensionMessage extensionMessage) {
...
extensionMessage.respond(serializedResponseData);
...
}
}
public class MySimpleMessagingService {
private static final Logger log = LoggerFactory.getLogger(MySimpleMessagingService.class);
private static final @NotNull String MY_MESSAGE_ID = "MySimpleMessagingService";
private final @NotNull ExtensionMessagingService extensionMessagingService;
private final @NotNull ScheduledExecutorService scheduledExecutorService;
public MySimpleMessagingService(final @NotNull ExtensionMessagingService extensionMessagingService,
final @NotNull ScheduledExecutorService scheduledExecutorService) {
this.extensionMessagingService = extensionMessagingService;
this.scheduledExecutorService = scheduledExecutorService;
}
public void start() {
extensionMessagingService.register(MY_MESSAGE_ID, new MySimpleMessagingService.MySimpleRespondCallback());
scheduledExecutorService.scheduleAtFixedRate(this::send, 1, 1, TimeUnit.MINUTES);
}
public void stop() {
extensionMessagingService.unregister(MY_MESSAGE_ID);
}
private void send() {
byte[] data = createDataToSend();
//Send Message in the cluster and retrieve responses from other nodes
final List<CompletableFuture<ExtensionMessageResponse>> responseList =
extensionMessagingService.send(
MY_MESSAGE_ID,
data,
EnterpriseBuilders.extensionMessageOptions().mode(BroadcastMode.OTHERS).build());
CompletableFuture.allOf(responseList.toArray(new CompletableFuture[]{}))
.exceptionally(throwable -> null)
.thenAccept(aVoid -> {
for (CompletableFuture<ExtensionMessageResponse> responseCompletableFuture : responseList) {
try {
final ExtensionMessageResponse response = responseCompletableFuture.get();
processDataFromResponse(response.getClusterNodeId(), response.getMessage());
} catch (ExecutionException | InterruptedException any) {
log.error(" Requesting response of data failed: ", any);
}
}
});
}
private byte[] createDataToSend() {
log.info("Create data to Send ");
return RandomUtils.nextBytes(200);
}
private void processDataFromResponse(String clusterNodeId, byte[] message) {
log.info("got response from {} ", clusterNodeId);
}
/**
* The callback that receives an ExtensionMessage must respond in any case.
*/
static class MySimpleRespondCallback implements ExtensionMessageCallback {
boolean success = false;
@Override
public void receive(@NotNull ExtensionMessage message) {
try {
success = createRespond(message.receive());
} finally {
message.respond(new byte[]{(byte) (success ? 1 : 0)});
}
}
private boolean createRespond(byte[] data) {
return true;
}
}
}
When you implement the Extension Messaging Service, you must register a response callback that generates a response for each ExtensionMessage received. It is absolutely necessary for your response callback to respond in all cases to every ExtensionMessage .
|
Control Center Service
Use this service to add custom views or notifications to the HiveMQ Control Center.
The Control Center Service allows extensions to do the following:
-
Get all available control center permissions from HiveMQ and other extensions
-
Verify that a user has permissions for a specific permission
Add Authentication to the HiveMQ Control Center
The Control Center Service allows extensions to add an authenticator for the HiveMQ Control Center users.
The HiveMQ Control Center can be configured with multiple users who each have a username and password (SHA256 and username prepended salt). For more information, see HiveMQ Control Center User Configuration.
The HiveMQ Enterprise Edition supports Role Based Access Control (RBAC) for Control Center users. RBAC gives you the ability to restrict user permissions and precisely control which users can view, access, and modify data. With RBAC, you can create fine-grained access management for your HiveMQ system.
This example shows how to add an authenticator to the HiveMQ Control Center.
//Implementation of the authenticator
public class MyControlCenterAuthenticator implements ControlCenterAuthenticator {
private static final String MY_DASHBOARD_VIEW = "MY_DASHBOARD_VIEW";
@Override
public void onLogin(@NotNull ControlCenterAuthInput controlCenterAuthInput, @NotNull ControlCenterAuthOutput controlCenterAuthOutput) { ... }
}
//usage
EnterpriseServices.controlCenterService().setAuthenticator(new MyControlCenterAuthenticator());
View and Add Control Center Permissions
This example shows how to view and add HiveMQ Control Center permissions.
final @NotNull ControlCenterPermission permission =
EnterpriseBuilders.controlCenterPermission()
.id(MY_DASHBOARD_VIEW)
.displayName("View My Dashboard")
.description("View My Dashboard Permission")
.group("CUSTOM").build();
EnterpriseServices.controlCenterService().addPermission(permission);
//Use permission by setting in ControlCenterAuthenticator Output for the logged-in user
//See full list of permissions from
// https://www.hivemq.com/docs/ese/4.5/enterprise-security-extension/ese.html#control-center-access-control
controlCenterAuthOutput.getUserPermissions().add(MY_DASHBOARD_VIEW);
Add Custom Control Center Views
The Control Center Service allows extensions to add single extension views or views with subviews to the HiveMQ Control Center.
This example adds a new view to the HiveMQ Control Center.
EnterpriseServices.controlCenterService().addView(new MyExtensionView(myViewDataProvider);
View details must be implemented with use of Vaadin libraries and CSS. Vaadin is an open-source platform for web application development. |
Create a Custom Extension View for the HiveMQ Control Center
Implementation of your custom extension view must include the following:
-
Your extension icon. The default is a plug icon.
-
The title of your extension view
-
The URL that appears for this view in the browser
-
A permission ID to return the needed Permission for this view
-
The view itself provided as a Vaadin View
Based on your custom extension view, these implementation elements are optional:
-
A selected URL that is suitable if your subview does not have a URL
-
A menu title
-
The associated CSS
The view must be created each time, because it will be shown currently on the specific website request. If not it could happen, that the view is not actual and user X is getting the data of the view that User Y has requested.
public class MyDashboardView implements ExtensionView {
public MyDashboardView() { … }
@Override
public @NotNull String getTitle() { return "My Dashboard"; }
@Override
public @NotNull String getUrl() { return "MyDashboard"; }
@Override
public @Nullable String getCss() { return "VAADIN/myExtension.css";}
@Override
public @NotNull View getView() { return new DemoView(); }
@Nullable
public String getPermissionId() { return "MY_DASHBOARD_VIEW"; }
private class DemoView implements View { … }
}
Add or Remove HiveMQ Control Center Notifications
This example shows how to add or remove notifications from the HiveMQ Control Center.
@NotNull Notification myNotification;
myNotification = new Notification() {
@Override
public @NotNull String getMessage() { return "Hello from myExtension"; }
@Override
public @NotNull NotificationLevel getLevel() { return NotificationLevel.INFO; }
};
EnterpriseServices.controlCenterService().addNotification(myNotification);
EnterpriseServices.controlCenterService().removeNotification(myNotification);
REST Service
The HiveMQ REST Service allows extensions to create accessible HTTP APIs directly within HiveMQ. Any HTTP content can be served directly from HiveMQ.
The REST Service allows extensions to do the following:
-
Register a custom REST API application with the HiveMQ REST API service
-
Remove a custom REST API application from the HiveMQ REST API service
-
Define multiple listeners to multiple HTTP Endpoints
-
Create JAX-RS based HTTP/REST APIs
The JAX-RS resources can be used to interact with HiveMQ by using other services. Interaction with HiveMQ is not necessary, you can also use the internal HTTP server of HiveMQ to avoid setting up an external HTTP server for your existing JAX-RS resources. |
Register a Custom REST API Application
This example shows how to register a custom REST API application with the HiveMQ REST API service.
When you register a custom REST API application with the HiveMQ REST API Service,
The base path for all resources is automatically determined by the extension ID: /api/v1/extensions/{extension-id}/.
At most one REST application can be set.
//simple example for rest service usage
Resource r = new Resource("backend");
try {
EnterpriseServices.restService().setRestApplication(() -> List.of(r));
} catch (FeatureDisabledException disabledException) {
//ignore
log.error("REST-API is not enabled in config.xml");
}
The REST API must be enabled in the configuration file of your HiveMQ instance (config.xml ).
|
Remove a Custom REST API Application
This example shows how to stop and remove a custom REST API application from the HiveMQ REST API service:
EnterpriseServices.restService().removeRestApplication();
Client Event Service
The Client Event Service allows extensions to do the following:
-
Iterate the events of a specific client in a defined time frame
Before you use the Client Event Service, make sure that the Client Event History feature is enabled in the config.xml file of your HiveMQ instance.
|
<client-event-history>
<enabled>true</enabled>
<lifetime>604800</lifetime> <!-- 7 days -->
</client-event-history>
Access the Client Event Service
EnterpriseServices.clientEventService()
Based on the time frame you define, the operation of this method can be expensive in large scale deployments. For example, do not call this method with long time frames (multiple days) in a loop for multiple clients. |
Iterate Events for Client
This example shows how to iterate the events of a specified client in a defined time frame.
EnterpriseServices.clientEventService().iterateEventsForClient(clientId, from, to, (context, event) -> {
switch (event.getType()) {
case OVERLOAD_PROTECTION_ON: {
resource.getClientStates().add( OVERLOAD_PROTECTION_ON.toString());
break;
}
case OVERLOAD_PROTECTION_OFF: {
resource.getClientStates().add( OVERLOAD_PROTECTION_OFF.toString());
break;
}
case DISCONNECT_BY_CLIENT_GRACEFUL:
case DISCONNECT_BY_CLIENT_UNGRACEFUL:
case DISCONNECT_BY_SERVER:
{
resource.getClientStates().add(event.toString());
context.abortIteration();
break;
}
}
});
The Client Event Service can identify the following types of events:
-
CONNECT_SUCCEEDED
-
CONNECT_FAILED
-
DISCONNECT_BY_CLIENT_GRACEFUL
-
DISCONNECT_BY_CLIENT_UNGRACEFUL
-
DISCONNECT_BY_SERVER
-
SESSION_REMOVED
-
OVERLOAD_PROTECTION_ON
-
OVERLOAD_PROTECTION_OFF
Next Steps
To learn more about the possibilities HiveMQ extensions offer and view code examples for several frequently implemented HiveMQ extension use cases, see Popular HiveMQ Extension Use Cases.