Popular HiveMQ Extension Use Cases
A HiveMQ extension has virtually infinite possibilities. Here are a few typical use cases to help you dive deeper into the details of the HiveMQ extension SDK and associated APIs:
Here are some typical uses for the HiveMQ Extension SDK:
Authentication
Authentication is the mechanism that verifies the identity of your communication partner.
Air travel provides a classic example of authentication. Before you are permitted to board a plane, airport security checks your passport. Presentation of a passport that identifies you as the person whose name is on the ticket, allows you to proceed.
In the context of MQTT, authentication is used to check if an MQTT client is allowed to connect to a broker and if it is allowed to use a particular client identifier.
To apply authentication in a HiveMQ extension, you implement a SimpleAuthenticator
and register an AuthenticatorProvider
with HiveMQ.
This implementation is done in the extensionStart
method of the main class. The custom provider for authenticator is registered with the SecurityRegistry
.
public void extensionStart(ExtensionStartInput extensionStartInput, ExtensionStartOutput extensionStartOutput) {
//register the provider with the SecurityRegistry
Services.securityRegistry().setAuthenticatorProvider(new MyAuthenticatorProvider());
}
In this example, the Authenticator Provider returns a new Authenticator for every new MQTT connection. It is also possible to share an instance of an authenticator between multiple MQTT connections, see this example.
public class MyAuthenticatorProvider implements AuthenticatorProvider {
@Override
public Authenticator getAuthenticator(AuthenticatorProviderInput authenticatorProviderInput) {
//return an instance of an Authenticator
return new MyAuthenticator();
}
}
A SimpleAuthenticator that implements username/password based authentication for MQTT clients uses the information from the input object to determine whether a client is allowed to connect.
In this example, only clients with the username "admin" and password "hivemq" are allowed to connect. Clients that lack the username or password are not allowed to connect.
public class MyAuthenticator implements SimpleAuthenticator {
@Override
public void onConnect(SimpleAuthInput simpleAuthInput, SimpleAuthOutput simpleAuthOutput) {
...
//check if the user is "admin" and the password is "hivemq"
if (username.equals("admin") && password.equals("hivemq")) {
simpleAuthOutput.authenticateSuccessfully();
} else {
simpleAuthOutput.failAuthentication();
}
}
}
public class MyAuthenticator implements SimpleAuthenticator {
@Override
public void onConnect(SimpleAuthInput simpleAuthInput, SimpleAuthOutput simpleAuthOutput) {
//get the contents of the MQTT connect packet from the input object
ConnectPacket connect = simpleAuthInput.getConnectPacket();
//check if the client set username and password
if (!connect.getUserName().isPresent() || !connect.getPassword().isPresent()) {
simpleAuthOutput.failAuthentication();
}
//get username and password from the connect packet
String username = connect.getUserName().get();
String password = Charset.forName("UTF-8").decode(connect.getPassword().get()).toString();
//check if the user is "admin" and the password is "hivemq"
if (username.equals("admin") && password.equals("hivemq")) {
simpleAuthOutput.authenticateSuccessfully();
} else {
simpleAuthOutput.failAuthentication();
}
}
}
In the authenticator, the extension developer has access to all client information that can be used for authentication. This information includes all of the information from the CONNECT packet and the TLS certificate of the client. Access to all available authentication information makes it possible to implement even the most complex authentication mechanisms.
Authorization
Authorization is the mechanism that controls access based on certain identities.
Let’s continue the air travel example that we began in the authentication section. After your ticket and passport authenticate your identity,
a boarding pass is used to authorizes your access to a specific plane, row, and seat.
In the context of an MQTT client, 'access' refers to the ability to publish and subscribe to certain topics.
HiveMQ extensions support two authorization mechanisms for the MQTT publish and subscribe actions:
-
Default Permissions: This mechanism provides access control on a per topic basis.
-
Authorizers: This mechanism implements authorization on a per-packet basis.
Default Permissions are the simplest and most scalable of the mechanisms. The permissions for a client are represented by a list of topics and attributes to which a client can or cannot publish or subscribe. Default permissions are independent for each client and can be set in an authenticator or in a ClientInitializer.
In this example, each client is allowed to publish and subscribe only to topics that start with the same client identifier as the client. Here, we add a few lines to the existing authenticator that add the default permissions of the client.
...
//create a topic permission for all topics starting with the client identifier
final TopicPermission permission = Builders.topicPermission() (1)
.topicFilter(simpleAuthInput.getConnectPacket().getClientId() + "/#") (2)
.type(TopicPermission.PermissionType.ALLOW) (3)
.build();
//add this permission to the client's default permissions
simpleAuthOutput.getDefaultPermissions().add(permission);
...
1 | The extension SDK provides builders for classes that need to be constructed by an extension developer. The class Builders has static methods to access the specific builders. |
2 | Topic filter for this permission, the MQTT wildcards # and + can be used to create topic filters that match multiple topics. |
3 | A white-list approach that only allow access to specific topics is used in this example. A black-list approach is also possible. |
public class MyAuthenticator implements SimpleAuthenticator {
@Override
public void onConnect(SimpleAuthInput simpleAuthInput, SimpleAuthOutput simpleAuthOutput) {
//get the contents of the MQTT connect packet from the input object
ConnectPacket connect = simpleAuthInput.getConnectPacket();
//check if the client set username and password
if (!connect.getUserName().isPresent() || !connect.getPassword().isPresent()) {
simpleAuthOutput.failAuthentication();
}
//get username and password from the connect packet
String username = connect.getUserName().get();
String password = Charset.forName("UTF-8").decode(connect.getPassword().get()).toString();
//check if the user is "admin" and the password is "hivemq"
if (username.equals("admin") && password.equals("hivemq")) {
//create a topic permission for all topics starting with the client identifier
TopicPermission permission = Builders.topicPermission()
.topicFilter(simpleAuthInput.getConnectPacket().getClientId() + "/#")
.build();
//add this permission to the client's default permissions
simpleAuthOutput.getDefaultPermissions().add(permission);
simpleAuthOutput.authenticateSuccessfully();
} else {
simpleAuthOutput.failAuthentication();
}
}
}
The permissions for an MQTT client can include more detail than only the topic. For example, the QoS level or the retained-message flag. For a list of all available permission options, see the Topic Permission chapter. |
The other mechanism for authorization are Authorizers. Authorizers implement authorization on a per-packet basis.
To learn more about Authorizers, see Publish Authorizer and Subscription Authorizer.
Lifecycle Events
To get notified when certain events in the lifecycle of an MQTT client occur, an extension can listen to lifecycle events. These events include a client establishing a connection, a client disconnecting, successful authentication of a client, and more.
To create a lifecycle-event listener, register a ClientLifecycleEventListenerProvider
with the EventRegistry
.
Services.eventRegistry().setClientLifecycleEventListener(new MyClientLifecycleEventListenerProvider());
The provider returns a new instance of a ClientLifecycleEventListener
for each client. If desired, you can share the same instance of a listener between multiple clients. For more information, see this example.
public class MyClientLifecycleEventListenerProvider implements ClientLifecycleEventListenerProvider {
@Override
public ClientLifecycleEventListener getClientLifecycleEventListener(ClientLifecycleEventListenerProviderInput input) {
return new MyClientLifecyleEventListener();
}
}
In this example, a log statement is issued when a client connects, when authentication is successful, and when a client disconnects.
public class MyClientLifecyleEventListener implements ClientLifecycleEventListener {
private static final Logger log = LoggerFactory.getLogger(MyClientLifecyleEventListener.class);
@Override
public void onMqttConnectionStart(ConnectionStartInput connectionStartInput) {
log.info("Client {} connects.", connectionStartInput.getConnectPacket().getClientId());
}
@Override
public void onAuthenticationSuccessful(AuthenticationSuccessfulInput authenticationSuccessfulInput) {
log.info("Client {} authenticated successfully.", authenticationSuccessfulInput.getClientInformation().getClientId());
}
@Override
public void onDisconnect(DisconnectEventInput disconnectEventInput) {
log.info("Client {} disconnected.", disconnectEventInput.getClientInformation().getClientId());
}
}
You can listen to more detailed events by overriding other methods from the interface ClientLifecycleEventListener .
|
Intercept & Manipulate MQTT Messages
To manipulate the content of an MQTT message before the broker processes the message, a HiveMQ extension can intercept the message.
The developer registers an Interceptor
callback that HiveMQ calls each time an MQTT message is received.
In this example, a PublishInterceptor
that manipulates the topic of incoming publishes is registered. An Interceptor
is always registered in a ClientInitializer
. The initializer is a callback that HiveMQ calls each time an MQTT client session is initialized.
Services.initializerRegistry().setClientInitializer(new MyClientInitializer());
An Interceptor is then added to the clientContext
of the Initializer.
public class MyClientInitializer implements ClientInitializer {
@Override
public void initialize(InitializerInput initializerInput, ClientContext clientContext) {
clientContext.addPublishInboundInterceptor(new PublishInboundInterceptor() { (1)
@Override
public void onInboundPublish(PublishInboundInput publishInboundInput, PublishInboundOutput publishInboundOutput) {
//set the topic to "new/topic"
publishInboundOutput.getPublishPacket().setTopic("new/topic"); (2)
}
});
}
}
1 | A new Interceptor is created and added to the context of the client. |
2 | The changed topic. Most of the attributes of an MQTT PUBLISH packet can be manipulated. |
The Publish Interceptor can also be used to prevent onward delivery of incoming messages. For more information, see this example. |
Publish MQTT Messages
You can use an extension to manipulate the messages clients send or to create and publish new messages.
The Publish Service allows you to publish messages to all subscribers of a topic or to a specific client. For more information, see this example.
Publish message = Builders.publish()
.topic("the/topic")
.qos(Qos.EXACTLY_ONCE)
.payload(Charset.forName("UTF-8").encode("payload"))
.build();
Services.publishService().publish(message);
The PUBLISH messages that the Publish Service sends are processed in the same way as a PUBLISH message that a client sends.
Modify Subscriptions
The Subscription Store allows extensions to manipulate the subscriptions of a client. The Subscription Store contains methods to add, remove, or get subscriptions for a client.
In this example, a subscription for the topic new/topic
with QoS 1 is added for the client with the identifier client-ID
.
TopicSubscription subscription = Builders.topicSubscription()
.topicFilter("new/topic")
.qos(Qos.AT_LEAST_ONCE)
.build();
Services.subscriptionStore().addSubscription("client-ID", subscription);
When a subscription for a client is added via the extension system or the HiveMQ Control Center, the retained message for the topics that are included in the subscription are not published to the client. |
Modify Retained Messages
In the extension SDK, the RetainedMessageStore
can set, remove, and fetch retained messages.
For this example, a retained message is added (or replaced) for the topic retain/topic
with QoS 0 and the payload content
.
RetainedPublish publish = Builders.retainedPublish()
.topic("retain/topic")
.payload(Charset.forName("UTF-8").encode("content"))
.qos(Qos.AT_MOST_ONCE)
.build();
Services.retainedMessageStore().addOrReplace(publish);
Unlike the retained messages that clients send, retained messages that are added with the Retained Message Store are not published to active subscribers. |
Cluster Discovery
The HiveMQ cluster discovery feature can utilize the custom discovery mechanisms that an extension provides. This functionality enables deep integration of a HiveMQ cluster into an existing infrastructure.
Services.clusterService().addDiscoveryCallback(new MyClusterDiscovery());
public class MyClusterDiscovery implements ClusterDiscoveryCallback {
//this is a placeholder for an implementation of your custom external service
MyExternalService externalService = new MyExternalService(); (1)
@Override
public void init(ClusterDiscoveryInput clusterDiscoveryInput, ClusterDiscoveryOutput clusterDiscoveryOutput) {
externalService.registerNode(clusterDiscoveryInput.getOwnAddress()); (2)
clusterDiscoveryOutput.provideCurrentNodes(externalService.getAllClusterNodes());
}
@Override
public void reload(ClusterDiscoveryInput clusterDiscoveryInput, ClusterDiscoveryOutput clusterDiscoveryOutput) {
clusterDiscoveryOutput.provideCurrentNodes(externalService.getAllClusterNodes()); (3)
}
@Override
public void destroy(ClusterDiscoveryInput clusterDiscoveryInput) {
externalService.removeNode(clusterDiscoveryInput.getOwnAddress()); (4)
}
}
1 | This class is a placeholder for a custom external service. |
2 | When this cluster node starts, the node registers with the external service. |
3 | The reload method is called regularly and a list of addresses for all cluster nodes is provided to the output object. You can configure the reload interval. |
4 | When this cluster node stops, the node is unregistered with the external service. |
If you want to use cluster discovery from an extension, the cluster configuration of HiveMQ must be set to extension .
For more information, see configuration example.
|
Metrics
HiveMQ extensions can provide their own metrics and access HiveMQ metrics.
Metrics in HiveMQ extensions are handled by the widely-used and well-known Dropwizard Metrics library.
Counter currentConnectionsCounter = Services.metricRegistry().counter("com.hivemq.networking.connections.current");
log.info("currently connected {}", currentConnectionsCounter.getCount());
final Counter customCounter = Services.metricRegistry().counter("demo.helloworld.started");
customCounter.inc();
Many types of metrics are available. Metric types include: Gauges, Counters, Meters, Histograms, and Timers. |