Additional HiveMQ Enterprise Extension SDK Services

The HiveMQ Enterprise Extension SDK adds several powerful services to the extension framework that specifically focus on the additional features the HiveMQ Enterprise and Professional Editions provide.

HiveMQ Enterprise Extension SDK Certification

Access to the HiveMQ Enterprise Extension SDK requires certification from the HiveMQ .

Once certified, you can use the services of the HiveMQ Enterprise Extension SDK to build and implement valid extensions for your Professional or Enterprise version of HiveMQ.

To learn more about HiveMQ Enterprise Extension SDK certification training or request scheduling and pricing information, contact HiveMQ sales.

Consumer Service

The Consumer Service allows you to register and unregister message consumers. Message consumers can be used to react quickly to incoming publishes on a specific set of topics and further process the consumed messages however your business case requires.

The Consumer Service is great fit when you want to write all or specific incoming publish messages to a third party system. For example, the HiveMQ Enterprise Extension for Kafka uses the Consumer Service to transform MQTT publish messages into Kafka records and then write the records into Kafka.

Each consumer must be registered with a unique consumer ID and defined consumer options.

The Consumer Service allows extensions to do the following:

The extension that runs the consumer must be registered on every node in the HiveMQ cluster.

Consumer Options

Consumer options describe the intended use of the consumer.
The definition of Consumer Options is mandatory for each consumer that you want to implement.

Table 1. Available Consumer Options
Option Description

Topic Filters

Sets all the configured topic filters for the selected consumer option.

Queue Limit

Sets the queue limit of the consumer per topic filter. The default value is 500000.

Example consumer options
final ConsumerOptions consumerOptions = EnterpriseBuilders.consumerOptions()
        .topicFilters("+/IN/#", "+/OUT/#")
        .queueLimit(1_000_000).build();

Create a Consumer

A message consumer is called whenever the broker receives an incoming publish messages that matches the topic filters for which the consumer is registered. When you use the Message Consumer Interface, only the method for consuming messages must be implemented. To avoid blocking the consumer, we recommend the use of an async wrapper.

This example show how to create a simple message consumer.

Example to create a message consumer
    public class MySimpleConsumer implements MessageConsumer {
        @Override
        public void consume(@NotNull ConsumerInput consumerInput, @NotNull ConsumerOutput consumerOutput) {
            final PublishPacket publishPacket = consumerInput.getPublish();
            final Async<ConsumerOutput> async = consumerOutput.async(Duration.ofSeconds(2));
            Services.extensionExecutorService().submit(() -> doSomethingWith(publishPacket, async));
        }
        private void doSomethingWith(final @NotNull PublishPacket publishPacket, final @NotNull Async<ConsumerOutput> async) {
            try {
                //your business logic
            } catch (Exception any) {
                async.getOutput().cancelConsumption();
            } finally {
                async.resume();
            }
        }
    }

ConsumerOutput marks this operation as async.

  • Always call Async.resume(), regardless of whether an operation is successful or unsuccessful.

  • If the timeout expires before Async.resume() is called, then the outcome is handled as failed.

  • Do not call this method more than once. If an async method is called multiple times an exception is thrown.

CancelConsumption() indicates a PUBLISH message was not consumed. Use this method to cancel unprocessed messages when you shut down your consumer.
The QoS of the canceled message determines the exact processing of the message:

  • Cancelled messages with QoS 0 (at most once) are discarded.

  • Cancelled messages with QoS 1 (at least once) or QoS 2 (exactly once) are offered again later to the same consumer (determined by consumer ID) on the current node.

Make sure that your business logic prevents endless retry loops for messages that fail and get cancelled.

Access the Consumer Service

EnterpriseServices.consumerService()

Register a Consumer with Options to HiveMQ

Before you work with a consumer, the consumer must be registered. Consumer registration is ideally done at the start of the extension. To register a consumer with options to HiveMQ, you need the unique ID of the consumer and the associated consumer options definition.

This example shows a consumer registration with consumer options.

EnterpriseServices.consumerService()
  .registerConsumer(consumerId, consumerOptions, new MessageConsumerProvider() {
      @Override
      public @NotNull MessageConsumer get() {
          return myConsumer;
      }
  });

The Message Consumer interface contains an init method that can be optionally overwritten.

When you register your consumer implementation, HiveMQ calls the init method with the defined consumer options as the parameters.

This example shows the init method with the consumer options that where provided when the consumer was registered, and the custom code that is needed for the specific consumer.

public void init(final @NotNull ConsumerOptions consumerOptions) {
   log.info("Initialize myConsumer");
}
Full Example Code
final ConsumerOptions consumerOptions = EnterpriseBuilders.consumerOptions().topicFilters("+/STAT").build();
final String consumerId = "myConsumerId";
final MessageConsumer myConsumer = new MySimpleConsumer();

final CompletableFuture<Void> registerFuture =
       EnterpriseServices.consumerService()
               .registerConsumer(consumerId, consumerOptions, new MessageConsumerProvider() {
                   @Override
                   public @NotNull MessageConsumer get() {
                       return myConsumer;
                   }
               });

registerFuture.whenComplete((aVoid, throwable) -> {
   if (throwable == null) {
       log.info("Consumer {} started.", consumerId);
   } else {
       log.warn("Not able to start  Consumer \"{}\", reason:", throwable.getMessage());
   }
});
The HiveMQ consumer API is built for high performance that allows asynchronous and simultaneous thread handling. To ensure consistency, the business logic you implement must be thread-safe.

Remove a Consumer from HiveMQ

It is best practice to unregister and remove consumers that you no longer plan to use.
Consumer removal is usually done when the extension stops.

This example shows how to remove a consumer with options from HiveMQ.

EnterpriseServices.consumerService().removeConsumer(consumerId);

The Message Consumer interface contains a destroy method that can be optionally overwritten.

When you remove your consumer, HiveMQ calls the destroy method with the defined consumer options. A consumer can be destroyed during runtime and additional code can be added via the destroy method.

This example shows the destroy method with the consumer options that where provided when the consumer was registered, and the custom code the specific consumer needs.

public void destroy(final @NotNull ConsumerOptions consumerOptions) {
    log.info("Destroy myConsumer");
}

