Callbacks
A core concept of HiveMQ plugin development is using the callbacks HiveMQ provides to execute custom business logic on events when they occur.
To hook your callback implementations into HiveMQ, you can use the Callback Registry.
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.callback.lowlevel.OnPingCallback;
import com.hivemq.spi.security.ClientData;
import javax.annotation.PostConstruct;
public class TestPlugin extends PluginEntryPoint {
@PostConstruct
public void postConstruct() {
getCallbackRegistry().addCallback(new OnPingCallback() { (1)
@Override
public void onPingReceived(ClientData clientData) {
System.out.println("Ping received");
}
});
}
}
1 | Registering the anonymous inner callback class |
Same priorities on callbacks
If callbacks have the same priority, the execution order of these callbacks is not predictable.
|
Callback Types
Every callback interface which can be implemented is a Asynchronous or Synchronous callback.
The only difference which matters for plugin implementors is, that Synchronous Callbacks have priorities. Priorities define the order of the callback execution when there is more than one implementation of a callback. Use the constants from the com.hivemq.spi.callback.CallbackPriority class or define your own numerical return value. Lower values mean higher priority.
Synchronous callbacks are executed in order because they typically allow to interfere with the message processing/authentication/authorization mechanism of HiveMQ. Naturally, it is very important to prevent blocking in Synchronous Callbacks.
Asynchronous Callbacks are executed in parallel. HiveMQ synchronizes after the parallel execution. That means, that a slow callback can block HiveMQ, although all other callbacks are already finished.
Overview of all Callbacks
Name | Type | Description |
---|---|---|
Broker Event Callbacks |
||
Synchronous |
Called when the broker starts up. This happens before bootstrapping functionality like binding network interfaces. |
|
Synchronous |
Called when the broker is ready. This is after all the bootstrapping is done and after a possible cluster join is tried. |
|
Synchronous |
Called when the broker stops. |
|
MQTT Message Callbacks |
||
Synchronous |
Called when a CONNECT message arrives. |
|
Asynchronous |
Called when a DISCONNECT message arrives or a TCP connection loss occurs. |
|
Synchronous |
Called when a PUBLISH MQTT message arrives. |
|
Asynchronous |
Called when the broker is sending a PUBLISH MQTT message. |
|
Synchronous |
Called when a MQTT SUBSCRIBE message arrives. |
|
Synchronous |
Called for every topic a client wants to subscribe to. |
|
Asynchronous |
Called when a MQTT UNSUBSCRIBE message arrives. |
|
Asynchronous |
Called when the broker is sending a MQTT CONNACK message. |
|
Asynchronous |
Called when a MQTT PINGREQ message arrives. |
|
Asynchronous |
Called when a MQTT PUBACK message arrives. |
|
Asynchronous |
Called when the broker sent a MQTT PUBACK message. |
|
Asynchronous |
Called when a MQTT PUBCOMP message arrives. |
|
Asynchronous |
Called when the broker sent a MQTT PUBCOMP message. |
|
Asynchronous |
Called when a MQTT PUBREC message arrives. |
|
Asynchronous |
Called when the broker sent a MQTT PUBREC message. |
|
Asynchronous |
Called when a MQTT PUBREL message arrives. |
|
Asynchronous |
Called when the broker sent a MQTT PUBREL message. |
|
Synchronous |
Called when a MQTT Session for the connecting client is available. |
|
Asynchronous |
Called when the broker sent a MQTT SUBACK message. |
|
Asynchronous |
Called when the broker sent a MQTT UNSUBACK message. |
|
Synchronous |
Called before the broker sends a MQTT PUBLISH message. |
|
Security Callbacks |
||
Asynchronous |
Called when a client made and successful or unsuccessful login attempt. |
|
Synchronous |
Called after a CONNECT message arrived to check the credentials |
|
Synchronous |
Returns MqttTopicPermissions when a client publishes or subscribes. |
|
Asynchronous |
Gets called when a client was disconnected due to insufficient permissions when publishing or subscribing. |
|
Synchronous |
Gets executed after a CONNECT message arrives and the client was authenticated successfully. |
|
Other Callbacks |
||
Asynchronous |
Gets called periodically by HiveMQ to discover other HiveMQ cluster nodes. |
|
Asynchronous |
Gets executed periodically based on a quartz-style cron configuration. |
|
Synchronous |
Called when a Web UI login occurs. |
OnBrokerStart
Type
Synchronous
Purpose
The com.hivemq.spi.callback.events.broker.OnBrokerStart
callback is useful for implementing custom startup logic and verifications.
A common use case is for example verifying that a database connection can be obtained or that expected files are available and valid.
Interfering with HiveMQ
It is possible to throw a com.hivemq.spi.callback.exception.BrokerUnableToStartException
.
When the onBrokerStart()
method of the OnBrokerStart
callback implementation throws this exception, HiveMQ refuses to start.
import com.google.inject.Inject;
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.broker.OnBrokerStart;
import com.hivemq.spi.callback.exception.BrokerUnableToStartException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveMQStart implements OnBrokerStart {
private static final Logger log = LoggerFactory.getLogger(HiveMQStart.class);
@Override
public void onBrokerStart() throws BrokerUnableToStartException {
log.info("HiveMQ is starting");
if (checkStartupSuccessful()) {
log.info("Startup check was successful");
} else {
throw new BrokerUnableToStartException("Could not start HiveMQ :("); (1)
}
}
@Override
public int priority() {
return CallbackPriority.MEDIUM;
}
}
1 | Throwing the BrokerUnableToStartException prevents HiveMQ from starting. |
OnBrokerStart
Type
Synchronous
Purpose
The com.hivemq.spi.callback.events.broker.OnBrokerReady
callback is useful for all tasks for which OnBrokerStart is to early.
A common use case is for example verifying that the cluster view meets expectations or to add custom subscriptions.
Interfering with HiveMQ
It is possible to throw a com.hivemq.spi.callback.exception.IllegalBrokerStateException
.
When the onBrokerReady()
method of the OnBrokerReady
callback implementation throws this exception, HiveMQ performs
an immediate shutdown. All incoming client connections are suspended until the last registered
com.hivemq.spi.callback.events.broker.OnBrokerReady
callback is successfully executed.
public class BrokerReadyCallback implements OnBrokerReady {
@Override
public void onBrokerReady() throws IllegalBrokerStateException {
if (checkRunningConditions()) {
addNeccessarySubscriptions();
} else {
throw new IllegalBrokerStateException("Cluster is to small"); (1)
}
}
@Override
public int priority() {
return CallbackPriority.MEDIUM;
}
}
1 | Throwing the IllegalBrokerStateException causes HiveMQ to stop. |
OnBrokerStop
Type
Synchronous
Purpose
The com.hivemq.spi.callback.events.broker.OnBrokerStop
callback is useful for implementing custom shutdown logic.
A common use case is for example shutting down a database connection.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.broker.OnBrokerStop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.io.FileOutputStream;
import java.io.IOException;
public class HiveMQStop implements OnBrokerStop {
final Logger log = LoggerFactory.getLogger(HiveMQStop.class);
final FileOutputStream fileOutputStream;
@Inject
public HiveMQStop(final FileOutputStream fileOutputStream) {
this.fileOutputStream = fileOutputStream;
}
@Override
public void onBrokerStop() {
try {
fileOutputStream.close();
} catch (IOException e) {
log.warn("Output stream could not be closed on broker shutdown.", e);
}
}
@Override
public int priority() {
return CallbackPriority.MEDIUM;
}
}
OnConnectCallback
Type
Synchronous
Purpose
The com.hivemq.spi.callback.events.OnConnectCallback
is useful for performing custom logic when a MQTT CONNECT message arrives.
A common use case is for example logging when a client connects.
Useful edge cases fort his callbacks are when you need to verify against some parts of the CONNECT message like the LWT part.
Interfering with HiveMQ
It is possible to throw a com.hivemq.spi.callback.exception.RefusedConnectionException
to disconnect a client with
a specific return code for the CONNACK
message. A CONNECT
message object and a ClientData
object — which contains
information about the credentials and the optional client authentication certificate — are passed as parameters to the onConnect
method.
Although possible for scenarios with only one authentication resource, this callback is not designed to perform authentication, please use the OnAuthenticationCallback for this purpose. |
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnConnectCallback;
import com.hivemq.spi.callback.exception.RefusedConnectionException;
import com.hivemq.spi.message.CONNECT;
import com.hivemq.spi.message.ReturnCode;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientConnect implements OnConnectCallback {
Logger log = LoggerFactory.getLogger(ClientConnect.class);
@Override
public void onConnect(CONNECT connect, ClientData clientData) throws RefusedConnectionException {
if (clientData.getClientId().equals("bad id")) {
throw new RefusedConnectionException(ReturnCode.REFUSED_IDENTIFIER_REJECTED);
}
log.info("Client {} is connecting", clientData.getClientId());
}
@Override
public int priority() {
return CallbackPriority.MEDIUM;
}
}
OnDisconnectCallback
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.events.OnDisconnectCallback
is useful for performing custom logic when a client disconnects
due to a MQTT DISCONNECT
message or a TCP connection loss.
The getDisconnectTimestamp()
method can be used to determine the exact time of the disconnect.
If the client disconnected due to a connection loss, the parameter abruptDisconnect
is true
.
If the client disconnected gracefully, the parameter is false
.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
The abruptAbort
parameter indicates if there was a TCP connection loss (abruptAbort
= true
) or a graceful disconnect with a MQTT DISCONNECT message.
import com.hivemq.spi.callback.events.OnDisconnectCallback;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientDisconnect implements OnDisconnectCallback {
final Logger log = LoggerFactory.getLogger(ClientDisconnect.class);
@Override
public void onDisconnect(ClientData clientData, boolean abruptDisconnect) {
if (abruptDisconnect) {
log.warn("Connection to client {}, with ip {}, was lost abruptly.", clientData.getClientId(), clientData.getInetAddress());
} else {
log.info("Client {} is disconnected.", clientData.getClientId());
}
}
}
OnPublishReceivedCallback
Type
Synchronous
Purpose
The com.hivemq.spi.callback.events.OnPublishReceivedCallback
gets called when a PUBLISH
MQTT message arrives.
This callback is useful for validating the PUBLISH
messages, e.g. verify that the message has special payload format.
Don’t use this callback for Topic based permissions. |
This callback gets called very often. Please make sure you use proper caching and that you don’t block. |
Interfering with HiveMQ
It’s possible to throw a com.hivemq.spi.callback.exception.OnPublishReceivedException
when the published message is not
valid from a business logic point of view. It’s possible to disconnect the publishing client by passing true
as
parameter to the OnPublishReceivedException
.
This callback is not meant to be used for topic restrictions.
Please see the Client Authorization Chapter for more details.
Also if more than on plugin is throwing an OnPublishReceivedException for the same callback, only the first one will be handled.
Therefore the first exception decides, whether the client gets disconnected or not.
|
import com.google.common.base.Charsets;
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnPublishReceivedCallback;
import com.hivemq.spi.callback.exception.OnPublishReceivedException;
import com.hivemq.spi.message.PUBLISH;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PublishReceived implements OnPublishReceivedCallback {
final Logger logger = LoggerFactory.getLogger(PublishReceived.class);
@Override
public void onPublishReceived(final PUBLISH publish, final ClientData clientData) throws OnPublishReceivedException {
final String clientID = clientData.getClientId();
final String topic = publish.getTopic();
final String message = new String(publish.getPayload(), Charsets.UTF_8);
logger.debug("Client {} sent a message to topic {}: {}", clientID, topic, message);
if (topic.equals("database/import") && message.contains("DROP TABLE")) {
throw new OnPublishReceivedException(true);
}
}
@Override
public int priority() {
return CallbackPriority.LOW;
}
}
OnPublishSend
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.events.OnPublishSend
callback gets called when an outgoing MQTT PUBLISH
is going to be sent to a subscribing client.
This callback gets called very often. Please make sure you use proper caching and that you xref:general-concepts.adoc#dont-block[don’t block |
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.google.common.base.Charsets;
import com.hivemq.spi.callback.events.OnPublishSend;
import com.hivemq.spi.message.PUBLISH;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PublishSend implements OnPublishSend {
final Logger logger = LoggerFactory.getLogger(PublishSend.class);
@Override
public void onPublishSend(final PUBLISH publish, final ClientData clientData) {
if (logger.isDebugEnabled()) {
final String clientID = clientData.getClientId();
final String topic = publish.getTopic();
final String message = new String(publish.getPayload(), Charsets.UTF_8);
logger.debug("Client {} received a message on topic {}: {}", clientID, topic, message);
}
}
}
OnSubscribeCallback
Type
Synchronous
Purpose
The com.hivemq.spi.callback.events.OnSubscribeCallback
callback gets called when a MQTT SUBSCRIBE
message arrives.
Useful for validating SUBSCRIBE messages or logging subscriptions. This callback is not designed to be used for Authorization.
Please see the Authorization chapter for more details.
Interfering with HiveMQ
It’s possible to throw a com.hivemq.spi.callback.exception.InvalidSubscriptionException
when the SUBSCRIBE message is invalid.
The client gets disconnected when the exception is thrown.
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnSubscribeCallback;
import com.hivemq.spi.callback.exception.InvalidSubscriptionException;
import com.hivemq.spi.message.SUBSCRIBE;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Subscribe implements OnSubscribeCallback {
final Logger logger = LoggerFactory.getLogger(Subscribe.class);
@Override
public void onSubscribe(final SUBSCRIBE message, final ClientData clientData) throws InvalidSubscriptionException {
logger.debug("Client {} is now subscribed to the topics: {}.", clientData.getClientId(), message.getTopics());
}
@Override
public int priority() {
return CallbackPriority.MEDIUM;
}
}
OnTopicSubscriptionCallback
Type
Synchronous
Purpose
The com.hivemq.spi.callback.events.OnTopicSubscriptionCallback
callback gets called for every topic in an arrived
MQTT SUBSCRIBE
message. It is used to manually set the return codes in the following MQTT SUBACK
message.
Breaking the MQTT protocol
It is possible to break the MQTT specification if you return codes in the SUBACK
message that are not permitted in the specification.
Especially if you return a higher quality of service than the client requested.
public class DenyQoS1 implements OnTopicSubscriptionCallback {
@Override
public SubackReturnCode getSubackReturnCodeForClient(final Topic topic, final SubackReturnCode authorizationResult, final ClientData clientData) {
if (SubackReturnCode.SUCCESS_QOS_1.equals(authorizationResult)) {
return SubackReturnCode.SUCCESS_QOS_0;
}
return authorizationResult;
}
@Override
public int priority() {
return CallbackPriority.MEDIUM;
}
}
OnUnsubscribeCallback
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.events.OnUnsubscribeCallback
callback gets called when a MQTT UNSUBSCRIBE
message arrives.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.events.OnUnsubscribeCallback;
import com.hivemq.spi.message.UNSUBSCRIBE;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UnSubscribe implements OnUnsubscribeCallback {
final Logger logger = LoggerFactory.getLogger(UnSubscribe.class);
@Override
public void onUnsubscribe(final UNSUBSCRIBE message, final ClientData clientData) {
logger.debug("Client {} is no longer subscribed to the topics: {}.", clientData.getClientId(), message.getTopics());
}
}
OnConnackSend
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnConnackSend
callback gets called when the broker is sending a MQTT CONNACK
message.
Useful to check of a client was actually connected successfully or if the connection failed for some reason.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.lowlevel.OnConnackSend;
import com.hivemq.spi.message.CONNACK;
import com.hivemq.spi.message.ReturnCode;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnackSent implements OnConnackSend {
final Logger logger = LoggerFactory.getLogger(ConnackSent.class);
@Override
public void onConnackSend(CONNACK connack, ClientData clientData) {
if (connack.getReturnCode() == ReturnCode.ACCEPTED) {
logger.debug("Client {} connected successfully.", clientData.getClientId());
} else {
logger.warn("Client {} failed to connect. Return code = {}", clientData.getClientId(), connack.getReturnCode().name());
}
}
}
OnPingCallback
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnPingCallback
callback gets called when a MQTT PINGREQ message arrives.
The callback is useful to check if client is still active. Note that PINGREQ messages are only sent by clients if no
other messages are sent during the keep alive period.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.lowlevel.OnPingCallback;
import com.hivemq.spi.security.ClientData;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class PingReceived implements OnPingCallback {
private final Map<String, Date> clientActivity = new HashMap<>();
@Override
public void onPingReceived(final ClientData clientData) {
clientActivity.put(clientData.getClientId(), new Date());
}
}
OnPubackReceived
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnPubackReceived
callback gets called when a MQTT PUBACK message arrives.
The Callback can be used to implement business logic, that should not be processed until a message was received by the client at least once.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.lowlevel.OnPubackReceived;
import com.hivemq.spi.message.PUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubackReceived implements OnPubackReceived {
final Logger logger = LoggerFactory.getLogger(PubackReceived.class);
@Override
public void onPubackReceived(PUBACK puback, ClientData clientData) {
logger.debug("Client {} received a PUBLISH message with id {}", puback.getMessageId());
}
}
OnPubackSend
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnPubackSend
callback gets called when the broker is sending a MQTT PUBACK
message.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.lowlevel.OnPubackSend;
import com.hivemq.spi.message.PUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubackSend implements OnPubackSend {
final Logger logger = LoggerFactory.getLogger(PubackSend.class);
@Override
public void onPubackSend(PUBACK puback, ClientData clientData) {
logger.debug("A PUBACK message with id {} sent by client {} was received.", puback.getMessageId(), clientData.getClientId());
}
}
OnPubcompReceived
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnPubcompReceived
callback gets called when a MQTT PUBCOMP
message arrives.
The callback is executed as soon as the transfer of a PUBLISH message with QoS 2 to a subscriber is completed.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.hivemq.spi.callback.lowlevel.OnPubcompReceived;
import com.hivemq.spi.message.PUBCOMP;
import com.hivemq.spi.security.ClientData;
public class PubcompReceived implements OnPubcompReceived {
final Multimap<String, Integer> incompleteMessageTransfers = HashMultimap.create();
@Override
public void onPubcompReceived(final PUBCOMP pubcomp, final ClientData clientData) {
incompleteMessageTransfers.remove(clientData.getClientId(), pubcomp.getMessageId());
}
}
OnPubcompSend
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnPubcompSend
callback gets called when the broker is sending a MQTT PUBCOMP
message.
The callback is executed as soon as the transfer of a PUBLISH message with QoS 2 to the broker is completed.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.hivemq.spi.callback.lowlevel.OnPubcompSend;
import com.hivemq.spi.message.PUBCOMP;
import com.hivemq.spi.security.ClientData;
public class PubcompSend implements OnPubcompSend {
final Multimap<String, Integer> incompleteMessageTransfers = HashMultimap.create();
@Override
public void onPubcompSend(final PUBCOMP pubcomp, final ClientData clientData) {
incompleteMessageTransfers.remove(clientData.getClientId(), pubcomp.getMessageId());
}
}
OnPubrecReceived
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnPubrecReceived
callback gets called when a MQTT PUBREC
message arrives.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.lowlevel.OnPubrecReceived;
import com.hivemq.spi.message.PUBREC;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubrecReceived implements OnPubrecReceived {
final Logger logger = LoggerFactory.getLogger(PubrecReceived.class);
@Override
public void onPubackReceived(final PUBREC pubrec, final ClientData clientData) {
logger.debug("Client {} received a PUBLISH message with id {}", pubrec.getMessageId());
}
}
OnPubrecSend
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnPubrecSend
callback gets called when the broker is sending a MQTT PUBREC
message.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.lowlevel.OnPubrecSend;
import com.hivemq.spi.message.PUBREC;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubrecSend implements OnPubrecSend {
final Logger logger = LoggerFactory.getLogger(PubrecSend.class);
@Override
public void onPubrecSend(final PUBREC puback, final ClientData clientData) {
logger.debug("A PUBLISH message with id {} sent by publisher {} was received.", pubrec.getMessageId(), clientData.getClientId());
}
}
OnPubrelReceived
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnPubrelReceived
callback gets called when a MQTT PUBREL
message arrives.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.lowlevel.OnPubrelReceived;
import com.hivemq.spi.message.PUBREL;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubrelReceived implements OnPubrelReceived {
Logger logger = LoggerFactory.getLogger(PubrelReceived.class);
@Override
public void onPubrelReceived(final PUBREL pubrel, final ClientData clientData) {
logger.debug("A PUBREL message with id {} was sent by publisher {}.", pubrel.getMessageId(), clientData.getClientId());
}
}
OnPubrelSend
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnPubrelSend
callback gets called when the broker is sending a MQTT PUBREL
message.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.lowlevel.OnPubrelSend;
import com.hivemq.spi.message.PUBREL;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubrelSend implements OnPubrelSend {
Logger logger = LoggerFactory.getLogger(PubrelSend.class);
@Override
public void onPubrelSend(final PUBREL pubrel, final ClientData clientData) {
logger.debug("A PUBREL message with id {} was sent to subscriber {}.", pubrel.getMessageId(), clientData.getClientId());
}
}
OnSessionReadyCallback
Type
Synchronous
Purpose
The com.hivemq.spi.callback.events.OnSessionReadyCallback
callback gets called when the node where the client is connected
is sure that a MQTT Session
is available for the client. It is useful for performing session scoped actions as using
the Session Attribute Store or the Client Group Service.
Interfering with HiveMQ
HiveMQ will wait for the last registered OnSessionReady callback to return before sending a MQTT CONNACK
message to the connecting client.
public class SessionReady implements OnSessionReadyCallback {
private static final Logger log = LoggerFactory.getLogger(SessionReady.class);
private static final String TOKEN = "token";
private final BlockingSessionAttributeStore attributeStore;
@Inject
SessionReady(final BlockingSessionAttributeStore attributeStore) { (1)
this.attributeStore = attributeStore;
}
@Override
public void onSessionReady(final ClientData clientData) {
final Optional<String> tokenValue = clientData.getConnectionAttributeStore().getAsString(TOKEN); (2)
if (tokenValue.isPresent()) {
final String clientId = clientData.getClientId();
try {
attributeStore.putAsString(clientId, TOKEN, tokenValue.get());
log.info("Successfully set token for client {}.", clientId); (3)
} catch (final Exception exception) {
log.error("Failed to set token for client {}.", clientId, exception); (3)
}
}
}
@Override
public int priority() {
return CallbackPriority.MEDIUM;
}
}
1 | Inject necessary Dependencies. |
2 | Perform Session scoped actions. |
3 | Log error or success. |
OnSubackSend
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnSubackSend
callback gets called when the broker is sending a MQTT SUBACK
message.
Useful to check of a client was subscribed successfully.
Note that a client which sent an invalid SUBSCRIBE message is usually disconnected immediately.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.lowlevel.OnSubackSend;
import com.hivemq.spi.message.SUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SubackSend implements OnSubackSend {
final Logger logger = LoggerFactory.getLogger(SubackSend.class);
@Override
public void onSubackSend(final SUBACK suback, final ClientData clientData) {
logger.debug("The subscription of client {} with id {} was successful.", clientData.getClientId(), suback.getMessageId());
}
}
OnUnsubackSend
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.lowlevel.OnUnsubackSend
callback gets called when the broker is sending a MQTT UNSUBACK
message.
Useful to check if a client was unsubscribed successfully.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.lowlevel.OnUnsubackSend;
import com.hivemq.spi.message.UNSUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UnsubackSend implements OnUnsubackSend {
final Logger logger = LoggerFactory.getLogger(UnsubackSend.class);
@Override
public void onSubackSend(final UNSUBACK unsuback, final ClientData clientData) {
logger.debug("The unsubscribe of client {} with id {} was successful.", clientData.getClientId(), unsuback.getMessageId());
}
}
BeforePublishSendCallback
Type
Synchronous
Purpose
The com.hivemq.spi.callback.events.BeforePublishSendCallback
gets called just before a MQTT PUBLISH
message is sent to a client.
Some properties, like topic and payload, of the to be sent message can be changed.
This will alter the message in question, but won’t have any effect on the internal logic of the broker.
This means that a client subscribed to topic1
will receive a message published to topic1
even if you change the topic
in the BeforePublishSendCallback
to topic2
.
Interfering with HiveMQ
In addition to changing the content of the PUBLISH message, this callback can be used to tell HiveMQ to drop this message for the intended client.
This can be achieved by throwing a com.hivemq.spi.callback.exception.BeforePublishSendException
.
public class PublishSend implements BeforePublishSendCallback {
@Override
public void beforePublishSend(final ModifiablePUBLISH publish, final ClientData clientData) throws BeforePublishSendException {
final String clientId = clientData.getClientId();
if ("client1".equals(clientId)) {
final String newPayload = new String(publish.getPayload()) + "<important Information>"; (1)
publish.setPayload(newPayload.getBytes());
}
if ("client2".equals(clientId)) {
throw BeforePublishSendException.getInstance(); (2)
}
}
@Override
public int priority() {
return CallbackPriority.MEDIUM;
}
}
1 | Change all the payloads to client1 to contain some important information. |
2 | Drop all PUBLISH messages that would have gone to client2 . |
AfterLoginCallback
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.security.AfterLoginCallback
gets called when a client made a successful or unsuccessful login attempt.
This happens when the client sent a CONNECT
message and authentication took place.
The callback offers the method afterSuccessfulLogin
for successful logins and a method afterFailedLogin
for unsuccessful login attempts.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
import com.hivemq.spi.callback.exception.AuthenticationException;
import com.hivemq.spi.callback.security.AfterLoginCallback;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AfterLogin implements AfterLoginCallback {
final Logger logger = LoggerFactory.getLogger(AfterLogin.class);
@Override
public void afterSuccessfulLogin(final ClientData clientData) {
logger.debug("Client {} logged in successfully.", clientData.getUsername().get());
}
@Override
public void afterFailedLogin(final AuthenticationException exception, final ClientData clientData) {
logger.warn("Client {} failed to log in. Return code: {}",
clientData.getUsername().or(clientData.getClientId()), exception.getReturnCode().name());
}
}
OnAuthenticationCallback
Type
Synchronous
Purpose
The com.hivemq.spi.callback.security.OnAuthenticationCallback
gets called after a CONNECT
message arrives and
handles the authentication of the client. Username/Password from the MQTT CONNECT
message can be used to authenticate
the client or when using client certificate authentication for the transport layer authentication, the client
certificate can also be used to authenticate on the application layer.
If your scenario allows this, you should use Caching. |
Interfering with HiveMQ
The checkCredentials
method must return either true
or false
, depending if the authentication was successful.
When the authentication wasn’t successful and you want to control the CONNACK
MQTT message return code, you can throw
a AuthenticationException
. This has the side effect that all other plugins which implement authentication are
ignored once the exception was thrown.
OnAuthorizationCallback
Type
Synchronous
Purpose
The com.hivemq.spi.callback.security.OnAuthorizationCallback
is responsible for returning a list of
com.hivemq.spi.topic.MqttTopicPermission
. Every time a specific action on a topic like publishing or subscribing is
done by a client, the callback is executed to check if the client is allowed to do this.
This callback gets called very often, so make sure you use proper Caching. |
The OnAuthorization callback is also called when a client connects with a CONNECT message containing a last will.
In this case a call to clientData.isAuthenticated() will return false , because the client is only marked as
authenticated after the permissions for the LWT have been checked.
See also the Authentication and Permissions chapter.
|
Interfering with HiveMQ
HiveMQ uses the returned MqttTopicPermissions to check if a client is allowed to do a specific action.
OnInsufficientPermissionsCallback
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.security.OnInsufficientPermissionDisconnect
gets called when a client was disconnected due
to insufficient permissions when publishing or subscribing. At the time the callback gets executed the client was
already disconnected, so this is an informational callback.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
RestrictionsAfterLoginCallback
Type
Synchronous
Purpose
The com.hivemq.spi.callback.security.RestrictionsAfterLoginCallback
gets executed after a CONNECT message arrives and
the client was authenticated successfully. This callback provides restrictions which affect only a specific client,
e.g. throttling or maximum allowed MQTT message sizes. HiveMQ version 3.3.0 and later allows configuring offline client
queue capacity and discard strategy individually for each client using the MAX_QUEUED_MESSAGES and DISCARD_STRATEGY restriction types.
Interfering with HiveMQ
The callback returns a set of restrictions which are applied to the specific client.
import com.hivemq.spi.callback.security.RestrictionsAfterLoginCallback;
import com.hivemq.spi.security.ClientData;
import com.hivemq.spi.security.Restriction;
import com.hivemq.spi.security.RestrictionType;
import java.util.HashSet;
import java.util.Set;
public class RestrictionsAfterLogin implements RestrictionsAfterLoginCallback {
@Override
public Set<Restriction> getRestrictions(final ClientData clientData) {
final Set<Restriction> restrictions = new HashSet<>();
restrictions.add(new Restriction(RestrictionType.MAX_PUBLISH_MESSAGE_SIZE, 128l)); (1)
restrictions.add(new Restriction(RestrictionType.MAX_OUTGOING_BYTES_SEC, 128l)); (2)
restrictions.add(new Restriction(RestrictionType.MAX_INCOMING_BYTES, 128l)); (3)
restrictions.add(new Restriction(RestrictionType.WRITE_BUFFER_HIGH_THRESHOLD, 128l)); (4)
restrictions.add(new Restriction(RestrictionType.WRITE_BUFFER_LOW_THRESHOLD, 128l)); (5)
restrictions.add(new Restriction(RestrictionType.MAX_QUEUED_MESSAGES, 128l)); (6)
restrictions.add(new Restriction(RestrictionType.DISCARD_STRATEGY, QueuedMessageStrategy.DISCARD)); (7)
restrictions.add(new Restriction(RestrictionType.CLIENT_SESSION_TTL, 300l)); (8)
restrictions.add(new Restriction(RestrictionType.INFLIGHT_QUEUE_SIZE, 1000l)); (9)
return restrictions;
}
}
1 | Limit the maximum size of a message the client can send. |
2 | Add throttling for sending messages out to the client. |
3 | Add throttling for receiving messages from the client. |
4 | Set the high fill state of the write buffer for this client. |
5 | Set the low fill state of the write buffer for this client. |
6 | Set the maximum amount of messages that are queued for the client if clean session is false . |
7 | Set the strategy to discard messages when the queue limit is reached DISCARD or DISCARD_OLDEST . |
8 | Set the Time To Live or TTL (in seconds) for the session of a client. If the client is disconnected longer than the amount defined in CLIENT_SESSION_TTL the session will be removed. Only affects clients with clean session is false . |
9 | The maximum size of the in-flight message queue for this client. Only affects online clients. |
ClusterDiscoveryCallback
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.cluster.ClusterDiscoveryCallback
is used to discover other nodes to form a cluster with.
Nodes are discovered by calling getNodeAddresses
, which is called periodically by HiveMQ.
The interval at which getNodeAddresses
is called can be configured in HiveMQ’s configuration file.
The callback also provides you with the means to register/unregister this HiveMQ nodes with the methods init
and destroy
.
For example the ClusterDiscoveryCallback
can be used to implement discovery via database, via shared files or via
an external registry like etcd or Consul.
Like any other callback, you have to manually add a concrete ClusterDiscoveryCallback
to the
Callback Registry at plugin startup time or at runtime.
Interfering with HiveMQ
The callback returns a list of ClusterNodeAddress
which are used by HiveMQ form a cluster.
A ClusterNodeAddress
essentially consists of a hostname/IP and port.
Every available node for this cluster must be represented with one ClusterNodeAddress
in the returned list.
If there is more than one ClusterDiscoveryCallback at a time all results of all plugins are used to form a cluster.
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.hivemq.spi.callback.cluster.ClusterDiscoveryCallback;
import com.hivemq.spi.callback.cluster.ClusterNodeAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class ClusterDiscovery implements ClusterDiscoveryCallback {
private static final Logger log = LoggerFactory.getLogger(ClusterDiscovery.class);
private final ExternalRegistry externalRegistry; (1)
private String clusterId;
@Inject
public ClusterDiscovery(final ExternalRegistry externalRegistry) {
this.externalRegistry = externalRegistry;
}
@Override
public void init(final String clusterId, final ClusterNodeAddress ownAddress) { (2)
this.clusterId = clusterId;
log.info("Registering HiveMQ with clusterId {} with central registry", clusterId);
externalRegistry.put(clusterId, ownAddress.getHost(), ownAddress.getPort());
}
@Override
public ListenableFuture<List<ClusterNodeAddress>> getNodeAddresses() { (3)
final ListenableFuture<List<ClusterNodeAddress>> allClusterNodesFuture = externalRegistry.getAllNodes();
return allClusterNodesFuture;
}
@Override
public void destroy() { (4)
log.info("Removing HiveMQ with clusterId {} from central registry", clusterId);
externalRegistry.remove(clusterId);
}
}
1 | some kind of central registry which holds the information for all your cluster nodes |
2 | Gets called one time when HiveMQ starts its cluster |
3 | Gets called periodically to discover new cluster nodes at runtime |
4 | Gets called one time when HiveMQ shuts down |
ScheduledCallback
Type
Asynchronous
Purpose
The com.hivemq.spi.callback.schedule.ScheduledCallback
gets executed periodically.
See the Scheduled Callback Execution chapter for more information.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
When the callback is added to the Callback Registry, a validation of
the quartz-style cron expression will take place. If the expression is invalid, the callback won’t be executed.
WebUIAuthenticationCallback
The WebUIAuthenticationCallback
can be added to the CallbackRegistry
to authenticate Web UI users.
When a callback is added, the configured users and passwords from config.xml are ignored and the authentication is delegated to the callback.
The user configuration of the config.xml
is explained in detail in the
access control section of the HiveMQ Web UI documentation.
When multiple callbacks are added, only the one with the highest priority is used.
If the possibility exists that the check of the credentials runs endlessly, i.e. endless retries because authorization endpoint is unreachable, a timeout mechanism should be implemented that results in login failure after the timeout expires. |
Type
Synchronous
Purpose
The com.hivemq.spi.callback.webui.WebUIAuthenticationCallback
is responsible for authenticating Web UI users
and gets called every time a Web UI login attempt occurs.
Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.
This example shows how to implement a simple authentication callback, which validates if username is "admin" and password is "hivemq".
public class WebUIAuthentication implements WebUIAuthenticationCallback {
@NotNull
@Override
public AuthenticationState checkCredentials(@NotNull final String username, @NotNull final String password) {
final boolean valid = validate(username, password);
if(valid){
return AuthenticationState.SUCCESS;
} else {
return AuthenticationState.FAILED;
}
}
@Override
public int priority() {
return 0;
}
private boolean validate(@NotNull final String username, @NotNull final String password){
if(username.equals("admin") && password.equals("hivemq")){
return true;
} else {
return false;
}
}
}
This example shows how to add a WebUIAuthenticationCallback
callback to the CallbackRegistry
.
public class MyPlugin extends PluginEntryPoint {
@PostConstruct
public void postConstruct() {
final WebUIAuthentication webUIAuthentication = new WebUIAuthentication(); (1)
CallbackRegistry callbackRegistry = getCallbackRegistry(); (2)
callbackRegistry.addCallback(webUIAuthentication); (3)
}
}
1 | Instantiate Web UI auth callback. |
2 | Get callback registry. |
3 | Add callback to registry. |
Scheduled Callback Execution
While most of the callbacks for HiveMQ plugins are used for reacting to certain events, it is sometimes desirable to run some tasks based on a periodical schedule.
With ScheduledCallback it’s easy to implement a plugin behaviour which occurs periodically, based on quartz-style cron expressions. This gives your plugins the possibility to run on regular intervals like every minute or every day at midnight. Even non-trivial intervals like Every last friday of every month during the years from 2002 to 2017 at 15:15 and 20:30 can be represented. See the Quartz-Style Cron Expressions chapter for more information.
The scheduled callback executions are especially useful for tasks like:
-
Maintenance tasks
-
Watchdog services
-
Reconfiguration services
-
Integration with other systems (especially in conjunction with the Publish Service)
-
Backup services
Usage
A ScheduledCallback
is a callback which gets executed asynchronously. Like any other callback, you have to manually
add a concrete ScheduledCallback
to the Callback Registry at plugin startup time or at runtime.
A ScheduledCallback
consists of two methods:
-
cronExpression()
: This method returns the quartz-styled cron expression to schedule the callback. -
execute()
: This method executes the actual code to run on every interval.
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.callback.registry.CallbackRegistry;
import com.hivemq.spi.callback.schedule.ScheduledCallback;
import javax.annotation.PostConstruct;
class CallbackSchedulerMainClass extends PluginEntryPoint {
@PostConstruct
public void postConstruct() {
final CallbackRegistry callbackRegistry = getCallbackRegistry();
callbackRegistry.addCallback(new ScheduledCallback() {
@Override
public void execute() {
System.out.println("Executing");
}
@Override
public String cronExpression() {
// Every second
return "* * * * * ?";
}
});
}
}
The cronExpression() method gets only called once when adding the callback to the
Callback Registry.
If you need to change the expression at runtime or if it is created dynamically, you have to
reload it at runtime.
|
The cron expression will be evaluated when adding it to the Callback Registry and all callbacks with invalid expressions will be rejected.
Quartz-Style Cron Expressions
To allow maximum flexibility for scheduling callbacks, HiveMQ recognizes quartz-style cron expressions. This allows scheduling based on time intervals and dates.
Expression | Meaning |
---|---|
0/5 * * * * ? |
every 5 seconds |
0 0 0/2 * * ? |
every 2 hours |
0 0 0 * * ? |
on midnight, every day |
0 0/5 15 * * ? |
every 5 minutes starting at 3:00 PM and ending at 3:55 PM, every day |
0 15 11 ? * MON-FRI |
every Monday, Tuesday, Wednesday, Thursday and Friday at 11:15 AM |
0 15 10 ? * 6L |
on the last Friday of every month at 10:15 AM |
Learn more about Quartz Cron Expressions here. |
Cron expression changes at runtime
The return value of the cronExpression()
method of a ScheduledCallback
is only evaluated once after it was added
to the Callback Registry.
However, sometimes static cron expressions are not enough for your scheduled callbacks.
Sometimes it’s desirable to (re-)load cron expressions from databases, configuration files, web services or other sources on demand.
A reload of the cron expression of a ScheduledCallback
can be triggered manually with the
reloadScheduledCallbackExpression(ScheduledCallback scheduledCallback)
method of the CallbackRegistry
.
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.callback.registry.CallbackRegistry;
import com.hivemq.spi.callback.schedule.ScheduledCallback;
import javax.annotation.PostConstruct;
class CallbackSchedulerMainClass extends PluginEntryPoint {
@PostConstruct
public void postConstruct() {
final CallbackRegistry callbackRegistry = getCallbackRegistry();
callbackRegistry.addCallback(new ScheduledCallback() {
@Override
public void execute() {
System.out.println("Business Logic Stuff");
callbackRegistry.reloadScheduledCallbackExpression(this); (1)
}
@Override
public String cronExpression() {
String expression = dynamicExpressionFromDatabase(); (2)
return expression;
}
});
}
}
1 | Trigger a manual reload of the expression, which calls the cronExpression() method after executing this callback |
2 | Loads a dynamic expression |
Utils
For convenience, HiveMQ offers some utilities for working with quartz-style cron expressions. The following utility classes are worth a look if you are working with cron expressions:
-
com.hivemq.spi.callback.schedule.ScheduleExpressions
: A utility class which offers constants for common cron expressions and some utility methods for calculating expressions.