HiveMQ Enterprise Extension for Amazon Kinesis Customization
Version 4.14.0 of HiveMQ and the Enterprise Extension for Amazon Kinesis introduces the HiveMQ Amazon Kinesis Extension Customization SDK.
 
Our flexible new API gives you the ability to programmatically specify sophisticated custom handling of message transformations bidirectionally between HiveMQ and the Amazon Kinesis Data Streams service.
Features
The HiveMQ Amazon Kinesis Extension Customization SDK gives you fine-grained control over message content, topics, and the bidirectional flow of exchanged messages between HiveMQ and Amazon Kinesis Data Streams.
Requirements
- 
Java 11 
- 
Java IDE that supports Gradle (for example, IntelliJ) 
- 
HiveMQ Platform Professional or Enterprise Edition version 4.14.0 or higher 
- 
Active AWS account with access to an Amazon Kinesis Data Streams service account 
Quick Start with Customization SDK
The Customization SDK for the HiveMQ Enterprise Extension for Amazon Kinesis uses the same Input/Output principle as the HiveMQ Extension SDK.
The quickest way to learn about the customization SDK is to check out our Hello World Customization project on GitHub and use it as the basis for your own customization.
The hivemq-amazon-kinesis-hello-world-customization project provides two transformer examples to get you started with transformations in the HiveMQ Amazon Kinesis Extension Customization SDK:
- 
An MqttToKinesisTransformerto transform and forward MQTT PUBLISH messages to Amazon Kinesis.
- 
An KinesisToMqttTransformerto transform and forward Amazon Kinesis records to HiveMQ.
Transformer Configuration
With the Transformer processor, you can add your own code to implement the transformation of MQTT messages to Amazon Kinesis records and the transformation of Amazon Kinesis records to MQTT messages.
To enable the use of a transformer, you extend the conf/config.xml file of your HiveMQ Enterprise Extension for Amazon Kinesis.
Transformers are configured as part of the <processor> configuration. For more information see, Transformer Processor in MQTT to Amazon Kinesis Routes and Transformer Processor in Amazon Kinesis to MQTT Routes.
| Parameter | Required | Type | Description | 
|---|---|---|---|
| 
 | String | The fully qualified class name of the transformer that is used. | |
| 
 | Complex | A list of the custom settings that are available in the input of the transformer. 
 | 
<mqtt-to-kinesis-route>
    <id>my-mqtt-to-kinesis-route</id>
    <aws-credential-profile-id>aws-credential-profile-01</aws-credential-profile-id>
    <region>eu-central-1</region>
    <mqtt-topic-filters>
        <mqtt-topic-filter>mqtt/topic/in</mqtt-topic-filter>
    </mqtt-topic-filters>
    <processor>
        <transformer>
            <implementation>fully.qualified.classname.to.YourTransformer</implementation>
            <custom-settings>
                <custom-setting>
                    <name>your-setting-name</name>
                    <value>your-setting-value-01</value>
                </custom-setting>
                <custom-setting>
                    <name>your-setting-name</name>
                    <value>your-setting-value-02</value>
                </custom-setting>
            </custom-settings>
        </transformer>
    </processor>
</mqtt-to-kinesis-route>MQTT to Amazon Kinesis Customization
The MQTT to Amazon Kinesis Transformer lets you extend the capabilities of your HiveMQ Enterprise Extension for Amazon Kinesis to meet the specific needs of your individual use case.
Implement this transformer to programmatically create customized Amazon Kinesis records from MQTT PUBLISH messages:
- 
Access data from all fields and properties of specific incoming MQTT PUBLISH messages: - 
MQTT message payload 
- 
MQTT QoS 
- 
MQTT retained message flag 
- 
MQTT topic 
- 
MQTT payload format indicator 
- 
MQTT message expiry 
- 
MQTT response topic 
- 
MQTT correlation data 
- 
MQTT content type 
- 
MQTT user properties 
 
- 
- 
Set information for the target Amazon Kinesis record as desired: - 
Record data 
- 
Stream name 
- 
Partition key 
- 
Explicit hash key 
 