Get All Consumers

If your extension uses multiple consumers, the ability to retrieve a map of all consumers that are registered to the extension can be very helpful. The key-value pairs that the method returns show the consumer ID and the modifiable consumer options for each consumer.

This example shows how to get a map of all consumers registered to the extension:

final @NotNull Map<String, @NotNull ModifiableConsumerOptions> myConsumers = EnterpriseServices.consumerService().getConsumers();

Get Options for a Specific Consumer

This example shows how to get the options for a specific consumer.

EnterpriseServices.consumerService().getConsumerOptions(“myConsumerId”);
Consumer options can be modified. This ability can be very helpful when a topic structure must be changed during runtime.
Example Code
final @NotNull Optional<ModifiableConsumerOptions> options = EnterpriseServices.consumerService().getConsumerOptions(consumerId);
if( options.isPresent()) {
   options.get().removeTopicFilter("+/STATUS");
   options.get().addTopicFilter("+/ERROR");
}

Session Attribute Store

This service allows an extension to manage the session attributes of clients that have the same lifecycle as the existing MQTT client session of your HiveMQ broker.

The Session Attribute Store manages the sessions of clients that have an existing session. The service cannot be used to add a client session.

The Sessions Attribute Store allows extensions to do the following:

You can also view the session attributes of a client on the client detail pages in your HiveMQ Control Center.
For more information, see Control Center Session Attributes.

Access the Session Attribute Store Interface

EnterpriseServices.sessionAttributeStore()
The earliest point that you can use the session attribute store for the client is the InitializerInput callback. This method is called when a client connects to a new or existing session. The method is also called for online clients when the extension starts.

Get All Session Attributes for a Client

This example shows how to retrieve all session attributes for a client with a specific client ID.

final @NotNull CompletableFuture<Map<String, ByteBuffer>> attributes = EnterpriseServices.sessionAttributeStore().getAll(clientId);
attributes.whenComplete( (aMap, throwable) -> {
   if (throwable == null) {
       log.info("Attributes found", aMap.keySet());
       checkAttributes(aMap);
   } else {
       log.warn("Exception with reason:", throwable.getMessage());
   }
});

Clear All Session Attributes for a Client

This example shows how to remove all session attributes from a specific client.

final String clientId = initializerInput.getClientInformation().getClientId();
EnterpriseServices.sessionAttributeStore().clear(clientId);

Add a Session Attribute to a Client

This example shows how to add a session attribute to a specific client.

EnterpriseServices.sessionAttributeStore().put(clientId, "myAttributeKey",
    ByteBuffer.wrap("myAttributeValue".getBytes(StandardCharsets.UTF_8)));
The session attribute that you set for the client must contain a key and a value. The maximum key length is 65535 characters. The maximum value size is 5 MB. Null values are not permitted.

Get a Specific Session Attribute for a Client

This example shows how to retrieve a specific session attribute of a specific client.

final @NotNull CompletableFuture<Optional<ByteBuffer>> myAttributeValue = EnterpriseServices.sessionAttributeStore().get(clientId, "myAttributeKey");
myAttributeValue.whenComplete( (aBuffer, throwable) -> {
   if (throwable == null) {
       if (aBuffer.isPresent()) {
          log.info("Attribute found", getValueAsStringFrom(aBuffer.get()));
       } else {
          log.info("Attribute not found");
       }
   } else {
       log.warn("Exception with reason:", throwable.getMessage());
   }
});
To get the desired session attribute from the Session Attribute Store, you must have the attribute key and client ID.

Remove a Specific Session Attribute from a Client

This example shows how to remove a specific session attribute of a specific client.

EnterpriseServices.sessionAttributeStore().remove(clientId,"myAttributeKey");
To remove the desired session attribute from the Session Attribute Store, you must have the attribute key and client ID.

Extension Messaging Service

The Extension Messaging Service makes it possible to send non-MQTT messages through the cluster and is intended for internal cluster traffic/communication between the extensions that run on the HiveMQ instances in your cluster.

The service is helpful when your use case requires the exchange of client information or data that is distributed over the cluster to be fully available on all nodes.

The ExtensionMessagingService allows extensions to do the following:

Define Extension Messaging Broadcast Options

The BroadcastMode of the Extension Messaging Service allows you to configure to which nodes information is sent.
The following options are available:

  • ALL: Sends information to all nodes in the cluster and includes the originating node.

  • OTHER: Sends information to the other nodes in the cluster and omits the originating node.

This example shows how to set the options to send information to all nodes in a cluster, omitting the originating node.

EnterpriseBuilders.extensionMessageOptions().mode(BroadcastMode.OTHERS).build()

Register Message Response to Receive Messages for a Specific Identifier

This example shows how to register to receive responses for a specific identifier to receive messages sent for the identifier with the Extension Messaging Service.

EnterpriseServices.extensionMessagingService()
    .register(MY_MESSAGE_ID, new MySimpleMessagingService.MySimpleRespondCallback());
When multiple extensions register a response callback for the same ID, the extension with the highest priority overrides any previous registration. Every extensionMessageCallback is removed after extension stop.

Send Messages for a Specific Identifier

This example shows how to send a message with ExtensionMessageOptions for a specific identifier with the Extension Messaging Service.
Every registered ExtensionMessageCallback with the specific identifier receives this message and can respond with a reply message.

The method returns a list of completable futures that contain the reply messages from all callbacks registered with the same identifier.

final List<CompletableFuture<ExtensionMessageResponse>> responseList =
    EnterpriseServices.extensionMessagingService.send(MY_MESSAGE_ID,
                                                        new byte[0],
                                                        extensionMessageOptions);
The completable futures of this method can fail throw an exception for the following reasons:
- A message is sent to a cluster node that runs a HiveMQ version older than 4.1.0.
- A message is sent to a cluster node that does not have a callback registered for the selected identifier.
- A message is sent to a cluster node that is currently not reachable.

Respond to Received Messages

This example shows how to complete message communication with a response.

static class MyRespondCallback implements ExtensionMessageCallback {
    ...

