Deprecated Services

The following services are deprecated since 3.1. Most of these services are replaced by a Blocking/Async version of the original service. Shared subscriptions can now be created with the Subscription Store, due to the changes to the shared subscription concept in 3.1. See the HiveMQ documentation for more information on shared subscriptions.

Table 1. Deprecated HiveMQ Services
Name Available since Description

Retained Message Store Service

1.5

Gives access to retained messages and lets plugins read, create, delete and modify retained messages

Subscription Store Service

1.5

Gives access to subscriptions and lets plugins read, create, delete and modify subscriptions

Client Service

1.5

Gives access to information about connected clients and clients with a persistent session

Metric Service

3.0

Allows to inspect and add custom HiveMQ metrics at runtime.

Shared Subscription Service

3.0

Allows to add and remove shared subscriptions at runtime.

Retained Message Store Service

This is the deprecated version of the retained message store service. It is recommended to use the updated version which can be found in Services.

The Retained Message Store enables HiveMQ plugins to directly add and remove retained messages from HiveMQs retained message store. It’s also possible to retrieve a certain retained message or the amount of messages that are currently stored by HiveMQ.

API

Retained Message Store API
           /**
            * @return all retained messages which are currently stored
            */
           public Set<RetainedMessage> getRetainedMessages();

           /**
            * @param topic a topic
            * @return retained message for the specific topic, otherwise an Optional
            *         instance with an empty reference
            */
           public Optional<RetainedMessage> getRetainedMessage(String topic);

           /**
            * Removes the retained message from given topic. null values are ignored.
            * If there isn't any retained message on the topic yet, nothing will happen.
            *
            * @param topic from which the message should be removed
            */
           public void remove(String topic);

           /**
            * Removes a given retained message. null values are ignored.
            * If the given retained message doesn't exist, nothing will happen.
            *
            * @param retainedMessage which should be removed
            */
           public void remove(RetainedMessage retainedMessage);

           /**
            * Removes all retained messages from the message store.
            */
           public void clear();

           /**
            * This method adds or replaces a retained message
            *
            * @param retainedMessage which should be added or replaced
            */
           public void addOrReplace(RetainedMessage retainedMessage);

           /**
            * Checks if the retained message is present in the retained message store.
            * Only the topic is of a retained message is considered for this check, QoS and message are ignored.
            *
            * @param retainedMessage to check if it's already in the message store
            * @return true if there's already a message on the topic of the given retained message
            */
           public boolean contains(RetainedMessage retainedMessage);

           /**
            * @return the number of all retained messages
            */
           public int size();

Example Usage

This section discusses common examples on how to use the Retained Message Store in your own plugins.

Inject the Message Store
import com.hivemq.spi.services.RetainedMessageStore;

import javax.inject.Inject;

public class MyPlugin {

    private final RetainedMessageStore retainedMessageStore;

    @Inject
    public MyPlugin(final RetainedMessageStore retainedMessageStore) { (1)
        this.retainedMessageStore = retainedMessageStore;
    }
}
1 The message store implementation is injected by the HiveMQ core.


Programmatically add a new Retained Message
    public void addRetainedMessage(String topic, String message){

        if(!retainedMessageStore.contains(topic)) (1)
            retainedMessageStore.addOrReplace(new RetainedMessage
                                 (topic, message.getBytes(), QoS.valueOf(1)));(2)
    }
1 Check if there is currently a retained message for a certain topic.
2 Add a retained message to the topic if there is none.


Clear the whole Retained Message Store
    public void cleanUp(){

        if(retainedMessageStore.size() >= TOO_MANY) (1)
            retainedMessageStore.clear(); (2)
    }
1 Get the amount of retained messages that are currently stored and check if there are too many retained messages.
2 Delete all retained messages.


Read and remove specific retained messages.
    public void logAndDeleteRetainedMessage(String topic){
        Optional<RetainedMessage> rm = retainedMessageStore.getRetainedMessage(topic); (1)

        if(rm.isPresent()){
            logger.info(new String(rm.get().getMessage()));
            retainedMessageStore.remove(rm.get()); (2)
        }
    }
1 Get the retained message for a certain topic if there is any.
2 Remove the message.

Gotchas

  • When calling the getRetainedMessage method with a topic for which no retained message is stored, you will receive a Optional.absent() object instead of null. See the com.google.common.base.Optional documentation for further information about the Optional class.

Subscription Store Service