- 
- 
Create multiple Amazon Kinesis records from a single MQTT PUBLISH message. 
MQTT to Amazon Kinesis Objects and Methods
MqttToKinesisTransformer
The MqttToKinesisTransformer interface is a transformer for the programmatic creation of one or more OutboundKinesisRecords from MQTT PublishPackets.
The HiveMQ Enterprise Extension for Amazon Kinesis executes the MqttToKinesisTransformer for each MQTT PUBLISH message your HiveMQ cluster receives that matches the <mqtt-topic-filters> you configure in the <mqtt-to-kinesis-route> tag of your conf/config.xml file.
The transformer can publish an unlimited number of OutboundKinesisMessages via the MqttToKinesisOutput object.
| Your compiled implementation (.class files) of the MqttToKinesisTransformerand all associated dependencies must be placed in a java archive (.jar) in the customizations folder of your HiveMQ Enterprise Extension for Amazon Kinesis.
In addition, you must configure a<transformer>that references the canonical name of the implementing class in theconf/config.xmlfile via<implementation>tag. | 
| Multiple threads can call methods concurrently.
To ensure that all threads behave properly and prevent unintended interactions, your implementation of the MqttToKinesisTransformerinterface must be thread-safe.
Also, all exception handling must be done inside the methods of yourMqttToKinesisTransformerimplementation. | 
Transformer methods are not permitted to throw an Exception of any kind.
If a method in your transformer throws an exception, fix and redeploy your transformer and disable/enable the Kinesis extension.
If the HiveMQ Amazon Kinesis extension cannot successfully complete a transformation, the extension drops the message, logs a warning, and increments the mqtt-to-kinesis.[route-id].failed.count metric.
To troubleshoot your transformer, monitor the mqtt-to-kinesis.[route-id].failed.count metric in the Amazon Kinesis Extension Metrics.
An increase in the failed metric can indicate an issue with your transformer.
Additionally, check your hivemq.log file for warnings related to the performance of your transformer.
| Interface | Description | 
|---|---|
| Contains the information of the MQTT  | |
| Returns the post-transformation  | 
@Override
public void transformMqttToKinesis(
        final @NotNull MqttToKinesisInput mqttToKinesisInput,
        final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
}MqttToKinesisInitInput
The MqttToKinesisInitInput interface provides context for the setup of the MqttToKinesisTransformer.
This interface is used to retrieve the associated custom-settings and the MetricRegistry of HiveMQ after the extension configuration loads or reloads.
| Method | Information | 
|---|---|
| 
 | Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty. | 
| 
 | Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfill the monitoring needs of your specific business logic. | 
@Override
public void init(final @NotNull MqttToKinesisInitInput mqttToKinesisInitInput) {
    final CustomSettings customSettings = mqttToKinesisInitInput.getCustomSettings();
    final MetricRegistry metricRegistry = mqttToKinesisInitInput.getMetricRegistry();
}MqttToKinesisInput
The MqttToKinesisInput object is the input parameter of the MqttToKinesisTransformer.
The MqttToKinesisInput object contains the PublishPacket to which the transformation is applied.
| Method | Information | 
|---|---|
| 
 | Contains all the MQTT Publish information about the message that triggers the transformer call | 
| 
 | Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty. | 
| 
 | Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfill the monitoring needs of your specific business logic. | 
@Override
public void transformMqttToKinesis(
        final @NotNull MqttToKinesisInput mqttToKinesisInput,
        final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
    final PublishPacket publishPacket = mqttToKinesisInput.getPublishPacket();
    final CustomSettings customSettings = mqttToKinesisInput.getCustomSettings();
    final MetricRegistry metricRegistry = mqttToKinesisInput.getMetricRegistry();
}MqttToKinesisOutput
The MqttToKinesisOutput object is the output parameter of the MqttToKinesisTransformer.
The MqttToKinesisOutput object provides the following methods:
| Method | Description | 
|---|---|
| 
 | Creates an empty Builder to create  | 
| 
 | Sets the created  | 