    @Override
    public void receive(@NotNull ExtensionMessage extensionMessage) {
        ...
        extensionMessage.respond(serializedResponseData);
        ...
    }
}
Full Example Code
public class MySimpleMessagingService {
   private static final Logger log = LoggerFactory.getLogger(MySimpleMessagingService.class);
   private static final @NotNull String MY_MESSAGE_ID = "MySimpleMessagingService";
   private final @NotNull ExtensionMessagingService extensionMessagingService;
   private final @NotNull ScheduledExecutorService scheduledExecutorService;

   public MySimpleMessagingService(final @NotNull ExtensionMessagingService extensionMessagingService,
                                   final @NotNull ScheduledExecutorService scheduledExecutorService) {
       this.extensionMessagingService = extensionMessagingService;
       this.scheduledExecutorService = scheduledExecutorService;
   }

   public void start() {
       extensionMessagingService.register(MY_MESSAGE_ID, new MySimpleMessagingService.MySimpleRespondCallback());
       scheduledExecutorService.scheduleAtFixedRate(this::send, 1, 1, TimeUnit.MINUTES);
   }

   public void stop() {
       extensionMessagingService.unregister(MY_MESSAGE_ID);
   }

   private void send() {
       byte[] data =  createDataToSend();
       //Send Message in the cluster and retrieve responses from other nodes
       final List<CompletableFuture<ExtensionMessageResponse>> responseList =
               extensionMessagingService.send(
                       MY_MESSAGE_ID,
                       data,
                       EnterpriseBuilders.extensionMessageOptions().mode(BroadcastMode.OTHERS).build());

       CompletableFuture.allOf(responseList.toArray(new CompletableFuture[]{}))
               .exceptionally(throwable -> null)
               .thenAccept(aVoid -> {
                   for (CompletableFuture<ExtensionMessageResponse> responseCompletableFuture : responseList) {
                       try {
                           final ExtensionMessageResponse response = responseCompletableFuture.get();
                           processDataFromResponse(response.getClusterNodeId(), response.getMessage());
                       } catch (ExecutionException | InterruptedException any) {
                           log.error(" Requesting response of data failed: ", any);
                       }
                   }
               });
   }

   private byte[] createDataToSend() {
       log.info("Create data to Send ");
       return RandomUtils.nextBytes(200);
   }

   private void processDataFromResponse(String clusterNodeId, byte[] message) {
       log.info("got response from {} ", clusterNodeId);
   }

   /**
    * The callback that receives an ExtensionMessage must respond in any case.
    */
   static class MySimpleRespondCallback implements ExtensionMessageCallback {
       boolean success = false;

       @Override
       public void receive(@NotNull ExtensionMessage message) {
           try {
               success = createRespond(message.receive());
           } finally {
               message.respond(new byte[]{(byte) (success ? 1 : 0)});
           }
       }

       private boolean createRespond(byte[] data) {
           return true;
       }
   }
}
When you implement the Extension Messaging Service, you must register a response callback that generates a response for each ExtensionMessage received. It is absolutely necessary for your response callback to respond in all cases to every ExtensionMessage.

Unregister to Stop Receiving Messages for a Specific Identifier

This example shows how to unregister a response to stop receiving messages for a specific identifier through the Extension Messaging Service.

extensionMessagingService.unregister(MY_MESSAGE_ID);

Control Center Service

Use this service to authenticate HiveMQ Control Center users, manage Control Center permissions, or add custom views and notifications to the HiveMQ Control Center.

The Control Center Service allows extensions to do the following:

Access the Control Center Service

EnterpriseServices.controlCenterService()

Add Simple Authentication to the HiveMQ Control Center

The Control Center Service allows extensions to add an authenticator for the HiveMQ Control Center users.

The HiveMQ Control Center can be configured with multiple users who each have a username and password (SHA256 and username prepended salt). For more information, see HiveMQ Control Center User Configuration.

The HiveMQ Enterprise Edition supports Role Based Access Control (RBAC) for Control Center users. RBAC gives you the ability to restrict user permissions and precisely control which users can view, access, and modify data. With RBAC, you can create fine-grained access management for your HiveMQ system.

If no custom permissions are set or you call the .clear() method of ModifiableControlCenterPermissions interface, the user is assigned HIVEMQ_SUPER_ADMIN permission.

This example shows how to add an authenticator to the HiveMQ Control Center.

//Implementation of the authenticator
public class MyControlCenterAuthenticator implements ControlCenterAuthenticator {

    public void onLogin(final @NotNull ControlCenterAuthInput controlCenterAuthInput, final @NotNull ControlCenterAuthOutput controlCenterAuthOutput) {
        final String username = controlCenterAuthInput.getUsername();
        final String password = controlCenterAuthInput.getPassword();
        if (loginAllowed(username, password)) {
            final Set<String> permissionsForUser = fetchPermissions(username);
            controlCenterAuthOutput.getUserPermissions().addAll(permissionsForUser);
            controlCenterAuthOutput.authenticateSuccessfully();
        } else {
            controlCenterAuthOutput.failAuthentication();
        }
    }
}

//usage
EnterpriseServices.controlCenterService().setAuthenticator(new MyControlCenterAuthenticator());

Add Enhanced Authentication to the HiveMQ Control Center

To enable more sophisticated authentication mechanisms, the Control Center Service allows extensions to add a ControlCenterEnhancedAuthenticator for HiveMQ Control Center users.

The onLoginLoad method provides LoginLoadInput and LoginLoadOutput parameters.
An optional onLoginFinished method provides the LoginFinishedInput parameter.
Extension input and extension output principles apply.

The LoginLoadInput parameter contains the following information:

  • The HTTP request from the user

  • The session ID of the control center session

The LoginFinishedInput parameter contains this information:

  • The session ID of the control center session

  • The outcome of the authentication process (success, fail, timeout)

  • The error message, if any error happened during the authentication

