Interceptors

HiveMQ Interceptors provide a convenient way for extensions to intercept and modify MQTT messages.

Based on the type of interceptor, you can register a ClientInitializer or use the GlobalInterceptorRegistry to add the interceptor that you want.

The following table lists all available HiveMQ Interceptors:

Table 1. Available HiveMQ Interceptors
Interceptor Description

Connect Inbound Interceptor

Enables extensions to modify incoming CONNECT messages.

Connack Outbound Interceptor

Enables extensions to intercept and modify outbound CONNACK messages.

Publish Inbound Interceptor

Enables extensions to intercept inbound PUBLISH messages, allowing modification or prevention of onward delivery.

Publish Outbound Interceptor

Enables extensions to intercept outgoing PUBLISH messages, allowing modification or prevention of onward delivery.

Puback Inbound Interceptor

Enables extensions to intercept and modify inbound PUBACK messages.

Puback Outbound Interceptor

Enables extensions to intercept and modify outbound PUBACK messages.

Pubrec Inbound Interceptor

Enables extensions to intercept and modify inbound PUBREC messages.

Pubrec Outbound Interceptor

Enables extensions to intercept and modify outbound PUBREC messages.

Pubrel Inbound Interceptor

Enables extensions to intercept and modify inbound PUBREL messages.

Pubrel Outbound Interceptor

Enables extensions to intercept and modify outbound PUBREL messages.

Pubcomp Inbound Interceptor

Enables extensions to intercept and modify inbound PUBCOMP messages.

Pubcomp Outbound Interceptor

Enables extensions to intercept and modify outbound PUBCOMP messages.

Subscribe Inbound Interceptor

Enables extensions to intercept and modify inbound SUBSCRIBE messages.

Suback Outbound Interceptor

Enables extensions to intercept and modify outbound SUBACK messages.

Unsubscribe Inbound Interceptor

Enables extensions to intercept and modify inbound UNSUBSCRIBE messages.

Unsuback Outbound Interceptor

Enables the extensions to intercept and modify outbound UNSUBACK messages.

Disconnect Inbound Interceptor

Enables extensions to intercept and modify inbound DISCONNECT messages.

Disconnect Outbound Interceptor

Enables extensions to intercept and modify outbound DISCONNECT messages.

PingReq Inbound Interceptor

Enables extensions to intercept inbound PINGREQ messages.

PingResp Outbound Interceptor

Enables extensions to intercept outbound PINGRESP messages.



Connect Inbound Interceptor

With the ConnectInboundInterceptor interface you can modify incoming MQTT CONNECT messages. You can use the ConnectInboundInterceptor to modify or analyse the data that is contained in inbound MQTT Connect messages. For this task, HiveMQ provides ConnectInboundInput and ConnectInboundOutput parameters. Extension input and output principles apply.

A ClientInitializer that is registered through the Initializer Registry can be used to add the ConnectInboundInterceptor.

When an asynchronous ConnectInboundOutput parameter with a TimeoutFallback.FAILURE fallback strategy times out, a CONNACK with the UNSPECIFIED_ERROR (128) reason code is sent to the client and the client is disconnected.

Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.

The ConnectInboundInput and ConnectInboundOutput parameters are updated after each interception.

The JavaDoc for the ConnectInboundInterceptor can be found here.

It is important to note that changes made to the CONNECT packet are usually not expected by the client implementation. For example, if a client with a modified client ID connects and expects a session to be present, the client may disconnect because no session-present flag on the server corresponds to the modified client ID.

The following parameters can be modified:

Parameter Description

Client ID

The unique ID of a client.

Clean Start flag

A flag that shows whether the client wants to start a new “clean” session (true) or resume a previous session if present (false)

Session Expiry Interval

The number of seconds that the broker stores session information for the MQTT client. The time is calculated from the moment that the client disconnects.

Keep alive

The time interval that is allowed between keep alive messages. When set to 0, the keep-alive feature is disabled.

Receive Maximum

Limits the number of QoS 1 and QoS 2 publications that the connected client can process concurrently.

Maximum packet size

The maximum packet size the connected client accepts.

Topic alias maximum

The maximum number of topic aliases that the client can hold.

Request response information

A flag that requests the server to return response information in the corresponding Connack packet.

Request problem information

A flag that requests the server to return problem information (for example reason string and user properties).

Authentication Method

A UTF-8 value that shows the authentication method that is used for the connect.

Authentication Data

The data that is used for authentication. If no authentication method is given, this parameter is not used.

Password

The password the client uses to connect.

Will publish packet

The Will publish that is set for the CONNECT. The Will can be modified or overridden.

User Properties

Additional user defined information (optional).

When an interceptor makes changes to the CONNECT message of a client, the client is not notified. To provide feedback to the client, you can use the ConnectInboundInterceptor to add user properties to the CONNECT packet.

Set Up Connect Inbound Interceptor

The following example shows how to set up a ConnectInboundInterceptor.

...

@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

    try {
        final ConnectInboundInterceptorA interceptorA = new ConnectInboundInterceptorA();
        final ConnectInboundInterceptorB interceptorB = new ConnectInboundInterceptorB();
        Services.interceptorRegistry().setConnectInboundInterceptorProvider(input -> {
            final String clientId = input.getClientInformation().getClientId();
            if (clientId.startsWith("a")) {
                return interceptorA;
            } else {
                return interceptorB;
            }
        });

    } catch (Exception e) {
        log.error("Exception thrown at extension start: ", e);
    }
}

...

Modify a CONNECT Message

The following example ensures that a client connects with a clean start and a session expiry of zero.

public class ModifyConnectInboundInterceptor implements ConnectInboundInterceptor {
    @Override
    public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
       final ModifiableConnectPacket connectPacket = output.getConnectPacket();
       final ModifiableUserProperties userProperties = connectPacket.getUserProperties();
       if (connectPacket.getUserProperties().getFirst("clean").isPresent()) {
           connectPacket.setSessionExpiryInterval(0);
           connectPacket.setCleanStart(true);
       }
    }
}

The following example requests a new ID for a connecting client and sets the new ID asynchronously.

public class ModifyConnectInboundInterceptor implements ConnectInboundInterceptor {
    @Override
    public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
        final Async<ConnectInboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
        final ModifiableConnectPacket connectPacket = output.getConnectPacket();
        final CompletableFuture<String> modifiedClientIdFuture = modifyClientId(connectPacket.getClientId());
        modifiedClientIdFuture.whenComplete((modifiedClientId, throwable) -> {
            if (modifiedClientId != null) {
                connectPacket.setClientId(modifiedClientId);
                async.resume();
            }
        });
    }
}

Modify a Last Will Message

The Will message of a CONNECT packet can be modified or removed. If the CONNECT does not include a Will message, a new Will can be created via the WillPublishBuilder and added to the CONNECT.

public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
    @Override
    public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
        final ModifiableConnectPacket connectPacket = output.getConnectPacket();
        if (connectPacket.getModifiableWillPublish().isEmpty()) {
            final String clientId = connectPacket.getClientId();
            final ByteBuffer payload = StandardCharsets.UTF_8.encode(clientId + " disconnected");
            final WillPublishBuilder builder = Builders.willPublish();
            final WillPublishPacket will = builder.willDelay(0)
                    .topic(clientId + "will")
                    .payload(payload)
                    .build();
            connectPacket.setWillPublish(will);
        }
    }
}

To remove a last-will message, you can set the will-message to 'null' as shown in the following example:

public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
    @Override
    public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
        final ModifiableConnectPacket connectPacket = output.getConnectPacket();
        if (connectPacket.getModifiableWillPublish().isPresent()) {
            connectPacket.setWillPublish(null);
        }
    }
}

The following example shows how to modify and existing will message:

public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
        @Override
        public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
            final ModifiableConnectPacket connectPacket = output.getConnectPacket();
            if (connectPacket.getModifiableWillPublish().isPresent()) {
                final ModifiableWillPublish modifiableWill = connectPacket.getModifiableWillPublish().get();
                modifiableWill.setTopic(connectPacket.getClientId() + "/will");
            }
        }
    }



Connack Outbound Interceptor

With the ConnackOutboundInterceptor interface you can intercept outbound MQTT CONNACK messages. You can use the ConnackOutboundInterceptor interface to modify or analyse the data that is contained in outbound MQTT CONNACK packet. For this task, HiveMQ provides ConnackOutboundInput and ConnackOutboundOutput parameters. Extension input and output principles apply.

You can use a ConnackOutboundInterceptor to modify outbound CONNACK messages.

The ConnackOutboundOutput parameter can easily be used for asynchronous processing.

The interceptor is set up in the ClientContext.

When an asynchronous ConnackOutboundOutput parameter with a TimeoutFallback.FAILURE fallback strategy times out, the connection of the client is closed without sending a CONNACK.

Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.

The ConnackOutboundOutput and ConnackOutboundInput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Reason Code

This parameter specifies if a Connect was successful and if not, holds the reason why it was unsuccessful.

Reason String

A UTF-8 encoded string that holds a string containing the reason.

User Properties

Additional diagnostic or other information (optional).

Server Reference

This parameter is used to pass another server for the client to connect to.

Response Information

A UTF-8 encoded string that holds the response information.

Client ID

The assigned client ID that is sent in the ConnackOutboundOutput.

Interception of a CONNACK message has the following limitations:

  • You cannot change a successful reason code to an unsuccessful reason code (or vice versa).

  • It is not possible to send response information to a client that did not request response information.

  • If the reason code is "SUCCESS", you cannot set a reason string.

  • It is not possible to prevent sending a CONNACK message to a client.

  • This interceptor can only modify the assigned client ID that is sent in the CONNACK packet. To modify the client ID that HiveMQ uses, see Connect Inbound Interceptor.

If response information is requested, you can use the ConnectInboundInterceptor to get the information as shown in this example. Furthermore Reason Code, Reason String and MQTT User Properties are MQTT 5 features. In MQTT 3 Environments the Reason Code will always be set to SUCCESS, the Reason String is set to null and it does not contain any user properties.

Connack Outbound Input

The input parameter contains the following unmodifiable information:

  • MQTT CONNACK packet

  • MQTT Version

  • Client ID

  • IP Address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS

The JavaDoc can be found here.

Connack Outbound Output

The output parameter contains a modifiable CONNACK packet. You can use the CONNACK packet to manipulate the parameters that are listed in the table above. For more information on ways to manipulate outbound CONNACK packets, see here.

Example Usage

This section provides examples of how to implement different ConnackOutboundInterceptor and add them through the GlobalInterceptorRegistry.

Examples:

Set Up In GlobalInterceptorRegistry

This example shows how to set up a ConnackOutboundInterceptor by a ConnackOutboundInterceptorProvider with the GlobalInterceptorRegistry:

...

@Override
public void extensionStart(@NotNull final ExtensionStartInput extensionStartInput, @NotNull final ExtensionStartOutput extensionStartOutput) {

    final ConnackOutboundInterceptorProvider interceptorProvider = new ConnackOutboundInterceptorProvider() {
        @Override
        public @NotNull ConnackOutboundInterceptor getConnackOutboundInterceptor(final @NotNull ConnackOutboundProviderInput connackOutboundProviderInput) {
            return new ConnackOutboundInterceptor() {
                @Override
                public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
                    //do something with the connack eg. logging
                    System.out.println("Connack intercepted for client: " + connackOutboundInput.getClientInformation().getClientId());
                }
            };
        }
    };

    Services.interceptorRegistry().setConnackOutboundInterceptorProvider(interceptorProvider);
}

...


Modify a CONNACK Message

This example shows how to implement a ConnackOutboundInterceptor that modifies parameters of an outbound CONNACK message:

public class ModifyingConnackOutboundInterceptor implements ConnackOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingConnackOutboundInterceptor.class);

    @Override
    public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
        //get the modifiable connack object from the output
        final ModifiableConnackPacket connackPacket = connackOutboundOutput.getConnackPacket();

        // modify / overwrite parameters of a connack packet.

        try {
            if (connackPacket.getReasonCode() != ConnackReasonCode.SUCCESS) {
                //Set reason string and reason code if reason code is not success.
                connackPacket.setReasonString("The reason for the not successful connack");
                connackPacket.setReasonCode(ConnackReasonCode.UNSPECIFIED_ERROR);
            }

            connackPacket.setServerReference("server reference");
            connackPacket.getUserProperties().addUserProperty("my-prop", "some value");
            connackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

        } catch (Exception e){
            log.debug("Connack outbound interception failed:", e);
        }
    }
}


Modify a CONNACK Message Asynchronously

This example shows how to implement a ConnackOutboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an outbound CONNACK message.

The default timeout strategy is failure. If the task takes more than 10 seconds, the connection of the client is closed without sending a CONNACK.

public class AsyncModifyingConnackOutboundInterceptor implements ConnackOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingConnackOutboundInterceptor.class);

    @Override
    public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<ConnackOutboundOutput> async = connackOutboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable connack object from the output
                final ModifiableConnackPacket connackPacket = connackOutboundOutput.getConnackPacket();
                // modify / overwrite parameters of a connack packet.

                try {
                    if (connackPacket.getReasonCode() != ConnackReasonCode.SUCCESS) {
                        //Set reason string and reason code if reason code is not success.
                        connackPacket.setReasonString("The reason for the not successful connack");
                        connackPacket.setReasonCode(ConnackReasonCode.UNSPECIFIED_ERROR);
                    }

                    connackPacket.setServerReference("server reference");
                    connackPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    connackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (Exception e){
                    log.debug("Connack outbound interception failed:", e);
                }
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
                if(throwable != null){
                    log.debug("Connack outbound interception failed:", throwable);
                }

                //resume output to tell HiveMQ it's done.
                async.resume();
            }
        });

    }
}


Modify the Response Information

This example shows how to implement a ConnackOutboundInterceptor that modifies the response information by storing the information if response information is requested in the ConnectInboundInterceptor:

public class ModifyResponseInformationMain implements ExtensionMain {

    private static final Logger log = LoggerFactory.getLogger(ModifyResponseInformationMain.class);

    private final Map<String, Boolean> clientIdResponseInformationRequestedMap = new HashMap<>();

    @Override
    public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

        final ConnectInboundInterceptor connectInboundInterceptor = new ConnectInboundInterceptor() {
            @Override
            public void onConnect(final @NotNull ConnectInboundInput connectInboundInput, final @NotNull ConnectInboundOutput connectInboundOutput) {

                try {
                    //get connect packet from input
                    final ConnectPacket connectPacket = connectInboundInput.getConnectPacket();

                    //store client id if response information is requested.
                    clientIdResponseInformationRequestedMap.put(connectPacket.getClientId(), connectPacket.getRequestResponseInformation());

                } catch (final Exception e){
                    log.debug("Connect inbound interception failed: ", e);
                }
            }
        };

        final ConnackOutboundInterceptor connackOutboundInterceptor = new ConnackOutboundInterceptor() {
            @Override
            public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {

                try {
                    //get client id from input
                    final String clientId = connackOutboundInput.getClientInformation().getClientId();

                    //check if problem information is requested
                    final Boolean responseInformationRequested = clientIdResponseInformationRequestedMap.get(clientId);
                    if (responseInformationRequested != null && responseInformationRequested) {
                        connackOutboundOutput.getConnackPacket().setResponseInformation("Some response information");
                    }

                    //remove from map since it is not needed anymore
                    clientIdResponseInformationRequestedMap.remove(clientId);
                } catch (final Exception e){
                    log.debug("Connack outbound interception failed: ", e);
                }
            }
        };

        // set connect inbound interceptor provider with interceptor
        Services.interceptorRegistry().setConnectInboundInterceptorProvider(new ConnectInboundInterceptorProvider() {
            @Override
            public ConnectInboundInterceptor getConnectInboundInterceptor(final @NotNull ConnectInboundProviderInput input) {
                return connectInboundInterceptor;
            }
        });

        // set connack outbound interceptor provider with interceptor
        Services.interceptorRegistry().setConnackOutboundInterceptorProvider(new ConnackOutboundInterceptorProvider() {
            @Override
            public ConnackOutboundInterceptor getConnackOutboundInterceptor(final @NotNull ConnackOutboundProviderInput input) {
                return connackOutboundInterceptor;
            }
        });

    }

    @Override
    public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {

    }
}



Publish Inbound Interceptor

With the PublishInboundInterceptor interface, you can intercept inbound MQTT PUBLISH packets. You can use the PublishInboundInterceptor to modify, analyse, or prevent the delivery of the data that is contained in inbound MQTT PUBLISH messages. For this task, HiveMQ provides PublishInboundInput and PublishInboundOutput parameters. Extension input and output principles apply.

The PublishInboundOutput is blocking , it can easily be used to create an asynchronous PublishInboundOutput object.

The interceptor can be set up in the ClientContext. For more information, see Initializer Registry.

When the delivery of a PUBLISH packet is prevented, the message is dropped.

In the HiveMQ Control Center, you can see the number of dropped messages from every PublishInboundInterceptor.

When delivery is prevented with an AckReasonCode other than SUCCESS, MQTT 3 clients are disconnected. However, an MQTT 5 client receives the PUBACK or PUBREC with the provided reason code and optional reason string.

Based on the quality of service level (QoS), when the AckReasonCode is SUCCESS an acknowledgement is sent (PUBACK or PUBREC).

When an async PublishInboundOutput times out with a TimeoutFallback.FAILURE fallback strategy, the process is the same.

Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first. The PublishInboundOutput and the PublishInboundInput are updated after each interception.

The following parameters can be modified:

Parameter Description

QoS

The quality of service level for the message. Possible values are 0, 1, and 2.

Topic

The information channel to which the data in the payload of the message is published.

Payload Format Indicator

A flag that represents the encoding of the payload. Possible values are UNSPECIFIED and UTF_8.

Message Expiry Interval

The number of seconds until the messages expire. If the server cannot begin message delivery to a specific subscriber within the message-expiry interval that is defined for the message, the copy of the message for that subscriber is deleted.

Response Topic

An optional UTF-8 string that can be used as a topic for response messages.

Correlation Data

Optional binary data that follows the response topic and enables the original sender of the request to handle asynchronous responses that can possibly be sent from multiple receivers.

Content Type

String parameter that describes the content of the publish message.

Payload

The payload of the publish message that contains the data that is published.

If any interceptor prevents delivery of a PUBLISH packet, no additional interceptors are called.

Publish Inbound Input

The input parameter contains the following unmodifiable information:

  • MQTT PUBLISH packet

  • MQTT Version

  • Client ID

  • IP Address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS

The JavaDoc can be found here.

Publish Inbound Output

The output parameter contains a modifiable PUBLISH packet. You can use the PUBLISH packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate values in the packet. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. For more information on ways to manipulate inbound PUBLISH packets, see here.

Example Usage

This section provides examples of how to implement different PublishInboundInterceptor and add them to a ClientInitializer via the InitializerRegistry.

Examples:

Set Up a PublishInboundInterceptor in the ClientInitializer

This example shows how to set up a PublishInboundInterceptor with the ClientInitializer.

...

@Override
public void extensionStart(final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {

    // create a new publish inbound interceptor
    final PublishInboundInterceptor publishInboundInterceptor = new PublishInboundInterceptor() {
        @Override
        public void onInboundPublish(
                final @NotNull PublishInboundInput publishInboundInput,
                final @NotNull PublishInboundOutput publishInboundOutput) {
            // do something with the publish, for example, logging
            System.out.println("Inbound publish message intercepted from client: "
                    + publishInboundInput.getClientInformation().getClientId());
        }
    };

    // create a new client initializer
    final ClientInitializer clientInitializer = new ClientInitializer() {
        @Override
        public void initialize(
                final @NotNull InitializerInput initializerInput,
                final @NotNull ClientContext clientContext) {
            // add the interceptor to the context of the connecting client
            clientContext.addPublishInboundInterceptor(publishInboundInterceptor);
        }
    };

    //register the client initializer
    Services.initializerRegistry().setClientInitializer(clientInitializer);
}

...


Modify a PUBLISH Packet

This example shows how to implement a PublishInboundInterceptor that modifies all parameters of an inbound PUBLISH message.

public class ModifyingPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(
            final @NotNull PublishInboundInput publishInboundInput,
            final @NotNull PublishInboundOutput publishInboundOutput) {

        // get the modifiable publish packet from the output
        final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();

        // modify / overwrite any parameter of the publish packet.
        publish.setContentType("modified contentType");
        publish.setCorrelationData(ByteBuffer.wrap("modified correlation data".getBytes()));
        publish.setMessageExpiryInterval(120L);
        publish.setPayload(ByteBuffer.wrap("modified payload".getBytes()));
        publish.setPayloadFormatIndicator(PayloadFormatIndicator.UNSPECIFIED);
        publish.setQos(Qos.AT_MOST_ONCE);
        publish.setTopic("modified topic");
        publish.setResponseTopic("modified response topic");
        publish.setRetain(false);
        publish.getUserProperties().clear();
        publish.getUserProperties().addUserProperty("mod-key", "mod-val");
    }
}


Modify a PUBLISH Packet Asynchronously

This example shows how to implement a PublishInboundInterceptor that modifies an inbound PUBLISH message asynchronously with the Managed Extension Executor Service.

The timeout strategy is failure. Onward delivery of the PUBLISH message will be prevented if the task takes longer than 10 seconds.

public class ModifyingAsyncPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(
            final @NotNull PublishInboundInput publishInboundInput,
            final @NotNull PublishInboundOutput publishInboundOutput) {

        // make the output object async with a timeout duration of 10 seconds and the timeout fallback failure
        final Async<PublishInboundOutput> asyncOutput =
                publishInboundOutput.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);

        //submit external task to extension executor service
        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {
                // get the modifiable publish packet from the output
                final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();

                // call external task that modifies the publish packet (method not provided)
                callExternalTask(publish);
            }
        });

        // add a callback for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object object, final @Nullable Throwable throwable) {
                if (throwable != null) {
                    throwable.printStackTrace(); // please use more sophisticated logging
                }

                // resume output to tell HiveMQ that asynchronous precessing is done
                asyncOutput.resume();
            }
        });
    }
}


Prevent Delivery of a PUBLISH Packet

This example shows how to implement a PublishInboundInterceptor that prevents the delivery of PUBLISH packets that come from clients with IDs that contain "prevent": and the topic is "topic/to/prevent".

public class PreventingPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(
            final @NotNull PublishInboundInput publishInboundInput,
            final @NotNull PublishInboundOutput publishInboundOutput) {

        // check some parameters if publish delivery must be prevented, for example, client id and topic.
        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final String topic = publishInboundInput.getPublishPacket().getTopic();

        if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {

            // prevent publish delivery on the output object
            publishInboundOutput.preventPublishDelivery();
        }
    }
}


Prevent Delivery of a PUBLISH Packet with Reason Code

This example shows how to implement a PublishInboundInterceptor that prevents the delivery of PUBLISH packets that come from clients with IDs that contain "prevent": and the topic is "topic/to/prevent".

When the quality of service level of the PUBLISH is greater than zero and the client is an MQTT 5 client, the client receives a PUBACK or PUBREC with reason code "TOPIC_NAME_INVALID".