This is the deprecated version of the subscription store service. It is recommended to use the updated version which can be found in Services.

The Subscription Store is designed to programmatically interact with MQTT client subscriptions. The Subscription Store service provides methods to get, add or remove subscriptions and allows to find out which topics a certain subscriber has subscribed to. It’s also possible with the Subscription Store to find out which clients subscribed to a certain topic.

The Subscription Store allows to query for subscription information on all cluster nodes or just the local HiveMQ node.

There are separate "local" methods available in case the broker instance might be connected to a cluster and you want to gather information about this instance only. The return value of the local methods us not wrapped in a ListenableFuture. Local and cluster methods will return the same result, if the broker instance is not part of a cluster.

API

    /**
     * This method returns all subscriptions on this HiveMQ Node as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
     * You won't receive subscriptions of connected
     * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
     * <p/>
     * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
     * negative performance effects.
     * <p/>
     * The returned Multimap is read-only and must not be modified.
     *
     * @return a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
     */
    @ReadOnly
    Multimap<String, Topic> getLocalSubscriptions();

    /**
     * Returns all MQTT client subscriber identifiers for a given topic, for this HiveMQ instance.
     * MQTT Wildcards are allowed.
     * <p/>
     * Don't pass <code>null</code> as topic. This method is lenient, so
     * it will just return an empty Set.
     * <p/>
     * The returned Set is read-only and must not be modified.
     *
     * @param topic the topic
     * @return client identifiers of all subscribers that subscribed to the topic
     */
    @ReadOnly
    Set<String> getLocalSubscribers(@NotNull String topic);

    /**
     * Returns all topics a client is subscribed to, on this HiveMQ instance.
     * <p/>
     * If the client does not exist, an empty Set is returned.
     * <p/>
     * Don't pass <code>null</code> as clientId. This method is lenient, so
     * it will just return an empty Set.
     * <p/>
     * The returned Set is read-only and must not be modified.
     *
     * @param clientID of the client
     * @return all topics the client subscribed to
     */
    @ReadOnly
    Set<Topic> getLocalTopics(@NotNull String clientID);

    /**
     * This method adds a subscription for a certain client to a certain topic.
     * If HiveMQ is connected to a cluster, the subscription will be broadcast to all other Cluster Nodes.
     * <p/>
     * This method is lenient, so if the clientId or the topic
     * is <code>null</code>, nothing will happen.
     *
     * @param clientID client, which should be subscribed
     * @param topic    topic to which the client should be subscribed
     * @return A {@link com.google.common.util.concurrent.ListenableFuture} object that will succeed,
     * as soon es the subscription was added by all Cluster Nodes.
     */
    ListenableFuture<Void> addSubscription(@NotNull String clientID, @NotNull Topic topic);

    /**
     * This method removes a subscription for a certain client and a certain topic.
     * If HiveMQ is connected to a cluster, the subscription will be removed by other Cluster Nodes as well.
     *
     * @param clientID client, which should get unsubscribed
     * @param topic    topic from which the client should get unsubscribed
     * @return A {@link com.google.common.util.concurrent.ListenableFuture} object that will succeed,
     * as soon es the subscription was removed by all Cluster Nodes.
     */
    ListenableFuture<Void> removeSubscription(@NotNull String clientID, @NotNull String topic);

    /**
     * This method returns all subscriptions this HiveMQ instance and all other nodes in a HiveMQ cluster,
     * as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
     * <p/>
     * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
     * negative performance effects.
     * <p/>
     * The returned Multimap is read-only and must not be modified.
     *
     * @return a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
     */
    @ReadOnly
    ListenableFuture<Multimap<String, Topic>> getSubscriptions();

    /**
     * Returns all MQTT client subscriber identifiers for a given topic, this HiveMQ instance and all other nodes in a HiveMQ cluster.
     * MQTT Wildcards are allowed.
     * <p/>
     * Don't pass <code>null</code> as topic. This method is lenient, so
     * it will just return an empty Set.
     * <p/>
     * The returned Set is read-only and must not be modified.
     *
     * @param topic the topic
     * @return client identifiers of all subscribers that subscribed to the topic
     */
    @ReadOnly
    ListenableFuture<Set<String>> getSubscribers(@NotNull String topic);

    /**
     * Returns all topics a client is subscribed to, on this HiveMQ instance and all other nodes in a HiveMQ cluster.
     * <p/>
     * If the client does not exist, an empty Set is returned.
     * <p/>
     * Don't pass <code>null</code> as clientId. This method is lenient, so
     * it will just return an empty Set.
     * <p/>
     * The returned Set is read-only and must not be modified.
     *
     * @param clientID of the client
     * @return all topics the client subscribed to
     */
    @ReadOnly
    ListenableFuture<Set<Topic>> getTopics(@NotNull String clientID);