The LoginLoadOutput parameter provides four important methods to decide authentication:

  • authenticateSuccessfully(username) finishes the authentication process for the user successfully. The username is determined by the extension logic. The username that is provided here appears as the user who is logged into the HiveMQ Control Center.
    Authenticators of extensions with a lower priority are not called.

  • failAuthentication() finishes the authentication process for the user and prevents the user from using the control center.
    Authenticators of extensions with a lower priority are not called.

  • nextExtensionOrDefault() does not decide the authenticity of the client.
    In this case, authenticators of extensions with a lower priority are called.
    If no further authenticators are present, the authentication fails by default.

  • redirectUser(url, callbackPath, callback) redirects the user to a specified URL where the user can complete further actions such as logging in to the site. Afterward, the user is typically redirected back to the callbackPath and the LoginHttpCallback callback is called.
    For more information, see Use an Enhanced Authenticator to Redirect Control Center Users and LoginHttpCallback.

The following methods set the final result of the authentication process:

  • redirectUser

  • authenticateSuccessfully

  • failAuthentication

After one of these three methods is called, another call to any one of the three methods automatically throws an UnsupportedOperationException.

The LoginLoadOutput parameter also allows access to user permissions and the ability to add or remove permissions: loginLoadOutput.getUserPermissions().add("HIVEMQ_SUPER_ADMIN");

If no custom permissions are set or you call the .clear() method of ModifiableControlCenterPermissions interface, the user is assigned HIVEMQ_SUPER_ADMIN permission.

Additionally, the showLoginComponents() method allows display of custom user interface components on the HiveMQ Control Center login page. This method takes components that are defined in the LoginComponent list and displays them instead of the default GUI components of the control center login page.
For more information, see Custom Graphical User Interface for Enhanced Authentication.

public static class MyEnhancedAuthenticator implements ControlCenterEnhancedAuthenticator {
    @Override
    public void onLoginLoad(@NotNull LoginLoadInput loginLoadInput, @NotNull LoginLoadOutput loginLoadOutput) {
brickhof marked this conversation as resolved.
        // session id of the control center session
        final String sessionId = loginLoadInput.getSessionId();
        // the http request of the login in the control center
        final HttpRequest request = loginLoadInput.getRequest();
        // actions on the output object
        // show ui components on the login page, see the example configuration for more information
        loginLoadOutput.showLoginComponents(List.of());
        // redirect user, see the example configuration for more information
        loginLoadOutput.redirectUser(url, "/callbackPath", (LoginHttpCallback) (input, output) -> {});
        // successfully complete the authentication process
        loginLoadOutput.authenticateSuccessfully("example user");
        // fail authentication process
        loginLoadOutput.failAuthentication();
        // add user permissions for this user
        loginLoadOutput.getUserPermissions().add("SUPER_PERMISSION");
        // skip this extension for the authentication and delegate it to another extension or the default behavior
        loginLoadOutput.nextExtensionOrDefault();
    }
    // this is an optional method that gets called after the authentication finishes
    @Override
    public void onLoginFinished(@NotNull LoginFinishedInput loginFinishedInput) {
        // the session id of the control center user
        final String sessionId = loginFinishedInput.getSessionId();
        System.out.println("User logged in with session id: " + sessionId);
        // check whether there are any error messages, and print error messages that are  present
        if(loginFinishedInput.getErrorMessage().isPresent()){
            System.out.println(loginFinishedInput.getErrorMessage().get());
        }
        // check and react to the outcome of the authentication process
        final LoginFinishedInput.LoginOutcome outcome = loginFinishedInput.getOutcome();
        switch (outcome) {
            case SUCCESS:
                System.out.println("Control center authentication succeeded for session id " + sessionId);
                break;
            case FAIL:
                System.out.println("Control center authentication failed for session id " + sessionId);
                break;
            case TIMEOUT:
                System.out.println("Control center authentication timed out for session id " + sessionId);
                break;
        }
    }
}

Use an Enhanced Authenticator to Redirect Control Center Users

The following example shows how to add an enhanced authenticator to the HiveMQ Control Center and use the authenticator to redirect the user to a different URL:

//implementation of the enhanced authenticator
public static class MyControlCenterEnhancedAuthenticator implements ControlCenterEnhancedAuthenticator {
    @Override
    public void onLoginLoad(@NotNull LoginLoadInput loginLoadInput, @NotNull LoginLoadOutput loginLoadOutput) {
        loginLoadOutput.redirectUser(new URL("http://identityprovider.com/auth?callback_uri=http://control-center.com/myCallback", "/myCallback", new LoginHttpCallback() {
            @Override
            public void onRequest(@NotNull LoginHttpCallbackInput input, @NotNull LoginHttpCallbackOutput output) {
                // this method is called after successful redirection and after the target site subsequently redirects the user to the control center on the specified callback path
                output.authenticateSuccessfully("user");
            }
        });
    }
}
//usage
EnterpriseServices.controlCenterService().setEnhancedAuthenticator(new MyControlCenterEnhancedAuthenticator());

After the redirection to another site is finished and the other site has subsequently redirected the user back to the HiveMQ Control Center on the callbackPath specified in the redirectUser method, the ControlCenterService calls the LoginHttpCallback method.

The LoginHttpCallback method must implement the onRequest(LoginHttpCallbackInput, LoginHttpCallbackOutput) method.

The LoginLoadInput parameter provides the following information:

  • The HTTP request from the user

  • The session ID of the control center session

The LoginHttpCallbackOutput offers the same options as the LoginLoadOutput:

  • authenticateSuccessfully(username) finishes the authentication process for the user successfully. The username is determined by the extension logic. The username that is provided here appears as the user who is logged into the HiveMQ Control Center.
    Authenticators of extensions with a lower priority are not called.

  • failAuthentication() finishes the authentication process for the user and prevents the user from using the control center.
    Authenticators of extensions with a lower priority are not called.

  • nextExtensionOrDefault() does not decide the authenticity of the client.
    In this case, authenticators of extensions with a lower priority are called.
    If no further authenticators are present, the authentication fails by default.