Reason codes for PUBACK or PUBREC are an MQTT 5 feature. If the reason code is not "SUCCESS", MQTT 3 clients are disconnected ungracefully.

public class PreventAndAckPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(
            final @NotNull PublishInboundInput publishInboundInput,
            final @NotNull PublishInboundOutput publishInboundOutput) {

        // check some parameters if publish delivery must be prevented, for example client id and topic.
        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final String topic = publishInboundInput.getPublishPacket().getTopic();

        if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {

            // prevent publish delivery with a reason code on the output object
            publishInboundOutput.preventPublishDelivery(AckReasonCode.TOPIC_NAME_INVALID);
        }
    }
}


Prevent Delivery of a PUBLISH Packet with Reason String

This example shows how to implement a PublishInboundInterceptor that prevents the delivery of PUBLISH packets that come from clients with IDs that contain "prevent" and the topic is "topic/to/prevent".

When the quality of service level of the PUBLISH is greater than zero and the client is an MQTT 5 client, the client receives a PUBACK or PUBREC with the reason code "TOPIC_NAME_INVALID" and the reason string "It is not allowed to publish to topic: topic/to/prevent".

Reason codes for PUBACK or PUBREC are an MQTT 5 feature. If the reason code is not "SUCCESS", MQTT 3 clients are disconnected ungracefully.

public class PreventWithReasonPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(
            final @NotNull PublishInboundInput publishInboundInput,
            final @NotNull PublishInboundOutput publishInboundOutput) {

        // check some parameters if publish delivery must be prevented, foe example client id and topic.
        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final String topic = publishInboundInput.getPublishPacket().getTopic();

        if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {

            // prevent publish delivery with a reason code and reason string on the output object
            publishInboundOutput.preventPublishDelivery(AckReasonCode.TOPIC_NAME_INVALID, "It is not allowed to publish to topic: " + topic);
        }

    }
}


Publish Outbound Interceptor

With the PublishOutboundInterceptor interface, you can modify an MQTT PUBLISH at the moment that the packet is sent to a subscribed client or prevent the delivery of the publish completely. For this task, HiveMQ provides PublishOutboundInput, and PublishOutboundOutput parameters. Extension input and extension output principles apply.

On the PublishOutboundInterceptor interface, you can modify an MQTT PUBLISH at the moment that the packet is sent to a subscribed client or prevent the delivery of the publish completely. For this task a PublishOutboundInput and a PublishOutboundOutput are provided by HiveMQ. Extension input and extension output principles apply.

The interceptor can be set up in the ClientInitializer. For more information, see Initializer Registry.

When an asynchronous PublishOutboundOutput parameter with a TimeoutFallback.FAILURE fallback strategy times out, delivery is prevented.

Multiple interceptors are called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.

The PublishOutboundOutput and PublishOutboundInput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

QoS

The Quality of Service level for the message. Possible values are 0, 1, and 2.

Topic

The information channel to which the data in the payload of the message is published.

Payload Format Indicator

A flag that indicates the encoding of the payload.

Message Expiry Interval

The number of seconds until the messages expire. If the server cannot begin message delivery to a specific subscriber within the message-expiry interval that is defined for the message, the copy of the message for that subscriber is deleted.

Response Topic

An optional UTF-8 string that can be used as a topic for response messages.

Correlation Data

Optional binary data that follows the response topic and enables the original sender of the request to handle asynchronous responses that can possibly be sent from multiple receivers.

Content Type

String parameter that describes the content of the publish message.

Payload

The payload of the publish message that contains the data that is published.

The JavaDoc for the PublishOutboundInterceptor can be found here.

Example Usage

This section provides examples of how to implement different PublishOutboundInterceptor and add them to a ClientInitializer via the InitializerRegistry.

Examples:

Set up a PublishOutboundInterceptor in the ClientInitializer

...
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput,
                           final @NotNull ExtensionStartOutput extensionStartOutput) {

    try {

        Services.initializerRegistry().setClientInitializer(new ClientInitializer() {
            @Override
            public void initialize(@NotNull InitializerInput initializerInput, @NotNull ClientContext clientContext) {
                clientContext.addPublishOutboundInterceptor(new MyPublishOutboundInterceptor());
            }
        });

    } catch (Exception e) {
        log.error("Exception thrown at extension start: ", e);
    }
}
...


Prevent Delivery of a PUBLISH Message to a Client

You can use the PublishOutboundOutput parameter to prevent an outgoing publish. This method causes the message to be dropped. The message is not sent to the subscriber. Use of the parameter in this way has no effect on other subscribers for the same message. The com.hivemq.messages.dropped.extension-prevented.count metric tracks the number of messages that are dropped because the extension system prevented delivery.

public class PreventPublishOutboundInterceptor implements PublishOutboundInterceptor {
    @Override
    public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
        final String clientId = input.getClientInformation().getClientId();
        final String topic = input.getPublishPacket().getTopic();
        if (!topic.startsWith(clientId)) {
            output.preventPublishDelivery();
        }
    }
}


Modify Outgoing PUBLISH Message

Nearly all aspects of a PUBLISH packet can be modified in the interceptor, except the packet ID and the duplicate-delivery flag. It is important to note that these modifications only affect the way the subscriber receives the PUBLISH. There is no impact on the way that HiveMQ processes the message.

This affects the following properties:

Property Description

Topic

Only the subscribers that have matching subscriptions for the original topic receive the PUBLISH, regardless of the changes.

Retained

The original state of the retain flag determines whether or not the PUBLISH is retained by the broker.

QoS

Changes to the QoS level do not change the delivery guarantees.

Expiry Interval

The original interval is still used to decide if a message is expired.

Modify Topic Example

The following example modifies the topic of an outgoing PUBLISH message, adds a response topic, and stores the original topic as a user property:

public class ModifyPublishOutboundInterceptor implements PublishOutboundInterceptor {
    @Override
    public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
        final String clientId = input.getClientInformation().getClientId();
        final ModifiableOutboundPublish publish = output.getPublishPacket();
        final String originalTopic = publish.getTopic();

        publish.setTopic(originalTopic + "/" + clientId);
        publish.getUserProperties().addUserProperty("original-topic", originalTopic);

        publish.setResponseTopic("response/"+clientId);
    }
}

Modify Payload Example

The following example decodes the payload of an outgoing PUBLISH, alters the string, and sets the altered string as the new payload asynchronously:

public class ModifyPublishOutboundInterceptor implements PublishOutboundInterceptor {
    @Override
    public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
        final Async<PublishOutboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
        Services.extensionExecutorService().submit(() -> {
            final ModifiableOutboundPublish publish = output.getPublishPacket();
            final String payloadString = StandardCharsets.UTF_8.decode(publish.getPayload().get()).toString();
            final String newPayload = payloadString.toLowerCase();
            publish.setPayload(StandardCharsets.UTF_8.encode(newPayload));
            async.resume();
        });
    }
}


Puback Inbound Interceptor

On the PubackInboundInterceptor interface, you can intercept inbound MQTT PUBACK messages. At the moment HiveMQ receives the acknowledgement of a QoS 1 publish message from any client. You can use the PubackInboundInterceptor to modify or analyse the data that is contained in inbound MQTT PUBACK messages. For this task, HiveMQ provides PubackInboundInput and PubackInboundOutput parameters. Extension input and output principles apply.

The PubackInboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first. The PubackInboundOutput and PubackInboundInput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Reason Code

The reason code which determines the status of the PUBACK.

Reason String

An UTF-8 encoded string that shows the status of the PUBACK. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Interception of an inbound PUBACK message has the following limitations:

  • You cannot change a successful reason code to an unsuccessful reason code (or vice versa).

  • It is not possible to prevent sending a PUBACK message to the server.

Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable puback packet by the interceptor and will call the next interceptor or should no interceptor be left to call will process the PUBACK.

Puback Inbound Input

The input parameter contains the following unmodifiable information:

  • MQTT PUBACK packet

  • Connection information

  • Client Information

PUBACK reason code, PUBACK reason string, and MQTT user properties are MQTT 5 features. MQTT 3 PUBACK packets are represented by the same PUBACK packet, but always have a SUCCESS Reason Code. The reason string of the MQTT 3 PUBACK packet is set to 'null' and the packet does not contain any user properties.

The JavaDoc can be found here.

Puback Inbound Output

The output parameter contains the modifiable PUBACK packet. You can use the PUBACK packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate the packets values. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. For more information on ways to manipulate inbound puback packets, see here.

Example Usage

This section provides examples on how to implement different PubackInboundInterceptor and add them with the ClientContext.

Examples:

Set Up In ClientContext

This example shows how to set up a PubackInboundInterceptor with the ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final PubackInboundInterceptor interceptor = (pubackInboundInput, pubackInboundOutput) -> {
        log.debug("intercepted a PUBACK packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addPubackInboundInterceptor(interceptor);
    });
}
...

Modify Inbound PUBACK

This example shows how to implement a PubackInboundInterceptor that modifies parameters of an inbound PUBACK message:

public class ModifyingPubackInboundInterceptor implements PubackInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingPubackInboundInterceptor.class);

    @Override
    public void onInboundPuback(final @NotNull PubackInboundInput pubackInboundInput, final @NotNull PubackInboundOutput pubackInboundOutput) {
        //get the modifiable puback object from the output
        final ModifiablePubackPacket pubackPacket = pubackInboundOutput.getPubackPacket();

        // modify / overwrite parameters of a puback packet.

        try {
            if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
                pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {

                //Set reason string and reason code if reason code is not success.
                pubackPacket.setReasonString("The reason for the unsuccessful puback");
                pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
            }

            pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
            pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Puback inbound interception failed:", e);
        }
    }
}

Modify Inbound PUBACK Asynchronously

This example shows how to implement a PubackInboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an inbound PUBACK message.

If the task takes more than 10 seconds, the unmodified PUBACK is forwarded to the next interceptor or finally processed by the broker.

public class AsyncModifyingPubackInboundInterceptor implements PubackInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubackInboundInterceptor.class);

    @Override
    public void onInboundPuback(final @NotNull PubackInboundInput pubackInboundInput, final @NotNull PubackInboundOutput pubackInboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<PubackInboundOutput> async = pubackInboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable puback object from the output
                final ModifiablePubackPacket pubackPacket = pubackInboundOutput.getPubackPacket();
                // modify / overwrite parameters of a puback packet.

                try {
                    if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
                        pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {

                        //Set reason string and reason code if reason code is not success.
                        pubackPacket.setReasonString("The reason for the unsuccessful puback");
                        pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
                    }

                    pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Puback inbound interception failed:", e);
                }
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
                if (throwable != null){
                    log.debug("Puback inbound interception failed:", throwable);
                }

                //resume output to tell HiveMQ it's done.
                async.resume();
            }
        });
    }
}

Puback Outbound Interceptor

On the PubackOutboundInterceptor interface, you can intercept outbound MQTT PUBACK messages. You can use the PubackOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT PUBACK messages. For this task, HiveMQ provides PubackOutboundInput, and PubackOutboundOutput parameters. Extension input and output principles apply.

The PubackOutboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first. The PubackOutboundInput and PubackOutboundOutput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Reason Code

The reason code which determines the status of the PUBACK.

Reason String

An UTF-8 encoded string that shows the status of the PUBACK. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Interception of an outbound PUBACK message has the following limitations:

  • You can not change a successful reason code to an unsuccessful reason code (or vice versa).

  • It is not possible to prevent sending a PUBACK message to a client.

Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable puback packet by the interceptor and will call the next interceptor or should no interceptor be left to call will sent the PUBACK to the client.

Puback Outbound Input

The input parameter contains the following unmodifiable information:

  • MQTT PUBACK packet

  • Connection information

  • Client Information