@Override
public void transformMqttToKinesis(
        final @NotNull MqttToKinesisInput mqttToKinesisInput,
        final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
    try {
        final PublishPacket publishPacket = mqttToKinesisInput.getPublishPacket();
        final OutboundKinesisRecordBuilder builder = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .streamName("destination-stream")
                .data(publishPacket.getPayload().orElseGet(() -> ByteBuffer.allocate(0)))
                .partitionKey(publishPacket.getTopic())
                .randomExplicitHashKey();
        mqttToKinesisOutput.setOutboundKinesisRecords(List.of(builder.build()));
    } catch (final Exception e) {
        mqttToKinesisInput.getMetricRegistry().counter("mqtt2kinesis.failed.counter").inc();
    }
}Amazon Kinesis to MQTT Customization
The Amazon Kinesis to MQTT Transformer lets you extend the capabilities of your HiveMQ Enterprise Extension for Amazon Kinesis to meet the specific needs of your individual use case.
Implement this transformer to programmatically create customized MQTT PublishPackets from Amazon Kinesis records.
- 
Access fields of a specific Amazon Kinesis record: - 
Record data 
- 
Stream name 
- 
Partition key 
- 
Sequence number 
- 
Approximate arrival timestamp 
- 
Encryption type 
 
- 
- 
Set information in the generated MQTT PUBLISH message as desired: - 
MQTT Payload 
- 
MQTT QoS 
- 
MQTT Retain flag 
- 
MQTT Topic 
- 
MQTT Payload format indicator 
- 
MQTT Message expiry 
- 
MQTT Response topic 
- 
MQTT Correlation data 
- 
MQTT Content type 
- 
MQTT User properties 
 
- 
- 
Create multiple MQTT PUBLISH messages from an Amazon Kinesis record (fan-out) 
Amazon Kinesis to MQTT Objects and Methods
KinesisToMqttTransformer
The KinesisToMqttTransformer interface is a transformer for the programmatic creation of MQTT PublishPackets from one or more InboundKinesisRecords.
The HiveMQ Enterprise Extension for Amazon Kinesis executes the KinesisToMqttTransformer for every Kinesis record your HiveMQ cluster polls from the <kinesis-streams> you configure in the <kinesis-to-mqtt-route> tag of your conf/config.xml file.
| Your compiled implementation (.class files) of the KinesisToMqttTransformerand all associated dependencies must be placed in a java archive (.jar) in the customizations folder of your HiveMQ Enterprise Extension for Amazon Kinesis.
Additionally, you must configure an<kinesis-to-mqtt-transformer>that references the canonical name of the implementing class in theconf/config.xmlfile. | 
| Multiple threads can call methods concurrently.
To ensure that all threads behave properly and prevent unintended interactions, your implementation of the KinesisToMqttTransformerinterface must be thread-safe.
In addition, exception handling must be done inside the methods of yourKinesisToMqttTransformerimplementation. | 
Transformer methods are not permitted to throw an Exception of any kind.
If a method in your transformer throws an exception, fix and redeploy your transformer and disable/enable the Amazon Kinesis extension.
If the HiveMQ Amazon Kinesis extension cannot successfully complete a transformation, the extension drops the message, logs a warning, and increments the kinesis-to-mqtt.transformer.[transformer_id].failed.count metric.
To troubleshoot your transformer, monitor the kinesis-to-mqtt.transformer.[transformer_id].failed.count metric in the Amazon Kinesis Extension Metrics.
An increase in the resend metric can indicate an issue with your transformer.
Additionally, check your hivemq.log file for warnings related to the performance of your transformer.
hivemq.log file2023-03-31 12:34:27,825 WARN - HiveMQ Enterprise Extension for Amazon Kinesis: Message from stream 'kinesis-stream-01' dropped for kinesis-to-mqtt-route 'kinesis-to-mqtt-route-01'. Reason: Transformer 'fully.qualified.classname.to.YourTransformer' for kinesis-to-mqtt-route 'kinesis-to-mqtt-route-01' threw an unhandled 'RuntimeException' while processing an Amazon Kinesis record from stream 'kinesis-stream-01'. Transformers are responsible for their own exception handling.
| One instance of your KinesisToMqttTransformerimplementation is created for each<transformer>that you reference in yourconf/config.xmlfile. | 
| Method | Description | 
|---|---|
| 
 | This callback is executed for every  | 