  • redirectUser(url, callbackPath, callback) redirects the user to the given URL where the user can complete further actions such as logging in to the site. Afterward, the user is typically redirected back to the callbackPath and the LoginHttpCallback callback is called.
    For more information, see Use an Enhanced Authenticator to Redirect Control Center Users and LoginHttpCallback.

The LoginHttpCallbackOutput parameter also allows access to user permissions and the ability to add or remove permissions: loginLoadOutput.getUserPermissions().add("HIVEMQ_SUPER_ADMIN");

Example implementation of the LoginHttpCallback interface:
public static class MyLoginHttpCallback implements LoginHttpCallback {
    @Override
    public void onRequest(@NotNull LoginHttpCallbackInput input, @NotNull LoginHttpCallbackOutput output) {
        // for simplicity in this example we assume the response request contains a token in the request body that we can authenticate locally
        // if authentication is validated the user will always be given admin rights (we skip role-based access for simplicity)
        final HttpRequest request = input.getRequest();
        final Token token = new Token(request.getBody());
        if (token.isValid()) {
            output.getUserPermissions().add("HIVEMQ_SUPER_ADMIN");
            output.authenticateSuccessfully(token.getUserName());
        } else {
            output.failAuthentication();
        }
        // alternatively you could again redirect to a new url if your authentication has multiple steps
        // output.redirectUser(url, callbackPath, anotherLoginHttpCallback);
    }
}

Custom Graphical User Interface for Enhanced Authentication

If desired, you can expose custom user interface components on your HiveMQ Control Center login page to authenticate or redirect your users.

The GUI components are implemented via the LoginComponent interface, which has two methods. The individual components are defined via the getComponent() method. Cascading Style Sheet (CSS) information for the components can be set with the getCss() method.

The LoginComponentInput parameter provides the session ID by getSessionID().

The LoginLoadOutput parameter provides the same methods to decide authentication as the LoginLoadOutput and LoginHttpCallbackOutput.

public class MyLoginComponent implements LoginComponent {
    @Override
    public @NotNull Component getComponent(@NotNull LoginComponentInput input, @NotNull LoginComponentOutput output) {
        final VerticalLayout loginLayout = new VerticalLayout();
        loginLayout.addStyleName("my-layout");
        final TextField tokenField = new TextField("Insert code2 here");
        tokenField.setIcon(FontAwesome.USER_SECRET);
        tokenField.setWidth(100, Sizeable.Unit.PERCENTAGE);
        final Button button = new Button("Login!");
        button.addClickListener((Button.ClickListener) event -> {
            if (tokenField.getValue().equals("code2")) {
                output.authenticateSuccessfully("example user");
            } else {
                output.failAuthentication();
            }
        });
        loginLayout.addComponent(tokenField);
        loginLayout.addComponent(button);
        return loginLayout;
    }
    @Override
    public @Nullable String getCss() {
        return null;
    }
}
public class ComponentsCCEnhancedAuthenticator implements ControlCenterEnhancedAuthenticator {
    private final @NotNull MyLoginComponent myLoginComponent;
    ComponentsCCEnhancedAuthenticator() {
        this.myLoginComponent = new MyLoginComponent();
    }
    @Override
    public void onLoginLoad(@NotNull LoginLoadInput loginLoadInput, @NotNull LoginLoadOutput loginLoadOutput) {
        loginLoadInput.getRequest();
        loginLoadOutput.showLoginComponents(List.of(myLoginComponent));
    }
}
View details must be implemented with the use of Vaadin libraries and CSS. Vaadin is an open-source platform for web application development.

View and Add Control Center Permissions

This example shows how to view and add HiveMQ Control Center permissions.

final @NotNull ControlCenterPermission permission =
EnterpriseBuilders.controlCenterPermission()
.id(MY_DASHBOARD_VIEW)
.displayName("View My Dashboard")
.description("View My Dashboard Permission")
.group("CUSTOM").build();

EnterpriseServices.controlCenterService().addPermission(permission);

//Use permission by setting in ControlCenterAuthenticator Output for the logged-in user

controlCenterAuthOutput.getUserPermissions().add(MY_DASHBOARD_VIEW);

Add Custom Control Center Views

The Control Center Service allows extensions to add single extension views or views with subviews to the HiveMQ Control Center.

This example adds a new view to the HiveMQ Control Center.

EnterpriseServices.controlCenterService().addView(new MyExtensionView(myViewDataProvider);
View details must be implemented with use of Vaadin libraries and CSS. Vaadin is an open-source platform for web application development.

Create a Custom Extension View for the HiveMQ Control Center

Implementation of your custom extension view must include the following:

  • Your extension icon. The default is a plug icon.

  • The title of your extension view

  • The URL that appears for this view in the browser

  • A permission ID to return the needed Permission for this view

  • The view itself provided as a Vaadin View

Based on your custom extension view, these implementation elements are optional:

  • A selected URL that is suitable if your subview does not have a URL

  • A menu title

  • The associated CSS

The view must be created each time, because it will be shown currently on the specific website request. If not it could happen, that the view is not actual and user X is getting the data of the view that User Y has requested.

public class MyDashboardView implements ExtensionView {

    public MyDashboardView() { … }
    @Override
    public @NotNull String getTitle() { return "My Dashboard"; }
    @Override
    public @NotNull String getUrl() { return "MyDashboard"; }
    @Override
    public @Nullable String getCss() { return "VAADIN/myExtension.css";}
    @Override
    public @NotNull View getView() { return new DemoView(); }

    @Nullable
    public String getPermissionId() {  return "MY_DASHBOARD_VIEW"; }

    private class DemoView implements View { … }
}

Add or Remove HiveMQ Control Center Notifications

This example shows how to add or remove notifications from the HiveMQ Control Center.

@NotNull Notification myNotification;
myNotification = new Notification() {
@Override
public @NotNull String getMessage() { return "Hello from myExtension"; }
@Override
public @NotNull NotificationLevel getLevel() { return NotificationLevel.INFO; }
};

EnterpriseServices.controlCenterService().addNotification(myNotification);
EnterpriseServices.controlCenterService().removeNotification(myNotification);

REST Service

The HiveMQ REST Service allows extensions to create accessible HTTP APIs directly within HiveMQ.
Use this service to serve HTTP content directly from HiveMQ, authenticate HiveMQ REST API users with username and password, or set specific permissions per REST API endpoint and method to create fine-grained authentication.

The REST Service allows extensions to do the following:

The JAX-RS resources can be used to interact with HiveMQ by using other services. Interaction with HiveMQ is not necessary, you can also use the internal HTTP server of HiveMQ to avoid setting up an external HTTP server for your existing JAX-RS resources.

Access the HiveMQ REST API Service

EnterpriseServices.restService()

Register a Custom REST API Application

This example shows how to register a custom REST API application with the HiveMQ REST API service.

When you register a custom REST API application with the HiveMQ REST API Service, The base path for all resources is automatically determined by the extension ID: /api/v1/extensions/{extension-id}/.
At most one REST application can be set.

//simple example for rest service usage
Resource r = new Resource("backend");
try {
   EnterpriseServices.restService().setRestApplication(() -> List.of(r));
} catch (FeatureDisabledException disabledException) {
   //ignore
   log.error("REST-API is not enabled in config.xml");
}
The REST API must be enabled in the configuration file of your HiveMQ instance (config.xml).
For more information, see HiveMQ REST API.

Remove a Custom REST API Application

This example shows how to stop and remove a custom REST API application from the HiveMQ REST API service:

EnterpriseServices.restService().removeRestApplication();

Register a Custom REST Authenticator

This example shows how to register a custom REST authenticator with the HiveMQ REST API service:

EnterpriseServices.restService().setAuthenticator(new MyRestAuthenticator());
To add authentication to your REST API application, the HiveMQ REST API and the auth tag in the rest-api section of your HiveMQ configuration (config.xml) must be enabled. For more information, see HiveMQ REST API.

Implement Authentication for the HiveMQ REST API

This example shows the basic steps to implement user authentication for the HiveMQ REST API.

The first step is to implement the RestAuthenticator interface. In the onRequest method, the result of the authentication can be set via the RestAuthOutput object.
In the following simple example, every request is authenticated successfully and the username "myuser" is assigned for future use.

Example very simple REST API authenticator
public class DemoAuthenticator implements RestAuthenticator {
    @Override
    public void onRequest(final @NotNull RestAuthInput restAuthInput, final @NotNull RestAuthOutput restAuthOutput) {
            restAuthOutput.authenticateSuccessfully("myuser");
    }
}

The authenticator must be set via EnterpriseServices.restService().setAuthenticator(). Only one authenticator can be set.

Example to set a custom authenticator for the REST API
public class HelloWorldEnterpriseMain implements ExtensionMain {

    @Override
    public void extensionStart(
            final @NotNull ExtensionStartInput extensionStartInput,
            final @NotNull ExtensionStartOutput extensionStartOutput) {
        //Register the custom REST authenticator
       EnterpriseServices.restService().setAuthenticator(new DemoAuthenticator());
     }
}

The authentication can return one of the following three results:

  • authenticateSuccessfully(username): Indicates successful completion of the user authentication and grants the user access to the REST API.

  • failAuthentication() : Indicates unsuccessful completion of the user authentication and denies the user access to the REST API.

  • nextExtensionOrDefault(): Indicates that the authenticity of the user could not be decided. Authentication is delegated to other extensions with lower priority. If no further extension is available, the default behavior is applied. (not authenticated)

Example authentication results:
public class DemoAuthenticator implements RestAuthenticator {
    @Override
    public void onRequest(final @NotNull RestAuthInput restAuthInput, final @NotNull RestAuthOutput restAuthOutput) {
            restAuthOutput.authenticateSuccessfully("user");
    }
}

public class DemoAuthenticator implements RestAuthenticator {
    @Override
    public void onRequest(final @NotNull RestAuthInput restAuthInput, final @NotNull RestAuthOutput restAuthOutput) {
            restAuthOutput.failAuthentication();
    }
}

public class DemoAuthenticator implements RestAuthenticator {
    @Override
    public void onRequest(final @NotNull RestAuthInput restAuthInput, final @NotNull RestAuthOutput restAuthOutput) {
            restAuthOutput.nextExtensionOrDefault();
    }
}

This example shows the available REST API authenticator inputs:

Example inputs for the REST API authenticator
public class DemoAuthenticator implements RestAuthenticator {

    @Override
    public void onRequest(@NotNull RestAuthInput restAuthInput, @NotNull RestAuthOutput restAuthOutput) {
        // print all inputs:
        log.info("Input: {url:" + restAuthInput.getRequestUrl().toExternalForm() +
                ", http-method: " + restAuthInput.getHttpMethod() +
                ", listener-name:" + restAuthInput.getListener().getName() +
                ", remote-addr:" + restAuthInput.getRemoteAddress() +
                ", remote-port:" + restAuthInput.getRemotePort() +
                ", headers:" + restAuthInput.getHttpHeaders().toString() +
                (restAuthInput.getTlsInformation().isPresent() ? ", tls-info: [protocol:" + restAuthInput.getTlsInformation().get().getProtocol() +
                        ", cipher:" + restAuthInput.getTlsInformation().get().getCipherSuite() + "]" : "none"));


        restAuthOutput.authenticateSuccessfully(username);
    }
}

Asynchronous Authentication for HiveMQ REST API

Authentication and authorization use cases often need to contact databases or other services. Utilizing asynchronous output mechanisms ensures that requests in the extension thread do not block other tasks from being processed. To protect performance, use of a non-blocking async API is highly recommended.

Example asynchronous authentication for the HiveMQ REST API
public class DemoAuthenticator implements RestAuthenticator {

    // only single threaded executor for a simple example
    private final @NotNull Executor executor = Executors.newScheduledThreadPool(1);

    @Override
    public void onRequest(@NotNull RestAuthInput restAuthInput, @NotNull RestAuthOutput restAuthOutput) {
           final Async<RestAuthOutput> async = restAuthOutput.async(Duration.ofSeconds(10));
            executor.execute(() -> {
                // do some heavy lifting, in this case sleep
                Thread.sleep(5000);
                restAuthOutput.authenticateSuccessfully(username)
                // important call resume() to signal the extension system, that you are done
                async.resume();
            });
       }
}

Implement Authorization for a Custom REST API Application

REST API authorization is available for the HiveMQ Enterprise edition, only. To add authorization to your REST API application, the rest-api and auth tags in your HiveMQ configuration (config.xml) must be enabled. When the auth tag is set to true. the default behavior of the HiveMQ REST API requires authentication but does not require authorization/permissions. For more information, see HiveMQ REST API.
Example resource with default REST API behavior
@Path("/demo")
public class DemoResource {

	// resource with default behavior that requires authentication but does not require authorization/permissions
    @Path("/example-default-behavior")
    @GET
    public String noauth() {
        return "This resource requires authentication but does not require authorization";
    }
}

This example shows how to implement authorization with assigned permissions using the RestAuthenticator.

If no permissions are added in the onRequest() method, the default behavior of the REST API is to require all REST API permissions (super user). When you explitily define one or more permissions, only the configured permissions apply for the user/request.
Example very simple authenticator that sets the permissions for all HiveMQ REST API resources
public class DemoAuthenticator implements RestAuthenticator {
    @Override
    public void onRequest(final @NotNull RestAuthInput restAuthInput, final @NotNull RestAuthOutput restAuthOutput) {
                restAuthOutput.getUserPermissions().add("HIVEMQ_MANAGEMENT_BACKUPS_GET");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MANAGEMENT_BACKUPS_BACKUPID_POST");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MANAGEMENT_BACKUPS_BACKUPID_GET");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MANAGEMENT_BACKUPS_BACKUPID_POST");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MANAGEMENT_TRACE_RECORDINGS_GET");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MANAGEMENT_TRACE_RECORDINGS_POST");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MANAGEMENT_TRACE_RECORDINGS_TRACERECORDINGID_PATCH");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MANAGEMENT_TRACE_RECORDINGS_TRACERECORDINGID_DELETE");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MANAGEMENT_FILES_TRACE_RECORDINGS_TRACERECORDINGID_GET");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MANAGEMENT_FILES_BACKUPS_BACKUPID_GET");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MQTT_CLIENTS_CLIENTID_SUBSCRIPTIONS_GET");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MQTT_CLIENTS_CLIENTID_CONNECTION_DELETE");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MQTT_CLIENTS_GET");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MQTT_CLIENTS_CLIENTID_GET");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MQTT_CLIENTS_CLIENTID_DELETE");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MQTT_CLIENTS_CLIENTID_CONNECTION_GET");
                restAuthOutput.getUserPermissions().add("HIVEMQ_MQTT_CLIENTS_CLIENTID_CONNECTION_DELETE");
                restAuthOutput.getUserPermissions().add("CUSTOM_PERMISSION");
                restAuthOutput.authenticateSuccessfully("user");
    }
}