PUBACK reason code, PUBACK reason string, and MQTT user properties are MQTT 5 features. MQTT 3 PUBACK packets are represented by the same PUBACK packet, but always have a SUCCESS Reason Code. The reason string of the MQTT 3 PUBACK packet is set to 'null' and the packet does not contain any user properties.

The JavaDoc can be found here.

Puback Outbound Output

The output parameter contains a modifiable PUBACK packet. The following properties can be modified: You can use the PUBACK packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate the packets values. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. For more information on ways to manipulate inbound PUBACK packets, see here.

Example Usage

This section provides examples on how to implement different PubackOutboundInterceptor and add them with the ClientContext.

Examples:

Set Up In ClientContext

This example shows how to set up a PubackOutboundInterceptor with the ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final PubackOutboundInterceptor interceptor = (pubackOutboundInput, pubackOutboundOutput) -> {
        log.debug("intercepted a PUBACK packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addPubackOutboundInterceptor(interceptor);
    });
}
...

Modify Outbound PUBACK

This example shows how to implement a PubackOutboundInterceptor that modifies parameters of an outbound PUBACK message:

public class ModifyingPubackOutboundInterceptor implements PubackOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingPubackOutboundInterceptor.class);

    @Override
    public void onOutboundPuback(final @NotNull PubackOutboundInput pubackOutboundInput, final @NotNull PubackOutboundOutput pubackOutboundOutput) {
        //get the modifiable puback object from the output
        final ModifiablePubackPacket pubackPacket = pubackOutboundOutput.getPubackPacket();

        // modify / overwrite parameters of a puback packet.

        try {
            if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
                pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {

                //Set reason string and reason code if reason code is not success.
                pubackPacket.setReasonString("The reason for the unsuccessful puback");
                pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
            }

            pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
            pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Puback outbound interception failed:", e);
        }
    }
}

Modify Outbound PUBACK Asynchronously

This example shows how to implement a PubackOutboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an outbound PUBACK message.

If the task takes more than 10 seconds, the unmodified PUBACK is forwarded to the next interceptor or finally sent to the client.

public class AsyncModifyingPubackOutboundInterceptor implements PubackOutboundPubackInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubackOutboundInterceptor.class);

    @Override
    public void onOutboundPuback(final @NotNull PubackOutboundInput pubackOutboundInput, final @NotNull PubackOutboundOutput pubackOutboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<PubackOutboundOutput> async = pubackOutboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable puback object from the output
                final ModifiablePubackPacket pubackPacket = pubackOutboundOutput.getPubackPacket();
                // modify / overwrite parameters of a puback packet.

                try {
                    if (pubackPacket.getReasonCode() != AckReasonCode.SUCCESS &&
                        pubackPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {

                        //Set reason string and reason code if reason code is not success.
                        pubackPacket.setReasonString("The reason for the unsuccessful puback");
                        pubackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
                    }

                    pubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    pubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Puback outbound interception failed:", e);
                }
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
                if (throwable != null){
                    log.debug("Puback outbound interception failed:", throwable);
                }

                //resume output to tell HiveMQ it's done.
                async.resume();
            }
        });
    }
}

Pubrec Inbound Interceptor

On the PubrecInboundInterceptor interface, you can intercept inbound MQTT PUBREC messages at the moment HiveMQ receives the acknowledgement of a QoS 1 publish message from any client. You can use the PubrecInboundInterceptor to modify or analyse the data that is contained in inbound MQTT PUBREC messages. For this task, HiveMQ provides PubrecInboundInput and PubrecInboundOutput parameters. Extension input and output principles apply.

The PubrecInboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

Multiple interceptors can be called sequentially. When various extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first. The PubrecInboundOutput and PubrecInboundInput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Reason Code

The reason code which determines the status of the PUBREC.

Reason String

An UTF-8 encoded string that shows the status of the PUBREC. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Interception of an inbound PUBREC message has the following limitations:

  • You cannot change a successful reason code to an unsuccessful reason code (or vice versa).

  • It is not possible to prevent sending a PUBREC message to the server.

Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable pubrec packet by the interceptor and will call the next interceptor or should no interceptor be left to call will process the PUBREC.

Pubrec Inbound Input

The input parameter contains the following unmodifiable information:

  • MQTT PUBREC packet

  • Connection information

  • Client information

PUBREC reason code, PUBREC reason string, and MQTT user properties are MQTT 5 features. MQTT 3 PUBREC packets are represented by the same PUBREC packet, but always have a SUCCESS Reason Code. The reason string of the MQTT 3 PUBREC packet is set to 'null' and the packet does not contain any user properties.

The JavaDoc can be found here.

Pubrec Inbound Output

The output parameter contains the modifiable PUBREC packet. You can use the PUBREC packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate values in the packet. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. More information on ways to manipulate inbound PUBREC packets can be found here.

Example Usage

This section provides examples on how to implement different PubrecInboundInterceptor and add them with the ClientContext.

Examples:

Set Up In ClientContext

This example shows how to set up a PubrecInboundInterceptor with the ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final PubrecInboundInterceptor interceptor = (pubrecInboundInput, pubrecInboundOutput) -> {
        log.debug("intercepted a PUBREC packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addPubrecInboundInterceptor(interceptor);
    });
}
...

Modify Inbound PUBREC

This example shows how to implement a PubrecInboundInterceptor that modifies parameters of an inbound PUBREC message:

public class ModifyingPubrecInboundInterceptor implements PubrecInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingPubrecInboundInterceptor.class);

    @Override
    public void onInboundPubrec(final @NotNull PubrecInboundInput pubrecInboundInput, final @NotNull PubrecInboundOutput pubrecInboundOutput) {
        //get the modifiable pubrec object from the output
        final ModifiablePubrecPacket pubrecPacket = pubrecInboundOutput.getPubrecPacket();

        // modify / overwrite parameters of a pubrec packet.

        try {
            if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
                pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {

                //Set reason string and reason code if reason code is not success.
                pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
                pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
            }

            pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
            pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Pubrec inbound interception failed:", e);
        }
    }
}

Modify Inbound PUBREC Asynchronously

This example shows how to implement a PubrecInboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an inbound PUBREC message.

If the task takes more than 10 seconds, the unmodified PUBREC is forwarded to the next interceptor or finally processed by the broker.

public class AsyncModifyingPubrecInboundInterceptor implements PubrecInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrecInboundInterceptor.class);

    @Override
    public void onInboundPubrec(final @NotNull PubrecInboundInput pubrecInboundInput, final @NotNull PubrecInboundOutput pubrecInboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<PubrecInboundOutput> async = pubrecInboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable pubrec object from the output
                final ModifiablePubrecPacket pubrecPacket = pubrecInboundOutput.getPubrecPacket();
                // modify / overwrite parameters of a pubrec packet.

                try {
                    if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
                        pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {

                        //Set reason string and reason code if reason code is not success.
                        pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
                        pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
                    }

                    pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Pubrec inbound interception failed:", e);
                }
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
                if (throwable != null){
                    log.debug("Pubrec inbound interception failed:", throwable);
                }

                //resume output to tell HiveMQ it's done.
                async.resume();
            }
        });
    }
}

Pubrec Outbound Interceptor

On the PubrecOutboundInterceptor interface, you can intercept outbound MQTT PUBREC messages. You can use the PubrecOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT PUBREC messages. For this task, HiveMQ provides PubrecOutboundInput, and PubrecOutboundOutput parameters. Extension input and output principles apply.

The PubrecOutboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

Multiple interceptors are called sequentially. The interceptor with the highest priority is called first. The PubrecOutboundOutput and PubrecOutboundInput parameters are updated after each interceptor.

The following parameters can be modified:

Parameter Description

Reason Code

The reason code which determines the status of the PUBREC.

Reason String

An UTF-8 encoded string that shows the status of the PUBREC. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Interception of an outbound PUBREC message has the following limitations:

  • You can not change a successful reason code to an unsuccessful reason code (or vice versa).

  • It is not possible to prevent sending a PUBREC message to a client.

Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable pubrec packet by the interceptor and will call the next interceptor or should no interceptor be left to call will sent the PUBREC to the client.

Pubrec Outbound Input

The input parameter contains the following unmodifiable information:

  • MQTT PUBREC packet

  • Connection information

  • Client Information

PUBREC reason code, PUBREC reason string, and MQTT user properties are MQTT 5 features. MQTT 3 PUBREC packets are represented by the same PUBREC packet, but always have a SUCCESS Reason Code. The reason string of the MQTT 3 PUBREC packet is set to 'null' and the packet does not contain any user properties.

The JavaDoc can be found here.

Pubrec Outbound Output

The output parameter contains a modifiable PUBREC packet. The following properties can be modified: You can use the PUBREC packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate the packets values. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. More information on ways to manipulate outbound PUBREC packets can be found here.

Example Usage

This section provides examples on how to implement different PubrecOutboundInterceptor and add them with the ClientContext.

Examples:

Set Up In ClientContext

This example shows how to set up a PubrecOutboundInterceptor with the ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final PubrecOutboundInterceptor interceptor = (pubrecOutboundInput, pubrecOutboundOutput) -> {
        log.debug("intercepted a PUBREC packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addPubrecOutboundInterceptor(interceptor);
    });
}
...

Modify Outbound PUBREC

This example shows how to implement a PubrecOutboundInterceptor that modifies parameters of an outbound PUBREC message:

public class ModifyingPubrecOutboundInterceptor implements PubrecOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingPubrecOutboundInterceptor.class);

    @Override
    public void onOutboundPubrec(final @NotNull PubrecOutboundInput pubrecOutboundInput, final @NotNull PubrecOutboundOutput pubrecOutboundOutput) {
        //get the modifiable pubrec object from the output
        final ModifiablePubrecPacket pubrecPacket = pubrecOutboundOutput.getPubrecPacket();

        // modify / overwrite parameters of a pubrec packet.

        try {
            if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
                pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {

                //Set reason string and reason code if reason code is not success.
                pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
                pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
            }

            pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
            pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Pubrec outbound interception failed:", e);
        }
    }
}

Modify Outbound PUBREC Asynchronously

This example shows how to implement a PubrecOutboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an outbound PUBREC message.

If the task takes more than 10 seconds, the unmodified PUBREC is forwarded to the next interceptor or finally sent to the client.

public class AsyncModifyingPubrecOutboundInterceptor implements PubrecOutboundPubrecInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrecOutboundInterceptor.class);

    @Override
    public void onOutboundPubrec(final @NotNull PubrecOutboundInput pubrecOutboundInput, final @NotNull PubrecOutboundOutput pubrecOutboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<PubrecOutboundOutput> async = pubrecOutboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable pubrec object from the output
                final ModifiablePubrecPacket pubrecPacket = pubrecOutboundOutput.getPubrecPacket();
                // modify / overwrite parameters of a pubrec packet.

                try {
                    if (pubrecPacket.getReasonCode() != AckReasonCode.SUCCESS&&
                        pubrecPacket.getReasonCode() != AckReasonCode.NO_MATCHING_SUBSCRIBERS) {

                        //Set reason string and reason code if reason code is not success.
                        pubrecPacket.setReasonString("The reason for the unsuccessful pubrec");
                        pubrecPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
                    }

                    pubrecPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    pubrecPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Pubrec outbound interception failed:", e);
                }
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
                if (throwable != null){
                    log.debug("Pubrec outbound interception failed:", throwable);
                }

                //resume output to tell HiveMQ it's done.
                async.resume();
            }
        });
    }
}

Pubrel Inbound Interceptor

On the PubrelInboundInterceptor interface, you can intercept inbound MQTT PUBREL messages. You can use the PubrelInboundInterceptor to modify or analyse the data that is contained in inbound MQTT PUBACK messages. For this task, HiveMQ provides PubrelInboundInput and PubrelInboundOutput parameters. Extension input and output principles apply.

The PubrelInboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first. The PubrelInboundOutput and PubrelInboundInput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Reason String

An UTF-8 encoded string that shows the status of the PUBREL. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Interception of an inbound PUBREL message has the following limitations:

  • It is not possible to prevent sending a PUBREL message to the server.

Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable pubrel packet by the interceptor and will call the next interceptor or should no interceptor be left to call will process the PUBREL.

Pubrel Inbound Input

The input parameter contains the following unmodifiable information:

  • MQTT PUBREL packet

  • Connection information

  • Client information

PUBREL reason code, PUBREL reason string, and MQTT user properties are MQTT 5 features. MQTT 3 PUBREL packets are represented by the same PUBREL packet, but always have a SUCCESS Reason Code. The reason string of the MQTT 3 PUBREL packet is set to 'null' and the packet does not contain any user properties.

The JavaDoc can be found here.

Pubrel Inbound Output

The output parameter contains the modifiable PUBREL packet. You can use the PUBREL packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate values in the packet. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. More information on ways to manipulate inbound puback packets can be found here.

Example Usage

This section provides examples on how to implement different PubrelInboundInterceptor and add them with the ClientContext.

Examples:

Set Up In ClientContext

This example shows how to set up a PubrelInboundInterceptor with the ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final PubrelInboundInterceptor interceptor = (pubrelInboundInput, pubrelInboundOutput) -> {
        log.debug("intercepted a PUBREL packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addPubrelInboundInterceptor(interceptor);
    });
}
...

Modify Inbound PUBREL

This example shows how to implement a PubrelInboundInterceptor that modifies parameters of an inbound PUBREL message:

public class ModifyingPubrelInboundInterceptor implements PubrelInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingPubrelInboundInterceptor.class);

    @Override
    public void onInboundPubrel(final @NotNull PubrelInboundInput pubrelInboundInput, final @NotNull PubrelInboundOutput pubrelInboundOutput) {
        //get the modifiable pubrel object from the output
        final ModifiablePubrelPacket pubrelPacket = pubrelInboundOutput.getPubrelPacket();

        // modify / overwrite parameters of a pubrel packet.

        try {
            if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
                //Set reason string if reason code is not success.
                pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
            }

            pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
            pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Pubrel inbound interception failed:", e);
        }
    }
}

Modify Inbound PUBREL Asynchronously

This example shows how to implement a PubrelInboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an inbound PUBREL message.

If the task takes more than 10 seconds, the unmodified PUBREL is forwarded to the next interceptor or finally processed by the broker.

public class AsyncModifyingPubrelInboundInterceptor implements PubrelInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrelInboundInterceptor.class);

    @Override
    public void onInboundPubrel(final @NotNull PubrelInboundInput pubrelInboundInput, final @NotNull PubrelInboundOutput pubrelInboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<PubrelInboundOutput> async = pubrelInboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable pubrel object from the output
                final ModifiablePubrelPacket pubrelPacket = pubrelInboundOutput.getPubrelPacket();
                // modify / overwrite parameters of a pubrel packet.

                try {
                    if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
                        //Set reason string if reason code is not success.
                        pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
                    }

                    pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Pubrel inbound interception failed:", e);
                }
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
                if (throwable != null){
                    log.debug("Pubrel inbound interception failed:", throwable);
                }

                //resume output to tell HiveMQ it's done.
                async.resume();
            }
        });
    }
}

Pubrel Outbound Interceptor

On the PubrelOutboundInterceptor interface, you can intercept outbound MQTT PUBREL messages. You can use the PubrelOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT PUBREL messages. For this task, HiveMQ provides PubrelOutboundInput, and PubrelOutboundOutput parameters. Extension input and output principles apply.

The PubrelOutboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

Multiple interceptors are called sequentially. The interceptor with the highest priority is called first. The PubrelOutboundOutput and PubrelOutboundInput parameters are updated after each interceptor.

The following parameters can be modified:

Parameter Description

Reason String

An UTF-8 encoded string that shows the status of the PUBREL. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Interception of an outbound PUBREL message has the following limitations:

  • It is not possible to prevent sending a PUBREL message to a client.

Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable pubrel packet by the interceptor and will call the next interceptor or should no interceptor be left to call will sent the PUBREL to the client.

Pubrel Outbound Input

The input parameter contains the following unmodifiable information:

  • MQTT PUBREL packet

  • Connection information

  • Client information

PUBREL reason code, PUBREL reason string, and MQTT user properties are MQTT 5 features. MQTT 3 PUBREL packets are represented by the same PUBREL packet, but always have a SUCCESS Reason Code. The reason string of the MQTT 3 PUBREL packet is set to 'null' and the packet does not contain any user properties.

The JavaDoc can be found here.

Pubrel Outbound Output

The output parameter contains a modifiable PUBREL packet. The following properties can be modified: You can use the PUBREL packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate values in the packet. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. More information on ways to manipulate inbound PUBREL packets can be found here.

Example Usage

This section provides examples on how to implement different PubrelOutboundInterceptor and add them with the ClientContext.

Examples:

Set Up In ClientContext

This example shows how to set up a PubrelOutboundInterceptor with the ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final PubrelOutboundInterceptor interceptor = (pubrelOutboundInput, pubrelOutboundOutput) -> {
        log.debug("intercepted a PUBREL packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addPubrelOutboundInterceptor(interceptor);
    });
}
...

Modify Outbound PUBREL

This example shows how to implement a PubrelOutboundInterceptor that modifies parameters of an outbound PUBREL message:

public class ModifyingPubrelOutboundInterceptor implements PubrelOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingPubrelOutboundInterceptor.class);

    @Override
    public void onOutboundPubrel(final @NotNull PubrelOutboundInput pubrelOutboundInput, final @NotNull PubrelOutboundOutput pubrelOutboundOutput) {
        //get the modifiable pubrel object from the output
        final ModifiablePubrelPacket pubrelPacket = pubrelOutboundOutput.getPubrelPacket();

        // modify / overwrite parameters of a pubrel packet.

        try {
            if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
                //Set reason string if reason code is not success.
                pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
            }

            pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
            pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Pubrel outbound interception failed:", e);
        }
    }
}

Modify Outbound PUBREL Asynchronously

This example shows how to implement a PubrelOutboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an outbound PUBREL message.

If the task takes more than 10 seconds, the unmodified PUBREL is forwarded to the next interceptor or finally sent to the client.

public class AsyncModifyingPubrelOutboundInterceptor implements PubrelOutboundPubrelInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubrelOutboundInterceptor.class);

    @Override
    public void onOutboundPubrel(final @NotNull PubrelOutboundInput pubrelOutboundInput, final @NotNull PubrelOutboundOutput pubrelOutboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<PubrelOutboundOutput> async = pubrelOutboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable pubrel object from the output
                final ModifiablePubrelPacket pubrelPacket = pubrelOutboundOutput.getPubrelPacket();
                // modify / overwrite parameters of a pubrel packet.

                try {
                    if (pubrelPacket.getReasonCode() != PubrelReasonCode.SUCCESS) {
                        //Set reason string if reason code is not success.
                        pubrelPacket.setReasonString("The reason for the unsuccessful pubrel");
                    }

                    pubrelPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    pubrelPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Pubrel outbound interception failed:", e);
                }
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
                if (throwable != null){
                    log.debug("Pubrel outbound interception failed:", throwable);
                }

                //resume output to tell HiveMQ it's done.
                async.resume();
            }
        });
    }
}

Pubcomp Inbound Interceptor

On the PubcompInboundInterceptor interface, you can intercept inbound MQTT PUBCOMP messages at the moment HiveMQ receives the acknowledgement of a QoS 1 publish message from any client. You can use the PubcompInboundInterceptor to modify or analyse the data that is contained in inbound MQTT PUBCOMP messages. For this task, HiveMQ provides PubcompInboundInput and PubcompInboundOutput parameters. Extension input and output principles apply.

You can use a PubcompInboundInterceptor to modify inbound PUBCOMP messages.

The PubcompInboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

Multiple interceptors can be called sequentially. When various extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first. The PubcompInboundOutput and `PubcompInboundInput`parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Reason String

An UTF-8 encoded string that shows the status of the PUBCOMP. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Interception of an inbound PUBCOMP message has the following limitations:

  • It is not possible to prevent sending a PUBCOMP message to the server.

Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable pubcomp packet by the interceptor and will call the next interceptor or should no interceptor be left to call will process the PUBCOMP.

Pubcomp Inbound Input

The input parameter contains the following unmodifiable information:

  • PUBCOMP packet identifier

  • PUBCOMP reason string

  • MQTT user properties

PUBCOMP reason code, PUBCOMP reason string, and MQTT user properties are MQTT 5 features. MQTT 3 PUBCOMP packets are represented by the same PUBCOMP packet, but always have a SUCCESS Reason Code. The reason string of the MQTT 3 PUBCOMP packet is set to 'null' and the packet does not contain any user properties.

The JavaDoc can be found here.

Pubcomp Inbound Output

The output parameter contains the modifiable PUBCOMP packet. You can use the PUBCOMP packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate values in the packets. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. More information on ways to manipulate inbound pubcomp packets can be found here.

Example Usage

This section provides examples on how to implement different PubcompInboundInterceptor and add them with the ClientContext.

Examples:

Set Up In ClientContext

This example shows how to set up a PubcompInboundInterceptor with the ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final PubcompInboundInterceptor interceptor = (pubcompInboundInput, pubcompInboundOutput) -> {
        log.debug("intercepted a PUBCOMP packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addPubcompInboundInterceptor(interceptor);
    });
}
...

Modify Inbound PUBCOMP

This example shows how to implement a PubcompInboundInterceptor that modifies parameters of an inbound PUBCOMP message:

public class ModifyingPubcompInboundInterceptor implements PubcompInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingPubcompInboundInterceptor.class);

    @Override
    public void onInboundPubcomp(final @NotNull PubcompInboundInput pubcompInboundInput, final @NotNull PubcompInboundOutput pubcompInboundOutput) {
        //get the modifiable pubcomp object from the output
        final ModifiablePubcompPacket pubcompPacket = pubcompInboundOutput.getPubcompPacket();

        // modify / overwrite parameters of a pubcomp packet.

        try {
            if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
                //Set reason string if reason code is not success.
                pubcompPacket.setReasonString("The reason for the unsuccessful pubcomp");
            }

            pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
            pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Pubcomp inbound interception failed:", e);
        }
    }
}

Modify Inbound PUBCOMP Asynchronously

This example shows how to implement a PubcompInboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an inbound PUBCOMP message.

If the task takes more than 10 seconds, the unmodified PUBCOMP is forwarded to the next interceptor or finally processed by the broker.

public class AsyncModifyingPubcompInboundInterceptor implements PubcompInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubcompInboundInterceptor.class);

    @Override
    public void onInboundPubcomp(final @NotNull PubcompInboundInput pubcompInboundInput, final @NotNull PubcompInboundOutput pubcompInboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<PubcompInboundOutput> async = pubcompInboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable pubcomp object from the output
                final ModifiablePubcompPacket pubcompPacket = pubcompInboundOutput.getPubcompPacket();
                // modify / overwrite parameters of a pubcomp packet.

                try {
                    if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
                        //Set reason string if reason code is not success.
                        pubcompPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);
                    }

                    pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Pubcomp inbound interception failed:", e);
                }
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
                if (throwable != null){
                    log.debug("Pubcomp inbound interception failed:", throwable);
                }

                //resume output to tell HiveMQ it's done.
                async.resume();
            }
        });
    }
}

