Interceptors
HiveMQ Interceptors provide a convenient way for extensions to intercept and modify MQTT messages.
Based on the type of interceptor, you can register a ClientInitializer
or use the GlobalInterceptorRegistry
to add the interceptor that you want.
The following table lists all available HiveMQ Interceptors:
Interceptor | Description |
---|---|
Enables extensions to modify incoming |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept inbound |
|
Enables extensions to intercept outgoing |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables the extensions to intercept and modify outbound |
|
Enables extensions to intercept and modify inbound |
|
Enables extensions to intercept and modify outbound |
|
Enables extensions to intercept inbound |
|
Enables extensions to intercept outbound |
Connect Inbound Interceptor
With the ConnectInboundInterceptor
interface you can modify incoming MQTT CONNECT
messages.
You can use the ConnectInboundInterceptor
to modify or analyse the data that is contained in inbound MQTT Connect
messages.
For this task, HiveMQ provides ConnectInboundInput
and ConnectInboundOutput
parameters.
Extension input and output principles apply.
A ClientInitializer
that is registered through the Initializer Registry
can be used to add the ConnectInboundInterceptor
.
When an asynchronous ConnectInboundOutput
parameter with a TimeoutFallback.FAILURE
fallback strategy times out,
a CONNACK
with the UNSPECIFIED_ERROR (128)
reason code is sent to the client and the client is disconnected.
Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The ConnectInboundInput
and ConnectInboundOutput
parameters are updated after each interception.
The JavaDoc for the ConnectInboundInterceptor
can be found
here.
It is important to note that changes made to the CONNECT
packet are usually not expected by the client implementation.
For example, if a client with a modified client ID connects and expects a session to be present, the client may disconnect because no session-present flag on the server corresponds to the modified client ID.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The unique ID of a client. |
|
A flag that shows whether the client wants to start a new “clean” session ( |
|
The number of seconds that the broker stores session information for the MQTT client. The time is calculated from the moment that the client disconnects. |
|
The time interval that is allowed between keep alive messages. When set to 0, the keep-alive feature is disabled. |
|
Limits the number of QoS 1 and QoS 2 publications that the connected client can process concurrently. |
|
The maximum packet size the connected client accepts. |
|
The maximum number of topic aliases that the client can hold. |
|
A flag that requests the server to return response information in the corresponding Connack packet. |
|
A flag that requests the server to return problem information (for example reason string and user properties). |
|
A UTF-8 value that shows the authentication method that is used for the connect. |
|
The data that is used for authentication. If no authentication method is given, this parameter is not used. |
|
The password the client uses to connect. |
|
The Will publish that is set for the |
|
Additional user defined information (optional). |
When an interceptor makes changes to the CONNECT message of a client, the client is not notified.
To provide feedback to the client, you can use the ConnectInboundInterceptor to add user properties to the CONNECT packet.
|
Set Up Connect Inbound Interceptor
The following example shows how to set up a ConnectInboundInterceptor
.
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
try {
final ConnectInboundInterceptorA interceptorA = new ConnectInboundInterceptorA();
final ConnectInboundInterceptorB interceptorB = new ConnectInboundInterceptorB();
Services.interceptorRegistry().setConnectInboundInterceptorProvider(input -> {
final String clientId = input.getClientInformation().getClientId();
if (clientId.startsWith("a")) {
return interceptorA;
} else {
return interceptorB;
}
});
} catch (Exception e) {
log.error("Exception thrown at extension start: ", e);
}
}
...
Modify a CONNECT Message
The following example ensures that a client connects with a clean start and a session expiry of zero.
public class ModifyConnectInboundInterceptor implements ConnectInboundInterceptor {
@Override
public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
final ModifiableConnectPacket connectPacket = output.getConnectPacket();
final ModifiableUserProperties userProperties = connectPacket.getUserProperties();
if (connectPacket.getUserProperties().getFirst("clean").isPresent()) {
connectPacket.setSessionExpiryInterval(0);
connectPacket.setCleanStart(true);
}
}
}
The following example requests a new ID for a connecting client and sets the new ID asynchronously.
public class ModifyConnectInboundInterceptor implements ConnectInboundInterceptor {
@Override
public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
final Async<ConnectInboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
final ModifiableConnectPacket connectPacket = output.getConnectPacket();
final CompletableFuture<String> modifiedClientIdFuture = modifyClientId(connectPacket.getClientId());
modifiedClientIdFuture.whenComplete((modifiedClientId, throwable) -> {
if (modifiedClientId != null) {
connectPacket.setClientId(modifiedClientId);
async.resume();
}
});
}
}
Modify a Last Will Message
The Will message of a CONNECT
packet can be modified or removed. If the CONNECT
does not include a Will message, a new Will can be created via the WillPublishBuilder
and added to the CONNECT
.
public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
@Override
public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
final ModifiableConnectPacket connectPacket = output.getConnectPacket();
if (connectPacket.getModifiableWillPublish().isEmpty()) {
final String clientId = connectPacket.getClientId();
final ByteBuffer payload = StandardCharsets.UTF_8.encode(clientId + " disconnected");
final WillPublishBuilder builder = Builders.willPublish();
final WillPublishPacket will = builder.willDelay(0)
.topic(clientId + "will")
.payload(payload)
.build();
connectPacket.setWillPublish(will);
}
}
}
To remove a last-will message, you can set the will-message to 'null' as shown in the following example:
public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
@Override
public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
final ModifiableConnectPacket connectPacket = output.getConnectPacket();
if (connectPacket.getModifiableWillPublish().isPresent()) {
connectPacket.setWillPublish(null);
}
}
}
The following example shows how to modify and existing will message:
public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
@Override
public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
final ModifiableConnectPacket connectPacket = output.getConnectPacket();
if (connectPacket.getModifiableWillPublish().isPresent()) {
final ModifiableWillPublish modifiableWill = connectPacket.getModifiableWillPublish().get();
modifiableWill.setTopic(connectPacket.getClientId() + "/will");
}
}
}
Connack Outbound Interceptor
With the ConnackOutboundInterceptor
interface you can intercept outbound MQTT CONNACK
messages.
You can use the ConnackOutboundInterceptor
interface to modify or analyse the data that is contained in outbound MQTT CONNACK
packet.
For this task, HiveMQ provides ConnackOutboundInput
and ConnackOutboundOutput
parameters.
Extension input and output principles apply.
You can use a ConnackOutboundInterceptor
to modify outbound CONNACK
messages.
The ConnackOutboundOutput
parameter can easily be used for asynchronous processing.
The interceptor is set up in the ClientContext
.
When an asynchronous ConnackOutboundOutput
parameter with a TimeoutFallback.FAILURE
fallback strategy times out,
the connection of the client is closed without sending a CONNACK
.
Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The ConnackOutboundOutput
and ConnackOutboundInput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
This parameter specifies if a Connect was successful and if not, holds the reason why it was unsuccessful. |
|
A UTF-8 encoded string that holds a string containing the reason. |
|
Additional diagnostic or other information (optional). |
|
This parameter is used to pass another server for the client to connect to. |
|
A UTF-8 encoded string that holds the response information. |
|
The assigned client ID that is sent in the |
Interception of a CONNACK
message has the following limitations:
-
You cannot change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to send response information to a client that did not request response information.
-
If the reason code is "SUCCESS", you cannot set a reason string.
-
It is not possible to prevent sending a
CONNACK
message to a client. -
This interceptor can only modify the assigned client ID that is sent in the
CONNACK
packet. To modify the client ID that HiveMQ uses, see Connect Inbound Interceptor.
If response information is requested, you can use the ConnectInboundInterceptor to get the information
as shown in this example.
Furthermore Reason Code, Reason String and MQTT User Properties are MQTT 5 features.
In MQTT 3 Environments the Reason Code will always be set to SUCCESS, the Reason String is set to null
and it does not contain any user properties.
|
Connack Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT CONNACK packet
-
MQTT Version
-
Client ID
-
IP Address
-
Listener with port, bind address and type
-
ConnectionAttributeStore
-
TLS
The JavaDoc can be found here.
Connack Outbound Output
The output parameter contains a modifiable CONNACK
packet.
You can use the CONNACK
packet to manipulate the parameters that are listed in the table above.
For more information on ways to manipulate outbound CONNACK
packets, see
here.
Example Usage
This section provides examples of how to implement different ConnackOutboundInterceptor
and add them through the GlobalInterceptorRegistry
.
Examples:
Set Up In GlobalInterceptorRegistry
This example shows how to set up a ConnackOutboundInterceptor
by a ConnackOutboundInterceptorProvider
with the GlobalInterceptorRegistry
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput extensionStartInput, @NotNull final ExtensionStartOutput extensionStartOutput) {
final ConnackOutboundInterceptorProvider interceptorProvider = new ConnackOutboundInterceptorProvider() {
@Override
public @NotNull ConnackOutboundInterceptor getConnackOutboundInterceptor(final @NotNull ConnackOutboundProviderInput connackOutboundProviderInput) {
return new ConnackOutboundInterceptor() {
@Override
public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
//do something with the connack eg. logging
System.out.println("Connack intercepted for client: " + connackOutboundInput.getClientInformation().getClientId());
}
};
}
};
Services.interceptorRegistry().setConnackOutboundInterceptorProvider(interceptorProvider);
}
...
Modify a CONNACK Message
This example shows how to implement a ConnackOutboundInterceptor
that modifies parameters of an outbound CONNACK
message:
public class ModifyingConnackOutboundInterceptor implements ConnackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingConnackOutboundInterceptor.class);
@Override
public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
//get the modifiable connack object from the output
final ModifiableConnackPacket connackPacket = connackOutboundOutput.getConnackPacket();
// modify / overwrite parameters of a connack packet.
try {
if (connackPacket.getReasonCode() != ConnackReasonCode.SUCCESS) {
//Set reason string and reason code if reason code is not success.
connackPacket.setReasonString("The reason for the not successful connack");
connackPacket.setReasonCode(ConnackReasonCode.UNSPECIFIED_ERROR);
}
connackPacket.setServerReference("server reference");
connackPacket.getUserProperties().addUserProperty("my-prop", "some value");
connackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (Exception e){
log.debug("Connack outbound interception failed:", e);
}
}
}
Modify a CONNACK Message Asynchronously
This example shows how to implement a ConnackOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound CONNACK
message.
The default timeout strategy is failure. If the task takes more than 10 seconds, the connection of the client is closed without sending a CONNACK
.
public class AsyncModifyingConnackOutboundInterceptor implements ConnackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingConnackOutboundInterceptor.class);
@Override
public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<ConnackOutboundOutput> async = connackOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable connack object from the output
final ModifiableConnackPacket connackPacket = connackOutboundOutput.getConnackPacket();
// modify / overwrite parameters of a connack packet.
try {
if (connackPacket.getReasonCode() != ConnackReasonCode.SUCCESS) {
//Set reason string and reason code if reason code is not success.
connackPacket.setReasonString("The reason for the not successful connack");
connackPacket.setReasonCode(ConnackReasonCode.UNSPECIFIED_ERROR);
}
connackPacket.setServerReference("server reference");
connackPacket.getUserProperties().addUserProperty("my-prop", "some value");
connackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (Exception e){
log.debug("Connack outbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if(throwable != null){
log.debug("Connack outbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Modify the Response Information
This example shows how to implement a ConnackOutboundInterceptor
that modifies the response information by storing the information if response information is requested in the ConnectInboundInterceptor
:
public class ModifyResponseInformationMain implements ExtensionMain {
private static final Logger log = LoggerFactory.getLogger(ModifyResponseInformationMain.class);
private final Map<String, Boolean> clientIdResponseInformationRequestedMap = new HashMap<>();
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
final ConnectInboundInterceptor connectInboundInterceptor = new ConnectInboundInterceptor() {
@Override
public void onConnect(final @NotNull ConnectInboundInput connectInboundInput, final @NotNull ConnectInboundOutput connectInboundOutput) {
try {
//get connect packet from input
final ConnectPacket connectPacket = connectInboundInput.getConnectPacket();
//store client id if response information is requested.
clientIdResponseInformationRequestedMap.put(connectPacket.getClientId(), connectPacket.getRequestResponseInformation());
} catch (final Exception e){
log.debug("Connect inbound interception failed: ", e);
}
}
};
final ConnackOutboundInterceptor connackOutboundInterceptor = new ConnackOutboundInterceptor() {
@Override
public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
try {
//get client id from input
final String clientId = connackOutboundInput.getClientInformation().getClientId();
//check if problem information is requested
final Boolean responseInformationRequested = clientIdResponseInformationRequestedMap.get(clientId);
if (responseInformationRequested != null && responseInformationRequested) {
connackOutboundOutput.getConnackPacket().setResponseInformation("Some response information");
}
//remove from map since it is not needed anymore
clientIdResponseInformationRequestedMap.remove(clientId);
} catch (final Exception e){
log.debug("Connack outbound interception failed: ", e);
}
}
};
// set connect inbound interceptor provider with interceptor
Services.interceptorRegistry().setConnectInboundInterceptorProvider(new ConnectInboundInterceptorProvider() {
@Override
public ConnectInboundInterceptor getConnectInboundInterceptor(final @NotNull ConnectInboundProviderInput input) {
return connectInboundInterceptor;
}
});
// set connack outbound interceptor provider with interceptor
Services.interceptorRegistry().setConnackOutboundInterceptorProvider(new ConnackOutboundInterceptorProvider() {
@Override
public ConnackOutboundInterceptor getConnackOutboundInterceptor(final @NotNull ConnackOutboundProviderInput input) {
return connackOutboundInterceptor;
}
});
}
@Override
public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {
}
}
Publish Inbound Interceptor
With the PublishInboundInterceptor
interface, you can intercept inbound MQTT PUBLISH
packets.
You can use the PublishInboundInterceptor
to modify, analyse, or prevent the delivery of the data that is contained in inbound MQTT PUBLISH
messages.
For this task, HiveMQ provides PublishInboundInput
and PublishInboundOutput
parameters.
Extension input and output principles apply.
The PublishInboundOutput
is blocking , it can easily be used to create an asynchronous PublishInboundOutput
object.
The interceptor can be set up in the ClientContext
. For more information, see Initializer Registry.
When the delivery of a PUBLISH
packet is prevented, the message is dropped.
In the HiveMQ Control Center, you can see the number of dropped messages from every PublishInboundInterceptor
.
When delivery is prevented with an AckReasonCode
other than SUCCESS
, MQTT 3 clients are disconnected.
However, an MQTT 5 client receives the PUBACK
or PUBREC
with the provided reason code and optional reason string.
Based on the quality of service level (QoS), when the AckReasonCode
is SUCCESS
an acknowledgement is sent (PUBACK
or PUBREC
).
When an async PublishInboundOutput
times out with a TimeoutFallback.FAILURE
fallback strategy, the process is the same.
Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The PublishInboundOutput
and the PublishInboundInput
are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The quality of service level for the message. Possible values are 0, 1, and 2. |
|
The information channel to which the data in the payload of the message is published. |
|
A flag that represents the encoding of the payload. Possible values are UNSPECIFIED and UTF_8. |
|
The number of seconds until the messages expire. If the server cannot begin message delivery to a specific subscriber within the message-expiry interval that is defined for the message, the copy of the message for that subscriber is deleted. |
|
An optional UTF-8 string that can be used as a topic for response messages. |
|
Optional binary data that follows the response topic and enables the original sender of the request to handle asynchronous responses that can possibly be sent from multiple receivers. |
|
String parameter that describes the content of the publish message. |
|
The payload of the publish message that contains the data that is published. |
If any interceptor prevents delivery of a PUBLISH packet, no additional interceptors are called.
|
Publish Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT PUBLISH packet
-
MQTT Version
-
Client ID
-
IP Address
-
Listener with port, bind address and type
-
ConnectionAttributeStore
-
TLS
The JavaDoc can be found here.
Publish Inbound Output
The output parameter contains a modifiable PUBLISH
packet.
You can use the PUBLISH
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound PUBLISH
packets, see
here.
Example Usage
This section provides examples of how to implement different PublishInboundInterceptor
and add them to a ClientInitializer
via the InitializerRegistry
.
Examples:
Set Up a PublishInboundInterceptor in the ClientInitializer
This example shows how to set up a PublishInboundInterceptor
with the ClientInitializer
.
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {
// create a new publish inbound interceptor
final PublishInboundInterceptor publishInboundInterceptor = new PublishInboundInterceptor() {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// do something with the publish, for example, logging
System.out.println("Inbound publish message intercepted from client: "
+ publishInboundInput.getClientInformation().getClientId());
}
};
// create a new client initializer
final ClientInitializer clientInitializer = new ClientInitializer() {
@Override
public void initialize(
final @NotNull InitializerInput initializerInput,
final @NotNull ClientContext clientContext) {
// add the interceptor to the context of the connecting client
clientContext.addPublishInboundInterceptor(publishInboundInterceptor);
}
};
//register the client initializer
Services.initializerRegistry().setClientInitializer(clientInitializer);
}
...
Modify a PUBLISH Packet
This example shows how to implement a PublishInboundInterceptor
that modifies all parameters of an inbound PUBLISH
message.
public class ModifyingPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// get the modifiable publish packet from the output
final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();
// modify / overwrite any parameter of the publish packet.
publish.setContentType("modified contentType");
publish.setCorrelationData(ByteBuffer.wrap("modified correlation data".getBytes()));
publish.setMessageExpiryInterval(120L);
publish.setPayload(ByteBuffer.wrap("modified payload".getBytes()));
publish.setPayloadFormatIndicator(PayloadFormatIndicator.UNSPECIFIED);
publish.setQos(Qos.AT_MOST_ONCE);
publish.setTopic("modified topic");
publish.setResponseTopic("modified response topic");
publish.setRetain(false);
publish.getUserProperties().clear();
publish.getUserProperties().addUserProperty("mod-key", "mod-val");
}
}
Modify a PUBLISH Packet Asynchronously
This example shows how to implement a PublishInboundInterceptor
that modifies an inbound PUBLISH
message asynchronously
with the Managed Extension Executor Service.
The timeout strategy is failure. Onward delivery of the PUBLISH
message
will be prevented if the task takes longer than 10 seconds.
public class ModifyingAsyncPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// make the output object async with a timeout duration of 10 seconds and the timeout fallback failure
final Async<PublishInboundOutput> asyncOutput =
publishInboundOutput.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
//submit external task to extension executor service
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
// get the modifiable publish packet from the output
final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();
// call external task that modifies the publish packet (method not provided)
callExternalTask(publish);
}
});
// add a callback for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object object, final @Nullable Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace(); // please use more sophisticated logging
}
// resume output to tell HiveMQ that asynchronous precessing is done
asyncOutput.resume();
}
});
}
}
Prevent Delivery of a PUBLISH Packet
This example shows how to implement a PublishInboundInterceptor
that prevents the delivery of PUBLISH
packets that come from clients with IDs that contain "prevent":
and the topic is "topic/to/prevent".
public class PreventingPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// check some parameters if publish delivery must be prevented, for example, client id and topic.
final String clientId = publishInboundInput.getClientInformation().getClientId();
final String topic = publishInboundInput.getPublishPacket().getTopic();
if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {
// prevent publish delivery on the output object
publishInboundOutput.preventPublishDelivery();
}
}
}
Prevent Delivery of a PUBLISH Packet with Reason Code
This example shows how to implement a PublishInboundInterceptor
that prevents the delivery of PUBLISH
packets that come from clients with IDs that contain "prevent":
and the topic is "topic/to/prevent".
When the quality of service level of the PUBLISH
is greater than zero and the client is an MQTT 5 client,
the client receives a PUBACK
or PUBREC
with reason code "TOPIC_NAME_INVALID".
Reason codes for PUBACK
or PUBREC
are an MQTT 5 feature.
If the reason code is not "SUCCESS", MQTT 3 clients are disconnected ungracefully.
public class PreventAndAckPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// check some parameters if publish delivery must be prevented, for example client id and topic.
final String clientId = publishInboundInput.getClientInformation().getClientId();
final String topic = publishInboundInput.getPublishPacket().getTopic();
if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {
// prevent publish delivery with a reason code on the output object
publishInboundOutput.preventPublishDelivery(AckReasonCode.TOPIC_NAME_INVALID);
}
}
}
Prevent Delivery of a PUBLISH Packet with Reason String
This example shows how to implement a PublishInboundInterceptor
that prevents the delivery of PUBLISH
packets that come from clients with IDs that contain "prevent"
and the topic is "topic/to/prevent".
When the quality of service level of the PUBLISH
is greater than zero and the client is an MQTT 5 client,
the client receives a PUBACK
or PUBREC
with the reason code "TOPIC_NAME_INVALID"
and the reason string "It is not allowed to publish to topic: topic/to/prevent".
Reason codes for PUBACK
or PUBREC
are an MQTT 5 feature.
If the reason code is not "SUCCESS", MQTT 3 clients are disconnected ungracefully.
public class PreventWithReasonPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(
final @NotNull PublishInboundInput publishInboundInput,
final @NotNull PublishInboundOutput publishInboundOutput) {
// check some parameters if publish delivery must be prevented, foe example client id and topic.
final String clientId = publishInboundInput.getClientInformation().getClientId();
final String topic = publishInboundInput.getPublishPacket().getTopic();
if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {
// prevent publish delivery with a reason code and reason string on the output object
publishInboundOutput.preventPublishDelivery(AckReasonCode.TOPIC_NAME_INVALID, "It is not allowed to publish to topic: " + topic);
}
}
}
Publish Outbound Interceptor
With the PublishOutboundInterceptor
interface,
you can modify an MQTT PUBLISH
at the moment that the packet is sent to a subscribed client or prevent the delivery of the publish completely.
For this task, HiveMQ provides PublishOutboundInput
, and PublishOutboundOutput
parameters.
Extension input and extension output principles apply.
On the PublishOutboundInterceptor
interface, you can modify an MQTT PUBLISH
at the moment that the packet is sent to a subscribed client or prevent the delivery of the publish completely.
For this task a PublishOutboundInput
and a PublishOutboundOutput
are provided by HiveMQ. Extension input and extension output principles apply.
The interceptor can be set up in the ClientInitializer
.
For more information, see Initializer Registry.
When an asynchronous PublishOutboundOutput
parameter with a TimeoutFallback.FAILURE
fallback strategy times out, delivery is prevented.
Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The PublishOutboundOutput
and PublishOutboundInput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The Quality of Service level for the message. Possible values are 0, 1, and 2. |
|
The information channel to which the data in the payload of the message is published. |
|
A flag that indicates the encoding of the payload. |
|
The number of seconds until the messages expire. If the server cannot begin message delivery to a specific subscriber within the message-expiry interval that is defined for the message, the copy of the message for that subscriber is deleted. |
|
An optional UTF-8 string that can be used as a topic for response messages. |
|
Optional binary data that follows the response topic and enables the original sender of the request to handle asynchronous responses that can possibly be sent from multiple receivers. |
|
String parameter that describes the content of the publish message. |
|
The payload of the publish message that contains the data that is published. |
The JavaDoc for the PublishOutboundInterceptor
can be found
here.
Example Usage
This section provides examples of how to implement different PublishOutboundInterceptor
and add them to a ClientInitializer
via the InitializerRegistry
.
Examples:
Set up a PublishOutboundInterceptor in the ClientInitializer
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput,
final @NotNull ExtensionStartOutput extensionStartOutput) {
try {
Services.initializerRegistry().setClientInitializer(new ClientInitializer() {
@Override
public void initialize(@NotNull InitializerInput initializerInput, @NotNull ClientContext clientContext) {
clientContext.addPublishOutboundInterceptor(new MyPublishOutboundInterceptor());
}
});
} catch (Exception e) {
log.error("Exception thrown at extension start: ", e);
}
}
...
Prevent Delivery of a PUBLISH Message to a Client
You can use the PublishOutboundOutput
parameter to prevent an outgoing publish.
This method causes the message to be dropped. The message is not sent to the subscriber.
Use of the parameter in this way has no effect on other subscribers for the same message.
The com.hivemq.messages.dropped.extension-prevented.count
metric tracks the number of messages that are dropped because the extension system prevented delivery.
public class PreventPublishOutboundInterceptor implements PublishOutboundInterceptor {
@Override
public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
final String clientId = input.getClientInformation().getClientId();
final String topic = input.getPublishPacket().getTopic();
if (!topic.startsWith(clientId)) {
output.preventPublishDelivery();
}
}
}
Modify Outgoing PUBLISH Message
Nearly all aspects of a PUBLISH
packet can be modified in the interceptor, except the packet ID and the duplicate-delivery flag.
It is important to note that these modifications only affect the way the subscriber receives the PUBLISH
. There is no impact on the way that HiveMQ processes the message.
This affects the following properties:
Property | Description |
---|---|
|
Only the subscribers that have matching subscriptions for the original topic receive the |
|
The original state of the retain flag determines whether or not the |
|
Changes to the QoS level do not change the delivery guarantees. |
|
The original interval is still used to decide if a message is expired. |
Modify Topic Example
The following example modifies the topic of an outgoing PUBLISH
message, adds a response topic, and stores the original topic as a user property:
public class ModifyPublishOutboundInterceptor implements PublishOutboundInterceptor {
@Override
public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
final String clientId = input.getClientInformation().getClientId();
final ModifiableOutboundPublish publish = output.getPublishPacket();
final String originalTopic = publish.getTopic();
publish.setTopic(originalTopic + "/" + clientId);
publish.getUserProperties().addUserProperty("original-topic", originalTopic);
publish.setResponseTopic("response/"+clientId);
}
}
Modify Payload Example
The following example decodes the payload of an outgoing PUBLISH
, alters the string, and sets the altered string as the new payload asynchronously:
public class ModifyPublishOutboundInterceptor implements PublishOutboundInterceptor {
@Override
public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
final Async<PublishOutboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
Services.extensionExecutorService().submit(() -> {
final ModifiableOutboundPublish publish = output.getPublishPacket();
final String payloadString = StandardCharsets.UTF_8.decode(publish.getPayload().get()).toString();
final String newPayload = payloadString.toLowerCase();
publish.setPayload(StandardCharsets.UTF_8.encode(newPayload));
async.resume();
});
}
}
Puback Inbound Interceptor
On the PubackInboundInterceptor
interface, you can intercept inbound MQTT PUBACK
messages.
At the moment HiveMQ receives the acknowledgement of a QoS 1 publish message from any client.
You can use the PubackInboundInterceptor
to modify or analyse the data that is contained in inbound MQTT PUBACK
messages.
For this task, HiveMQ provides PubackInboundInput
and PubackInboundOutput
parameters.
Extension input and output principles apply.
The PubackInboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
Multiple interceptors can be called sequentially.
For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The PubackInboundOutput
and PubackInboundInput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The reason code which determines the status of the |
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an inbound PUBACK
message has the following limitations:
-
You cannot change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
PUBACK
message to the server.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable puback packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will process the PUBACK .
|
Puback Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBACK
packet -
Connection information
-
Client Information
PUBACK reason code, PUBACK reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBACK packets are represented by the same PUBACK packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBACK packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Puback Inbound Output
The output parameter contains the modifiable PUBACK
packet.
You can use the PUBACK
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate the packets values.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound puback packets, see
here.
Example Usage
This section provides examples on how to implement different PubackInboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up In ClientContext
This example shows how to set up a PubackInboundInterceptor
with the ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubackInboundInterceptor interceptor = (pubackInboundInput, pubackInboundOutput) -> {
log.debug("intercepted a PUBACK packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubackInboundInterceptor(interceptor);
});
}
...
Modify Inbound PUBACK
This example shows how to implement a PubackInboundInterceptor
that modifies parameters of an inbound PUBACK
message:
public class ModifyingPubackInboundInterceptor implements PubackInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubackInboundInterceptor.class);
@Override
public void onInboundPuback(final @NotNull PubackInboundInput pubackInboundInput, final @NotNull PubackInboundOutput pubackInboundOutput) {
//get the modifiable puback object from the output
final ModifiablePubackPacket pubackPacket = pubackInboundOutput.getPubackPacket();
// modify / overwrite parameters of a puback packet.
try {
if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubackPacket.setReasonString("The reason for the unsuccessful puback");
pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Puback inbound interception failed:", e);
}
}
}
Modify Inbound PUBACK Asynchronously
This example shows how to implement a PubackInboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an inbound PUBACK
message.
If the task takes more than 10 seconds, the unmodified PUBACK is forwarded to the next interceptor or finally processed by the broker.
public class AsyncModifyingPubackInboundInterceptor implements PubackInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubackInboundInterceptor.class);
@Override
public void onInboundPuback(final @NotNull PubackInboundInput pubackInboundInput, final @NotNull PubackInboundOutput pubackInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubackInboundOutput> async = pubackInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable puback object from the output
final ModifiablePubackPacket pubackPacket = pubackInboundOutput.getPubackPacket();
// modify / overwrite parameters of a puback packet.
try {
if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubackPacket.setReasonString("The reason for the unsuccessful puback");
pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Puback inbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Puback inbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Puback Outbound Interceptor
On the PubackOutboundInterceptor
interface, you can intercept outbound MQTT PUBACK
messages.
You can use the PubackOutboundInterceptor
to modify or analyse the data that is contained in outbound MQTT PUBACK
messages.
For this task, HiveMQ provides PubackOutboundInput
, and PubackOutboundOutput
parameters.
Extension input and output principles apply.
The PubackOutboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
Multiple interceptors can be called sequentially.
For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The PubackOutboundInput
and PubackOutboundOutput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The reason code which determines the status of the |
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an outbound PUBACK
message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
PUBACK
message to a client.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable puback packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will sent the PUBACK to the client.
|
Puback Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBACK
packet -
Connection information
-
Client Information
PUBACK reason code, PUBACK reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBACK packets are represented by the same PUBACK packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBACK packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Puback Outbound Output
The output parameter contains a modifiable PUBACK
packet. The following properties can be modified:
You can use the PUBACK
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate the packets values.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound PUBACK
packets, see
here.
Example Usage
This section provides examples on how to implement different PubackOutboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up In ClientContext
This example shows how to set up a PubackOutboundInterceptor
with the ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubackOutboundInterceptor interceptor = (pubackOutboundInput, pubackOutboundOutput) -> {
log.debug("intercepted a PUBACK packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubackOutboundInterceptor(interceptor);
});
}
...
Modify Outbound PUBACK
This example shows how to implement a PubackOutboundInterceptor
that modifies parameters of an outbound PUBACK
message:
public class ModifyingPubackOutboundInterceptor implements PubackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubackOutboundInterceptor.class);
@Override
public void onOutboundPuback(final @NotNull PubackOutboundInput pubackOutboundInput, final @NotNull PubackOutboundOutput pubackOutboundOutput) {
//get the modifiable puback object from the output
final ModifiablePubackPacket pubackPacket = pubackOutboundOutput.getPubackPacket();
// modify / overwrite parameters of a puback packet.
try {
if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubackPacket.setReasonString("The reason for the unsuccessful puback");
pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Puback outbound interception failed:", e);
}
}
}
Modify Outbound PUBACK Asynchronously
This example shows how to implement a PubackOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound PUBACK message.
If the task takes more than 10 seconds, the unmodified PUBACK is forwarded to the next interceptor or finally sent to the client.
public class AsyncModifyingPubackOutboundInterceptor implements PubackOutboundPubackInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubackOutboundInterceptor.class);
@Override
public void onOutboundPuback(final @NotNull PubackOutboundInput pubackOutboundInput, final @NotNull PubackOutboundOutput pubackOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubackOutboundOutput> async = pubackOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable puback object from the output
final ModifiablePubackPacket pubackPacket = pubackOutboundOutput.getPubackPacket();
// modify / overwrite parameters of a puback packet.
try {
if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubackPacket.setReasonString("The reason for the unsuccessful puback");
pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Puback outbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Puback outbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubrec Inbound Interceptor
On the PubrecInboundInterceptor
interface, you can intercept inbound MQTT PUBREC
messages
at the moment HiveMQ receives the acknowledgement of a QoS 1 publish message from any client.
You can use the PubrecInboundInterceptor
to modify or analyse the data that is contained in inbound MQTT PUBREC
messages.
For this task, HiveMQ provides PubrecInboundInput
and PubrecInboundOutput
parameters.
Extension input and output principles apply.
The PubrecInboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
Multiple interceptors can be called sequentially.
When various extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.
The PubrecInboundOutput
and PubrecInboundInput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The reason code which determines the status of the |
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an inbound PUBREC
message has the following limitations:
-
You cannot change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
PUBREC
message to the server.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubrec packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will process the PUBREC .
|
Pubrec Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBREC
packet -
Connection information
-
Client information
PUBREC reason code, PUBREC reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBREC packets are represented by the same PUBREC packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBREC packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubrec Inbound Output
The output parameter contains the modifiable PUBREC
packet.
You can use the PUBREC
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate inbound PUBREC
packets can be found
here.
Example Usage
This section provides examples on how to implement different PubrecInboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up In ClientContext
This example shows how to set up a PubrecInboundInterceptor
with the ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubrecInboundInterceptor interceptor = (pubrecInboundInput, pubrecInboundOutput) -> {
log.debug("intercepted a PUBREC packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubrecInboundInterceptor(interceptor);
});
}
...
Modify Inbound PUBREC
This example shows how to implement a PubrecInboundInterceptor
that modifies parameters of an inbound PUBREC message:
public class ModifyingPubrecInboundInterceptor implements PubrecInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubrecInboundInterceptor.class);
@Override
public void onInboundPubrec(final @NotNull PubrecInboundInput pubrecInboundInput, final @NotNull PubrecInboundOutput pubrecInboundOutput) {
//get the modifiable pubrec object from the output
final ModifiablePubrecPacket pubrecPacket = pubrecInboundOutput.getPubrecPacket();
// modify / overwrite parameters of a pubrec packet.
try {
if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrec inbound interception failed:", e);
}
}
}
Modify Inbound PUBREC Asynchronously
This example shows how to implement a PubrecInboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an inbound PUBREC
message.
If the task takes more than 10 seconds, the unmodified PUBREC
is forwarded to the next interceptor or finally processed by the broker.
public class AsyncModifyingPubrecInboundInterceptor implements PubrecInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrecInboundInterceptor.class);
@Override
public void onInboundPubrec(final @NotNull PubrecInboundInput pubrecInboundInput, final @NotNull PubrecInboundOutput pubrecInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubrecInboundOutput> async = pubrecInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubrec object from the output
final ModifiablePubrecPacket pubrecPacket = pubrecInboundOutput.getPubrecPacket();
// modify / overwrite parameters of a pubrec packet.
try {
if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrec inbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubrec inbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubrec Outbound Interceptor
On the PubrecOutboundInterceptor
interface, you can intercept outbound MQTT PUBREC
messages.
You can use the PubrecOutboundInterceptor
to modify or analyse the data that is contained in outbound MQTT PUBREC
messages.
For this task, HiveMQ provides PubrecOutboundInput
, and PubrecOutboundOutput
parameters.
Extension input and output principles apply.
The PubrecOutboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
Multiple interceptors are called sequentially.
The interceptor with the highest priority is called first.
The PubrecOutboundOutput
and PubrecOutboundInput
parameters are updated after each interceptor.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The reason code which determines the status of the |
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an outbound PUBREC
message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
PUBREC
message to a client.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubrec packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will sent the PUBREC to the client.
|
Pubrec Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBREC
packet -
Connection information
-
Client Information
PUBREC reason code, PUBREC reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBREC packets are represented by the same PUBREC packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBREC packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubrec Outbound Output
The output parameter contains a modifiable PUBREC
packet. The following properties can be modified:
You can use the PUBREC
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate the packets values.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate outbound PUBREC
packets can be found
here.
Example Usage
This section provides examples on how to implement different PubrecOutboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up In ClientContext
This example shows how to set up a PubrecOutboundInterceptor
with the ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubrecOutboundInterceptor interceptor = (pubrecOutboundInput, pubrecOutboundOutput) -> {
log.debug("intercepted a PUBREC packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubrecOutboundInterceptor(interceptor);
});
}
...
Modify Outbound PUBREC
This example shows how to implement a PubrecOutboundInterceptor
that modifies parameters of an outbound PUBREC
message:
public class ModifyingPubrecOutboundInterceptor implements PubrecOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubrecOutboundInterceptor.class);
@Override
public void onOutboundPubrec(final @NotNull PubrecOutboundInput pubrecOutboundInput, final @NotNull PubrecOutboundOutput pubrecOutboundOutput) {
//get the modifiable pubrec object from the output
final ModifiablePubrecPacket pubrecPacket = pubrecOutboundOutput.getPubrecPacket();
// modify / overwrite parameters of a pubrec packet.
try {
if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrec outbound interception failed:", e);
}
}
}
Modify Outbound PUBREC Asynchronously
This example shows how to implement a PubrecOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound PUBREC
message.
If the task takes more than 10 seconds, the unmodified PUBREC
is forwarded to the next interceptor or finally sent to the client.
public class AsyncModifyingPubrecOutboundInterceptor implements PubrecOutboundPubrecInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrecOutboundInterceptor.class);
@Override
public void onOutboundPubrec(final @NotNull PubrecOutboundInput pubrecOutboundInput, final @NotNull PubrecOutboundOutput pubrecOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubrecOutboundOutput> async = pubrecOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubrec object from the output
final ModifiablePubrecPacket pubrecPacket = pubrecOutboundOutput.getPubrecPacket();
// modify / overwrite parameters of a pubrec packet.
try {
if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {
//Set reason string and reason code if reason code is not success.
pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrec outbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubrec outbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubrel Inbound Interceptor
On the PubrelInboundInterceptor
interface, you can intercept inbound MQTT PUBREL
messages.
You can use the PubrelInboundInterceptor
to modify or analyse the data that is contained in inbound MQTT PUBACK
messages.
For this task, HiveMQ provides PubrelInboundInput
and PubrelInboundOutput
parameters.
Extension input and output principles apply.
The PubrelInboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
Multiple interceptors can be called sequentially.
For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The PubrelInboundOutput
and PubrelInboundInput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an inbound PUBREL
message has the following limitations:
-
It is not possible to prevent sending a
PUBREL
message to the server.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubrel packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will process the PUBREL .
|
Pubrel Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBREL
packet -
Connection information
-
Client information
PUBREL reason code, PUBREL reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBREL packets are represented by the same PUBREL packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBREL packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubrel Inbound Output
The output parameter contains the modifiable PUBREL
packet.
You can use the PUBREL
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate inbound puback packets can be found
here.
Example Usage
This section provides examples on how to implement different PubrelInboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up In ClientContext
This example shows how to set up a PubrelInboundInterceptor
with the ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubrelInboundInterceptor interceptor = (pubrelInboundInput, pubrelInboundOutput) -> {
log.debug("intercepted a PUBREL packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubrelInboundInterceptor(interceptor);
});
}
...
Modify Inbound PUBREL
This example shows how to implement a PubrelInboundInterceptor
that modifies parameters of an inbound PUBREL
message:
public class ModifyingPubrelInboundInterceptor implements PubrelInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubrelInboundInterceptor.class);
@Override
public void onInboundPubrel(final @NotNull PubrelInboundInput pubrelInboundInput, final @NotNull PubrelInboundOutput pubrelInboundOutput) {
//get the modifiable pubrel object from the output
final ModifiablePubrelPacket pubrelPacket = pubrelInboundOutput.getPubrelPacket();
// modify / overwrite parameters of a pubrel packet.
try {
if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
}
pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrel inbound interception failed:", e);
}
}
}
Modify Inbound PUBREL Asynchronously
This example shows how to implement a PubrelInboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an inbound PUBREL
message.
If the task takes more than 10 seconds, the unmodified PUBREL
is forwarded to the next interceptor or finally processed by the broker.
public class AsyncModifyingPubrelInboundInterceptor implements PubrelInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrelInboundInterceptor.class);
@Override
public void onInboundPubrel(final @NotNull PubrelInboundInput pubrelInboundInput, final @NotNull PubrelInboundOutput pubrelInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubrelInboundOutput> async = pubrelInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubrel object from the output
final ModifiablePubrelPacket pubrelPacket = pubrelInboundOutput.getPubrelPacket();
// modify / overwrite parameters of a pubrel packet.
try {
if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
}
pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrel inbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubrel inbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubrel Outbound Interceptor
On the PubrelOutboundInterceptor
interface, you can intercept outbound MQTT PUBREL
messages.
You can use the PubrelOutboundInterceptor
to modify or analyse the data that is contained in outbound MQTT PUBREL
messages.
For this task, HiveMQ provides PubrelOutboundInput
, and PubrelOutboundOutput
parameters.
Extension input and output principles apply.
The PubrelOutboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
Multiple interceptors are called sequentially. The interceptor with the highest priority is called first.
The PubrelOutboundOutput
and PubrelOutboundInput
parameters are updated after each interceptor.
The following parameters can be modified:
Parameter | Description |
---|---|
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an outbound PUBREL
message has the following limitations:
-
It is not possible to prevent sending a
PUBREL
message to a client.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubrel packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will sent the PUBREL to the client.
|
Pubrel Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
PUBREL
packet -
Connection information
-
Client information
PUBREL reason code, PUBREL reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBREL packets are represented by the same PUBREL packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBREL packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubrel Outbound Output
The output parameter contains a modifiable PUBREL
packet. The following properties can be modified:
You can use the PUBREL
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate inbound PUBREL
packets can be found
here.
Example Usage
This section provides examples on how to implement different PubrelOutboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up In ClientContext
This example shows how to set up a PubrelOutboundInterceptor
with the ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubrelOutboundInterceptor interceptor = (pubrelOutboundInput, pubrelOutboundOutput) -> {
log.debug("intercepted a PUBREL packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubrelOutboundInterceptor(interceptor);
});
}
...
Modify Outbound PUBREL
This example shows how to implement a PubrelOutboundInterceptor
that modifies parameters of an outbound PUBREL
message:
public class ModifyingPubrelOutboundInterceptor implements PubrelOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubrelOutboundInterceptor.class);
@Override
public void onOutboundPubrel(final @NotNull PubrelOutboundInput pubrelOutboundInput, final @NotNull PubrelOutboundOutput pubrelOutboundOutput) {
//get the modifiable pubrel object from the output
final ModifiablePubrelPacket pubrelPacket = pubrelOutboundOutput.getPubrelPacket();
// modify / overwrite parameters of a pubrel packet.
try {
if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
}
pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrel outbound interception failed:", e);
}
}
}
Modify Outbound PUBREL Asynchronously
This example shows how to implement a PubrelOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound PUBREL
message.
If the task takes more than 10 seconds, the unmodified PUBREL
is forwarded to the next interceptor or finally sent to the client.
public class AsyncModifyingPubrelOutboundInterceptor implements PubrelOutboundPubrelInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrelOutboundInterceptor.class);
@Override
public void onOutboundPubrel(final @NotNull PubrelOutboundInput pubrelOutboundInput, final @NotNull PubrelOutboundOutput pubrelOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubrelOutboundOutput> async = pubrelOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubrel object from the output
final ModifiablePubrelPacket pubrelPacket = pubrelOutboundOutput.getPubrelPacket();
// modify / overwrite parameters of a pubrel packet.
try {
if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
}
pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubrel outbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubrel outbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubcomp Inbound Interceptor
On the PubcompInboundInterceptor
interface, you can intercept inbound MQTT PUBCOMP
messages
at the moment HiveMQ receives the acknowledgement of a QoS 1 publish message from any client.
You can use the PubcompInboundInterceptor
to modify or analyse the data that is contained in inbound MQTT PUBCOMP
messages.
For this task, HiveMQ provides PubcompInboundInput
and PubcompInboundOutput
parameters.
Extension input and output principles apply.
You can use a PubcompInboundInterceptor
to modify inbound PUBCOMP
messages.
The PubcompInboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
Multiple interceptors can be called sequentially.
When various extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.
The PubcompInboundOutput
and `PubcompInboundInput`parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an inbound PUBCOMP
message has the following limitations:
-
It is not possible to prevent sending a
PUBCOMP
message to the server.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubcomp packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will process the PUBCOMP .
|
Pubcomp Inbound Input
The input parameter contains the following unmodifiable information:
-
PUBCOMP
packet identifier -
PUBCOMP
reason string -
MQTT user properties
PUBCOMP reason code, PUBCOMP reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBCOMP packets are represented by the same PUBCOMP packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBCOMP packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubcomp Inbound Output
The output parameter contains the modifiable PUBCOMP
packet.
You can use the PUBCOMP
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packets.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate inbound pubcomp packets can be found
here.
Example Usage
This section provides examples on how to implement different PubcompInboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up In ClientContext
This example shows how to set up a PubcompInboundInterceptor
with the ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubcompInboundInterceptor interceptor = (pubcompInboundInput, pubcompInboundOutput) -> {
log.debug("intercepted a PUBCOMP packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubcompInboundInterceptor(interceptor);
});
}
...
Modify Inbound PUBCOMP
This example shows how to implement a PubcompInboundInterceptor
that modifies parameters of an inbound PUBCOMP
message:
public class ModifyingPubcompInboundInterceptor implements PubcompInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubcompInboundInterceptor.class);
@Override
public void onInboundPubcomp(final @NotNull PubcompInboundInput pubcompInboundInput, final @NotNull PubcompInboundOutput pubcompInboundOutput) {
//get the modifiable pubcomp object from the output
final ModifiablePubcompPacket pubcompPacket = pubcompInboundOutput.getPubcompPacket();
// modify / overwrite parameters of a pubcomp packet.
try {
if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubcompPacket.setReasonString("The reason for the unsuccessful pubcomp");
}
pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubcomp inbound interception failed:", e);
}
}
}
Modify Inbound PUBCOMP Asynchronously
This example shows how to implement a PubcompInboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an inbound PUBCOMP
message.
If the task takes more than 10 seconds, the unmodified PUBCOMP
is forwarded to the next interceptor or finally processed by the broker.
public class AsyncModifyingPubcompInboundInterceptor implements PubcompInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubcompInboundInterceptor.class);
@Override
public void onInboundPubcomp(final @NotNull PubcompInboundInput pubcompInboundInput, final @NotNull PubcompInboundOutput pubcompInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubcompInboundOutput> async = pubcompInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubcomp object from the output
final ModifiablePubcompPacket pubcompPacket = pubcompInboundOutput.getPubcompPacket();
// modify / overwrite parameters of a pubcomp packet.
try {
if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubcompPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
}
pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubcomp inbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubcomp inbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Pubcomp Outbound Interceptor
On the PubcompOutboundInterceptor
interface, you can intercept outbound MQTT PUBCOMP
messages.
You can use the PubcompOutboundInterceptor
to modify or analyse the data that is contained in outbound MQTT PUBCOMP
messages.
For this task, HiveMQ provides PubcompOutboundInput
, and PubcompOutboundOutput
parameters.
Extension input and output principles apply.
The PubcompOutboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
Multiple interceptors are called sequentially.
The interceptor with the highest priority is called first.
The PubcompOutboundOutput
and PubcompOutboundInput
parameters are updated after each interceptor.
The following parameters can be modified:
Parameter | Description |
---|---|
|
An UTF-8 encoded string that shows the status of the |
|
Additional diagnostic or other information. |
Interception of an outbound PUBCOMP
message has the following limitations:
-
It is not possible to prevent sending a
PUBCOMP
message to a client.
Should an interceptor throw an exception or cause a timeout using the async method,
then HiveMQ will ignore changes made to the modifiable pubrec packet by the interceptor and will call the next
interceptor or should no interceptor be left to call will sent the PUBCOMP to the client.
|
Pubcomp Outbound Input
The input parameter contains the following unmodifiable information:
-
PUBCOMP
packet -
Connection information
-
Client information
PUBCOMP reason code, PUBCOMP reason string, and MQTT user properties are MQTT 5 features.
MQTT 3 PUBCOMP packets are represented by the same PUBCOMP packet, but always have a SUCCESS Reason Code.
The reason string of the MQTT 3 PUBCOMP packet is set to 'null' and the packet does not contain any user properties.
|
The JavaDoc can be found here.
Pubcomp Outbound Output
The output parameter contains the modifiable PUBCOMP
packet.
You can use the PUBCOMP
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packets.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
More information on ways to manipulate inbound pubcomp packets can be found
here.
Example Usage
This section provides examples on how to implement different PubcompOutboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up In ClientContext
This example shows how to set up a PubcompOutboundInterceptor
with the ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PubcompOutboundInterceptor interceptor = (pubcompOutboundInput, pubcompOutboundOutput) -> {
log.debug("intercepted a PUBCOMP packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPubcompOutboundInterceptor(interceptor);
});
}
...
Modify Outbound PUBCOMP
This example shows how to implement a PubcompOutboundInterceptor
that modifies parameters of an outbound PUBCOMP
message:
public class ModifyingPubcompOutboundInterceptor implements PubcompOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingPubcompOutboundInterceptor.class);
@Override
public void onOutboundPubcomp(final @NotNull PubcompOutboundInput pubcompOutboundInput, final @NotNull PubcompOutboundOutput pubcompOutboundOutput) {
//get the modifiable pubcomp object from the output
final ModifiablePubcompPacket pubcompPacket = pubcompOutboundOutput.getPubcompPacket();
// modify / overwrite parameters of a pubcomp packet.
try {
if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubcompPacket.setReasonString("The reason for the unsuccessful pubcomp");
}
pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubcomp outbound interception failed:", e);
}
}
}
Modify Outbound PUBCOMP Asynchronously
This example shows how to implement a PubcompOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound PUBCOMP
message.
If the task takes more than 10 seconds, the unmodified PUBCOMP
is forwarded to the next interceptor or finally sent to the client.
public class AsyncModifyingPubcompOutboundInterceptor implements PubcompOutboundPubcompInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubcompOutboundInterceptor.class);
@Override
public void onOutboundPubcomp(final @NotNull PubcompOutboundInput pubcompOutboundInput, final @NotNull PubcompOutboundOutput pubcompOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<PubcompOutboundOutput> async = pubcompOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable pubcomp object from the output
final ModifiablePubcompPacket pubcompPacket = pubcompOutboundOutput.getPubcompPacket();
// modify / overwrite parameters of a pubcomp packet.
try {
if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
//Set reason string if reason code is not success.
pubcompPacket.setReasonString("The reason for the unsuccessful pubcomp");
}
pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Pubcomp outbound interception failed:", e);
}
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
if (throwable != null){
log.debug("Pubcomp outbound interception failed:", throwable);
}
//resume output to tell HiveMQ it's done.
async.resume();
}
});
}
}
Subscribe Inbound Interceptor
On the SubscribeInboundInterceptor
interface, you can intercept inbound MQTT Subscribe
messages.
You can use the SubscribeInboundInterceptor
to modify or analyse the data that is contained in inbound MQTT Subscribe
messages.
For this task, HiveMQ provides SubscribeInboundInput
, and SubscribeInboundOutput
parameters.
Extension input and output principles apply.
The interceptor can be set up in the ClientContext
.
The SubscribeInboundOutput
parameter can easily be used for asynchronous processing.
By default, when the asynchronous processing times out, the SUBSCRIBE
is rejected and a SUBACK
message with the
UNSPECIFIED_ERROR
reason code is sent to the client.
Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
Changes from the SubscribeInboundOutput
parameter are reflected in the SubscribeInboundInput
parameter of the next interceptor.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The list of topic filters and additional options that are associated with the subscriptions attributes. |
|
The subscription identifier for the |
|
Additional diagnostic or other information (optional). |
Subscribe Inbound Input
The SubscribeInboundInput
parameter contains the following unmodifiable information.
-
MQTT
SUBSCRIBE
packet -
Connection information
-
Client information
The JavaDoc can be found here.
Subscribe Inbound Output
The SubscribeInboundOutput
parameter contains the modifiable SUBSCRIBE
packet.
You can use the SUBSCRIBE
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in packets.
Some values cause exceptions to be thrown.
For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound subscribe packets, see the JavaDoc
here.
Example Usage
Set Up a SubscribeInboundInterceptor in a ClientInitializer
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {
// create a new subscribe inbound interceptor
final SubscribeInboundInterceptor subscribeInboundInterceptor = new SubscribeInboundInterceptor() {
@Override
public void onInboundSubscribe(
final @NotNull SubscribeInboundInput subscribeInboundInput,
final @NotNull SubscribeInboundOutput subscribeInboundOutput) {
// do something with the subscribe, for example logging
log.debug("Inbound subscribe message intercepted from client: "
+ subscribeInboundInput.getClientInformation().getClientId());
}
};
// create a new client initializer
final ClientInitializer clientInitializer = new ClientInitializer() {
@Override
public void initialize(
final @NotNull InitializerInput initializerInput,
final @NotNull ClientContext clientContext) {
// add the interceptor to the context of the connecting client
clientContext.addSubscribeInboundInterceptor(subscribeInboundInterceptor);
}
};
// register the client initializer
Services.initializerRegistry().setClientInitializer(clientInitializer);
}
...
Modify a SUBSCRIBE Message
This example shows how to implement a SubscribeInboundInterceptor
that modifies all parameters of an inbound
SUBSCRIBE
message:
public class ModifyingSubscribeInboundInterceptor implements SubscribeInboundInterceptor {
@Override
public void onInboundSubscribe(
final @NotNull SubscribeInboundInput subscribeInboundInput,
final @NotNull SubscribeInboundOutput subscribeInboundOutput) {
// get the modifiable subscribe packet from the output
final ModifiableSubscribePacket subscribe = subscribeInboundOutput.getSubscribePacket();
// modify / overwrite any parameter of the subscribe packet
final ModifiableSubscription subscription = subscribe.getSubscriptions().get(0);
subscription.setTopicFilter("modified-topic");
subscription.setQos(Qos.AT_LEAST_ONCE);
subscription.setRetainHandling(RetainHandling.DO_NOT_SEND);
subscription.setRetainAsPublished(true);
subscription.setNoLocal(true);
subscribe.getUserProperties().addUserProperty("additional", "value");
}
}
Modify a SUBSCRIBE Message Asynchronously
This example shows how to implement a SubscribeInboundInterceptor
that asynchronously modifies an inbound SUBSCRIBE
message with the Managed Extension Executor Service.
public class ModifyingAsyncSubscribeInboundInterceptor implements SubscribeInboundInterceptor {
@Override
public void onInboundSubscribe(
final @NotNull SubscribeInboundInput subscribeInboundInput,
final @NotNull SubscribeInboundOutput subscribeInboundOutput) {
// make output asynchronous with a timeout duration of 10 seconds and the timeout fallback failure
final Async<SubscribeInboundOutput> asyncOutput =
subscribeInboundOutput.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
// submit external task to the extension executor service
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(() -> {
// get the modifiable subscribe packet from the output
final ModifiableSubscribePacket subscribe = subscribeInboundOutput.getSubscribePacket();
// call external task that modifies the subscribe packet (method not provided)
callExternalTask(subscribe);
});
// add a callback for completion of the task
taskFuture.whenComplete((ignored, throwable) -> {
if (throwable != null) {
throwable.printStackTrace(); // please use more sophisticated logging
}
// resume output to tell HiveMQ that asynchronous precessing is done
asyncOutput.resume();
});
}
}
Suback Outbound Interceptor
With the SubackOutboundInterceptor
interface, you can intercept outbound MQTT UNSUBACK
messages.
You can use the SubackOutboundInterceptor
to modify or analyse the data that is contained in outbound MQTT SUBACK
messages.
For this task, HiveMQ provides SubackOutboundInput
and SubackOutboundOutput
parameters.
Extension input and output principles apply.
The SubackOutboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
When an asynchronous SubackOutboundOutput
parameter times out, all pending changes to the SUBACK
are discarded and the unmodified SUBACK
is sent to the client.
Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.
The SubackOutboundInput
and SubackOutboundOutput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The cause of the disconnection. Clients can use this information to determine whether to retry the connection, and how long to wait before a reconnection attempt. |
|
A UTF-8 encoded string that shows the cause of the disconnect. This Reason String is human readable and designed for diagnostics. |
|
Additional diagnostic or other information. |
Interception of an outbound SUBACK
message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending an
SUBACK
message to a client.
When an interceptor throws an exception or causes a timeout using the async method,
then HiveMQ will ignore changes to the modifiable SUBACK packet made by the interceptor and will call the next interceptor.
If there are no further interceptors, the SUBACK is sent to the client.
|
Suback Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
DISCONNECT
packet -
Connection information
-
Client Information
The JavaDoc can be found here.
Suback Outbound Output
The output parameter contains the modifiable SUBACK
packet.
You can use the SUBACK
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate the packets values.
Some values cause exceptions to be thrown.
For example, null or non-UTF8 characters.
For more information on ways to manipulate outbound suback packets, see
here.
Example Usage
This section provides examples on how to implement different SubackOutboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up Interceptor in ClientContext
This example shows how to set up a SubackOutboundInterceptor
with the help of a ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final SubackOutboundInterceptor interceptor = (subackOutboundInput, subackOutboundOutput) -> {
log.debug("intercepted a SUBACK packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addSubackOutboundInterceptor(interceptor);
});
}
...
Modify Outbound SUBACK
This example shows how to implement a SubackOutboundInterceptor
that modifies the parameters of an outbound SUBACK
message:
public class ModifyingSubackOutboundInterceptor implements SubackOutboundInterceptor {
@Override
public void onOutboundSuback(final @NotNull SubackOutboundInput subackOutboundInput, final @NotNull SubackOutboundOutput subackOutboundOutput) {
//get the modifiable suback object from the output
final ModifiableSubackPacket subackPacket = subackOutboundOutput.getSubackPacket();
subackPacket.setReasonString("The reason for the unsuccessful suback");
subackPacket.setReasonCode(DisconnectReasonCode.MALFORMED_PACKET);
subackPacket.getUserProperties().addUserProperty("my-prop", "some value");
subackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
}
}
Modify Outbound SUBACK Asynchronously
This example shows how to implement a SubackOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound SUBACK
message.
If the task takes more than 10 seconds, the unmodified SUBACK
is forwarded to the next interceptor or finally sent to the client.
public class AsyncModifyingSubackOutboundInterceptor implements SubackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingSubackOutboundInterceptor.class);
@Override
public void onOutboundSuback(final @NotNull SubackOutboundInput subackOutboundInput, final @NotNull SubackOutboundOutput subackOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<SubackOutboundOutput> async = subackOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable suback object from the output
final ModifiableSubackPacket subackPacket = subackOutboundOutput.getSubackPacket();
// modify / overwrite parameters of a suback packet.
try {
//Set reason string and reason code.
subackPacket.setReasonString("The reason for the unsuccessful Suback");
subackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
subackPacket.getUserProperties().addUserProperty("my-prop", "some value");
subackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Suback outbound interception failed:", e);
}
async.resume();
}
});
}
}
Unsubscribe Inbound Interceptor
With the UnsubscribeInboundInterceptor
interface, you can intercept inbound MQTT UNSUBSCRIBE
messages.
You can use the UnsubscribeInboundInterceptor
to modify or analyse the data that is contained in inbound MQTT UNSUBSCRIBE
messages.
For this task, HiveMQ provides UnsubscribeInboundInput
and UnsubscribeInboundOutput
parameters.
Extension input and output principles apply.
The UnsubscribeInboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.
The UnsubscribeInboundInput
and UnsubscribeInboundOutput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
Lists the topics filters from which to unsubscribe the client. |
|
Additional diagnostic or other information. |
When an interceptor throws an exception or causes a timeout using the async method,
then HiveMQ will prevent the unsubscribe and will return an UNSUBACK packet to the client.
For an MQTT 5 client the UNSUBACK will contain the reason code Unspecified error for each topic filter.
|
Unsubscribe Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
UNSUBSCRIBE
packet -
Connection information
-
Client information
The JavaDoc can be found here.
Unsubscribe Inbound Output
The output parameter contains the modifiable UNSUBSCRIBE
packet.
You can use the UNSUBSCRIBE
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound unsubscribe packets, see
here.
Example Usage
This section provides examples on how to implement different UnsubscribeInboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up Interceptor in Client Context
This example shows how to set up an UnsubscribeInboundInterceptor
with the help of a ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final UnsubscribeInboundInterceptor interceptor = (unsubscribeInboundInput, unsubscribeInboundOutput) -> {
log.info("intercepted a UNSUBSCRIBE packet for client {}.", unsubscribeInboundInput.getClientInformation().getClientId());
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addUnsubscribeInboundInterceptor(interceptor);
});
}
...
Modify Inbound UNSUBSCRIBE
This example shows how to implement an UnsubscribeInboundInterceptor
that modifies the parameters of an inbound UNSUBSCRIBE
message:
public class ModifyingUnsubscribeInboundInterceptor implements UnsubscribeInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingUnsubscribeInboundInterceptor.class);
@Override
public void onInboundUnsubscribe(final @NotNull UnsubscribeInboundInput unsubscribeInboundInput, final @NotNull UnsubscribeInboundOutput unsubscribeInboundOutput) {
//get the modifiable disconnect object from the output
final ModifiableUnsubscribePacket unsubscribePacket = unsubscribeInboundOutput.getUnsubscribePacket();
// modify / overwrite parameters of the disconnect packet.
try {
unsubscribePacket.getUserProperties().addUserProperty("my-prop", "some value");
unsubscribePacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Unsubscribe inbound interception failed:", e);
}
}
}
Modify Inbound UNSUBSCRIBE Asynchronously
This example shows how to implement an UnsubscribeInboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an inbound UNSUBSCRIBE
message.
public class AsyncModifyingUnsubscribeInboundInterceptor implements UnsubscribeInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingUnsubscribeInboundInterceptor.class);
@Override
public void onInboundUnsubscribe(final @NotNull UnsubscribeInboundInput unsubscribeInboundInput, final @NotNull UnsubscribeInboundOutput unsubscribeInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<UnsubscribeInboundOutput> async = unsubscribeInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable unsubscribe object from the output
final ModifiableUnsubscribePacket unsubscribePacket = unsubscribeInboundOutput.getUnsubscribePacket();
// modify / overwrite parameters of a unsubscribe packet.
try {
//Set reason string and reason code.
unsubscribePacket.setReasonString("The reason for the unsuccessful puback");
unsubscribePacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
unsubscribePacket.getUserProperties().addUserProperty("my-prop", "some value");
unsubscribePacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Unsubscribe inbound interception failed:", e);
}
async.resume();
}
});
}
}
Unsuback Outbound Interceptor
All changeable properties in the UNSUBACK packet are MQTT 5 properties and will not be sent to MQTT 3 clients.
Therefore, to optimize resource usage, the UNSUBACK packet should not be modified for MQTT 3 clients.
|
With the UnsubackOutboundInterceptor
interface, you can intercept outbound MQTT UNSUBACK
messages.
You can use the UnsubackOutboundInterceptor
to modify or analyse the data that is contained in outbound MQTT UNSUBACK
messages.
For this task, HiveMQ provides UnsubackOutboundInput
and UnsubackOutboundOutput
parameters.
Extension input and output principles apply.
The UnsubackOutboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
When an asynchronous UnsubackOutboundOutput
parameter times out, all pending changes to the UNSUBACK
are discarded
and the unmodified UNSUBACK
is sent to the client.
Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.
The UnsubackOutboundInput
and UnsubackOutboundOutput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The list of reason codes that correspond to the different topic filters that are used in the unsubscribe package. |
|
An UTF-8 encoded string that shows the cause of the |
|
Additional diagnostic or other information (optional). |
Interception of an outbound UNSUBACK
message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending an
UNSUBACK
message to a client.
When an interceptor throws an exception or causes a timeout using the async method,
then HiveMQ will ignore changes to the modifiable UNSUBACK packet made by the interceptor and will call the next interceptor.
If there are no further interceptors, the UNSUBACK is sent to the client.
|
Unsuback Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
UNSUBACK
packet -
Connection information
-
Client information
The JavaDoc can be found here.
Unsuback Outbound Output
The output parameter contains the modifiable UNSUBACK
packet.
You can use the UNSUBACK
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown. For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound unsuback packets, see
here.
Example Usage
This section provides examples on how to implement different UnsubackOutboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up Interceptor in Client Context
This example shows how to set up a UnsubackOutboundInterceptor
with the help of a ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final UnsubackOutboundInterceptor interceptor = (unsubackOutboundInput, unsubackOutboundOutput) -> {
log.debug("intercepted a UNSUBACK packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addUnsubackOutboundInterceptor(interceptor);
});
}
...
Modify Outbound UNSUBACK
This example shows how to implement an UnsubackOutboundInterceptor
that modifies the parameters of an outbound UNSUBACK
message:
public class ModifyingUnsubackOutboundInterceptor implements UnsubackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingUnsubackOutboundInterceptor.class);
@Override
public void onOutboundUnsuback(final @NotNull UnsubackOutboundInput unsubackOutboundInput, final @NotNull UnsubackOutboundOutput unsubackOutboundOutput) {
//get the modifiable unsuback object from the output
final ModifiableUnsubackPacket unsubackPacket = unsubackOutboundOutput.getUnsubackPacket();
// modify / overwrite parameters of the unsuback packet.
try {
//Set reason string and reason code if reason code is not success.
unsubackPacket.setReasonString("The reason for the unsuccessful unsuback");
List<UnsubackReasonCode> reasonCodes = new ArrayList<>();
reasonCodes.add(UnsubackReasonCode.SUCCESS);
unsubackPacket.setReasonCodes(reasonCodes);
unsubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
unsubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Unsuback outbound interception failed:", e);
}
}
}
Modify Outbound UNSUBACK Asynchronously
This example shows how to implement an UnsubackOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound UNSUBACK
message.
If the task takes more than 10 seconds, the unmodified UNSUBACK
is sent to the client.
public class AsyncModifyingUnsubackOutboundInterceptor implements UnsubackOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingUnsubackOutboundInterceptor.class);
@Override
public void onOutboundUnsuback(final @NotNull UnsubackOutboundInput unsubackOutboundInput, final @NotNull UnsubackOutboundOutput unsubackOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<UnsubackOutboundOutput> async = unsubackOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable unsuback object from the output
final ModifiableOutboundPacket unsubackPacket = unsubackOutboundOutput.getUnsubackPacket();
// modify / overwrite parameters of a unsuback packet.
try {
//Set reason string and reason code.
unsubackPacket.setReasonString("The reason for the unsuccessful puback");
List<UnsubackReasonCode> reasonCodes = new ArrayList<>();
reasonCodes.add(UnsubackReasonCode.SUCCESS);
unsubackPacket.setReasonCodes(reasonCodes);
unsubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
unsubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Unsuback outbound interception failed:", e);
}
async.resume();
}
});
}
}
Disconnect Inbound Interceptor
On the DisconnectInboundInterceptor
interface, you can intercept inbound MQTT DISCONNECT
messages.
You can use the DisconnectInboundInterceptor
to modify or analyse the data that is contained in inbound MQTT DISCONNECT
messages.
For this task, HiveMQ provides DisconnectInboundInput
, and DisconnectInboundOutput
parameters.
Extension input and output principles apply.
The DisconnectInboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
When an asynchronous DisconnectInboundOutput
parameter times out, all pending changes to the DISCONNECT
are discarded and the unmodified DISCONNECT
is sent to the broker.
Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The DisconnectInboundInput
and DisconnectInboundOutput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The cause of the disconnection. Clients can use this information to determine whether to retry the connection, and how long to wait before a reconnection attempt. |
|
An UTF-8 encoded string that shows the cause of the disconnect. This Reason String is human readable and designed for diagnostics. |
|
Additional diagnostic or other information. |
|
The amount of time (in seconds) after the client disconnects that the session data of the client is still valid on the HiveMQ broker. The time is counted from the moment that the client disconnects. If the Session Expiry Interval of the corresponding Connect packet is set to zero, the Session Expiry Interval of the Disconnect must also be set to zero. |
Interception of an inbound DISCONNECT
message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
DISCONNECT
message to the server.
Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable disconnect packet by the interceptor and will call the next interceptor or should no interceptor be left to call will process the disconnect request by the client. |
Disconnect Inbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
DISCONNECT
packet -
Connection information
-
Client Information
The JavaDoc can be found here.
Disconnect Inbound Output
The output parameter contains the modifiable DISCONNECT
packet.
You can use the DISCONNECT
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate values in the packet.
Some values cause exceptions to be thrown.
For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound disconnect packets, see
here.
Example Usage
This section provides examples on how to implement different DisconnectInboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up Interceptor in ClientContext
This example shows how to set up a DisconnectInboundInterceptor
with the help of a ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final DisconnectInboundInterceptor interceptor = (disconnectInboundInput, disconnectInboundOutput) -> {
log.debug("intercepted a DISCONNECT packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addDisconnectInboundInterceptor(interceptor);
});
}
...
Modify Inbound DISCONNECT
This example shows how to implement a DisconnectInboundInterceptor
that modifies the parameters of an inbound DISCONNECT
message:
public class ModifyingDisconnectInboundInterceptor implements DisconnectInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingDisconnectInboundInterceptor.class);
@Override
public void onInboundDisconnect(final @NotNull DisconnectInboundInput disconnectInboundInput, final @NotNull DisconnectInboundOutput disconnectInboundOutput) {
//get the modifiable disconnect object from the output
final ModifiableInboundDisconnectPacket disconnectPacket = disconnectInboundOutput.getDisconnectPacket();
// modify / overwrite parameters of the disconnect packet.
try {
if (disconnectPacket.getReasonCode() != DisconnectReasonCode.NORMAL_DISCONNECTION) {
//Set reason string and reason code if reason code is not success.
disconnectPacket.setReasonString("The reason for the unsuccessful disconnect");
disconnectPacket.setReasonCode(DisconnectReasonCode.MALFORMED_PACKET);
}
disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Disconnect inbound interception failed:", e);
}
}
}
Modify Inbound DISCONNECT Asynchronously
This example shows how to implement a DisconnectInboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an inbound DISCONNECT
message.
If the task takes more than 10 seconds, the unmodified DISCONNECT
is sent to the client.
public class AsyncModifyingDisconnectInboundInterceptor implements DisconnectInboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingDisconnectInboundInterceptor.class);
@Override
public void onInboundDisconnect(final @NotNull DisconnectInboundInput disconnectInboundInput, final @NotNull DisconnectInboundOutput disconnectInboundOutput) {
//make output object async with a duration of 10 seconds
final Async<DisconnectInboundOutput> async = disconnectInboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable disconnect object from the output
final ModifiableDisconnectInboundPacket disconnectPacket = disconnectInboundOutput.getDisconnectPacket();
// modify / overwrite parameters of a disconnect packet.
try {
//Set reason string and reason code.
disconnectPacket.setReasonString("The reason for the unsuccessful puback");
disconnectPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Disconnect inbound interception failed:", e);
}
async.resume();
}
});
}
}
Modify Session Expiry Interval
This example shows the precautions to take when you modify the session expiry interval. It is not possible to change the session expiry interval of clients that connect with a session expiry value of 0.
public class SessionExpiryDisconnectInboundInterceptor implements DisconnectInboundInterceptor {
@Override
public void onInboundDisconnect(final @NotNull DisconnectInboundInput disconnectInboundInput, final @NotNull DisconnectInboundOutput disconnectInboundOutput) {
if (disconnectInboundInput.getDisconnectPacket().getSessionExpiryInterval().isPresent()) {
if (disconnectInboundInput.getDisconnectPacket().getSessionExpiryInterval().get() != 0) {
disconnectInboundOutput.getDisconnectPacket().setSessionExpiryInterval(10L);
}
}
}
}
Disconnect Outbound Interceptor
Server sent DISCONNECT packets only exist in MQTT 5, this interceptor will not be called for MQTT 3 clients.
|
On the DisconnectOutboundInterceptor
interface, you can intercept outbound MQTT DISCONNECT
messages.
You can use the DisconnectOutboundInterceptor
to modify or analyse the data that is contained in outbound MQTT DISCONNECT
messages.
For this task, HiveMQ provides DisconnectOutboundInput
, and DisconnectOutboundOutput
parameters.
Extension input and output principles apply.
The DisconnectOutboundOutput
parameter can easily be used for asynchronous processing.
The interceptor can be set up in the ClientContext
.
When an asynchronous DisconnectOutboundOutput
parameter times out, all pending changes to the DISCONNECT
are discarded and the unmodified DISCONNECT
is sent to the client.
Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
The DisconnectOutboundInput
and DisconnectOutboundOutput
parameters are updated after each interception.
The following parameters can be modified:
Parameter | Description |
---|---|
|
The cause of the disconnection. Clients can use this information to determine whether to retry the connection, and how long to wait before a reconnection attempt. |
|
An UTF-8 encoded string that shows the cause of the disconnect. This Reason String is human readable and designed for diagnostics. |
|
Additional diagnostic or other information. |
|
Specifies an alternative broker to which the client redirects on Disconnect . |
Interception of an outbound DISCONNECT
message has the following limitations:
-
You can not change a successful reason code to an unsuccessful reason code (or vice versa).
-
It is not possible to prevent sending a
DISCONNECT
message to a client.
Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable disconnect packet by the interceptor and will call the next interceptor or should no interceptor be left to call will disconnect the client. |
Disconnect Outbound Input
The input parameter contains the following unmodifiable information:
-
MQTT
DISCONNECT
packet -
Connection information
-
Client Information
The JavaDoc can be found here.
Disconnect Outbound Output
The output parameter contains the modifiable DISCONNECT
packet.
You can use the DISCONNECT
packet to manipulate the parameters that are listed in the table above.
Use caution when you manipulate the packets values.
Some values cause exceptions to be thrown.
For example, null or non-UTF8 characters.
For more information on ways to manipulate inbound disconnect packets, see
here.
Example Usage
This section provides examples on how to implement different DisconnectOutboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up Interceptor in ClientContext
This example shows how to set up a DisconnectOutboundInterceptor
with the help of a ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final DisconnectOutboundInterceptor interceptor = (disconnectOutboundInput, disconnectOutboundOutput) -> {
log.debug("intercepted a DISCONNECT packet.");
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addDisconnectOutboundInterceptor(interceptor);
});
}
...
Modify Outbound DISCONNECT
This example shows how to implement a DisconnectOutboundInterceptor
that modifies the parameters of an outbound DISCONNECT
message:
public class ModifyingDisconnectOutboundInterceptor implements DisconnectOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(ModifyingDisconnectOutboundInterceptor.class);
@Override
public void onOutboundDisconnect(final @NotNull DisconnectOutboundInput disconnectOutboundInput, final @NotNull DisconnectOutboundOutput disconnectOutboundOutput) {
//get the modifiable disconnect object from the output
final ModifiableOutboundDisconnectPacket disconnectPacket = disconnectOutboundOutput.getDisconnectPacket();
// modify / overwrite parameters of the disconnect packet.
try {
if (disconnectPacket.getReasonCode() != DisconnectReasonCode.NORMAL_DISCONNECTION) {
//Set reason string and reason code if reason code is not success.
disconnectPacket.setReasonString("The reason for the unsuccessful disconnect");
disconnectPacket.setReasonCode(DisconnectReasonCode.MALFORMED_PACKET);
}
disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Disconnect outbound interception failed:", e);
}
}
}
Modify Outbound DISCONNECT Asynchronously
This example shows how to implement a DisconnectOutboundInterceptor
that uses the Managed Extension Executor Service
to asynchronously modify an outbound DISCONNECT
message.
If the task takes more than 10 seconds, the unmodified DISCONNECT
is sent to the client.
public class AsyncModifyingDisconnectOutboundInterceptor implements DisconnectOutboundInterceptor {
private static final Logger log = LoggerFactory.getLogger(AsyncModifyingDisconnectOutboundInterceptor.class);
@Override
public void onOutboundDisconnect(final @NotNull DisconnectOutboundInput disconnectOutboundInput, final @NotNull DisconnectOutboundOutput disconnectOutboundOutput) {
//make output object async with a duration of 10 seconds
final Async<DisconnectOutboundOutput> async = disconnectOutboundOutput.async(Duration.ofSeconds(10));
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable disconnect object from the output
final ModifiableDisconnectOutboundPacket disconnectPacket = disconnectOutboundOutput.getDisconnectPacket();
// modify / overwrite parameters of a disconnect packet.
try {
//Set reason string and reason code.
disconnectPacket.setReasonString("The reason for the unsuccessful puback");
disconnectPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
} catch (final Exception e){
log.debug("Disconnect outbound interception failed:", e);
}
async.resume();
}
});
}
}
Pingreq Inbound Interceptor
With the PingReqInboundInterceptor
interface, you can intercept inbound MQTT PINGREQ
messages.
You can use the PingReqInboundInterceptor
to monitor sent PINGRESP
packets.
For this task, HiveMQ provides PingReqInboundInput
and PingReqInboundOutput
parameters.
Extension input and output principles apply.
The interceptor can be set up in the ClientContext
.
Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
It is not possible to prevent sending an inbound PINGREQ .
|
Pingreq Inbound Input
The input parameter contains the following unmodifiable information:
-
Connection information
-
Client Information
The JavaDoc can be found here.
Pingreq Inbound Output
Since PINGREQ
message are static by nature, there is no way of modifying any of its values.
Example Usage
This section provides examples on how to implement different PingReqInboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up Interceptor in ClientContext
This example shows how to set up a PingReqInboundInterceptor
with the help of a ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PingReqInboundInterceptor interceptor = (pingReqInboundInput, pingReqInboundOutput) -> {
final String clientId = pingReqInboundInput.getClientInformation().getClientId();
log.info("intercepted a PINGREQ packet of client '{}'.", clientId);
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPingReqInboundInterceptor(interceptor);
});
}
...
Pingresp Outbound Interceptor
With the PingRespOutboundInterceptor
interface, you can intercept outbound MQTT PINGRESP
messages.
You can use the PingRespOutboundInterceptor
to monitor sent PINGRESP
packets.
For this task, HiveMQ provides PingRespOutboundInput
and PingRespOutboundOutput
parameters.
Extension input and output principles apply.
The interceptor can be set up in the ClientContext
.
Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.
It is not possible to prevent sending an outbound PINGRESP .
|
Pingresp Outbound Input
The input parameter contains the following unmodifiable information:
-
Connection information
-
Client Information
The JavaDoc can be found here.
Pingresp Outbound Output
Since PINGRESP
message are static by nature, there is no way of modifying any of its values.
Example Usage
This section provides examples on how to implement different PingRespOutboundInterceptor
and add them with the ClientContext
.
Examples:
Set Up Interceptor in ClientContext
This example shows how to set up a PingRespOutboundInterceptor
with the help of a ClientContext
and the ClientInitializer
:
...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {
final PingRespOutboundInterceptor interceptor = (pingRespOutboundInput, pingRespOutboundOutput) -> {
final String clientId = pingRespOutboundInput.getClientInformation().getClientId();
log.info("intercepted a PINGRESP packet of client '{}'.", clientId);
};
Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
clientContext.addPingRespOutboundInterceptor(interceptor);
});
}
...