This example defines a simple resource that does not require authentication or authorization.
To achieve the behavior, use @NoAuthenticationRequired annotation.

Example annotation for resources that do not require authentication or authorization
@Path("/demo")
public class DemoResource {

	// resource that does not require authentication and allows access to everyone:
    @Path("/example-no-authentication")
    @GET
    @NoAuthenticationRequired
    public String noauth() {
        return "This resource does not need any authentication or authorization";
    }
}
If you want to implement your own APIs for the REST API, you can customize the authentication behavior and the permissions that are required.

If desired, you can define a custom permission that is needed to access the resource:

Example annotation for resources that require a custom permission
@Path("/demo")
public class DemoResource {

	// resource that requires the  permission "/perm1"
    @Path("/perm1")
    @GET
    @RequiresPermissions("PERMISSION1")
    public String perm1() {
        return "this resource requires successful authentication with the permission \"PERMISSION1\"";
    }
}

For use cases that need additional information to accompany a request (for example, the username), it is possible to inject the SecurityContext as a method argument.

Example annotation for injecting the security context
@Path("/demo")
public class DemoResource {
	// a security context can be injected to use information such as  usernname, http/https, roles and permissions
    @Path("/context")
    @GET
    @Produces("application/json")
    public Map<String, String> secure(@Context SecurityContext securityContext) {
        if (securityContext == null) {
            return null;
        }

        return Map.of(
                "username", securityContext.getUserPrincipal() != null ? securityContext.getUserPrincipal().getName() : "null",
                "secure", securityContext.isSecure() ? "true" : "false",
                "super-admin", securityContext.isUserInRole("HIVEMQ_SUPER_ADMIN") ? "true" : "false",
                "perm1", securityContext.isUserInRole("PERMISSION1") ? "true" : "false",
                "perm2", securityContext.isUserInRole("PERMISSION2") ? "true" : "false"
        );
    }


}