Pubcomp Outbound Interceptor

On the PubcompOutboundInterceptor interface, you can intercept outbound MQTT PUBCOMP messages. You can use the PubcompOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT PUBCOMP messages. For this task, HiveMQ provides PubcompOutboundInput, and PubcompOutboundOutput parameters. Extension input and output principles apply.

The PubcompOutboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

Multiple interceptors are called sequentially. The interceptor with the highest priority is called first. The PubcompOutboundOutput and PubcompOutboundInput parameters are updated after each interceptor.

The following parameters can be modified:

Parameter Description

Reason String

An UTF-8 encoded string that shows the status of the PUBCOMP. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Interception of an outbound PUBCOMP message has the following limitations:

  • It is not possible to prevent sending a PUBCOMP message to a client.

Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable pubrec packet by the interceptor and will call the next interceptor or should no interceptor be left to call will sent the PUBCOMP to the client.

Pubcomp Outbound Input

The input parameter contains the following unmodifiable information:

  • PUBCOMP packet

  • Connection information

  • Client information

PUBCOMP reason code, PUBCOMP reason string, and MQTT user properties are MQTT 5 features. MQTT 3 PUBCOMP packets are represented by the same PUBCOMP packet, but always have a SUCCESS Reason Code. The reason string of the MQTT 3 PUBCOMP packet is set to 'null' and the packet does not contain any user properties.

The JavaDoc can be found here.

Pubcomp Outbound Output

The output parameter contains the modifiable PUBCOMP packet. You can use the PUBCOMP packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate values in the packets. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. More information on ways to manipulate inbound pubcomp packets can be found here.

Example Usage

This section provides examples on how to implement different PubcompOutboundInterceptor and add them with the ClientContext.

Examples:

Set Up In ClientContext

This example shows how to set up a PubcompOutboundInterceptor with the ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final PubcompOutboundInterceptor interceptor = (pubcompOutboundInput, pubcompOutboundOutput) -> {
        log.debug("intercepted a PUBCOMP packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addPubcompOutboundInterceptor(interceptor);
    });
}
...

Modify Outbound PUBCOMP

This example shows how to implement a PubcompOutboundInterceptor that modifies parameters of an outbound PUBCOMP message:

public class ModifyingPubcompOutboundInterceptor implements PubcompOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingPubcompOutboundInterceptor.class);

    @Override
    public void onOutboundPubcomp(final @NotNull PubcompOutboundInput pubcompOutboundInput, final @NotNull PubcompOutboundOutput pubcompOutboundOutput) {
        //get the modifiable pubcomp object from the output
        final ModifiablePubcompPacket pubcompPacket = pubcompOutboundOutput.getPubcompPacket();

        // modify / overwrite parameters of a pubcomp packet.

        try {
            if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
                //Set reason string if reason code is not success.
                pubcompPacket.setReasonString("The reason for the unsuccessful pubcomp");
            }

            pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
            pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Pubcomp outbound interception failed:", e);
        }
    }
}

Modify Outbound PUBCOMP Asynchronously

This example shows how to implement a PubcompOutboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an outbound PUBCOMP message.

If the task takes more than 10 seconds, the unmodified PUBCOMP is forwarded to the next interceptor or finally sent to the client.

public class AsyncModifyingPubcompOutboundInterceptor implements PubcompOutboundPubcompInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingPubcompOutboundInterceptor.class);

    @Override
    public void onOutboundPubcomp(final @NotNull PubcompOutboundInput pubcompOutboundInput, final @NotNull PubcompOutboundOutput pubcompOutboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<PubcompOutboundOutput> async = pubcompOutboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable pubcomp object from the output
                final ModifiablePubcompPacket pubcompPacket = pubcompOutboundOutput.getPubcompPacket();
                // modify / overwrite parameters of a pubcomp packet.

                try {
                    if (pubcompPacket.getReasonCode() != PubcompReasonCode.SUCCESS) {
                        //Set reason string if reason code is not success.
                        pubcompPacket.setReasonString("The reason for the unsuccessful pubcomp");
                    }

                    pubcompPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    pubcompPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Pubcomp outbound interception failed:", e);
                }
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
                if (throwable != null){
                    log.debug("Pubcomp outbound interception failed:", throwable);
                }

                //resume output to tell HiveMQ it's done.
                async.resume();
            }
        });
    }
}

Subscribe Inbound Interceptor

On the SubscribeInboundInterceptor interface, you can intercept inbound MQTT Subscribe messages. You can use the SubscribeInboundInterceptor to modify or analyse the data that is contained in inbound MQTT Subscribe messages. For this task, HiveMQ provides SubscribeInboundInput, and SubscribeInboundOutput parameters. Extension input and output principles apply.

The interceptor can be set up in the ClientContext.

The SubscribeInboundOutput parameter can easily be used for asynchronous processing. By default, when the asynchronous processing times out, the SUBSCRIBE is rejected and a SUBACK message with the UNSPECIFIED_ERROR reason code is sent to the client.

Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.

Changes from the SubscribeInboundOutput parameter are reflected in the SubscribeInboundInput parameter of the next interceptor.

The following parameters can be modified:

Parameter Description

Subscriptions

The list of topic filters and additional options that are associated with the subscriptions attributes.

Subscription Identifier

The subscription identifier for the SUBSCRIBE packet. For MQTT 3 clients, this entry is always empty.

User Properties

Additional diagnostic or other information (optional).

Subscribe Inbound Input

The SubscribeInboundInput parameter contains the following unmodifiable information.

  • MQTT SUBSCRIBE packet

  • Connection information

  • Client information

The JavaDoc can be found here.

Subscribe Inbound Output

The SubscribeInboundOutput parameter contains the modifiable SUBSCRIBE packet. You can use the SUBSCRIBE packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate values in packets. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. For more information on ways to manipulate inbound subscribe packets, see the JavaDoc here.

Example Usage

Set Up a SubscribeInboundInterceptor in a ClientInitializer

...

@Override
public void extensionStart(final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {

    // create a new subscribe inbound interceptor
    final SubscribeInboundInterceptor subscribeInboundInterceptor = new SubscribeInboundInterceptor() {
        @Override
        public void onInboundSubscribe(
                final @NotNull SubscribeInboundInput subscribeInboundInput,
                final @NotNull SubscribeInboundOutput subscribeInboundOutput) {
            // do something with the subscribe, for example logging
            log.debug("Inbound subscribe message intercepted from client: "
                    + subscribeInboundInput.getClientInformation().getClientId());
        }
    };

    // create a new client initializer
    final ClientInitializer clientInitializer = new ClientInitializer() {
        @Override
        public void initialize(
                final @NotNull InitializerInput initializerInput,
                final @NotNull ClientContext clientContext) {
            // add the interceptor to the context of the connecting client
            clientContext.addSubscribeInboundInterceptor(subscribeInboundInterceptor);
        }
    };

    // register the client initializer
    Services.initializerRegistry().setClientInitializer(clientInitializer);
}

...

Modify a SUBSCRIBE Message

This example shows how to implement a SubscribeInboundInterceptor that modifies all parameters of an inbound SUBSCRIBE message:

public class ModifyingSubscribeInboundInterceptor implements SubscribeInboundInterceptor {

    @Override
    public void onInboundSubscribe(
            final @NotNull SubscribeInboundInput subscribeInboundInput,
            final @NotNull SubscribeInboundOutput subscribeInboundOutput) {

        // get the modifiable subscribe packet from the output
        final ModifiableSubscribePacket subscribe = subscribeInboundOutput.getSubscribePacket();

        // modify / overwrite any parameter of the subscribe packet
        final ModifiableSubscription subscription = subscribe.getSubscriptions().get(0);
        subscription.setTopicFilter("modified-topic");
        subscription.setQos(Qos.AT_LEAST_ONCE);
        subscription.setRetainHandling(RetainHandling.DO_NOT_SEND);
        subscription.setRetainAsPublished(true);
        subscription.setNoLocal(true);
        subscribe.getUserProperties().addUserProperty("additional", "value");
    }
}

Modify a SUBSCRIBE Message Asynchronously

This example shows how to implement a SubscribeInboundInterceptor that asynchronously modifies an inbound SUBSCRIBE message with the Managed Extension Executor Service.

public class ModifyingAsyncSubscribeInboundInterceptor implements SubscribeInboundInterceptor {

    @Override
    public void onInboundSubscribe(
            final @NotNull SubscribeInboundInput subscribeInboundInput,
            final @NotNull SubscribeInboundOutput subscribeInboundOutput) {

        // make output asynchronous with a timeout duration of 10 seconds and the timeout fallback failure
        final Async<SubscribeInboundOutput> asyncOutput =
                subscribeInboundOutput.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);

        // submit external task to the extension executor service
        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(() -> {
            // get the modifiable subscribe packet from the output
            final ModifiableSubscribePacket subscribe = subscribeInboundOutput.getSubscribePacket();

            // call external task that modifies the subscribe packet (method not provided)
            callExternalTask(subscribe);
        });

        // add a callback for completion of the task
        taskFuture.whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                throwable.printStackTrace(); // please use more sophisticated logging
            }

            // resume output to tell HiveMQ that asynchronous precessing is done
            asyncOutput.resume();
        });
    }
}

Suback Outbound Interceptor

With the SubackOutboundInterceptor interface, you can intercept outbound MQTT UNSUBACK messages. You can use the SubackOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT SUBACK messages. For this task, HiveMQ provides SubackOutboundInput and SubackOutboundOutput parameters. Extension input and output principles apply.

The SubackOutboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

When an asynchronous SubackOutboundOutput parameter times out, all pending changes to the SUBACK are discarded and the unmodified SUBACK is sent to the client.

Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.

The SubackOutboundInput and SubackOutboundOutput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Reason Code

The cause of the disconnection. Clients can use this information to determine whether to retry the connection, and how long to wait before a reconnection attempt.

Reason String

A UTF-8 encoded string that shows the cause of the disconnect. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Interception of an outbound SUBACK message has the following limitations:

  • You can not change a successful reason code to an unsuccessful reason code (or vice versa).

  • It is not possible to prevent sending an SUBACK message to a client.

When an interceptor throws an exception or causes a timeout using the async method, then HiveMQ will ignore changes to the modifiable SUBACK packet made by the interceptor and will call the next interceptor. If there are no further interceptors, the SUBACK is sent to the client.

Suback Outbound Input

The input parameter contains the following unmodifiable information:

  • MQTT DISCONNECT packet

  • Connection information

  • Client Information

The JavaDoc can be found here.

Suback Outbound Output

The output parameter contains the modifiable SUBACK packet. You can use the SUBACK packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate the packets values. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. For more information on ways to manipulate outbound suback packets, see here.

Example Usage

This section provides examples on how to implement different SubackOutboundInterceptor and add them with the ClientContext.

Examples:

Set Up Interceptor in ClientContext

This example shows how to set up a SubackOutboundInterceptor with the help of a ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final SubackOutboundInterceptor interceptor = (subackOutboundInput, subackOutboundOutput) -> {
        log.debug("intercepted a SUBACK packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addSubackOutboundInterceptor(interceptor);
    });
}
...

Modify Outbound SUBACK

This example shows how to implement a SubackOutboundInterceptor that modifies the parameters of an outbound SUBACK message:

public class ModifyingSubackOutboundInterceptor implements SubackOutboundInterceptor {

    @Override
    public void onOutboundSuback(final @NotNull SubackOutboundInput subackOutboundInput, final @NotNull SubackOutboundOutput subackOutboundOutput) {
        //get the modifiable suback object from the output
        final ModifiableSubackPacket subackPacket = subackOutboundOutput.getSubackPacket();

        subackPacket.setReasonString("The reason for the unsuccessful suback");
        subackPacket.setReasonCode(DisconnectReasonCode.MALFORMED_PACKET);

        subackPacket.getUserProperties().addUserProperty("my-prop", "some value");
        subackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
    }
}

