Interceptors
HiveMQ Interceptors provide a convenient way for extensions to intercept and modify MQTT messages.
Based on the type of interceptor, you can register a ClientInitializer or use the GlobalInterceptorRegistry to add the interceptor that you want.
The following table lists all available HiveMQ Interceptors:
| Interceptor | Description |
|---|---|
Enables extensions to modify incoming |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept inbound |
|
Enables extensions to intercept outgoing |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables the extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept inbound |
|
Enables extensions to intercept outbound |
Connect Inbound Interceptor
With the ConnectInboundInterceptor interface you can modify incoming MQTT CONNECT messages.
You can use the ConnectInboundInterceptor to modify or analyse the data that is contained in inbound MQTT Connect messages.
For this task, HiveMQ provides ConnectInboundInput and ConnectInboundOutput parameters.
Extension input and output principles apply.
A ClientInitializer that is registered through the Initializer Registry
can be used to add the ConnectInboundInterceptor.
When an asynchronous ConnectInboundOutput parameter with a TimeoutFallback.FAILURE fallback strategy times out,
a CONNACK with the UNSPECIFIED_ERROR (128) reason code is sent to the client and the client is disconnected.
Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The ConnectInboundInput and ConnectInboundOutput parameters are updated after each interception.
The JavaDoc for the ConnectInboundInterceptor can be found
here.
It is important to note that changes made to the CONNECT packet are usually not expected by the client implementation.
For example, if a client with a modified client ID connects and expects a session to be present, the client may disconnect because no session-present flag on the server corresponds to the modified client ID.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The unique ID of a client. |
|
A flag that shows whether the client wants to start a new “clean” session ( |
|
The number of seconds that the broker stores session information for the MQTT client. The time is calculated from the moment that the client disconnects. |
|
The time interval that is allowed between keep alive messages. When set to 0, the keep-alive feature is disabled. |
|
Limits the number of QoS 1 and QoS 2 publications that the connected client can process concurrently. |
|
The maximum packet size the connected client accepts. |
|
The maximum number of topic aliases that the client can hold. |
|
A flag that requests the server to return response information in the corresponding Connack packet. |
|
A flag that requests the server to return problem information (for example reason string and user properties). |
|
A UTF-8 value that shows the authentication method that is used for the connect. |
|
The data that is used for authentication. If no authentication method is given, this parameter is not used. |
|
The password the client uses to connect. |
|
The Will publish that is set for the |
|
Additional user defined information (optional). |
When an interceptor makes changes to the CONNECT message of a client, the client is not notified.
To provide feedback to the client, you can use the ConnectInboundInterceptor to add user properties to the CONNECT packet.
|
Set Up Connect Inbound Interceptor
The following example shows how to set up a ConnectInboundInterceptor.
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
try {
final ConnectInboundInterceptorA interceptorA = new ConnectInboundInterceptorA();
final ConnectInboundInterceptorB interceptorB = new ConnectInboundInterceptorB();
Services.interceptorRegistry().setConnectInboundInterceptorProvider(input -> {
final String clientId = input.getClientInformation().getClientId();
if (clientId.startsWith("a")) {
return interceptorA;
} else {
return interceptorB;
}
});
} catch (Exception e) {
log.error("Exception thrown at extension start: ", e);
}
}
...
Modify a CONNECT Message
The following example ensures that a client connects with a clean start and a session expiry of zero.
public class ModifyConnectInboundInterceptor implements ConnectInboundInterceptor {
@Override
public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
final ModifiableConnectPacket connectPacket = output.getConnectPacket();
final ModifiableUserProperties userProperties = connectPacket.getUserProperties();
if (connectPacket.getUserProperties().getFirst("clean").isPresent()) {
connectPacket.setSessionExpiryInterval(0);
connectPacket.setCleanStart(true);
}
}
}
The following example requests a new ID for a connecting client and sets the new ID asynchronously.
public class ModifyConnectInboundInterceptor implements ConnectInboundInterceptor {
@Override
public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
final Async<ConnectInboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
final ModifiableConnectPacket connectPacket = output.getConnectPacket();
final CompletableFuture<String> modifiedClientIdFuture = modifyClientId(connectPacket.getClientId());
modifiedClientIdFuture.whenComplete((modifiedClientId, throwable) -> {
if (modifiedClientId != null) {
connectPacket.setClientId(modifiedClientId);
async.resume();
}
});
}
}
Modify a Last Will Message
The Will message of a CONNECT packet can be modified or removed. If the CONNECT does not include a Will message, a new Will can be created via the WillPublishBuilder and added to the CONNECT.
public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
@Override
public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
final ModifiableConnectPacket connectPacket = output.getConnectPacket();
if (connectPacket.getModifiableWillPublish().isEmpty()) {
final String clientId = connectPacket.getClientId();
final ByteBuffer payload = StandardCharsets.UTF_8.encode(clientId + " disconnected");
final WillPublishBuilder builder = Builders.willPublish();
final WillPublishPacket will = builder.willDelay(0)
.topic(clientId + "will")
.payload(payload)
.build();
connectPacket.setWillPublish(will);
}
}
}
To remove a last-will message, you can set the will-message to 'null' as shown in the following example:
public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
@Override
public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
final ModifiableConnectPacket connectPacket = output.getConnectPacket();
if (connectPacket.getModifiableWillPublish().isPresent()) {
connectPacket.setWillPublish(null);
}
}
}
The following example shows how to modify and existing will message:
public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
@Override
public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
final ModifiableConnectPacket connectPacket = output.getConnectPacket();
if (connectPacket.getModifiableWillPublish().isPresent()) {
final ModifiableWillPublish modifiableWill = connectPacket.getModifiableWillPublish().get();
modifiableWill.setTopic(connectPacket.getClientId() + "/will");
}
}
}
Connack Outbound Interceptor
With the ConnackOutboundInterceptor interface you can intercept outbound MQTT CONNACK messages.
You can use the ConnackOutboundInterceptor interface to modify or analyse the data that is contained in outbound MQTT CONNACK packet.
For this task, HiveMQ provides ConnackOutboundInput and ConnackOutboundOutput parameters.
Extension input and output principles apply.
You can use a ConnackOutboundInterceptor to modify outbound CONNACK messages.
The ConnackOutboundOutput parameter can easily be used for asynchronous processing.
The interceptor is set up in the ClientContext.
When an asynchronous ConnackOutboundOutput parameter with a TimeoutFallback.FAILURE fallback strategy times out,
the connection of the client is closed without sending a CONNACK.
Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The ConnackOutboundOutput and ConnackOutboundInput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
This parameter specifies if a Connect was successful and if not, holds the reason why it was unsuccessful. |
|
A UTF-8 encoded string that holds a string containing the reason. |
|
Additional diagnostic or other information (optional). |
|
This parameter is used to pass another server for the client to connect to. |
|
A UTF-8 encoded string that holds the response information. |
Interception of a CONNACK message has the following limitations:
-
You cannot change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to send response information to a client that did not request response information.
-
If the reason code is "SUCCESS", you cannot set a reason string.
-
It is not possible to prevent sending a
CONNACKmessage to a client.
If response information is requested, you can use the ConnectInboundInterceptor to get the information
as shown in this example.
Furthermore Reason Code, Reason String and MQTT User Properties are MQTT 5 features.
In MQTT 3 Environments the Reason Code will always be set to SUCCESS, the Reason String is set to null
and it does not contain any user properties.
|
Connack Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT CONNACK packet
-
MQTT Version
-
Client ID
-
IP Address
-
Listener with port, bind address and type
-
ConnectionAttributeStore
-
TLS
The JavaDoc can be found here.
Connack Outbound Output
The output parameter contains a modifiable CONNACK packet.
You can use the CONNACK packet to manipulate the parameters that are listed in the table above.
For more information on ways to manipulate outbound CONNACK packets, see
here.
Example Usage
This section provides examples of how to implement different ConnackOutboundInterceptor
and add them through the GlobalInterceptorRegistry.
Examples:
Set Up In GlobalInterceptorRegistry
This example shows how to set up a ConnackOutboundInterceptor by a ConnackOutboundInterceptorProvider with the GlobalInterceptorRegistry:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput extensionStartInput, @NotNull final ExtensionStartOutput extensionStartOutput) {
final ConnackOutboundInterceptorProvider interceptorProvider = new ConnackOutboundInterceptorProvider() {
@Override
public @NotNull ConnackOutboundInterceptor getConnackOutboundInterceptor(final @NotNull ConnackOutboundProviderInput connackOutboundProviderInput) {
return new ConnackOutboundInterceptor() {
@Override
public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
//do something with the connack eg. logging
System.out.println("Connack intercepted for client: " + connackOutboundInput.getClientInformation().getClientId());
}
};
}
};
Services.interceptorRegistry().setConnackOutboundInterceptorProvider(interceptorProvider);
}
...
Modify a CONNACK Message
This example shows how to implement a ConnackOutboundInterceptor that modifies parameters of an outbound CONNACK message:
public class ModifyingConnackOutboundInterceptor implements ConnackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingConnackOutboundInterceptor.class);
@Override
public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
//get the modifiable connack object from the output
final ModifiableConnackPacket connackPacket = connackOutboundOutput.getConnackPacket();
// modify / overwrite parameters of a connack packet.
try {
if (connackPacket.getReasonCode() != ConnackReasonCode.SUCCESS) {
//Set reason string and reason code if reason code is not success.
connackPacket.setReasonString("The reason for the not successful connack");
connackPacket.setReasonCode(ConnackReasonCode.UNSPECIFIED_ERROR);
}
connackPacket.setServerReference("server reference");
connackPacket.getUserProperties().addUserProperty("my-prop", "some value");
connackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (Exception e){
log.debug("Connack outbound interception failed:", e);
}
}
}
Modify a CONNACK Message Asynchronously
This example shows how to implement a ConnackOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound CONNACK message.
The default timeout strategy is failure. If the task takes more than 10 seconds, the connection of the client is closed without sending a CONNACK.
public class AsyncModifyingConnackOutboundInterceptor implements ConnackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingConnackOutboundInterceptor.class);
@Override
public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<ConnackOutboundOutput> async = connackOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable connack object from the output
final ModifiableConnackPacket connackPacket = connackOutboundOutput.getConnackPacket();
// modify / overwrite parameters of a connack packet.
try {
if (connackPacket.getReasonCode() != ConnackReasonCode.SUCCESS) {
//Set reason string and reason code if reason code is not success.
connackPacket.setReasonString("The reason for the not successful connack");
connackPacket.setReasonCode(ConnackReasonCode.UNSPECIFIED_ERROR);
}
connackPacket.setServerReference("server reference");
connackPacket.getUserProperties().addUserProperty("my-prop", "some value");
connackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (Exception e){
log.debug("Connack outbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if(throwable != null){
log.debug("Connack outbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Modify the Response Information
This example shows how to implement a ConnackOutboundInterceptor that modifies the response information by storing the information if response information is requested in the ConnectInboundInterceptor:
public class ModifyResponseInformationMain implements ExtensionMain {
private static final Logger log = LoggerFactory.getLogger(ModifyResponseInformationMain.class);
private final Map<String, Boolean> clientIdResponseInformationRequestedMap = new HashMap<>();
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
final ConnectInboundInterceptor connectInboundInterceptor = new ConnectInboundInterceptor() {
@Override
public void onConnect(final @NotNull ConnectInboundInput connectInboundInput, final @NotNull ConnectInboundOutput connectInboundOutput) {
try {
//get connect packet from input
final ConnectPacket connectPacket = connectInboundInput.getConnectPacket();
//store client id if response information is requested.
clientIdResponseInformationRequestedMap.put(connectPacket.getClientId(), connectPacket.getRequestResponseInformation());
} catch (final Exception e){
log.debug("Connect inbound interception failed: ", e);
}
}
};
final ConnackOutboundInterceptor connackOutboundInterceptor = new ConnackOutboundInterceptor() {
@Override
public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
try {
//get client id from input
final String clientId = connackOutboundInput.getClientInformation().getClientId();
//check if problem information is requested
final Boolean responseInformationRequested = clientIdResponseInformationRequestedMap.get(clientId);
if (responseInformationRequested != null && responseInformationRequested) {
connackOutboundOutput.getConnackPacket().setResponseInformation("Some response information");
}
//remove from map since it is not needed anymore
clientIdResponseInformationRequestedMap.remove(clientId);
} catch (final Exception e){
log.debug("Connack outbound interception failed: ", e);
}
}
};
// set connect inbound interceptor provider with interceptor
Services.interceptorRegistry().setConnectInboundInterceptorProvider(new ConnectInboundInterceptorProvider() {
@Override
public ConnectInboundInterceptor getConnectInboundInterceptor(final @NotNull ConnectInboundProviderInput input) {
return connectInboundInterceptor;
}
});
// set connack outbound interceptor provider with interceptor
Services.interceptorRegistry().setConnackOutboundInterceptorProvider(new ConnackOutboundInterceptorProvider() {
@Override
public ConnackOutboundInterceptor getConnackOutboundInterceptor(final @NotNull ConnackOutboundProviderInput input) {
return connackOutboundInterceptor;
}
});
}
@Override
public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {
}
}
Publish Inbound Interceptor
With the PublishInboundInterceptor interface, you can intercept inbound MQTT PUBLISH packets.
You can use the PublishInboundInterceptor to modify, analyse, or prevent the delivery of the data that is contained in inbound MQTT PUBLISH messages.
For this task, HiveMQ provides PublishInboundInput and PublishInboundOutput parameters.
Extension input and output principles apply.
The PublishInboundOutput is blocking , it can easily be used to create an asynchronous PublishInboundOutput object.
The interceptor can be set up in the ClientContext. For more information, see Initializer Registry.
When the delivery of a PUBLISH packet is prevented, the message is dropped.
In the HiveMQ Control Center, you can see the number of dropped messages from every PublishInboundInterceptor.
When delivery is prevented with an AckReasonCode other than SUCCESS, MQTT 3 clients are disconnected.
However, an MQTT 5 client receives the PUBACK or PUBREC with the provided reason code and optional reason string.
Based on the quality of service level (QoS), when the AckReasonCode is SUCCESS an acknowledgement is sent (PUBACK or PUBREC).
When an async PublishInboundOutput times out with a TimeoutFallback.FAILURE fallback strategy, the process is the same.
Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The PublishInboundOutput and the PublishInboundInput are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The quality of service level for the message. Possible values are 0, 1, and 2. |
|
The information channel to which the data in the payload of the message is published. |
|
A flag that represents the encoding of the payload. Possible values are UNSPECIFIED and UTF_8. |
|
The number of seconds until the messages expire. If the server cannot begin message delivery to a specific subscriber within the message-expiry interval that is defined for the message, the copy of the message for that subscriber is deleted. |
|
An optional UTF-8 string that can be used as a topic for response messages. |
|
Optional binary data that follows the response topic and enables the original sender of the request to handle asynchronous responses that can possibly be sent from multiple receivers. |
|
String parameter that describes the content of the publish message. |
|
The payload of the publish message that contains the data that is published. |
If any interceptor prevents delivery of a PUBLISH packet, no additional interceptors are called.
|
Publish Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT PUBLISH packet
-
MQTT Version
-
Client ID
-
IP Address
-
Listener with port, bind address and type
-
ConnectionAttributeStore
-
TLS
The JavaDoc can be found here.
Publish Inbound Output
The output parameter contains a modifiable PUBLISH packet.
You can use the PUBLISH packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound PUBLISH packets, see
here.
Example Usage
This section provides examples of how to implement different PublishInboundInterceptor
and add them to a ClientInitializer via the InitializerRegistry.
Examples:
Set Up a PublishInboundInterceptor in the ClientInitializer
This example shows how to set up a PublishInboundInterceptor with the ClientInitializer.
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {
// create a new publish inbound interceptor
final PublishInboundInterceptor publishInboundInterceptor = new PublishInboundInterceptor() {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// do something with the publish, for example, logging
System.out.println("Inbound publish message intercepted from client: "
+ publishInboundInput.getClientInformation().getClientId());
}
};
// create a new client initializer
final ClientInitializer clientInitializer = new ClientInitializer() {
@Override
public void initialize(
final @NotNull InitializerInput initializerInput,
final @NotNull ClientContext clientContext) {
// add the interceptor to the context of the connecting client
clientContext.addPublishInboundInterceptor(publishInboundInterceptor);
}
};
//register the client initializer
Services.initializerRegistry().setClientInitializer(clientInitializer);
}
...
Modify a PUBLISH Packet
This example shows how to implement a PublishInboundInterceptor that modifies all parameters of an inbound PUBLISH message.
public class ModifyingPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// get the modifiable publish packet from the output
final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();
// modify / overwrite any parameter of the publish packet.
publish.setContentType("modified contentType");
publish.setCorrelationData(ByteBuffer.wrap("modified correlation data".getBytes()));
publish.setMessageExpiryInterval(120L);
publish.setPayload(ByteBuffer.wrap("modified payload".getBytes()));
publish.setPayloadFormatIndicator(PayloadFormatIndicator.UNSPECIFIED);
publish.setQos(Qos.AT_MOST_ONCE);
publish.setTopic("modified topic");
publish.setResponseTopic("modified response topic");
publish.setRetain(false);
publish.getUserProperties().clear();
publish.getUserProperties().addUserProperty("mod-key", "mod-val");
}
}
Modify a PUBLISH Packet Asynchronously
This example shows how to implement a PublishInboundInterceptor that modifies an inbound PUBLISH message asynchronously
with the Managed Extension Executor Service.
The timeout strategy is failure. Onward delivery of the PUBLISH message
will be prevented if the task takes longer than 10 seconds.
public class ModifyingAsyncPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// make the output object async with a timeout duration of 10 seconds and the timeout fallback failure
final Async<PublishInboundOutput> asyncOutput =
publishInboundOutput.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
//submit external task to extension executor service
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
// get the modifiable publish packet from the output
final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();
// call external task that modifies the publish packet (method not provided)
callExternalTask(publish);
}
});
// add a callback for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object object, final @Nullable Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace(); // please use more sophisticated logging
}
// resume output to tell HiveMQ that asynchronous precessing is done
asyncOutput.resume();
}
});
}
}
Prevent Delivery of a PUBLISH Packet
This example shows how to implement a PublishInboundInterceptor
that prevents the delivery of PUBLISH packets that come from clients with IDs that contain "prevent":
and the topic is "topic/to/prevent".
public class PreventingPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// check some parameters if publish delivery must be prevented, for example, client id and topic.
final String clientId = publishInboundInput.getClientInformation().getClientId();
final String topic = publishInboundInput.getPublishPacket().getTopic();
if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {
// prevent publish delivery on the output object
publishInboundOutput.preventPublishDelivery();
}
}
}
Prevent Delivery of a PUBLISH Packet with Reason Code
This example shows how to implement a PublishInboundInterceptor
that prevents the delivery of PUBLISH packets that come from clients with IDs that contain "prevent":
and the topic is "topic/to/prevent".
When the quality of service level of the PUBLISH is greater than zero and the client is an MQTT 5 client,
the client receives a PUBACK or PUBREC with reason code "TOPIC_NAME_INVALID".
Reason codes for PUBACK or PUBREC are an MQTT 5 feature.
If the reason code is not "SUCCESS", MQTT 3 clients are disconnected ungracefully.
public class PreventAndAckPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// check some parameters if publish delivery must be prevented, for example client id and topic.
final String clientId = publishInboundInput.getClientInformation().getClientId();
final String topic = publishInboundInput.getPublishPacket().getTopic();
if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {
// prevent publish delivery with a reason code on the output object
publishInboundOutput.preventPublishDelivery(AckReasonCode.TOPIC_NAME_INVALID);
}
}
}
Prevent Delivery of a PUBLISH Packet with Reason String
This example shows how to implement a PublishInboundInterceptor
that prevents the delivery of PUBLISH packets that come from clients with IDs that contain "prevent"
and the topic is "topic/to/prevent".
When the quality of service level of the PUBLISH is greater than zero and the client is an MQTT 5 client,
the client receives a PUBACK or PUBREC with the reason code "TOPIC_NAME_INVALID"
and the reason string "It is not allowed to publish to topic: topic/to/prevent".
Reason codes for PUBACK or PUBREC are an MQTT 5 feature.
If the reason code is not "SUCCESS", MQTT 3 clients are disconnected ungracefully.
public class PreventWithReasonPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// check some parameters if publish delivery must be prevented, foe example client id and topic.
final String clientId = publishInboundInput.getClientInformation().getClientId();
final String topic = publishInboundInput.getPublishPacket().getTopic();
if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {
// prevent publish delivery with a reason code and reason string on the output object
publishInboundOutput.preventPublishDelivery(AckReasonCode.TOPIC_NAME_INVALID, "It is not allowed to publish to topic: " + topic);
}
}
}
Publish Outbound Interceptor
With the PublishOutboundInterceptor interface,
you can modify an MQTT PUBLISH at the moment that the packet is sent to a subscribed client or prevent the delivery of the publish completely.
For this task, HiveMQ provides PublishOutboundInput, and PublishOutboundOutput parameters.
Extension input and extension output principles apply.
On the PublishOutboundInterceptor interface, you can modify an MQTT PUBLISH at the moment that the packet is sent to a subscribed client or prevent the delivery of the publish completely.
For this task a PublishOutboundInput and a PublishOutboundOutput are provided by HiveMQ. Extension input and extension output principles apply.
The interceptor can be set up in the ClientInitializer.
For more information, see Initializer Registry.
When an asynchronous PublishOutboundOutput parameter with a TimeoutFallback.FAILURE fallback strategy times out, delivery is prevented.
Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The PublishOutboundOutput and PublishOutboundInput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The Quality of Service level for the message. Possible values are 0, 1, and 2. |
|
The information channel to which the data in the payload of the message is published. |
|
A flag that indicates the encoding of the payload. |
|
The number of seconds until the messages expire. If the server cannot begin message delivery to a specific subscriber within the message-expiry interval that is defined for the message, the copy of the message for that subscriber is deleted. |
|
An optional UTF-8 string that can be used as a topic for response messages. |
|
Optional binary data that follows the response topic and enables the original sender of the request to handle asynchronous responses that can possibly be sent from multiple receivers. |
|
String parameter that describes the content of the publish message. |
|
The payload of the publish message that contains the data that is published. |
The JavaDoc for the PublishOutboundInterceptor can be found
here.
Example Usage
This section provides examples of how to implement different PublishOutboundInterceptor
and add them to a ClientInitializer via the InitializerRegistry.
Examples:
Set up a PublishOutboundInterceptor in the ClientInitializer
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput,
final @NotNull ExtensionStartOutput extensionStartOutput) {
try {
Services.initializerRegistry().setClientInitializer(new ClientInitializer() {
@Override
public void initialize(@NotNull InitializerInput initializerInput, @NotNull ClientContext clientContext) {
clientContext.addPublishOutboundInterceptor(new MyPublishOutboundInterceptor());
}
});
} catch (Exception e) {
log.error("Exception thrown at extension start: ", e);
}
}
...
Prevent Delivery of a PUBLISH Message to a Client
You can use the PublishOutboundOutput parameter to prevent an outgoing publish.
This method causes the message to be dropped. The message is not sent to the subscriber.
Use of the parameter in this way has no effect on other subscribers for the same message.
The com.hivemq.messages.dropped.extension-prevented.count metric tracks the number of messages that are dropped because the extension system prevented delivery.
public class PreventPublishOutboundInterceptor implements PublishOutboundInterceptor {
@Override
public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
final String clientId = input.getClientInformation().getClientId();
final String topic = input.getPublishPacket().getTopic();
if (!topic.startsWith(clientId)) {
output.preventPublishDelivery();
}
}
}
Modify Outgoing PUBLISH Message
Nearly all aspects of a PUBLISH packet can be modified in the interceptor, except the packet ID and the duplicate-delivery flag.
It is important to note that these modifications only affect the way the subscriber receives the PUBLISH. There is no impact on the way that HiveMQ processes the message.
This affects the following properties:
| Property | Description |
|---|---|
|
Only the subscribers that have matching subscriptions for the original topic receive the |
|
The original state of the retain flag determines whether or not the |
|
Changes to the QoS level do not change the delivery guarantees. |
|
The original interval is still used to decide if a message is expired. |
Modify Topic Example
The following example modifies the topic of an outgoing PUBLISH message, adds a response topic, and stores the original topic as a user property:
public class ModifyPublishOutboundInterceptor implements PublishOutboundInterceptor {
@Override
public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
final String clientId = input.getClientInformation().getClientId();
final ModifiableOutboundPublish publish = output.getPublishPacket();
final String originalTopic = publish.getTopic();
publish.setTopic(originalTopic + "/" + clientId);
publish.getUserProperties().addUserProperty("original-topic", originalTopic);
publish.setResponseTopic("response/"+clientId);
}
}
Modify Payload Example
The following example decodes the payload of an outgoing PUBLISH, alters the string, and sets the altered string as the new payload asynchronously:
public class ModifyPublishOutboundInterceptor implements PublishOutboundInterceptor {
@Override
public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
final Async<PublishOutboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
Services.extensionExecutorService().submit(() -> {
final ModifiableOutboundPublish publish = output.getPublishPacket();
final String payloadString = StandardCharsets.UTF_8.decode(publish.getPayload().get()).toString();
final String newPayload = payloadString.toLowerCase();
publish.setPayload(StandardCharsets.UTF_8.encode(newPayload));
async.resume();
});
}
}
Puback Inbound Interceptor
On the PubackInboundInterceptor interface, you can intercept inbound MQTT PUBACK messages.
At the moment HiveMQ receives the acknowledgement of a QoS 1 publish message from any client.
You can use the PubackInboundInterceptor to modify or analyse the data that is contained in inbound MQTT PUBACK messages.
For this task, HiveMQ provides PubackInboundInput and PubackInboundOutput parameters.
Extension input and output principles apply.
The PubackInboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
Multiple interceptors can be called sequentially.
For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The PubackInboundOutput and PubackInboundInput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The reason code which determines the status of the |
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an inbound PUBACK message has the following limitations:
-
You cannot change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
PUBACKmessage to the server.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable puback packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will process the PUBACK.
|
Puback Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBACKpacket -
Connection information
-
Client Information
PUBACK reason code, PUBACK reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBACK packets are represented by the same PUBACK packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBACK packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Puback Inbound Output
The output parameter contains the modifiable PUBACK packet.
You can use the PUBACK packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate the packets values.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound puback packets, see
here.
Example Usage
This section provides examples on how to implement different PubackInboundInterceptor and add them with the ClientContext.
Examples:
Set Up In ClientContext
This example shows how to set up a PubackInboundInterceptor with the ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubackInboundInterceptor interceptor = (pubackInboundInput, pubackInboundOutput) -> {
log.debug("intercepted a PUBACK packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubackInboundInterceptor(interceptor);
});
}
...
Modify Inbound PUBACK
This example shows how to implement a PubackInboundInterceptor that modifies parameters of an inbound PUBACK message:
public class ModifyingPubackInboundInterceptor implements PubackInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubackInboundInterceptor.class);
@Override
public void onInboundPuback(final @NotNull PubackInboundInput pubackInboundInput, final @NotNull PubackInboundOutput pubackInboundOutput) {
//get the modifiable puback object from the output
final ModifiablePubackPacket pubackPacket = pubackInboundOutput.getPubackPacket();
// modify / overwrite parameters of a puback packet.
try {
if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubackPacket.setReasonString("The reason for the unsuccessful puback");
pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Puback inbound interception failed:", e);
}
}
}
Modify Inbound PUBACK Asynchronously
This example shows how to implement a PubackInboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an inbound PUBACK message.
If the task takes more than 10 seconds, the unmodified PUBACK is forwarded to the next interceptor or finally processed by the broker.
public class AsyncModifyingPubackInboundInterceptor implements PubackInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubackInboundInterceptor.class);
@Override
public void onInboundPuback(final @NotNull PubackInboundInput pubackInboundInput, final @NotNull PubackInboundOutput pubackInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubackInboundOutput> async = pubackInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable puback object from the output
final ModifiablePubackPacket pubackPacket = pubackInboundOutput.getPubackPacket();
// modify / overwrite parameters of a puback packet.
try {
if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubackPacket.setReasonString("The reason for the unsuccessful puback");
pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Puback inbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Puback inbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Puback Outbound Interceptor
On the PubackOutboundInterceptor interface, you can intercept outbound MQTT PUBACK messages.
You can use the PubackOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT PUBACK messages.
For this task, HiveMQ provides PubackOutboundInput, and PubackOutboundOutput parameters.
Extension input and output principles apply.
The PubackOutboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
Multiple interceptors can be called sequentially.
For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The PubackOutboundInput and PubackOutboundOutput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The reason code which determines the status of the |
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an outbound PUBACK message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
PUBACKmessage to a client.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable puback packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will sent the PUBACK to the client.
|
Puback Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBACKpacket -
Connection information
-
Client Information
PUBACK reason code, PUBACK reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBACK packets are represented by the same PUBACK packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBACK packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Puback Outbound Output
The output parameter contains a modifiable PUBACK packet. The following properties can be modified:
You can use the PUBACK packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate the packets values.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound PUBACK packets, see
here.
Example Usage
This section provides examples on how to implement different PubackOutboundInterceptor and add them with the ClientContext.
Examples:
Set Up In ClientContext
This example shows how to set up a PubackOutboundInterceptor with the ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubackOutboundInterceptor interceptor = (pubackOutboundInput, pubackOutboundOutput) -> {
log.debug("intercepted a PUBACK packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubackOutboundInterceptor(interceptor);
});
}
...
Modify Outbound PUBACK
This example shows how to implement a PubackOutboundInterceptor that modifies parameters of an outbound PUBACK message:
public class ModifyingPubackOutboundInterceptor implements PubackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubackOutboundInterceptor.class);
@Override
public void onOutboundPuback(final @NotNull PubackOutboundInput pubackOutboundInput, final @NotNull PubackOutboundOutput pubackOutboundOutput) {
//get the modifiable puback object from the output
final ModifiablePubackPacket pubackPacket = pubackOutboundOutput.getPubackPacket();
// modify / overwrite parameters of a puback packet.
try {
if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubackPacket.setReasonString("The reason for the unsuccessful puback");
pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Puback outbound interception failed:", e);
}
}
}
Modify Outbound PUBACK Asynchronously
This example shows how to implement a PubackOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound PUBACK message.
If the task takes more than 10 seconds, the unmodified PUBACK is forwarded to the next interceptor or finally sent to the client.
public class AsyncModifyingPubackOutboundInterceptor implements PubackOutboundPubackInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubackOutboundInterceptor.class);
@Override
public void onOutboundPuback(final @NotNull PubackOutboundInput pubackOutboundInput, final @NotNull PubackOutboundOutput pubackOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubackOutboundOutput> async = pubackOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable puback object from the output
final ModifiablePubackPacket pubackPacket = pubackOutboundOutput.getPubackPacket();
// modify / overwrite parameters of a puback packet.
try {
if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubackPacket.setReasonString("The reason for the unsuccessful puback");
pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Puback outbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Puback outbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubrec Inbound Interceptor
On the PubrecInboundInterceptor interface, you can intercept inbound MQTT PUBREC messages
at the moment HiveMQ receives the acknowledgement of a QoS 1 publish message from any client.
You can use the PubrecInboundInterceptor to modify or analyse the data that is contained in inbound MQTT PUBREC messages.
For this task, HiveMQ provides PubrecInboundInput and PubrecInboundOutput parameters.
Extension input and output principles apply.
The PubrecInboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
Multiple interceptors can be called sequentially.
When various extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.
The PubrecInboundOutput and PubrecInboundInput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The reason code which determines the status of the |
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an inbound PUBREC message has the following limitations:
-
You cannot change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
PUBRECmessage to the server.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubrec packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will process the PUBREC.
|
Pubrec Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBRECpacket -
Connection information
-
Client information
PUBREC reason code, PUBREC reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBREC packets are represented by the same PUBREC packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBREC packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubrec Inbound Output
The output parameter contains the modifiable PUBREC packet.
You can use the PUBREC packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate inbound PUBREC packets can be found
here.
Example Usage
This section provides examples on how to implement different PubrecInboundInterceptor
and add them with the ClientContext.
Examples:
Set Up In ClientContext
This example shows how to set up a PubrecInboundInterceptor with the ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubrecInboundInterceptor interceptor = (pubrecInboundInput, pubrecInboundOutput) -> {
log.debug("intercepted a PUBREC packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubrecInboundInterceptor(interceptor);
});
}
...
Modify Inbound PUBREC
This example shows how to implement a PubrecInboundInterceptor that modifies parameters of an inbound PUBREC message:
public class ModifyingPubrecInboundInterceptor implements PubrecInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubrecInboundInterceptor.class);
@Override
public void onInboundPubrec(final @NotNull PubrecInboundInput pubrecInboundInput, final @NotNull PubrecInboundOutput pubrecInboundOutput) {
//get the modifiable pubrec object from the output
final ModifiablePubrecPacket pubrecPacket = pubrecInboundOutput.getPubrecPacket();
// modify / overwrite parameters of a pubrec packet.
try {
if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrec inbound interception failed:", e);
}
}
}
Modify Inbound PUBREC Asynchronously
This example shows how to implement a PubrecInboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an inbound PUBREC message.
If the task takes more than 10 seconds, the unmodified PUBREC is forwarded to the next interceptor or finally processed by the broker.
public class AsyncModifyingPubrecInboundInterceptor implements PubrecInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrecInboundInterceptor.class);
@Override
public void onInboundPubrec(final @NotNull PubrecInboundInput pubrecInboundInput, final @NotNull PubrecInboundOutput pubrecInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubrecInboundOutput> async = pubrecInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubrec object from the output
final ModifiablePubrecPacket pubrecPacket = pubrecInboundOutput.getPubrecPacket();
// modify / overwrite parameters of a pubrec packet.
try {
if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrec inbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubrec inbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubrec Outbound Interceptor
On the PubrecOutboundInterceptor interface, you can intercept outbound MQTT PUBREC messages.
You can use the PubrecOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT PUBREC messages.
For this task, HiveMQ provides PubrecOutboundInput, and PubrecOutboundOutput parameters.
Extension input and output principles apply.
The PubrecOutboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
Multiple interceptors are called sequentially.
The interceptor with the highest priority is called first.
The PubrecOutboundOutput and PubrecOutboundInput parameters are updated after each interceptor.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The reason code which determines the status of the |
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an outbound PUBREC message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
PUBRECmessage to a client.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubrec packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will sent the PUBREC to the client.
|
Pubrec Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBRECpacket -
Connection information
-
Client Information
PUBREC reason code, PUBREC reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBREC packets are represented by the same PUBREC packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBREC packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubrec Outbound Output
The output parameter contains a modifiable PUBREC packet. The following properties can be modified:
You can use the PUBREC packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate the packets values.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate outbound PUBREC packets can be found
here.
Example Usage
This section provides examples on how to implement different PubrecOutboundInterceptor and add them with the ClientContext.
Examples:
Set Up In ClientContext
This example shows how to set up a PubrecOutboundInterceptor with the ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubrecOutboundInterceptor interceptor = (pubrecOutboundInput, pubrecOutboundOutput) -> {
log.debug("intercepted a PUBREC packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubrecOutboundInterceptor(interceptor);
});
}
...
Modify Outbound PUBREC
This example shows how to implement a PubrecOutboundInterceptor that modifies parameters of an outbound PUBREC message:
public class ModifyingPubrecOutboundInterceptor implements PubrecOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubrecOutboundInterceptor.class);
@Override
public void onOutboundPubrec(final @NotNull PubrecOutboundInput pubrecOutboundInput, final @NotNull PubrecOutboundOutput pubrecOutboundOutput) {
//get the modifiable pubrec object from the output
final ModifiablePubrecPacket pubrecPacket = pubrecOutboundOutput.getPubrecPacket();
// modify / overwrite parameters of a pubrec packet.
try {
if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrec outbound interception failed:", e);
}
}
}
Modify Outbound PUBREC Asynchronously
This example shows how to implement a PubrecOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound PUBREC message.
If the task takes more than 10 seconds, the unmodified PUBREC is forwarded to the next interceptor or finally sent to the client.
public class AsyncModifyingPubrecOutboundInterceptor implements PubrecOutboundPubrecInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrecOutboundInterceptor.class);
@Override
public void onOutboundPubrec(final @NotNull PubrecOutboundInput pubrecOutboundInput, final @NotNull PubrecOutboundOutput pubrecOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubrecOutboundOutput> async = pubrecOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubrec object from the output
final ModifiablePubrecPacket pubrecPacket = pubrecOutboundOutput.getPubrecPacket();
// modify / overwrite parameters of a pubrec packet.
try {
if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrec outbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubrec outbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubrel Inbound Interceptor
On the PubrelInboundInterceptor interface, you can intercept inbound MQTT PUBREL messages.
You can use the PubrelInboundInterceptor to modify or analyse the data that is contained in inbound MQTT PUBACK messages.
For this task, HiveMQ provides PubrelInboundInput and PubrelInboundOutput parameters.
Extension input and output principles apply.
The PubrelInboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
Multiple interceptors can be called sequentially.
For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The PubrelInboundOutput and PubrelInboundInput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an inbound PUBREL message has the following limitations:
-
It is not possible to prevent sending a
PUBRELmessage to the server.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubrel packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will process the PUBREL.
|
Pubrel Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBRELpacket -
Connection information
-
Client information
PUBREL reason code, PUBREL reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBREL packets are represented by the same PUBREL packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBREL packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubrel Inbound Output
The output parameter contains the modifiable PUBREL packet.
You can use the PUBREL packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate inbound puback packets can be found
here.
Example Usage
This section provides examples on how to implement different PubrelInboundInterceptor
and add them with the ClientContext.
Examples:
Set Up In ClientContext
This example shows how to set up a PubrelInboundInterceptor with the ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubrelInboundInterceptor interceptor = (pubrelInboundInput, pubrelInboundOutput) -> {
log.debug("intercepted a PUBREL packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubrelInboundInterceptor(interceptor);
});
}
...
Modify Inbound PUBREL
This example shows how to implement a PubrelInboundInterceptor that modifies parameters of an inbound PUBREL message:
public class ModifyingPubrelInboundInterceptor implements PubrelInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubrelInboundInterceptor.class);
@Override
public void onInboundPubrel(final @NotNull PubrelInboundInput pubrelInboundInput, final @NotNull PubrelInboundOutput pubrelInboundOutput) {
//get the modifiable pubrel object from the output
final ModifiablePubrelPacket pubrelPacket = pubrelInboundOutput.getPubrelPacket();
// modify / overwrite parameters of a pubrel packet.
try {
if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
}
pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrel inbound interception failed:", e);
}
}
}
Modify Inbound PUBREL Asynchronously
This example shows how to implement a PubrelInboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an inbound PUBREL message.
If the task takes more than 10 seconds, the unmodified PUBREL is forwarded to the next interceptor or finally processed by the broker.
public class AsyncModifyingPubrelInboundInterceptor implements PubrelInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrelInboundInterceptor.class);
@Override
public void onInboundPubrel(final @NotNull PubrelInboundInput pubrelInboundInput, final @NotNull PubrelInboundOutput pubrelInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubrelInboundOutput> async = pubrelInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubrel object from the output
final ModifiablePubrelPacket pubrelPacket = pubrelInboundOutput.getPubrelPacket();
// modify / overwrite parameters of a pubrel packet.
try {
if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
}
pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrel inbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubrel inbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubrel Outbound Interceptor
On the PubrelOutboundInterceptor interface, you can intercept outbound MQTT PUBREL messages.
You can use the PubrelOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT PUBREL messages.
For this task, HiveMQ provides PubrelOutboundInput, and PubrelOutboundOutput parameters.
Extension input and output principles apply.
The PubrelOutboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
Multiple interceptors are called sequentially. The interceptor with the highest priority is called first.
The PubrelOutboundOutput and PubrelOutboundInput parameters are updated after each interceptor.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an outbound PUBREL message has the following limitations:
-
It is not possible to prevent sending a
PUBRELmessage to a client.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubrel packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will sent the PUBREL to the client.
|
Pubrel Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBRELpacket -
Connection information
-
Client information
PUBREL reason code, PUBREL reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBREL packets are represented by the same PUBREL packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBREL packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubrel Outbound Output
The output parameter contains a modifiable PUBREL packet. The following properties can be modified:
You can use the PUBREL packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate inbound PUBREL packets can be found
here.
Example Usage
This section provides examples on how to implement different PubrelOutboundInterceptor and add them with the ClientContext.
Examples:
Set Up In ClientContext
This example shows how to set up a PubrelOutboundInterceptor with the ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubrelOutboundInterceptor interceptor = (pubrelOutboundInput, pubrelOutboundOutput) -> {
log.debug("intercepted a PUBREL packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubrelOutboundInterceptor(interceptor);
});
}
...
Modify Outbound PUBREL
This example shows how to implement a PubrelOutboundInterceptor that modifies parameters of an outbound PUBREL message:
public class ModifyingPubrelOutboundInterceptor implements PubrelOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubrelOutboundInterceptor.class);
@Override
public void onOutboundPubrel(final @NotNull PubrelOutboundInput pubrelOutboundInput, final @NotNull PubrelOutboundOutput pubrelOutboundOutput) {
//get the modifiable pubrel object from the output
final ModifiablePubrelPacket pubrelPacket = pubrelOutboundOutput.getPubrelPacket();
// modify / overwrite parameters of a pubrel packet.
try {
if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
}
pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrel outbound interception failed:", e);
}
}
}
Modify Outbound PUBREL Asynchronously
This example shows how to implement a PubrelOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound PUBREL message.
If the task takes more than 10 seconds, the unmodified PUBREL is forwarded to the next interceptor or finally sent to the client.
public class AsyncModifyingPubrelOutboundInterceptor implements PubrelOutboundPubrelInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrelOutboundInterceptor.class);
@Override
public void onOutboundPubrel(final @NotNull PubrelOutboundInput pubrelOutboundInput, final @NotNull PubrelOutboundOutput pubrelOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubrelOutboundOutput> async = pubrelOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubrel object from the output
final ModifiablePubrelPacket pubrelPacket = pubrelOutboundOutput.getPubrelPacket();
// modify / overwrite parameters of a pubrel packet.
try {
if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
}
pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrel outbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubrel outbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubcomp Inbound Interceptor
On the PubcompInboundInterceptor interface, you can intercept inbound MQTT PUBCOMP messages
at the moment HiveMQ receives the acknowledgement of a QoS 1 publish message from any client.
You can use the PubcompInboundInterceptor to modify or analyse the data that is contained in inbound MQTT PUBCOMP messages.
For this task, HiveMQ provides PubcompInboundInput and PubcompInboundOutput parameters.
Extension input and output principles apply.
You can use a PubcompInboundInterceptor to modify inbound PUBCOMP messages.
The PubcompInboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
Multiple interceptors can be called sequentially.
When various extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.
The PubcompInboundOutput and `PubcompInboundInput`parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an inbound PUBCOMP message has the following limitations:
-
It is not possible to prevent sending a
PUBCOMPmessage to the server.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubcomp packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will process the PUBCOMP.
|
Pubcomp Inbound Input
The input parameter contains the following unmodifiable information:
-
PUBCOMPpacket identifier -
PUBCOMPreason string -
MQTT user properties
PUBCOMP reason code, PUBCOMP reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBCOMP packets are represented by the same PUBCOMP packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBCOMP packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubcomp Inbound Output
The output parameter contains the modifiable PUBCOMP packet.
You can use the PUBCOMP packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packets.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate inbound pubcomp packets can be found
here.
Example Usage
This section provides examples on how to implement different PubcompInboundInterceptor and add them with the ClientContext.
Examples:
Set Up In ClientContext
This example shows how to set up a PubcompInboundInterceptor with the ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubcompInboundInterceptor interceptor = (pubcompInboundInput, pubcompInboundOutput) -> {
log.debug("intercepted a PUBCOMP packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubcompInboundInterceptor(interceptor);
});
}
...
Modify Inbound PUBCOMP
This example shows how to implement a PubcompInboundInterceptor that modifies parameters of an inbound PUBCOMP message:
public class ModifyingPubcompInboundInterceptor implements PubcompInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubcompInboundInterceptor.class);
@Override
public void onInboundPubcomp(final @NotNull PubcompInboundInput pubcompInboundInput, final @NotNull PubcompInboundOutput pubcompInboundOutput) {
//get the modifiable pubcomp object from the output
final ModifiablePubcompPacket pubcompPacket = pubcompInboundOutput.getPubcompPacket();
// modify / overwrite parameters of a pubcomp packet.
try {
if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubcompPacket.setReasonString("The reason for the unsuccessful pubcomp");
}
pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubcomp inbound interception failed:", e);
}
}
}
Modify Inbound PUBCOMP Asynchronously
This example shows how to implement a PubcompInboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an inbound PUBCOMP message.
If the task takes more than 10 seconds, the unmodified PUBCOMP is forwarded to the next interceptor or finally processed by the broker.
public class AsyncModifyingPubcompInboundInterceptor implements PubcompInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubcompInboundInterceptor.class);
@Override
public void onInboundPubcomp(final @NotNull PubcompInboundInput pubcompInboundInput, final @NotNull PubcompInboundOutput pubcompInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubcompInboundOutput> async = pubcompInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubcomp object from the output
final ModifiablePubcompPacket pubcompPacket = pubcompInboundOutput.getPubcompPacket();
// modify / overwrite parameters of a pubcomp packet.
try {
if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubcompPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubcomp inbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubcomp inbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubcomp Outbound Interceptor
On the PubcompOutboundInterceptor interface, you can intercept outbound MQTT PUBCOMP messages.
You can use the PubcompOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT PUBCOMP messages.
For this task, HiveMQ provides PubcompOutboundInput, and PubcompOutboundOutput parameters.
Extension input and output principles apply.
The PubcompOutboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
Multiple interceptors are called sequentially.
The interceptor with the highest priority is called first.
The PubcompOutboundOutput and PubcompOutboundInput parameters are updated after each interceptor.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an outbound PUBCOMP message has the following limitations:
-
It is not possible to prevent sending a
PUBCOMPmessage to a client.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubrec packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will sent the PUBCOMP to the client.
|
Pubcomp Outbound Input
The input parameter contains the following unmodifiable information:
-
PUBCOMPpacket -
Connection information
-
Client information
PUBCOMP reason code, PUBCOMP reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBCOMP packets are represented by the same PUBCOMP packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBCOMP packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubcomp Outbound Output
The output parameter contains the modifiable PUBCOMP packet.
You can use the PUBCOMP packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packets.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate inbound pubcomp packets can be found
here.
Example Usage
This section provides examples on how to implement different PubcompOutboundInterceptor and add them with the ClientContext.
Examples:
Set Up In ClientContext
This example shows how to set up a PubcompOutboundInterceptor with the ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubcompOutboundInterceptor interceptor = (pubcompOutboundInput, pubcompOutboundOutput) -> {
log.debug("intercepted a PUBCOMP packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubcompOutboundInterceptor(interceptor);
});
}
...
Modify Outbound PUBCOMP
This example shows how to implement a PubcompOutboundInterceptor that modifies parameters of an outbound PUBCOMP message:
public class ModifyingPubcompOutboundInterceptor implements PubcompOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubcompOutboundInterceptor.class);
@Override
public void onOutboundPubcomp(final @NotNull PubcompOutboundInput pubcompOutboundInput, final @NotNull PubcompOutboundOutput pubcompOutboundOutput) {
//get the modifiable pubcomp object from the output
final ModifiablePubcompPacket pubcompPacket = pubcompOutboundOutput.getPubcompPacket();
// modify / overwrite parameters of a pubcomp packet.
try {
if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubcompPacket.setReasonString("The reason for the unsuccessful pubcomp");
}
pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubcomp outbound interception failed:", e);
}
}
}
Modify Outbound PUBCOMP Asynchronously
This example shows how to implement a PubcompOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound PUBCOMP message.
If the task takes more than 10 seconds, the unmodified PUBCOMP is forwarded to the next interceptor or finally sent to the client.
public class AsyncModifyingPubcompOutboundInterceptor implements PubcompOutboundPubcompInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubcompOutboundInterceptor.class);
@Override
public void onOutboundPubcomp(final @NotNull PubcompOutboundInput pubcompOutboundInput, final @NotNull PubcompOutboundOutput pubcompOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubcompOutboundOutput> async = pubcompOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubcomp object from the output
final ModifiablePubcompPacket pubcompPacket = pubcompOutboundOutput.getPubcompPacket();
// modify / overwrite parameters of a pubcomp packet.
try {
if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubcompPacket.setReasonString("The reason for the unsuccessful pubcomp");
}
pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubcomp outbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubcomp outbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Subscribe Inbound Interceptor
On the SubscribeInboundInterceptor interface, you can intercept inbound MQTT Subscribe messages.
You can use the SubscribeInboundInterceptor to modify or analyse the data that is contained in inbound MQTT Subscribe messages.
For this task, HiveMQ provides SubscribeInboundInput, and SubscribeInboundOutput parameters.
Extension input and output principles apply.
The interceptor can be set up in the ClientContext.
The SubscribeInboundOutput parameter can easily be used for asynchronous processing.
By default, when the asynchronous processing times out, the SUBSCRIBE is rejected and a SUBACK message with the
UNSPECIFIED_ERROR reason code is sent to the client.
Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
Changes from the PublishInboundOutput parameter are reflected in the PublishInboundInput parameter of the next interceptor.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The list of topic filters and additional options that are associated with the subscriptions attributes. |
|
The subscription identifier for the |
|
Additional diagnostic or other information (optional). |
Subscribe Inbound Input
The SubscribeInboundInput parameter contains the following unmodifiable information.
-
MQTT
SUBSCRIBEpacket -
Connection information
-
Client information
The JavaDoc can be found here.
Subscribe Inbound Output
The SubscribeInboundOutput parameter contains the modifiable SUBSCRIBE packet.
You can use the SUBSCRIBE packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in packets.
Some values cause exceptions to be thrown.
For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound subscribe packets, see the JavaDoc
here.
Example Usage
Set Up a SubscribeInboundInterceptor in a ClientInitializer
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {
// create a new subscribe inbound interceptor
final SubscribeInboundInterceptor subscribeInboundInterceptor = new SubscribeInboundInterceptor() {
@Override
public void onInboundPublish(
final @NotNull SubscribeInboundInput subscribeInboundInput,
final @NotNull SubscribeInboundOutput subscribeInboundOutput) {
// do something with the subscribe, for example logging
log.debug("Inbound subscribe message intercepted from client: "
+ subscribeInboundInput.getClientInformation().getClientId());
}
};
// create a new client initializer
final ClientInitializer clientInitializer = new ClientInitializer() {
@Override
public void initialize(
final @NotNull InitializerInput initializerInput,
final @NotNull ClientContext clientContext) {
// add the interceptor to the context of the connecting client
clientContext.addSubscribeInboundInterceptor(subscribeInboundInterceptor);
}
};
// register the client initializer
Services.initializerRegistry().setClientInitializer(clientInitializer);
}
...
Modify a SUBSCRIBE Message
This example shows how to implement a SubscribeInboundInterceptor that modifies all parameters of an inbound
SUBSCRIBE message:
public class ModifyingSubscribeInboundInterceptor implements SubscribeInboundInterceptor {
@Override
public void onInboundSubscribe(
final @NotNull SubscribeInboundInput subscribeInboundInput,
final @NotNull SubscribeInboundOutput subscribeInboundOutput) {
// get the modifiable subscribe packet from the output
final ModifiableSubscribePacket subscribe = subscribeInboundOutput.getSubscribePacket();
// modify / overwrite any parameter of the subscribe packet
final ModifiableSubscription subscription = subscribe.getSubscriptions().get(0);
subscription.setTopicFilter("modified-topic");
subscription.setQos(Qos.AT_LEAST_ONCE);
subscription.setRetainHandling(RetainHandling.DO_NOT_SEND);
subscription.setRetainAsPublished(true);
subscription.setNoLocal(true);
subscribe.getUserProperties().addUserProperty("additional", "value");
}
}
Modify a SUBSCRIBE Message Asynchronously
This example shows how to implement a SubscribeInboundInterceptor that asynchronously modifies an inbound SUBSCRIBE
message with the Managed Extension Executor Service.
public class ModifyingAsyncSubscribeInboundInterceptor implements SubscribeInboundInterceptor {
@Override
public void onInboundSubscribe(
final @NotNull SubscribeInboundInput subscribeInboundInput,
final @NotNull SubscribeInboundOutput subscribeInboundOutput) {
// make output asynchronous with a timeout duration of 10 seconds and the timeout fallback failure
final Async<SubscribeInboundOutput> asyncOutput =
subscribeInboundOutput.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
// submit external task to the extension executor service
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(() -> {
// get the modifiable subscribe packet from the output
final ModifiableSubscribePacket subscribe = subscribeInboundOutput.getSubscribePacket();
// call external task that modifies the subscribe packet (method not provided)
callExternalTask(subscribe);
});
// add a callback for completion of the task
taskFuture.whenComplete((ignored, throwable) -> {
if (throwable != null) {
throwable.printStackTrace(); // please use more sophisticated logging
}
// resume output to tell HiveMQ that asynchronous precessing is done
asyncOutput.resume();
});
}
}
Suback Outbound Interceptor
With the SubackOutboundInterceptor interface, you can intercept outbound MQTT UNSUBACK messages.
You can use the SubackOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT SUBACK messages.
For this task, HiveMQ provides SubackOutboundInput and SubackOutboundOutput parameters.
Extension input and output principles apply.
The SubackOutboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
When an asynchronous SubackOutboundOutput parameter times out, all pending changes to the SUBACK are discarded and the unmodified SUBACK is sent to the client.
Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.
The SubackOutboundInput and SubackOutboundOutput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The cause of the disconnection. Clients can use this information to determine whether to retry the connection, and how long to wait before a reconnection attempt. |
|
A UTF-8 encoded string that shows the cause of the disconnect. This Reason String is human readable and designed for diagnostics. |
|
Additional diagnostic or other information. |
Interception of an outbound SUBACK message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending an
SUBACKmessage to a client.
When an interceptor throws an exception or causes a timeout using the async method,
then HiveMQ will ignore changes to the modifiable SUBACK packet made by the interceptor and will call the next interceptor.
If there are no further interceptors, the SUBACK is sent to the client.
|
Suback Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
DISCONNECTpacket -
Connection information
-
Client Information
The JavaDoc can be found here.
Suback Outbound Output
The output parameter contains the modifiable SUBACK packet.
You can use the SUBACK packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate the packets values.
Some values cause exceptions to be thrown.
For example, null or non-UTF8 characters.
For more information on ways to manipulate outbound suback packets, see
here.
Example Usage
This section provides examples on how to implement different SubackOutboundInterceptor and add them with the ClientContext.
Examples:
Set Up Interceptor in ClientContext
This example shows how to set up a SubackOutboundInterceptor with the help of a ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final SubackOutboundInterceptor interceptor = (subackOutboundInput, subackOutboundOutput) -> {
log.debug("intercepted a SUBACK packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addSubackOutboundInterceptor(interceptor);
});
}
...
Modify Outbound SUBACK
This example shows how to implement a SubackOutboundInterceptor that modifies the parameters of an outbound SUBACK message:
public class ModifyingSubackOutboundInterceptor implements SubackOutboundInterceptor {
@Override
public void onOutboundSuback(final @NotNull SubackOutboundInput subackOutboundInput, final @NotNull SubackOutboundOutput subackOutboundOutput) {
//get the modifiable suback object from the output
final ModifiableSubackPacket subackPacket = subackOutboundOutput.getSubackPacket();
subackPacket.setReasonString("The reason for the unsuccessful suback");
subackPacket.setReasonCode(DisconnectReasonCode.MALFORMED_PACKET);
subackPacket.getUserProperties().addUserProperty("my-prop", "some value");
subackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
}
}
Modify Outbound SUBACK Asynchronously
This example shows how to implement a SubackOutboundInterceptor that uses the Managed Extension Executor Service
to asynchronously modify an outbound SUBACK message.
If the task takes more than 10 seconds, the unmodified SUBACK is forwarded to the next interceptor or finally sent to the client.
public class AsyncModifyingSubackOutboundInterceptor implements SubackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingSubackOutboundInterceptor.class);
@Override
public void onOutboundSuback(final @NotNull SubackOutboundInput subackOutboundInput, final @NotNull SubackOutboundOutput subackOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<SubackOutboundOutput> async = subackOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable suback object from the output
final ModifiableSubackPacket subackPacket = subackOutboundOutput.getSubackPacket();
// modify / overwrite parameters of a suback packet.
try {
//Set reason string and reason code.
subackPacket.setReasonString("The reason for the unsuccessful Suback");
subackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
subackPacket.getUserProperties().addUserProperty("my-prop", "some value");
subackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Suback outbound interception failed:", e);
}
async.resume();
}
});
}
}
Unsubscribe Inbound Interceptor
With the UnsubscribeInboundInterceptor interface, you can intercept inbound MQTT UNSUBSCRIBE messages.
You can use the UnsubscribeInboundInterceptor to modify or analyse the data that is contained in inbound MQTT UNSUBSCRIBE messages.
For this task, HiveMQ provides UnsubscribeInboundInput and UnsubscribeInboundOutput parameters.
Extension input and output principles apply.
The UnsubscribeInboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.
The UnsubscribeInboundInput and UnsubscribeInboundOutput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
Lists the topics filters from which to unsubscribe the client. |
|
Additional diagnostic or other information. |
When an interceptor throws an exception or causes a timeout using the async method,
then HiveMQ will prevent the unsubscribe and will return an UNSUBACK packet to the client.
For an MQTT 5 client the UNSUBACK will contain the reason code Unspecified error for each topic filter.
|
Unsubscribe Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
UNSUBSCRIBEpacket -
Connection information
-
Client information
The JavaDoc can be found here.
Unsubscribe Inbound Output
The output parameter contains the modifiable UNSUBSCRIBE packet.
You can use the UNSUBSCRIBE packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound unsubscribe packets, see
here.
Example Usage
This section provides examples on how to implement different UnsubscribeInboundInterceptor and add them with the ClientContext.
Examples:
Set Up Interceptor in Client Context
This example shows how to set up an UnsubscribeInboundInterceptor with the help of a ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final UnsubscribeInboundInterceptor interceptor = (unsubscribeInboundInput, unsubscribeInboundOutput) -> {
log.info("intercepted a UNSUBSCRIBE packet for client {}.", unsubscribeInboundInput.getClientInformation().getClientId());
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addUnsubscribeInboundInterceptor(interceptor);
});
}
...
Modify Inbound UNSUBSCRIBE
This example shows how to implement an UnsubscribeInboundInterceptor that modifies the parameters of an inbound UNSUBSCRIBE message:
public class ModifyingUnsubscribeInboundInterceptor implements UnsubscribeInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingUnsubscribeInboundInterceptor.class);
@Override
public void onInboundUnsubscribe(final @NotNull UnsubscribeInboundInput unsubscribeInboundInput, final @NotNull UnsubscribeInboundOutput unsubscribeInboundOutput) {
//get the modifiable disconnect object from the output
final ModifiableUnsubscribePacket unsubscribePacket = unsubscribeInboundOutput.getUnsubscribePacket();
// modify / overwrite parameters of the disconnect packet.
try {
unsubscribePacket.getUserProperties().addUserProperty("my-prop", "some value");
unsubscribePacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Unsubscribe inbound interception failed:", e);
}
}
}
Modify Inbound UNSUBSCRIBE Asynchronously
This example shows how to implement an UnsubscribeInboundInterceptor that uses the Managed Extension Executor Service
to asynchronously modify an inbound UNSUBSCRIBE message.
public class AsyncModifyingUnsubscribeInboundInterceptor implements UnsubscribeInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingUnsubscribeInboundInterceptor.class);
@Override
public void onInboundUnsubscribe(final @NotNull UnsubscribeInboundInput unsubscribeInboundInput, final @NotNull UnsubscribeInboundOutput unsubscribeInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<UnsubscribeInboundOutput> async = unsubscribeInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable unsubscribe object from the output
final ModifiableUnsubscribePacket unsubscribePacket = unsubscribeInboundOutput.getUnsubscribePacket();
// modify / overwrite parameters of a unsubscribe packet.
try {
//Set reason string and reason code.
unsubscribePacket.setReasonString("The reason for the unsuccessful puback");
unsubscribePacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
unsubscribePacket.getUserProperties().addUserProperty("my-prop", "some value");
unsubscribePacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Unsubscribe inbound interception failed:", e);
}
async.resume();
}
});
}
}
Unsuback Outbound Interceptor
All changeable properties in the UNSUBACK packet are MQTT 5 properties and will not be sent to MQTT 3 clients.
Therefore, to optimize resource usage, the UNSUBACK packet should not be modified for MQTT 3 clients.
|
With the UnsubackOutboundInterceptor interface, you can intercept outbound MQTT UNSUBACK messages.
You can use the UnsubackOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT UNSUBACK messages.
For this task, HiveMQ provides UnsubackOutboundInput and UnsubackOutboundOutput parameters.
Extension input and output principles apply.
The UnsubackOutboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
When an asynchronous UnsubackOutboundOutput parameter times out, all pending changes to the UNSUBACK are discarded
and the unmodified UNSUBACK is sent to the client.
Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.
The UnsubackOutboundInput and UnsubackOutboundOutput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The list of reason codes that correspond to the different topic filters that are used in the unsubscribe package. |
|
An UTF-8 encoded string that shows the cause of the |
|
Additional diagnostic or other information (optional). |
Interception of an outbound UNSUBACK message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending an
UNSUBACKmessage to a client.
When an interceptor throws an exception or causes a timeout using the async method,
then HiveMQ will ignore changes to the modifiable UNSUBACK packet made by the interceptor and will call the next interceptor.
If there are no further interceptors, the UNSUBACK is sent to the client.
|
Unsuback Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
UNSUBACKpacket -
Connection information
-
Client information
The JavaDoc can be found here.
Unsuback Outbound Output
The output parameter contains the modifiable UNSUBACK packet.
You can use the UNSUBACK packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound unsuback packets, see
here.
Example Usage
This section provides examples on how to implement different UnsubackOutboundInterceptor and add them with the ClientContext.
Examples:
Set Up Interceptor in Client Context
This example shows how to set up a UnsubackOutboundInterceptor with the help of a ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final UnsubackOutboundInterceptor interceptor = (unsubackOutboundInput, unsubackOutboundOutput) -> {
log.debug("intercepted a UNSUBACK packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addUnsubackOutboundInterceptor(interceptor);
});
}
...
Modify Outbound UNSUBACK
This example shows how to implement an UnsubackOutboundInterceptor that modifies the parameters of an outbound UNSUBACK message:
public class ModifyingUnsubackOutboundInterceptor implements UnsubackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingUnsubackOutboundInterceptor.class);
@Override
public void onOutboundUnsuback(final @NotNull UnsubackOutboundInput unsubackOutboundInput, final @NotNull UnsubackOutboundOutput unsubackOutboundOutput) {
//get the modifiable unsuback object from the output
final ModifiableUnsubackPacket unsubackPacket = unsubackOutboundOutput.getUnsubackPacket();
// modify / overwrite parameters of the unsuback packet.
try {
//Set reason string and reason code if reason code is not success.
unsubackPacket.setReasonString("The reason for the unsuccessful unsuback");
List<UnsubackReasonCode> reasonCodes = new ArrayList<>();
reasonCodes.add(UnsubackReasonCode.SUCCESS);
unsubackPacket.setReasonCodes(reasonCodes);
unsubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
unsubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Unsuback outbound interception failed:", e);
}
}
}
Modify Outbound UNSUBACK Asynchronously
This example shows how to implement an UnsubackOutboundInterceptor that uses the Managed Extension Executor Service
to asynchronously modify an outbound UNSUBACK message.
If the task takes more than 10 seconds, the unmodified UNSUBACK is sent to the client.
public class AsyncModifyingUnsubackOutboundInterceptor implements UnsubackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingUnsubackOutboundInterceptor.class);
@Override
public void onOutboundUnsuback(final @NotNull UnsubackOutboundInput unsubackOutboundInput, final @NotNull UnsubackOutboundOutput unsubackOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<UnsubackOutboundOutput> async = unsubackOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable unsuback object from the output
final ModifiableOutboundPacket unsubackPacket = unsubackOutboundOutput.getUnsubackPacket();
// modify / overwrite parameters of a unsuback packet.
try {
//Set reason string and reason code.
unsubackPacket.setReasonString("The reason for the unsuccessful puback");
List<UnsubackReasonCode> reasonCodes = new ArrayList<>();
reasonCodes.add(UnsubackReasonCode.SUCCESS);
unsubackPacket.setReasonCodes(reasonCodes);
unsubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
unsubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Unsuback outbound interception failed:", e);
}
async.resume();
}
});
}
}
Disconnect Inbound Interceptor
On the DisconnectInboundInterceptor
interface, you can intercept inbound MQTT DISCONNECT messages.
You can use the DisconnectInboundInterceptor to modify or analyse the data that is contained in inbound MQTT DISCONNECT messages.
For this task, HiveMQ provides DisconnectInboundInput, and DisconnectInboundOutput parameters.
Extension input and output principles apply.
The DisconnectInboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
When an asynchronous DisconnectInboundOutput parameter times out, all pending changes to the DISCONNECT are discarded and the unmodified DISCONNECT is sent to the broker.
Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The DisconnectInboundInput and DisconnectInboundOutput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The cause of the disconnection. Clients can use this information to determine whether to retry the connection, and how long to wait before a reconnection attempt. |
|
An UTF-8 encoded string that shows the cause of the disconnect. This Reason String is human readable and designed for diagnostics. |
|
Additional diagnostic or other information. |
|
The amount of time (in seconds) after the client disconnects that the session data of the client is still valid on the HiveMQ broker. The time is counted from the moment that the client disconnects. If the Session Expiry Interval of the corresponding Connect packet is set to zero, the Session Expiry Interval of the Disconnect must also be set to zero. |
Interception of an inbound DISCONNECT message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
DISCONNECTmessage to the server.
| Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable disconnect packet by the interceptor and will call the next interceptor or should no interceptor be left to call will process the disconnect request by the client. |
Disconnect Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
DISCONNECTpacket -
Connection information
-
Client Information
The JavaDoc can be found here.
Disconnect Inbound Output
The output parameter contains the modifiable DISCONNECT packet.
You can use the DISCONNECT packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown.
For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound disconnect packets, see
here.
Example Usage
This section provides examples on how to implement different DisconnectInboundInterceptor and add them with the ClientContext.
Examples:
Set Up Interceptor in ClientContext
This example shows how to set up a DisconnectInboundInterceptor with the help of a ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final DisconnectInboundInterceptor interceptor = (disconnectInboundInput, disconnectInboundOutput) -> {
log.debug("intercepted a DISCONNECT packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addDisconnectInboundInterceptor(interceptor);
});
}
...
Modify Inbound DISCONNECT
This example shows how to implement a DisconnectInboundInterceptor that modifies the parameters of an inbound DISCONNECT message:
public class ModifyingDisconnectInboundInterceptor implements DisconnectInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingDisconnectInboundInterceptor.class);
@Override
public void onInboundDisconnect(final @NotNull DisconnectInboundInput disconnectInboundInput, final @NotNull DisconnectInboundOutput disconnectInboundOutput) {
//get the modifiable disconnect object from the output
final ModifiableInboundDisconnectPacket disconnectPacket = disconnectInboundOutput.getDisconnectPacket();
// modify / overwrite parameters of the disconnect packet.
try {
if (disconnectPacket.getReasonCode() != DisconnectReasonCode.NORMAL_DISCONNECTION) {
//Set reason string and reason code if reason code is not success.
disconnectPacket.setReasonString("The reason for the unsuccessful disconnect");
disconnectPacket.setReasonCode(DisconnectReasonCode.MALFORMED_PACKET);
}
disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Disconnect inbound interception failed:", e);
}
}
}
Modify Inbound DISCONNECT Asynchronously
This example shows how to implement a DisconnectInboundInterceptor that uses the Managed Extension Executor Service
to asynchronously modify an inbound DISCONNECT message.
If the task takes more than 10 seconds, the unmodified DISCONNECT is sent to the client.
public class AsyncModifyingDisconnectInboundInterceptor implements DisconnectInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingDisconnectInboundInterceptor.class);
@Override
public void onInboundDisconnect(final @NotNull DisconnectInboundInput disconnectInboundInput, final @NotNull DisconnectInboundOutput disconnectInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<DisconnectInboundOutput> async = disconnectInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable disconnect object from the output
final ModifiableDisconnectInboundPacket disconnectPacket = disconnectInboundOutput.getDisconnectPacket();
// modify / overwrite parameters of a disconnect packet.
try {
//Set reason string and reason code.
disconnectPacket.setReasonString("The reason for the unsuccessful puback");
disconnectPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Disconnect inbound interception failed:", e);
}
async.resume();
}
});
}
}
Modify Session Expiry Interval
This example shows the precautions to take when you modify the session expiry interval. It is not possible to change the session expiry interval of clients that connect with a session expiry value of 0.
public class SessionExpiryDisconnectInboundInterceptor implements DisconnectInboundInterceptor {
@Override
public void onInboundDisconnect(final @NotNull DisconnectInboundInput disconnectInboundInput, final @NotNull DisconnectInboundOutput disconnectInboundOutput) {
if (disconnectInboundInput.getDisconnectPacket().getSessionExpiryInterval().isPresent()) {
if (disconnectInboundInput.getDisconnectPacket().getSessionExpiryInterval().get() != 0) {
disconnectInboundOutput.getDisconnectPacket().setSessionExpiryInterval(10L);
}
}
}
}
Disconnect Outbound Interceptor
Server sent DISCONNECT packets only exist in MQTT 5, this interceptor will not be called for MQTT 3 clients.
|
On the DisconnectOutboundInterceptor interface, you can intercept outbound MQTT DISCONNECT messages.
You can use the DisconnectOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT DISCONNECT messages.
For this task, HiveMQ provides DisconnectOutboundInput, and DisconnectOutboundOutput parameters.
Extension input and output principles apply.
The DisconnectOutboundOutput parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext.
When an asynchronous DisconnectOutboundOutput parameter times out, all pending changes to the DISCONNECT are discarded and the unmodified DISCONNECT is sent to the client.
Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The DisconnectOutboundInput and DisconnectOutboundOutput parameters are updated after each interception.
The following parameters can be modified:
| Parameter | Description |
|---|---|
|
The cause of the disconnection. Clients can use this information to determine whether to retry the connection, and how long to wait before a reconnection attempt. |
|
An UTF-8 encoded string that shows the cause of the disconnect. This Reason String is human readable and designed for diagnostics. |
|
Additional diagnostic or other information. |
|
Specifies an alternative broker to which the client redirects on Disconnect . |
Interception of an outbound DISCONNECT message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
DISCONNECTmessage to a client.
| Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable disconnect packet by the interceptor and will call the next interceptor or should no interceptor be left to call will disconnect the client. |
Disconnect Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
DISCONNECTpacket -
Connection information
-
Client Information
The JavaDoc can be found here.
Disconnect Outbound Output
The output parameter contains the modifiable DISCONNECT packet.
You can use the DISCONNECT packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate the packets values.
Some values cause exceptions to be thrown.
For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound disconnect packets, see
here.
Example Usage
This section provides examples on how to implement different DisconnectOutboundInterceptor and add them with the ClientContext.
Examples:
Set Up Interceptor in ClientContext
This example shows how to set up a DisconnectOutboundInterceptor with the help of a ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final DisconnectOutboundInterceptor interceptor = (disconnectOutboundInput, disconnectOutboundOutput) -> {
log.debug("intercepted a DISCONNECT packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addDisconnectOutboundInterceptor(interceptor);
});
}
...
Modify Outbound DISCONNECT
This example shows how to implement a DisconnectOutboundInterceptor that modifies the parameters of an outbound DISCONNECT message:
public class ModifyingDisconnectOutboundInterceptor implements DisconnectOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingDisconnectOutboundInterceptor.class);
@Override
public void onOutboundDisconnect(final @NotNull DisconnectOutboundInput disconnectOutboundInput, final @NotNull DisconnectOutboundOutput disconnectOutboundOutput) {
//get the modifiable disconnect object from the output
final ModifiableOutboundDisconnectPacket disconnectPacket = disconnectOutboundOutput.getDisconnectPacket();
// modify / overwrite parameters of the disconnect packet.
try {
if (disconnectPacket.getReasonCode() != DisconnectReasonCode.NORMAL_DISCONNECTION) {
//Set reason string and reason code if reason code is not success.
disconnectPacket.setReasonString("The reason for the unsuccessful disconnect");
disconnectPacket.setReasonCode(DisconnectReasonCode.MALFORMED_PACKET);
}
disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Disconnect outbound interception failed:", e);
}
}
}
Modify Outbound DISCONNECT Asynchronously
This example shows how to implement a DisconnectOutboundInterceptor that uses the Managed Extension Executor Service
to asynchronously modify an outbound DISCONNECT message.
If the task takes more than 10 seconds, the unmodified DISCONNECT is sent to the client.
public class AsyncModifyingDisconnectOutboundInterceptor implements DisconnectOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingDisconnectOutboundInterceptor.class);
@Override
public void onOutboundDisconnect(final @NotNull DisconnectOutboundInput disconnectOutboundInput, final @NotNull DisconnectOutboundOutput disconnectOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<DisconnectOutboundOutput> async = disconnectOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable disconnect object from the output
final ModifiableDisconnectOutboundPacket disconnectPacket = disconnectOutboundOutput.getDisconnectPacket();
// modify / overwrite parameters of a disconnect packet.
try {
//Set reason string and reason code.
disconnectPacket.setReasonString("The reason for the unsuccessful puback");
disconnectPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Disconnect outbound interception failed:", e);
}
async.resume();
}
});
}
}
Pingreq Inbound Interceptor
With the PingReqInboundInterceptor interface, you can intercept inbound MQTT PINGREQ messages.
You can use the PingReqInboundInterceptor to monitor sent PINGRESP packets.
For this task, HiveMQ provides PingReqInboundInput and PingReqInboundOutput parameters.
Extension input and output principles apply.
The interceptor can be set up in the ClientContext.
Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
It is not possible to prevent sending an inbound PINGREQ.
|
Pingreq Inbound Input
The input parameter contains the following unmodifiable information:
-
Connection information
-
Client Information
The JavaDoc can be found here.
Pingreq Inbound Output
Since PINGREQ message are static by nature, there is no way of modifying any of its values.
Example Usage
This section provides examples on how to implement different PingReqInboundInterceptor and add them with the ClientContext.
Examples:
Set Up Interceptor in ClientContext
This example shows how to set up a PingReqInboundInterceptor with the help of a ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PingReqInboundInterceptor interceptor = (pingReqInboundInput, pingReqInboundOutput) -> {
final String clientId = pingReqInboundInput.getClientInformation().getClientId();
log.info("intercepted a PINGREQ packet of client '{}'.", clientId);
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPingReqInboundInterceptor(interceptor);
});
}
...
Pingresp Outbound Interceptor
With the PingRespOutboundInterceptor interface, you can intercept outbound MQTT PINGRESP messages.
You can use the PingRespOutboundInterceptor to monitor sent PINGRESP packets.
For this task, HiveMQ provides PingRespOutboundInput and PingRespOutboundOutput parameters.
Extension input and output principles apply.
The interceptor can be set up in the ClientContext.
Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
It is not possible to prevent sending an outbound PINGRESP.
|
Pingresp Outbound Input
The input parameter contains the following unmodifiable information:
-
Connection information
-
Client Information
The JavaDoc can be found here.
Pingresp Outbound Output
Since PINGRESP message are static by nature, there is no way of modifying any of its values.
Example Usage
This section provides examples on how to implement different PingRespOutboundInterceptor and add them with the ClientContext.
Examples:
Set Up Interceptor in ClientContext
This example shows how to set up a PingRespOutboundInterceptor with the help of a ClientContext and the ClientInitializer:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PingRespOutboundInterceptor interceptor = (pingRespOutboundInput, pingRespOutboundOutput) -> {
final String clientId = pingRespOutboundInput.getClientInformation().getClientId();
log.info("intercepted a PINGRESP packet of client '{}'.", clientId);
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPingRespOutboundInterceptor(interceptor);
});
}
...