All ListenableFuture methods will return immediately. Please use callbacks or use the get() method to wait for the result.

Example Usage

This section discusses common examples on how to use the Subscription Store in your own plugins.

Inject the Subscription Store
import com.hivemq.spi.services.SubscriptionStore;

import javax.inject.Inject;

public class MyPlugin {

    private final SubscriptionStore subscriptionStore;

    @Inject
    public MyPlugin(final SubscriptionStore subscriptionStore) { (1)
        this.subscriptionStore = subscriptionStore;
    }
}
1 The subscription store implementation is injected by the HiveMQ core.


Remove unwanted subscriptions
public void removeTopicSubscriptions(final Topic topicToRemove) {

    Futures.addCallback(subscriptionStore.getSubscriptions(), new FutureCallback<Multimap<String, Topic>>() { (1)

        @Override
        public void onSuccess(Multimap<String, Topic> subscriptions) { (2)(3)

            for (String clientId : subscriptions.keySet()) { (4)

                if (subscriptions.get(clientId).contains(topicToRemove)) { (5)
                    subscriptionStore.removeSubscription(clientId, topicToRemove.getTopic());
                }
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}
1 The getSubscriptions method returns a ListenableFuture.
2 The callback succeeds, as soon as the subscriptions hab been collected.
3 Get a Multimap with client identifiers as keys and topic subscriptions as value.
4 Iterate all client identifiers.
5 Find all clients that subscribed to a certain topic and remove the subscriptions.


Setup a default Subscription
public class ConnackSend implements OnConnackSend {

    private final static String DEFAULT_TOPIC = "default/topic";
    private final SubscriptionStore subscriptionStore;

    @Inject
    public ConnackSend(SubscriptionStore subscriptionStore, ClientService clientService) {
        this.subscriptionStore = subscriptionStore;
    }

    @Override
    public void onConnackSend(CONNACK connack, ClientData clientData) { (1)
        subscriptionStore.addSubscription(clientData.getClientId(), new Topic(DEFAULT_TOPIC, QoS.valueOf(1))); (2)
    }
}
1 Add a subscription as soon as a client connected successfully.
2 As soon as a subscription is added, the MQTT client will receive all messages published to the topic.


List Subscribers for a certain topic
private void logSubscribers(String topic) {

    Set<String> subscribers = subscriptionStore.getLocalSubscribers(topic); (1)
    for (String clientId : subscribers) {
        logger.info(clientId);
    }
}
1 Get all subscribers for a certain topic, on this broker instance only.

Gotchas

  • The Multimap class is part of the com.google.common.collect API. See the documentation for further information.

  • Topic class objects are equal if their topics are equal, independent of their quality of service (QoS) levels.

  • Subscriptions added in the OnConnectCallback for the connecting clients are removed after the callback execution, if the client is requesting a clean session. Consider using the OnConnackSend callback instead.


Client Service

This is the deprecated version of the client service. It is recommended to use the updated version which can be found in Service.

The Client Service allows plugins to gather information about clients, like whether a client is currently connected or not, its username, client identifier and SSL/TLS certificate, if the client is anonymous or if the client is authenticated.

The Client Service allows to query for client information on all cluster nodes or just the local HiveMQ node.

There are separate "local" methods available in case the broker instance might be connected to a cluster and you want to gather information about this instance only. The return value of the local methods are not wrapped in a ListenableFuture. In case the HiveMQ instance is not connected to a cluster, both methods will return the same results.

API

    /**
     * Returns all identifiers of connected clients of this HiveMQ Node. You won't receive client identifiers of connected
     * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
     *

     * If you have many client connections, please not that calling this method frequently could have negative performance
     * effects.
     *
     * @return client identifiers of all connected clients
     */
    Set<string> getLocalConnectedClients();

    /**
     * Returns all disconnected clients which have a persistent MQTT session on this instance of HiveMQ (MQTT clean session=false).
     * </string>

     * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
     *
     * @return all disconnected clients with a persistent MQTT session
     */
    Set<string> getLocalDisconnectedClients();

    /**
     * Check if a client with a given identifier is currently connected to this HiveMQ instance.
     *
     * @param clientId client, which should be checked
     * @return true, if a certain client is currently connected and false otherwise
     */
    boolean isClientConnectedLocal(String clientId);

    /**
     * Returns client information for clients that are connected to this broker instance.
     * </string>

     * If the client isn't connected, you will receive an {@link Optional} with absent data.
     *
     * @param clientId the client identifier of the client
     * @return {@link ClientData} for a specific client.
     */
    Optional<clientdata> getLocalClientDataForClientId(String clientId);

    /**
     * Returns all identifiers of connected clients of this HiveMQ instance and all other nodes in a HiveMQ cluster
     * </clientdata>

     * Calling this method frequently in a clustered environment could have negative performance effects.
     *
     * @return client identifiers of all connected clients
     */
    ListenableFuture<set<string>> getConnectedClients();

    /**
     * Returns all disconnected clients which have a persistent MQTT session on this broker or any other cluster node.
     * </set<string>

     * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
     *
     * @return all disconnected clients with a persistent MQTT session
     */
    ListenableFuture<set<string>> getDisconnectedClients();

    /**
     * Check if a client with a given identifier is currently connected to this HiveMQ broker instance or any other instance in the cluster.
     *
     * @param clientId client, which should be checked
     * @return true, if a certain client is currently connected and false otherwise
     */
    ListenableFuture<boolean> isClientConnected(final String clientId);

    /**
     * Returns additional client information about a given client with a given client identifier.
     * </boolean></set<string>

     * This method will also get client information from other cluster nodes if needed.
     *

     * If the client isn't connected, you will receive an {@link Optional} with absent data.
     *
     * @param clientId the client identifier of the client
     * @return {@link ClientData} for a specific client.
     */
    ListenableFuture<optional<clientdata>> getClientDataForClientId(final String clientId);

Example Usage

This section discusses common examples on how to use the Client Service in your own plugins.

Inject the Client Server
import com.hivemq.spi.services.ClientService;

import javax.inject.Inject;

public class MyPlugin {

    private final ClientService clientService;

    @Inject
    public MyPlugin(final ClientService clientService) { (1)
        this.clientService = clientService;
    }
}
1 The client service implementation is injected by the HiveMQ core.


Log Client Information
private void logClientData(String clientId) {

    Futures.addCallback(clientService.getClientDataForClientId(clientId), new FutureCallback<optional<clientdata>>() {

        @Override
        public void onSuccess(Optional<clientdata> result) {

            if (result.isPresent()) {
                ClientData clientData = result.get();
                logger.info(clientData.getUsername() + " " + clientData.getClientId() + " " + clientData.isAuthenticated());
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}
List Connected Clients
private void logConnectedClients() {
    Futures.addCallback(clientService.getConnectedClients(), new FutureCallback<set<string>>() {
        @Override
        public void onSuccess(Set<string> connectedClients) {
            for (String connectedClient : connectedClients) {
                logger.info("client {} is currently connected.", connectedClient);
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}

Gotchas

  • If a MQTT client is connected with cleanSession=false, all information about the client will be lost as soon as the client disconnects. This means the client will not be considered as "disconnected client" for the getDisconnectedClients method.

Metric Service

This is the deprecated version of the metric service. It is recommended to use the updated version which can be found in Services.

HiveMQ 3 introduced a new, holistic monitoring model. While HiveMQ 2 recalculated metrics at specific points of time, HiveMQ 3 metrics are always current and accurate.

HiveMQ uses the de-facto standard Java metrics library Dropwizard Metrics, so plugin developers can benefit from the whole ecosystem about that library and writing custom integrations is a breeze.

The MetricService of HiveMQ can be used to gather information about pre-defined HiveMQ metrics or can be used to hook in custom metrics defined by your plugin. There’s no artificial wrapper, so you can use the feature-rich Dropwizard Metrics library directly.

For gathering pre-defined HiveMQ metrics, the constant class com.hivemq.spi.metrics.HiveMQMetrics can be used.

API

    /**
     * Returns a specific HiveMQ metric. If the metric does not exist, this method will return
     * <code>null</code>.
     * <p/>
     * For a list of all available metrics, refer to the {@link com.hivemq.spi.metrics.HiveMQMetrics} constant class.
     *
     * @param metric the metric
     * @param <T>    the metric type
     * @return the metric (if available) or <code>null</code>
     */
    @Nullable
    <T extends Metric> T getHiveMQMetric(HiveMQMetric<T> metric);

    /**
     * Returns the metric registry of HiveMQ.
     *
     * @return the metric registry
     */
    MetricRegistry getMetricRegistry();

Example Usage

Monitor specific metrics
private void logConnectionsHistogram() {
    Histogram connections = metricService.getHiveMQMetric(HiveMQMetrics.CONNECTIONS_OVERALL_MEAN);

    final Snapshot snapshot = connections.getSnapshot();

    logger.info("Mean: {}ms", snapshot.getMean() / 1000 / 1000);
    logger.info("StdDev: {}ms", snapshot.getStdDev() / 1000 / 1000);
    logger.info("Min: {}ms", snapshot.getMin() / 1000 / 1000);
    logger.info("Max: {}ms", snapshot.getMax() / 1000 / 1000);
    logger.info("Median: {}ms", snapshot.getMedian() / 1000 / 1000);
    logger.info("75th percentile: {}ms", snapshot.get75thPercentile() / 1000 / 1000);
    logger.info("95th percentile: {}ms", snapshot.get95thPercentile() / 1000 / 1000);
    logger.info("98th percentile: {}ms", snapshot.get98thPercentile() / 1000 / 1000);
    logger.info("99th percentile: {}ms", snapshot.get99thPercentile() / 1000 / 1000);
    logger.info("999th percentile: {}ms", snapshot.get999thPercentile() / 1000 / 1000);
}
Add custom metrics
private void addCustomMetrics() {

    //Adding different metric types
    final Meter meter = metricService.getMetricRegistry().meter("my-meter");
    final Timer timer = metricService.getMetricRegistry().timer("my-timer");
    final Counter counter = metricService.getMetricRegistry().counter("my-counter");
    final Histogram histogram = metricService.getMetricRegistry().histogram("my-histogram");

    //Remove a metric
    metricService.getMetricRegistry().remove("my-meter");
}

Shared Subscription Service

This service is marked as deprecated.

HiveMQ 3 brings a new concept for load balancing client subscriptions to multiple clients with Shared Subscriptions. The concept is explained in the Official User Guide.

These Shared Subscriptions can be added and removed at runtime.

API

@ThreadSafe
public interface SharedSubscriptionService {

    /**
     * Adds shared subscriptions to the SharedSubscriptionService.
     *

     * Although the contract of this method is to disallow null values, this method is lenient.
     * If ``null`` values are passed, these values are ignored.
     *
     * @param sharedSubscriptions the shared subscriptions to add
     */
    void addSharedSubscriptions(@NotNull final String... sharedSubscriptions);

    /**
     * Returns ``true`` if shared subscriptions are available, ``false`` otherwise
     *
     * @return ``true`` if shared subscriptions are available, ``false`` otherwise
     */
    boolean sharedSubscriptionsAvailable();

    /**
     * Removes a shared subscription from the SharedSubscriptionService.
     *

     * Although the contract of this method is to disallow null values, this method is lenient.
     * If a ``null`` value is passed, the value is ignored.
     *
     * @param sharedSubscription the shared subscription to remove
     */
    void removeSharedSubscription(@NotNull final String sharedSubscription);

    /**
     * Returns all shared subscriptions. The returned List is read-only representation
     * of all shared subscriptions
     *
     * @return a read-only List of all shared subscriptions
     */
    @ReadOnly
    List<string> getSharedSubscriptions();

    /**
     * Returns the number of all shared subscriptions.
     *
     * @return the number of all shared subscriptions
     */
    long getSharedSubscriptionsSize();
}

Example Usage

Inspect all registered Shared Subscriptions
private final SharedSubscriptionService sharedSubscriptionService;

@Inject
public MyCallback(SharedSubscriptionService sharedSubscriptionService){
    this.sharedSubscriptionService = sharedSubscriptionService
}

@PostConstruct
public void init(){
    listSharedSubscriptions();
}

public void listSharedSubscriptions(){
    final List<string> sharedSubscriptions = sharedSubscriptionService.getSharedSubscriptions();
    log.info("The following shared subscriptions were registered:", sharedSubscriptions.toString());
}
Adding Shared Subscriptions
public void addSharedSubscriptions(){
   sharedSubscriptionService.addSharedSubscriptions("first/shared", "second/shared");
}
Removing Shared Subscriptions
public void removeSharedSubscription(){
    sharedSubscriptionService.removeSharedSubscription("my/shared/subscription");
}