Modify Outbound SUBACK Asynchronously

This example shows how to implement a SubackOutboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an outbound SUBACK message.

If the task takes more than 10 seconds, the unmodified SUBACK is forwarded to the next interceptor or finally sent to the client.

public class AsyncModifyingSubackOutboundInterceptor implements SubackOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingSubackOutboundInterceptor.class);

    @Override
    public void onOutboundSuback(final @NotNull SubackOutboundInput subackOutboundInput, final @NotNull SubackOutboundOutput subackOutboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<SubackOutboundOutput> async = subackOutboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable suback object from the output
                final ModifiableSubackPacket subackPacket = subackOutboundOutput.getSubackPacket();

                // modify / overwrite parameters of a suback packet.
                try {
                    //Set reason string and reason code.
                    subackPacket.setReasonString("The reason for the unsuccessful Suback");
                    subackPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);

                    subackPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    subackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Suback outbound interception failed:", e);
                }
                async.resume();
            }
        });
    }
}

Unsubscribe Inbound Interceptor

With the UnsubscribeInboundInterceptor interface, you can intercept inbound MQTT UNSUBSCRIBE messages. You can use the UnsubscribeInboundInterceptor to modify or analyse the data that is contained in inbound MQTT UNSUBSCRIBE messages. For this task, HiveMQ provides UnsubscribeInboundInput and UnsubscribeInboundOutput parameters. Extension input and output principles apply.

The UnsubscribeInboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.

The UnsubscribeInboundInput and UnsubscribeInboundOutput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Topic Filters

Lists the topics filters from which to unsubscribe the client.

User Properties

Additional diagnostic or other information.

When an interceptor throws an exception or causes a timeout using the async method, then HiveMQ will prevent the unsubscribe and will return an UNSUBACK packet to the client. For an MQTT 5 client the UNSUBACK will contain the reason code Unspecified error for each topic filter.

Unsubscribe Inbound Input

The input parameter contains the following unmodifiable information:

  • MQTT UNSUBSCRIBE packet

  • Connection information

  • Client information

The JavaDoc can be found here.

Unsubscribe Inbound Output

The output parameter contains the modifiable UNSUBSCRIBE packet. You can use the UNSUBSCRIBE packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate values in the packet. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. For more information on ways to manipulate inbound unsubscribe packets, see here.

Example Usage

This section provides examples on how to implement different UnsubscribeInboundInterceptor and add them with the ClientContext.

Examples:

Set Up Interceptor in Client Context

This example shows how to set up an UnsubscribeInboundInterceptor with the help of a ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final UnsubscribeInboundInterceptor interceptor = (unsubscribeInboundInput, unsubscribeInboundOutput) -> {
        log.info("intercepted a UNSUBSCRIBE packet for client {}.", unsubscribeInboundInput.getClientInformation().getClientId());
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addUnsubscribeInboundInterceptor(interceptor);
    });
}
...

Modify Inbound UNSUBSCRIBE

This example shows how to implement an UnsubscribeInboundInterceptor that modifies the parameters of an inbound UNSUBSCRIBE message:

public class ModifyingUnsubscribeInboundInterceptor implements UnsubscribeInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingUnsubscribeInboundInterceptor.class);

    @Override
    public void onInboundUnsubscribe(final @NotNull UnsubscribeInboundInput unsubscribeInboundInput, final @NotNull UnsubscribeInboundOutput unsubscribeInboundOutput) {
        //get the modifiable disconnect object from the output
        final ModifiableUnsubscribePacket unsubscribePacket = unsubscribeInboundOutput.getUnsubscribePacket();
        // modify / overwrite parameters of the disconnect packet.
        try {

            unsubscribePacket.getUserProperties().addUserProperty("my-prop", "some value");
            unsubscribePacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Unsubscribe inbound interception failed:", e);
        }
    }
}

Modify Inbound UNSUBSCRIBE Asynchronously

This example shows how to implement an UnsubscribeInboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an inbound UNSUBSCRIBE message.

public class AsyncModifyingUnsubscribeInboundInterceptor implements UnsubscribeInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingUnsubscribeInboundInterceptor.class);

    @Override
    public void onInboundUnsubscribe(final @NotNull UnsubscribeInboundInput unsubscribeInboundInput, final @NotNull UnsubscribeInboundOutput unsubscribeInboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<UnsubscribeInboundOutput> async = unsubscribeInboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable unsubscribe object from the output
                final ModifiableUnsubscribePacket unsubscribePacket = unsubscribeInboundOutput.getUnsubscribePacket();

                // modify / overwrite parameters of a unsubscribe packet.
                try {
                    //Set reason string and reason code.
                    unsubscribePacket.setReasonString("The reason for the unsuccessful puback");
                    unsubscribePacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);

                    unsubscribePacket.getUserProperties().addUserProperty("my-prop", "some value");
                    unsubscribePacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Unsubscribe inbound interception failed:", e);
                }

                async.resume();
            }
        });
    }
}

Unsuback Outbound Interceptor

All changeable properties in the UNSUBACK packet are MQTT 5 properties and will not be sent to MQTT 3 clients. Therefore, to optimize resource usage, the UNSUBACK packet should not be modified for MQTT 3 clients.

With the UnsubackOutboundInterceptor interface, you can intercept outbound MQTT UNSUBACK messages. You can use the UnsubackOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT UNSUBACK messages. For this task, HiveMQ provides UnsubackOutboundInput and UnsubackOutboundOutput parameters. Extension input and output principles apply.

The UnsubackOutboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

When an asynchronous UnsubackOutboundOutput parameter times out, all pending changes to the UNSUBACK are discarded and the unmodified UNSUBACK is sent to the client.

Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors, HiveMQ calls the interceptor with the highest priority first.

The UnsubackOutboundInput and UnsubackOutboundOutput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Reason Codes

The list of reason codes that correspond to the different topic filters that are used in the unsubscribe package.

Reason String

An UTF-8 encoded string that shows the cause of the UNSUBACK. This reason string is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information (optional).

Interception of an outbound UNSUBACK message has the following limitations:

  • You can not change a successful reason code to an unsuccessful reason code (or vice versa).

  • It is not possible to prevent sending an UNSUBACK message to a client.

When an interceptor throws an exception or causes a timeout using the async method, then HiveMQ will ignore changes to the modifiable UNSUBACK packet made by the interceptor and will call the next interceptor. If there are no further interceptors, the UNSUBACK is sent to the client.

Unsuback Outbound Input

The input parameter contains the following unmodifiable information:

  • MQTT UNSUBACK packet

  • Connection information

  • Client information

The JavaDoc can be found here.

Unsuback Outbound Output

The output parameter contains the modifiable UNSUBACK packet. You can use the UNSUBACK packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate values in the packet. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. For more information on ways to manipulate inbound unsuback packets, see here.

Example Usage

This section provides examples on how to implement different UnsubackOutboundInterceptor and add them with the ClientContext.

Examples:

Set Up Interceptor in Client Context

This example shows how to set up a UnsubackOutboundInterceptor with the help of a ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final UnsubackOutboundInterceptor interceptor = (unsubackOutboundInput, unsubackOutboundOutput) -> {
        log.debug("intercepted a UNSUBACK packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addUnsubackOutboundInterceptor(interceptor);
    });
}
...

Modify Outbound UNSUBACK

This example shows how to implement an UnsubackOutboundInterceptor that modifies the parameters of an outbound UNSUBACK message:

Complete example for the usage of a unsuback outbound interceptor
public class ModifyingUnsubackOutboundInterceptor implements UnsubackOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingUnsubackOutboundInterceptor.class);

    @Override
    public void onOutboundUnsuback(final @NotNull UnsubackOutboundInput unsubackOutboundInput, final @NotNull UnsubackOutboundOutput unsubackOutboundOutput) {
        //get the modifiable unsuback object from the output
        final ModifiableUnsubackPacket unsubackPacket = unsubackOutboundOutput.getUnsubackPacket();

        // modify / overwrite parameters of the unsuback packet.
        try {
            //Set reason string and reason code if reason code is not success.
            unsubackPacket.setReasonString("The reason for the unsuccessful unsuback");
            List<UnsubackReasonCode> reasonCodes = new ArrayList<>();
            reasonCodes.add(UnsubackReasonCode.SUCCESS);
            unsubackPacket.setReasonCodes(reasonCodes);

            unsubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
            unsubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Unsuback outbound interception failed:", e);
        }
    }
}

Modify Outbound UNSUBACK Asynchronously

This example shows how to implement an UnsubackOutboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an outbound UNSUBACK message.

If the task takes more than 10 seconds, the unmodified UNSUBACK is sent to the client.

Complete example for the usage of a unsuback outbound interceptor
public class AsyncModifyingUnsubackOutboundInterceptor implements UnsubackOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingUnsubackOutboundInterceptor.class);

    @Override
    public void onOutboundUnsuback(final @NotNull UnsubackOutboundInput unsubackOutboundInput, final @NotNull UnsubackOutboundOutput unsubackOutboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<UnsubackOutboundOutput> async = unsubackOutboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable unsuback object from the output
                final ModifiableOutboundPacket unsubackPacket = unsubackOutboundOutput.getUnsubackPacket();

                // modify / overwrite parameters of a unsuback packet.
                try {
                    //Set reason string and reason code.
                    unsubackPacket.setReasonString("The reason for the unsuccessful puback");
                    List<UnsubackReasonCode> reasonCodes = new ArrayList<>();
                    reasonCodes.add(UnsubackReasonCode.SUCCESS);
                    unsubackPacket.setReasonCodes(reasonCodes);

                    unsubackPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    unsubackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Unsuback outbound interception failed:", e);
                }
                async.resume();
            }
        });
    }
}

Disconnect Inbound Interceptor

On the DisconnectInboundInterceptor interface, you can intercept inbound MQTT DISCONNECT messages. You can use the DisconnectInboundInterceptor to modify or analyse the data that is contained in inbound MQTT DISCONNECT messages. For this task, HiveMQ provides DisconnectInboundInput, and DisconnectInboundOutput parameters. Extension input and output principles apply.

The DisconnectInboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

When an asynchronous DisconnectInboundOutput parameter times out, all pending changes to the DISCONNECT are discarded and the unmodified DISCONNECT is sent to the broker.

Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.

The DisconnectInboundInput and DisconnectInboundOutput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Reason Code

The cause of the disconnection. Clients can use this information to determine whether to retry the connection, and how long to wait before a reconnection attempt.

Reason String

An UTF-8 encoded string that shows the cause of the disconnect. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Session Expiry Interval

The amount of time (in seconds) after the client disconnects that the session data of the client is still valid on the HiveMQ broker. The time is counted from the moment that the client disconnects. If the Session Expiry Interval of the corresponding Connect packet is set to zero, the Session Expiry Interval of the Disconnect must also be set to zero.

Interception of an inbound DISCONNECT message has the following limitations:

  • You can not change a successful reason code to an unsuccessful reason code (or vice versa).

  • It is not possible to prevent sending a DISCONNECT message to the server.

Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable disconnect packet by the interceptor and will call the next interceptor or should no interceptor be left to call will process the disconnect request by the client.

Disconnect Inbound Input

The input parameter contains the following unmodifiable information:

  • MQTT DISCONNECT packet

  • Connection information

  • Client Information

The JavaDoc can be found here.

Disconnect Inbound Output