Client Event Service

The Client Event Service allows extensions to do the following:

  • Iterate the events of a specific client in a defined time frame

Before you use the Client Event Service, make sure that the Client Event History feature is enabled in the config.xml file of your HiveMQ instance.
    <client-event-history>
        <enabled>true</enabled>
        <lifetime>604800</lifetime> <!-- 7 days -->
    </client-event-history>

Access the Client Event Service

EnterpriseServices.clientEventService()
Based on the time frame you define, the operation of this method can be expensive in large scale deployments. For example, do not call this method with long time frames (multiple days) in a loop for multiple clients.

Iterate Events for Client

This example shows how to iterate the events of a specified client in a defined time frame.

EnterpriseServices.clientEventService().iterateEventsForClient(clientId, from, to, (context, event) -> {
   switch (event.getType()) {
       case OVERLOAD_PROTECTION_ON: {
           resource.getClientStates().add( OVERLOAD_PROTECTION_ON.toString());
           break;
       }
       case OVERLOAD_PROTECTION_OFF: {
           resource.getClientStates().add( OVERLOAD_PROTECTION_OFF.toString());
           break;
       }
       case DISCONNECT_BY_CLIENT_GRACEFUL:
       case DISCONNECT_BY_CLIENT_UNGRACEFUL:
       case DISCONNECT_BY_SERVER:
       {
           resource.getClientStates().add(event.toString());
           context.abortIteration();
           break;
       }
   }
});

The Client Event Service can identify the following types of events:

  • CONNECT_SUCCEEDED

  • CONNECT_FAILED

  • DISCONNECT_BY_CLIENT_GRACEFUL

  • DISCONNECT_BY_CLIENT_UNGRACEFUL

  • DISCONNECT_BY_SERVER

  • SESSION_REMOVED

  • OVERLOAD_PROTECTION_ON

  • OVERLOAD_PROTECTION_OFF

Publish Attributes