public class MyKinesisToMqttTransformer implements KinesisToMqttTransformer {
    @Override
    public void init(final @NotNull KinesisToMqttInitInput kinesisToMqttInitInput) {
        // Insert your own business logic
    }
    @Override
    public void transformKinesisToMqtt(
            final @NotNull KinesisToMqttInput kinesisToMqttInput,
            final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {
        // Insert your own business logic
    }
}KinesisToMqttInitInput
The KinesisToMqttInitInput interface provides context for the set up of a KinesisToMqttTransformer and is used to call the associated custom-settings and the MetricRegistry of HiveMQ after (re)loading of the extension configuration.
| Method | Information | 
|---|---|
| 
 | Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty. | 
| 
 | Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfil the monitoring needs of your specific business logic. | 
@Override
public void init(final @NotNull KinesisToMqttInitInput kinesisToMqttInitInput) {
    final CustomSettings customSettings = kinesisToMqttInitInput.getCustomSettings();
    final MetricRegistry metricRegistry = kinesisToMqttInitInput.getMetricRegistry();
}KinesisToMqttInput
The KinesisToMqttInput object is the input parameter of the KinesisToMqttTransformer.
The KinesisToMqttInput object contains the information of the InboundKinesisRecord to which the transformation is applied.
| Method | Description | 
|---|---|
| 
 | Contains information of the Amazon Kinesis record that triggered the transformer call. | 
| 
 | Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty. | 
| 
 | Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfil the monitoring needs of your specific business logic. | 
@Override
public void transformKinesisToMqtt(
        final @NotNull KinesisToMqttInput kinesisToMqttInput,
        final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {
    final InboundKinesisRecord inboundKinesisRecord = kinesisToMqttInput.getInboundKinesisRecord();
    final CustomSettings customSettings = kinesisToMqttInput.getCustomSettings();
    final MetricRegistry metricRegistry = kinesisToMqttInput.getMetricRegistry();
}KinesisToMqttOutput
The KinesisToMqttOutput object is the output parameter of the KinesisToMqttTransformer.
The KinesisToMqttOutput object allows access to the MQTT PublishBuilder.
After the KinesisToMqttTransformer transforms the KinesisToMqttInput, HiveMQ publishes the output in MQTT messages.
| Method | Description | 
|---|---|
| 
 | Returns a new  | 
| 
 | Defines the MQTT publish messages that HiveMQ publishes once the transformation is complete. Each call of the  
 | 
| If publishesor any element isnull, a NullPointerException is thrown.If a publish contains any element that is not created via a PublishBuilder, an IllegalArgumentException is thrown.If the publisheslist is empty, HiveMQ does not publish any MQTT messages. | 
@Override
public void transformKinesisToMqtt(
        final @NotNull KinesisToMqttInput kinesisToMqttInput,
        final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {
    try {
        final InboundKinesisRecord record = kinesisToMqttInput.getInboundKinesisRecord();
        final PublishBuilder builder = kinesisToMqttOutput.newPublishBuilder()
                .topic("destination-topic")
                .payload(record.getData());
        kinesisToMqttOutput.setPublishes(List.of(builder.build()));
    } catch (final Exception e) {
        kinesisToMqttInput.getMetricRegistry().counter("kinesis2mqtt.failed.counter").inc();
    }
}Customization Use Cases
MQTT to Amazon Kinesis Customization Use Case Examples
Build and send custom Amazon Kinesis records in response to received MQTT messages: In this use case, the goal is to transform MQTT messages based on a defined business logic and output a customized Amazon Kinesis record.
 
public class BusinessMqttToKinesisTransformer implements MqttToKinesisTransformer {
    @Override
    public void transformMqttToKinesis(
            final @NotNull MqttToKinesisInput mqttToKinesisInput,
            final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
        // Get MQTT payload
        final ByteBuffer mqttPayload =
                mqttToKinesisInput.getPublishPacket().getPayload().orElseGet(() -> ByteBuffer.allocate(0));
        // Manipulate the MQTT payload according to your business logic
        // For example, deserialize or add fields
        final byte[] data = ownBusinessLogic(mqttPayload);
        final OutboundKinesisRecord record = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .data(data)
                .streamName("stream-to-my-service")
                .partitionKey(mqttToKinesisInput.getPublishPacket().getTopic())
                .build();
        // Set record as output
        mqttToKinesisOutput.setOutboundKinesisRecords(List.of(record));
    }
}Multicast one MQTT message to multiple Amazon Kinesis records: In this use case, the goal is to transform one MQTT message into multiple customized Amazon Kinesis records that each contain a specific part of the information from the original MQTT message:
 
public class MulticastMqttToKinesisTransformer implements MqttToKinesisTransformer {
    private static final @NotNull String LOCATION_SERVICE_STREAM = "location-service";
    private static final @NotNull String SPEED_SERVICE_STREAM = "speed-control-service";
    private static final @NotNull String FUEL_SERVICE_STREAM = "fuel-control-service";
    @Override
    public void transformMqttToKinesis(
            final @NotNull MqttToKinesisInput mqttToKinesisInput,
            final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
        // Get MQTT payload
        final ByteBuffer mqttPayload =
                mqttToKinesisInput.getPublishPacket().getPayload().orElse(ByteBuffer.allocate(0));
        // Extract location from MQTT payload and create record
        final byte[] location = extractLocation(mqttPayload);
        final OutboundKinesisRecord locationRecord = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .streamName(LOCATION_SERVICE_STREAM)
                .partitionKey(publishPacket.getTopic())
                .data(location)
                .build();
        // Extract speed from MQTT payload and create record
        final byte[] speed = extractSpeed(mqttPayload);
        final OutboundKinesisRecord speedRecord = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .streamName(SPEED_SERVICE_STREAM)
                .partitionKey(publishPacket.getTopic())
                .data(speed)
                .build();
        // Extract fuel from MQTT payload and create record
        final byte[] fuel = extractSpeed(mqttPayload);
        final OutboundKinesisRecord fuelRecord = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .streamName(FUEL_SERVICE_STREAM)
                .partitionKey(publishPacket.getTopic())
                .data(fuel)
                .build();
        // Set the records as output
        mqttToKinesisOutput.setOutboundKinesisRecords(List.of(locationRecord, speedRecord, fuelRecord));
    }
}Use custom logic to drop specific MQTT messages: In this use case, the goal is to drop received MQTT messages from topic drop/me and to create Amazon Kinesis records for all other.
 
public class DroppingMqttToKinesisTransformer implements MqttToKinesisTransformer {
    @Override
    public void transformMqttToKinesis(
            final @NotNull MqttToKinesisInput mqttToKinesisInput,
            final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
        final @NotNull PublishPacket publishPacket = mqttToKinesisInput.getPublishPacket();
        if ("drop/me".equals(publishPacket.getTopic())) {
            return;
        }
        final OutboundKinesisRecord record = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .data(publishPacket.getPayload().orElseGet(() -> ByteBuffer.allocate(0)))
                .streamName("my-destination-stream")
                .partitionKey(publishPacket.getTopic())
                .build();
        mqttToKinesisOutput.setOutboundKinesisRecords(List.of(record));
    }
}Use custom settings to transform MQTT PUBLISH messages: In this use case, the goal is to use custom settings from the configuration XML file to transform MQTT publishes.
 
<transformer>
    <implementation>fully.qualified.classname.to.CustomSettingsMqttToKinesisTransformer</implementation>
    <custom-settings>
        <custom-setting>
            <name>destination-topic</name>
            <value>destination/pubsub/topic/a</value>
        </custom-setting>
    </custom-settings>
</transformer>public class CustomSettingsMqttToKinesisTransformer implements MqttToKinesisTransformer {
    @SuppressWarnings("NotNullFieldNotInitialized")
    private @NotNull String destinationStream;
    @Override
    public void init(final @NotNull MqttToKinesisInitInput mqttToKinesisInitInput) {
        final CustomSettings customSettings = mqttToKinesisInitInput.getCustomSettings();
        destinationStream = customSettings.getFirst("destination-stream").orElse("default");
    }
    @Override
    public void transformMqttToKinesis(
            final @NotNull MqttToKinesisInput mqttToKinesisInput,
            final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
        final PublishPacket publishPacket = mqttToKinesisInput.getPublishPacket();
        final OutboundKinesisRecord record = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .streamName(destinationStream)
                .partitionKey(publishPacket.getTopic())
                .data(publishPacket.getPayload().orElseGet(() -> ByteBuffer.allocate(0)))
                .build();
        mqttToKinesisOutput.setOutboundKinesisRecords(List.of(record));
    }
}Amazon Kinesis to MQTT Customization Use Case Examples
Build custom MQTT topic from incoming Amazon Kinesis records: In this use case, the goal is to extract selected information from an incoming Amazon Kinesis record and publish this information to a specific MQTT topic based on your own custom business logic.
 
public class BusinessKinesisToMqttTransformer implements KinesisToMqttTransformer {
    @Override
    public void transformKinesisToMqtt(
            final @NotNull KinesisToMqttInput kinesisToMqttInput,
            final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {
        final InboundKinesisRecord record = kinesisToMqttInput.getInboundKinesisRecord();
        final byte[] mqttPayload = ownBusinessLogic(record.getData());
        // Build MQTT publish
        final Publish publish = kinesisToMqttOutput.newPublishBuilder()
                .topic(record.getStreamName())
                .qos(Qos.AT_LEAST_ONCE)
                .payload(ByteBuffer.wrap(mqttPayload))
                .build();
        // Set MQTT publish as output
        kinesisToMqttOutput.setPublishes(List.of(publish));
    }
}Use custom logic to skip specific Amazon Kinesis records: In this use case, the goal is to filter out incoming Amazon Kinesis records that are empty and to create MQTT messages only for received messages that contain data.
 
public class DroppingKinesisToMqttTransformer implements KinesisToMqttTransformer {
    @Override
    public void transformKinesisToMqtt(
            final @NotNull KinesisToMqttInput kinesisToMqttInput,
            final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {
        final InboundKinesisRecord record = kinesisToMqttInput.getInboundKinesisRecord();
        final ByteBuffer data = record.getData();
        if (data.remaining() == 0) {
            // Drop the record if the data is empty.
            return;
        }
        // Build MQTT publish
        final Publish publish = kinesisToMqttOutput.newPublishBuilder()
                .topic(record.getStreamName())
                .payload(data)
                .build();
        kinesisToMqttOutput.setPublishes(List.of(publish));
    }
}Use custom settings to transform Amazon Kinesis records: In this use case, the goal is to use custom settings from the configuration XML file to transform Amazon Kinesis records.
 
<transformer>
    <implementation>fully.qualified.classname.to.CustomSettingsKinesisToMqttTransformer</implementation>
    <custom-settings>
        <custom-setting>
            <name>destination-topic</name>
            <value>destination/pubsub/topic/a</value>
        </custom-setting>
    </custom-settings>
</transformer>public class CustomSettingsKinesisToMqttTransformer implements KinesisToMqttTransformer {
    @SuppressWarnings("NotNullFieldNotInitialized")
    private @NotNull String destinationTopic;
    @Override
    public void init(final @NotNull KinesisToMqttInitInput kinesisToMqttInitInput) {
        final CustomSettings customSettings = kinesisToMqttInitInput.getCustomSettings();
        destinationTopic = customSettings.getFirst("destination-topic").orElse("default/destination/topic");
    }
    @Override
    public void transformKinesisToMqtt(
            final @NotNull KinesisToMqttInput kinesisToMqttInput,
            final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {
        // Get the Amazon Kinesis record.
        final InboundKinesisRecord record = kinesisToMqttInput.getInboundKinesisRecord();
        // Build MQTT publish
        final Publish publish = kinesisToMqttOutput.newPublishBuilder()
                .topic(destinationTopic)
                .payload(record.getData())
                .build();
        kinesisToMqttOutput.setPublishes(List.of(publish));
    }
}Deploy Customization
You can deploy your customization with three simple steps:
- 
Configure your conf/config.xmlto add the MQTT to Amazon Kinesis transformer.
- 
Run the ./gradlew jartask from your Gradle project to build your customization.
- 
For example, the task from the Hello World project. 
- 
Move the jar file from your local build/libs/folder to theHIVEMQ_HOME/extensions/hivemq-amazon-kinesis-extension/customizationsdirectory of your HiveMQ installation.
SDK Metrics
When you implement a transformer, The HiveMQ Amazon Kinesis Extension Customization SDK adds useful metrics to your HiveMQ Enterprise Extension for Amazon Kinesis. Monitor the metrics to gain valuable insights into the behavior of your applications over time.
For more information, see HiveMQ Amazon Kinesis Metrics.
In addition to the default metrics, the metric registry that the HiveMQ Amazon Kinesis Extension Customization SDK exposes gives you the ability to create your own metrics to measure specific business aspects of your individual use case. For more information, see KinesisToMqttInitInput.