The output parameter contains the modifiable DISCONNECT packet. You can use the DISCONNECT packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate values in the packet. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. For more information on ways to manipulate inbound disconnect packets, see here.

Example Usage

This section provides examples on how to implement different DisconnectInboundInterceptor and add them with the ClientContext.

Examples:

Set Up Interceptor in ClientContext

This example shows how to set up a DisconnectInboundInterceptor with the help of a ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final DisconnectInboundInterceptor interceptor = (disconnectInboundInput, disconnectInboundOutput) -> {
        log.debug("intercepted a DISCONNECT packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addDisconnectInboundInterceptor(interceptor);
    });
}
...

Modify Inbound DISCONNECT

This example shows how to implement a DisconnectInboundInterceptor that modifies the parameters of an inbound DISCONNECT message:

Complete example for the usage of a disconnect inbound interceptor
public class ModifyingDisconnectInboundInterceptor implements DisconnectInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingDisconnectInboundInterceptor.class);

    @Override
    public void onInboundDisconnect(final @NotNull DisconnectInboundInput disconnectInboundInput, final @NotNull DisconnectInboundOutput disconnectInboundOutput) {
        //get the modifiable disconnect object from the output
        final ModifiableInboundDisconnectPacket disconnectPacket = disconnectInboundOutput.getDisconnectPacket();

        // modify / overwrite parameters of the disconnect packet.
        try {
            if (disconnectPacket.getReasonCode() != DisconnectReasonCode.NORMAL_DISCONNECTION) {
                //Set reason string and reason code if reason code is not success.
                disconnectPacket.setReasonString("The reason for the unsuccessful disconnect");
                disconnectPacket.setReasonCode(DisconnectReasonCode.MALFORMED_PACKET);
            }

            disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
            disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Disconnect inbound interception failed:", e);
        }
    }
}

Modify Inbound DISCONNECT Asynchronously

This example shows how to implement a DisconnectInboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an inbound DISCONNECT message.

If the task takes more than 10 seconds, the unmodified DISCONNECT is sent to the client.

Complete example for asynchronous usage of a disconnect inbound interceptor
public class AsyncModifyingDisconnectInboundInterceptor implements DisconnectInboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingDisconnectInboundInterceptor.class);

    @Override
    public void onInboundDisconnect(final @NotNull DisconnectInboundInput disconnectInboundInput, final @NotNull DisconnectInboundOutput disconnectInboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<DisconnectInboundOutput> async = disconnectInboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable disconnect object from the output
                final ModifiableDisconnectInboundPacket disconnectPacket = disconnectInboundOutput.getDisconnectPacket();

                // modify / overwrite parameters of a disconnect packet.
                try {
                    //Set reason string and reason code.
                    disconnectPacket.setReasonString("The reason for the unsuccessful puback");
                    disconnectPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);

                    disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Disconnect inbound interception failed:", e);
                }

                async.resume();
            }
        });
    }
}

Modify Session Expiry Interval

This example shows the precautions to take when you modify the session expiry interval. It is not possible to change the session expiry interval of clients that connect with a session expiry value of 0.

public class SessionExpiryDisconnectInboundInterceptor implements DisconnectInboundInterceptor {

    @Override
    public void onInboundDisconnect(final @NotNull DisconnectInboundInput disconnectInboundInput, final @NotNull DisconnectInboundOutput disconnectInboundOutput) {
        if (disconnectInboundInput.getDisconnectPacket().getSessionExpiryInterval().isPresent()) {
            if (disconnectInboundInput.getDisconnectPacket().getSessionExpiryInterval().get() != 0) {
                disconnectInboundOutput.getDisconnectPacket().setSessionExpiryInterval(10L);
            }
        }
    }
}

Disconnect Outbound Interceptor

Server sent DISCONNECT packets only exist in MQTT 5, this interceptor will not be called for MQTT 3 clients.

On the DisconnectOutboundInterceptor interface, you can intercept outbound MQTT DISCONNECT messages. You can use the DisconnectOutboundInterceptor to modify or analyse the data that is contained in outbound MQTT DISCONNECT messages. For this task, HiveMQ provides DisconnectOutboundInput, and DisconnectOutboundOutput parameters. Extension input and output principles apply.

The DisconnectOutboundOutput parameter can easily be used for asynchronous processing.

The interceptor can be set up in the ClientContext.

When an asynchronous DisconnectOutboundOutput parameter times out, all pending changes to the DISCONNECT are discarded and the unmodified DISCONNECT is sent to the client.

Multiple interceptors can be called sequentially. For example, when various extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.

The DisconnectOutboundInput and DisconnectOutboundOutput parameters are updated after each interception.

The following parameters can be modified:

Parameter Description

Reason Code

The cause of the disconnection. Clients can use this information to determine whether to retry the connection, and how long to wait before a reconnection attempt.

Reason String

An UTF-8 encoded string that shows the cause of the disconnect. This Reason String is human readable and designed for diagnostics.

User Properties

Additional diagnostic or other information.

Server Reference

Specifies an alternative broker to which the client redirects on Disconnect .

Interception of an outbound DISCONNECT message has the following limitations:

  • You can not change a successful reason code to an unsuccessful reason code (or vice versa).

  • It is not possible to prevent sending a DISCONNECT message to a client.

Should an interceptor throw an exception or cause a timeout using the async method, then HiveMQ will ignore changes made to the modifiable disconnect packet by the interceptor and will call the next interceptor or should no interceptor be left to call will disconnect the client.

Disconnect Outbound Input

The input parameter contains the following unmodifiable information:

  • MQTT DISCONNECT packet

  • Connection information

  • Client Information

The JavaDoc can be found here.

Disconnect Outbound Output

The output parameter contains the modifiable DISCONNECT packet. You can use the DISCONNECT packet to manipulate the parameters that are listed in the table above. Use caution when you manipulate the packets values. Some values cause exceptions to be thrown. For example, null or non-UTF8 characters. For more information on ways to manipulate inbound disconnect packets, see here.

Example Usage

This section provides examples on how to implement different DisconnectOutboundInterceptor and add them with the ClientContext.

Examples:

Set Up Interceptor in ClientContext

This example shows how to set up a DisconnectOutboundInterceptor with the help of a ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final DisconnectOutboundInterceptor interceptor = (disconnectOutboundInput, disconnectOutboundOutput) -> {
        log.debug("intercepted a DISCONNECT packet.");
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addDisconnectOutboundInterceptor(interceptor);
    });
}
...

Modify Outbound DISCONNECT

This example shows how to implement a DisconnectOutboundInterceptor that modifies the parameters of an outbound DISCONNECT message:

Complete example for the usage of a disconnect outbound interceptor
public class ModifyingDisconnectOutboundInterceptor implements DisconnectOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingDisconnectOutboundInterceptor.class);

    @Override
    public void onOutboundDisconnect(final @NotNull DisconnectOutboundInput disconnectOutboundInput, final @NotNull DisconnectOutboundOutput disconnectOutboundOutput) {
        //get the modifiable disconnect object from the output
        final ModifiableOutboundDisconnectPacket disconnectPacket = disconnectOutboundOutput.getDisconnectPacket();

        // modify / overwrite parameters of the disconnect packet.
        try {
            if (disconnectPacket.getReasonCode() != DisconnectReasonCode.NORMAL_DISCONNECTION) {
                //Set reason string and reason code if reason code is not success.
                disconnectPacket.setReasonString("The reason for the unsuccessful disconnect");
                disconnectPacket.setReasonCode(DisconnectReasonCode.MALFORMED_PACKET);
            }

            disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
            disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");
        } catch (final Exception e){
            log.debug("Disconnect outbound interception failed:", e);
        }
    }
}

Modify Outbound DISCONNECT Asynchronously

This example shows how to implement a DisconnectOutboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an outbound DISCONNECT message.

If the task takes more than 10 seconds, the unmodified DISCONNECT is sent to the client.

Complete example for the usage of a disconnect outbound interceptor
public class AsyncModifyingDisconnectOutboundInterceptor implements DisconnectOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingDisconnectOutboundInterceptor.class);

    @Override
    public void onOutboundDisconnect(final @NotNull DisconnectOutboundInput disconnectOutboundInput, final @NotNull DisconnectOutboundOutput disconnectOutboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<DisconnectOutboundOutput> async = disconnectOutboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable disconnect object from the output
                final ModifiableDisconnectOutboundPacket disconnectPacket = disconnectOutboundOutput.getDisconnectPacket();

                // modify / overwrite parameters of a disconnect packet.
                try {
                    //Set reason string and reason code.
                    disconnectPacket.setReasonString("The reason for the unsuccessful puback");
                    disconnectPacket.setReasonCode(AckReasonCode.UNSPECIFIED_ERROR);

                    disconnectPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    disconnectPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (final Exception e){
                    log.debug("Disconnect outbound interception failed:", e);
                }
                async.resume();
            }
        });
    }
}

Pingreq Inbound Interceptor

With the PingReqInboundInterceptor interface, you can intercept inbound MQTT PINGREQ messages. You can use the PingReqInboundInterceptor to monitor sent PINGRESP packets. For this task, HiveMQ provides PingReqInboundInput and PingReqInboundOutput parameters. Extension input and output principles apply.

The interceptor can be set up in the ClientContext.

Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.

It is not possible to prevent sending an inbound PINGREQ.

Pingreq Inbound Input

The input parameter contains the following unmodifiable information:

  • Connection information

  • Client Information

The JavaDoc can be found here.

Pingreq Inbound Output

Since PINGREQ message are static by nature, there is no way of modifying any of its values.

Example Usage

This section provides examples on how to implement different PingReqInboundInterceptor and add them with the ClientContext.

Examples:

Set Up Interceptor in ClientContext

This example shows how to set up a PingReqInboundInterceptor with the help of a ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final PingReqInboundInterceptor interceptor = (pingReqInboundInput, pingReqInboundOutput) -> {

        final String clientId = pingReqInboundInput.getClientInformation().getClientId();
        log.info("intercepted a PINGREQ packet of client '{}'.", clientId);
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addPingReqInboundInterceptor(interceptor);
    });
}
...

Pingresp Outbound Interceptor

With the PingRespOutboundInterceptor interface, you can intercept outbound MQTT PINGRESP messages. You can use the PingRespOutboundInterceptor to monitor sent PINGRESP packets. For this task, HiveMQ provides PingRespOutboundInput and PingRespOutboundOutput parameters. Extension input and output principles apply.

The interceptor can be set up in the ClientContext.

Multiple interceptors are called sequentially. For example, when multiple extensions provide interceptors HiveMQ calls the interceptor with the highest priority first.

It is not possible to prevent sending an outbound PINGRESP.

Pingresp Outbound Input

The input parameter contains the following unmodifiable information:

  • Connection information

  • Client Information

The JavaDoc can be found here.

Pingresp Outbound Output

Since PINGRESP message are static by nature, there is no way of modifying any of its values.

Example Usage

This section provides examples on how to implement different PingRespOutboundInterceptor and add them with the ClientContext.

Examples:

Set Up Interceptor in ClientContext

This example shows how to set up a PingRespOutboundInterceptor with the help of a ClientContext and the ClientInitializer:

...
@Override
public void extensionStart(@NotNull final ExtensionStartInput esi, @NotNull final ExtensionStartOutput eso) {

    final PingRespOutboundInterceptor interceptor = (pingRespOutboundInput, pingRespOutboundOutput) -> {
        final String clientId = pingRespOutboundInput.getClientInformation().getClientId();
        log.info("intercepted a PINGRESP packet of client '{}'.", clientId);
    };

    Services.initializerRegistry().setClientInitializer((initializerInput, clientContext) -> {
        clientContext.addPingRespOutboundInterceptor(interceptor);
    });
}
...