General Extension Concepts
A HiveMQ extension is essentially a folder that holds a Java archive and an XML descriptor file that you place in the extensions folder of your HiveMQ installation.
Each HiveMQ extension has a particular extension lifecycle that allows the extension to be installed, enabled, or disabled at runtime.
The HiveMQ extension callback contains one or two parameters that are called input and output. For more information, see Extension Input/Output Principles.
Extension Lifecycle
HiveMQ extensions offer hot-reload functionality that allows you to install, enable, and disable extensions at runtime. Hot-reloading of extensions supports the high-availability strategy of HiveMQ by providing a way to change or update extensions with no downtime.
Each installed HiveMQ extension can be in one of two available states: Enabled
or Disabled
.
You can use an administrative action to switch the state of an extension during runtime.
To disable an extension at runtime, create a DISABLED marker in the extension home folder before you remove the extension. |
The following diagram shows the typical lifecycle of HiveMQ extensions at HiveMQ start.
In the diagram, 'Extension 1' is already installed when HiveMQ starts.
When you stop HiveMQ, all currently-enabled extensions stop automatically. The following diagram shows the lifecycle of 'Extension 1' when HiveMQ stops.
HiveMQ allows extensions to be enabled or disabled during runtime. The following diagram shows the lifecycle of an extension called Extension 2 that is enabled at runtime.
When you add a disabled marker to the folder of an extension to disable the extension during runtime, the following lifecycle can be expected:
To install a HiveMQ extension, you simply move the folder that contains the extension to the <hivemq-home>/extensions/
folder of your HiveMQ instance.
By default, HiveMQ enables each extension that you install. To prevent HiveMQ from automatically starting up an extension, you need to add a disabled marker to the folder of the extension.
To mark an installed extension as disabled, place a file with the name DISABLED in the home folder of the extension.
To enable a previously-disabled extension, remove the DISABLED
file from the extension folder.
HiveMQ automatically registers the changes you make to the extension, and enables or disables the extension accordingly.
The following example shows the folder content of two HiveMQ extensions:
- hivemq/
|- bin/
|- conf/
...
|- extensions/
| |- extension-1/
| | |- extension.jar
| | |- hivemq-extension.xml
| |- extension-2/
| | |- extension.jar
| | |- hivemq-extension.xml
| | |- DISABLED
In the example, Extension-2 has a DISABLED marker.
HiveMQ does not enable extensions that have a disabled marker in their extension folder during broker startup.
The disabled extension can be enabled at runtime by removing the disabled marker.
To update an installed extension to a newer version, disable the running extension with a DISABLED marker, make the necessary changes, and then remove the DISABLED marker to re-enable the extension
When an extension starts at HiveMQ startup or during runtime, HiveMQ calls the extensionStart
method that the ExtensionMain method of the extension implements.
ClientInitializer, Authenticators, Authorizers, and LifecycleCallbacks need to be registered in extensionStart
method.
You can also use extensionStart
to initialize and set up resources for an extension. For example, open a connection to a database.
public class HelloWorldMain implements ExtensionMain {
private static final @NotNull Logger log = LoggerFactory.getLogger(HelloWorldMain.class);
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
try {
//Open Resources here.
log.info("Started " + extensionStartInput.getExtensionInformation().getName() + ":" + extensionStartInput.getExtensionInformation().getVersion());
} catch (Exception e) {
log.error("Exception thrown at extension start: ", e);
}
}
@Override
public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {
log.info("Stopped " + extensionStartInput.getExtensionInformation().getName() + ":" + extensionStartInput.getExtensionInformation().getVersion());
}
}
When an extension stops at HiveMQ shutdown or during runtime, HiveMQ calls the extensionStop
method that the ExtensionMain method of the extension implements.
The extensionStop
method can be used to clean up extension resources and stop tasks that the extension runs. For example, close a database connection.
To prevent further starts, an extension can call the preventExtensionStartup
method on the output
parameter of the extensionStart
method.
When an extension prevents its own startup, HiveMQ automatically disabled the extension. Additionally, the extension must provide a reason why extension startup has been prevented. Detailed reason information improves the ability of HiveMQ operations to debug the prevented extension startup.
If an exception is thrown and not caught with the implementation of extensionStart
, HiveMQ interprets the startup of the extension unsuccessful and disables the extension.
Extensions are responsible for handling all exceptions their methods throw as well as any exceptions that are thrown as a result of code the extension calls. |
If the extensionStop
method of an exception throws an exception, HiveMQ stops and disables the extension. An extension cannot prevent its own stop.
Extension Input / Output Principles
In general, callback methods in the HiveMQ Extension SDK API contain one or two parameters:
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
}
...
Extension Input
An input object is an informational object. This object provides callback context and global information.
Input Examples
The following information shows several extension input options, tells where the input occurs, and what information the input holds.
Example Extension Start Input
This example illustrates the content of an ExtensionStartInput
from the extension start callback:
-
Previous extension version (for updated extensions)
-
Enabled extensions
-
Current extension version
-
Extension name
-
Extension home folder
-
Extension ID
-
Extension author (if author information is provided in the
hivemq-extension.xml
file)
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
final Optional<String> previousVersion = extensionStartInput.getPreviousVersion();
final Map<String, ExtensionInformation> enabledExtensions = extensionStartInput.getEnabledExtensions();
final ExtensionInformation extensionInformation = extensionStartInput.getExtensionInformation();
final String version = extensionInformation.getVersion();
final String name = extensionInformation.getName();
final File extensionFolder = extensionInformation.getExtensionHomeFolder();
final String id = extensionInformation.getId();
final Optional<String> author = extensionInformation.getAuthor();
}
...
Example Publish Inbound Input
This example illustrates the content of a PublishInboundInput
from the inbound publish callback of a Publish Inbound Interceptor:
-
MQTT PUBLISH packet
-
Client ID
-
MQTT version
-
IP address
-
Listener with port, bind address, and type
-
TLS
-
Proxy information
-
Connection attribute store
...
new PublishInboundInterceptor() {
@Override
public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
final PublishPacket publishPacket = publishInboundInput.getPublishPacket();
final ClientInformation clientInformation = publishInboundInput.getClientInformation();
final ConnectionInformation connectionInformation = publishInboundInput.getConnectionInformation();
}
};
...
Extension Output
Output objects give you the ability to extend the default behavior of HiveMQ to accommodate your specific business case. Depending on the callback, MQTT packets can be modified, dropped, or authorized, connecting clients can be authenticated or disconnected, and many more possibilities.
Output objects are blocking and can easily be used to create an asynchronous output object.
Blocking Output
The default execution of every output object is blocking.
In a blocking execution, the next callback for the same client cannot be called until the previous callback is completely executed.
Blocking execution is recommended for simple callback code that is not time-consuming.
There is no difference in ordering when using blocking or asynchronous callbacks. Execution of the callbacks is always ordered.
Be sure that your blocking code returns, otherwise the extension task execution for a client can not complete. |
Blocking Examples
The following examples highlight use cases that benefit from a blocking output.
Example Publish Interceptor Output
This example shows a very simple Publish Inbound Interceptor^.
The interceptor checks the client ID of the publishing client and prevents delivery if the ID contains the string "prevent".
public class SimplePreventInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
final String clientId = publishInboundInput.getClientInformation().getClientId();
if (clientId.contains("prevent")) {
//prevent publish delivery on the output object
publishInboundOutput.preventPublishDelivery();
}
}
}
Example Subscription Authorizer Output
This example shows how to implement a simple Subscription Authorizer^.
The authorizer denys any shared subscription.
public class DenySharedAuthorizer implements SubscriptionAuthorizer {
@Override
public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {
//disallow a shared subscription
if (input.getTopicFilter().startsWith("$shared")) {
output.failAuthorization();
return;
}
//otherwise let the other extensions or default permissions decide
output.nextExtensionOrDefault();
}
}
Asynchronous Output
If your use case requires external service calls such as HTTP requests or database reads/writes, we recommended the use of asynchronous (async) output.
To make an output object asynchronous, call one of the following methods:
-
output.async(Duration timeout);
(fallback is FAILURE) -
output.async(Duration timeout, TimeoutFallback fallback);
For more information, see HiveMQ Extension SDK JavaDoc
Some output objects provide additional async()
implementations. For example, PublishInboundOutput
. For more information, see Publish Inbound Interceptor.
The use of multiple async() calls is not allowed and throws an exception.
|
When output.resume()
is called or the timeout duration is exceeded, HiveMQ calls the next callback for the same context.
Blocking and asynchronous callbacks do not affect callback ordering. Execution of callbacks is always ordered.
We recommended this type of execution for complex or time-consuming callback code. For more information, see async examples.
Always verify that your async callback execution is non-blocking. |
Duration (Timeout) Parameter
The mandatory timeout parameter defines how long HiveMQ must wait for the result of an output.
TimeoutFallback Parameter
The default fallback is FAILURE
. A 'Failure' response usually causes HiveMQ to end any further action. The actual behavior is defined in the specific implementation.
SUCCESS
usually means that HiveMQ either sees the outcome as successful or asks the next extension.
Asynchronous Output Examples
The following examples highlight use cases that benefit from asynchronous output.
Example Async Publish Interceptor Output
This example shows how to implement a PublishInboundInterceptor
that asynchronously modifies an inbound PUBLISH message in an external task
with the use of the Managed Extension Executor Service
Since the timeout strategy is FAILURE
, HiveMQ prevents onward delivery of the PUBLISH
if the duration of the task exceeds 2 seconds.
public class ModifyingAsyncPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
//make output object async with a duration of 2 seconds and timeout fallback failure
final Async<PublishInboundOutput> asyncOutput = publishInboundOutput.async(Duration.ofSeconds(2), TimeoutFallback.FAILURE);
//submit external task to extension executor service
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable publish object from the output
final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();
//call external publish modification (method not provided)
callExternalTask(publish);
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object object, final @Nullable Throwable throwable) {
if(throwable != null){
//please use more sophisticated logging
throwable.printStackTrace();
}
//Always resume the async output, otherwise it will time out
asyncOutput.resume();
}
});
}
}
Example Asynchronous Subscription Authorizer Output
This example shows how to use the SubscriptionAuthorizer
with an async output to call an external service.
public class MyAsyncSubscriptionAuthorizer implements SubscriptionAuthorizer {
@Override
public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {
//get the managed extension executor service
final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();
//make the output async with a timeout of 2 seconds
final Async<SubscriptionAuthorizerOutput> async = output.async(Duration.ofSeconds(2));
//submit a task to the extension executor
extensionExecutorService.submit(new Runnable() {
@Override
public void run() {
//call an external service to decide the outcome
final boolean result = callExternalTaks();
if (result) {
output.authorizeSuccessfully();
} else {
output.failAuthorization();
}
//Always resume the async output, otherwise it will time out
async.resume();
}
});
}
}
Initializing Objects
The HiveMQ Extension SDK provides static Services
and Builder
classes to enable the creation of your desired Java objects inside extension classes.
final EventRegistry eventRegistry = Services.eventRegistry();
final TopicPermission permission = Builders.topicPermission()
.topicFilter("allowed/topic")
.qos(TopicPermission.Qos.ALL)
.activity(TopicPermission.MqttActivity.ALL)
.type(TopicPermission.PermissionType.ALLOW)
.retain(TopicPermission.Retain.ALL)
.build();
Builders validate the given values. The HiveMQ Extension SDK JavaDoc lists the exceptions that can be thrown when an invalid value is passed. |
Never Block
The single most important rule for all extensions is: Never block an output.
You can use the ManagedExtensionExecutorService for every action that has the potential to block in the callback of an output.
It is a best practise to use a connection pool such as HikariCP when you work with databases. |
The following example shows how the ManagedExtensionExecutorService
can be used to accomplish non-blocking callback behavior.
This is a good example of an asynchronous Subscription Authorizer that calls an external server.
Since calls to an external service always expose the risk of taking a long time or timing out, callbacks such as this can block an entire flow.
To prevent potential blocks, the PublishAuthorizesOutput
is made asynchronous with the help of the ManagedExtensionExecutorService
.
public class MyAsyncSubscriptionAuthorizer implements SubscriptionAuthorizer {
@Override
public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {
//get the managed extension executor service
final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();
//make the output async with a timeout of 2 seconds
final Async<SubscriptionAuthorizerOutput> async = output.async(Duration.ofSeconds(2));
//submit a task to the extension executor
extensionExecutorService.submit(new Runnable() {
@Override
public void run() {
//call an external service to decide the outcome
final boolean result = callExternalTaks();
if (result) {
output.authorizeSuccessfully();
} else {
output.failAuthorization();
}
//Always resume the async output, otherwise it will time out
async.resume();
}
});
}
}
Extension Isolation
The HiveMQ Extension SDK uses extension isolation:
-
Each extension has its own class loader
-
Extension resources and classes are not shared between extensions
Extension isolation adds additional security for your extension and avoids difficult to debug failure behavior that can occur when extensions share libraries.
Undesirable side effects can result when you use libraries like Jersey that tend to interfere with other class loaders. HiveMQ implements workarounds for such libraries internally. If you use a library that HiveMQ does not yet cover, notify contact@hivemq.com to request an appropriate workaround.