Publish Attributes are key-value pairs that can be attached to a PUBLISH message and are visible throughout the HiveMQ Enterprise Extension SDK. You can use Publish Attributes to store specific metadata per PUBLISH message internally in HiveMQ.

Each Publish Attribute that you set for a PUBLISH message must contain a key and a value. The maximum key length is 1024 characters. The maximum value size is 10 KiB. The total size of all Publish Attributes you add to a single PUBLISH message cannot exceed 1 MB. If the combined size of the Publish Attributes for a message exceeds the 1 MB limit, HiveMQ logs an IllegalArgumentException.

Publish Attributes are helpful if you want to gain insight into the journey of a PUBLISH message through your HiveMQ cluster. For example, add a timestamp to a PUBLISH message.

The information stored in each Publish Attribute is only accessible on your HiveMQ broker. Publish Attributes are not sent to MQTT clients in any form.
To attach information to a PUBLISH message that is visible to MQTT clients, use MQTT 5 User Properties instead.

You can use Publish Attributes in the following ways:

To add or remove a Publish Attribute, you must use the ModifiablePublishAttributes class that is part of the EnterpriseModifiablePublishPacket, EnterpriseModifiableOutboundPublish, and EnterpriseModifiableWillPublish class types.

Access the PUBLISH Attributes Interface

Publish Attributes are part of the PUBLISH packet object that can be accessed from the input and output parameters of various methods. To access the Publish Attributes you must cast the PUBLISH package to the enterprise version of the PUBLISH packet class.

The syntax for the name of the enterprise version of a class is quite simple. To cast to the enterprise version of a class, add Enterprise before the name of the original class.
For example, the enterprise version of ModifiablePublishPacket is EnterpriseModifiablePublishPacket.

This example shows how to access the enterprise version of the ModifiablePublishPacket in the PublishInboundInterceptor:

Services.initializerRegistry().setClientInitializer(((initializerInput, clientContext) -> {
    clientContext.addPublishInboundInterceptor((publishInboundInput, publishInboundOutput) -> {

        // original publish packet class name is "ModifiablePublishPacket"
        final ModifiablePublishPacket originalPacket = publishInboundOutput.getPublishPacket();

        // add "Enterprise" to "ModifiablePublishPacket" to get the class name of the enterprise version
        final EnterpriseModifiablePublishPacket enterprisePacket = (EnterpriseModifiablePublishPacket) originalPacket;

        final ModifiablePublishAttributes publishAttributes = enterprisePacket.getPublishAttributes();
        publishAttributes.put("key", StandardCharsets.UTF_8.encode("value"));
    });
}));

Add a Publish Attribute to a PUBLISH Message

This example shows how to add a Publish Attribute to a PUBLISh message.

Services.initializerRegistry().setClientInitializer(((initializerInput, clientContext) -> {
    clientContext.addPublishInboundInterceptor((publishInboundInput, publishInboundOutput) -> {

        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final EnterpriseModifiablePublishPacket publishPacket = (EnterpriseModifiablePublishPacket) publishInboundOutput.getPublishPacket();

        final ModifiablePublishAttributes publishAttributes = publishPacket.getPublishAttributes();
        publishAttributes.put("inbound-interceptor-attribute", StandardCharsets.UTF_8.encode(clientId));
        publishAttributes.put("inbound-interceptor-attribute-second", StandardCharsets.UTF_8.encode(clientId));
    });
}));
You can also use the EnterprisePublishBuilder, EnterpriseRetainedPublishBuilder, or EnterpriseWillPublishBuilder to add PublishAttributes to a PUBLISH message.

This example shows how to use a HiveMQ Enterprise Extension SDK builder to add a Publish Attribute to a PUBLISH message.

final EnterprisePublish enterprisePublish = EnterpriseBuilders.publish()
        .topic("test/topic")
        .payload(ByteBuffer.wrap("messageWithAttributes".getBytes(StandardCharsets.UTF_8)))
        .publishAttribute("key", ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8)))
        .publishAttribute("key1", ByteBuffer.wrap("value1".getBytes(StandardCharsets.UTF_8)))
        .build();

Remove a Specific Publish Attribute from a PUBLISH Message

This example shows how to remove a specific key and value pair from the Publish Attribute of a PUBLISH message.

final ModifiablePublishAttributes publishAttributes = publishPacket.getPublishAttributes();
final String key = "key-to-remove";
final Optional<@Immutable ByteBuffer> removedValue = publishAttributes.remove(key);

if (removedValue.isPresent()) {
    logger.info("Remove key {} had value {}", key, StandardCharsets.UTF_8.decode(removedValue.get()));
} else {
    logger.info("Removed key {} had no value attached", key);
}

Remove all Publish Attributes from a PUBLISH Message

This example shows how to delete all Publish Attributes that are currently attached to the selected PUBLISH message.

final ModifiablePublishAttributes publishAttributes = publishPacket.getPublishAttributes();

publishAttributes.clear();

Get All Publish Attributes from a PUBLISH Message

This example shows how to retrieve the key value pairs contained in all Publish Attributes of a specific PUBLISH message.

final PublishAttributes publishAttributes = publishPacket.getPublishAttributes();

final Map<String, ByteBuffer> stringByteBufferMap = publishAttributes.asMap();

for (final Map.Entry<String, ByteBuffer> entry : stringByteBufferMap.entrySet()) {
    final String key = entry.getKey();
    final String value = StandardCharsets.UTF_8.decode(entry.getValue()).toString();

    logger.info("The value key {} is {}", key, value);
}

Get the Value for a Specific Key in a Publish Attribute

This example shows how to retrieve the value of a specific key in a selected Publish Attribute.

final String key = "key-to-get-value";
final PublishAttributes publishAttributes = publishPacket.getPublishAttributes();

final Optional<@Immutable ByteBuffer> fetchedValue = publishAttributes.get(key);

if (fetchedValue.isPresent()) {
    logger.info("The value for key {} is {}", key, StandardCharsets.UTF_8.decode(fetchedValue.get()));
} else {
    logger.info("Key {} has no value attached", key);
}

Next Steps

To learn more about the possibilities HiveMQ extensions offer and view code examples for several frequently implemented HiveMQ extension use cases, see Popular HiveMQ Extension Use Cases