HiveMQ Enterprise Extension for Amazon Kinesis Customization

Version 4.14.0 of HiveMQ and the Enterprise Extension for Amazon Kinesis introduces the HiveMQ Amazon Kinesis Extension Customization SDK.

HiveMQ Enterprise Extension for Amazon Kinesis Customization SDK

Our flexible new API gives you the ability to programmatically specify sophisticated custom handling of message transformations bidirectionally between HiveMQ and the Amazon Kinesis Data Streams service.

Features

The HiveMQ Amazon Kinesis Extension Customization SDK gives you fine-grained control over message content, topics, and the bidirectional flow of exchanged messages between HiveMQ and Amazon Kinesis Data Streams.

Requirements

Quick Start with Customization SDK

The Customization SDK for the HiveMQ Enterprise Extension for Amazon Kinesis uses the same Input/Output principle as the HiveMQ Extension SDK.

The quickest way to learn about the customization SDK is to check out our Hello World Customization project on GitHub and use it as the basis for your own customization.

The hivemq-amazon-kinesis-hello-world-customization project provides two transformer examples to get you started with transformations in the HiveMQ Amazon Kinesis Extension Customization SDK:

  • An MqttToKinesisTransformer to transform and forward MQTT PUBLISH messages to Amazon Kinesis.

  • An KinesisToMqttTransformer to transform and forward Amazon Kinesis records to HiveMQ.

Transformer Configuration

With the Transformer processor, you can add your own code to implement the transformation of MQTT messages to Amazon Kinesis records and the transformation of Amazon Kinesis records to MQTT messages.

To enable the use of a transformer, you extend the conf/config.xml file of your HiveMQ Enterprise Extension for Amazon Kinesis.

Transformers are configured as part of the <processor> configuration. For more information see, Transformer Processor in MQTT to Amazon Kinesis Routes and Transformer Processor in Amazon Kinesis to MQTT Routes.

Table 1. Transformer configuration settings
Parameter Required Type Description

implementation

String

The fully qualified class name of the transformer that is used.

custom-settings

Complex

A list of the custom settings that are available in the input of the transformer.

  • custom-setting: The name and value pair that the custom setting of the transformer implements. The name does not need to be unique. You can configure as many custom-setting tags as your use case requires.

Example transformer configuration for an MQTT to Kinesis route
<mqtt-to-kinesis-route>
    <id>my-mqtt-to-kinesis-route</id>
    <aws-credential-profile-id>aws-credential-profile-01</aws-credential-profile-id>
    <region>eu-central-1</region>
    <mqtt-topic-filters>
        <mqtt-topic-filter>mqtt/topic/in</mqtt-topic-filter>
    </mqtt-topic-filters>
    <processor>
        <transformer>
            <implementation>fully.qualified.classname.to.YourTransformer</implementation>
            <custom-settings>
                <custom-setting>
                    <name>your-setting-name</name>
                    <value>your-setting-value-01</value>
                </custom-setting>
                <custom-setting>
                    <name>your-setting-name</name>
                    <value>your-setting-value-02</value>
                </custom-setting>
            </custom-settings>
        </transformer>
    </processor>
</mqtt-to-kinesis-route>

MQTT to Amazon Kinesis Customization

The MQTT to Amazon Kinesis Transformer lets you extend the capabilities of your HiveMQ Enterprise Extension for Amazon Kinesis to meet the specific needs of your individual use case.
Implement this transformer to programmatically create customized Amazon Kinesis records from MQTT PUBLISH messages:

  • Access data from all fields and properties of specific incoming MQTT PUBLISH messages:

    • MQTT message payload

    • MQTT QoS

    • MQTT retained message flag

    • MQTT topic

    • MQTT payload format indicator

    • MQTT message expiry

    • MQTT response topic

    • MQTT correlation data

    • MQTT content type

    • MQTT user properties

  • Set information for the target Amazon Kinesis record as desired:

    • Record data

    • Stream name

    • Partition key

    • Explicit hash key

  • Create multiple Amazon Kinesis records from a single MQTT PUBLISH message.

MQTT to Amazon Kinesis Objects and Methods

MqttToKinesisTransformer

The MqttToKinesisTransformer interface is a transformer for the programmatic creation of one or more OutboundKinesisRecords from MQTT PublishPackets.

The HiveMQ Enterprise Extension for Amazon Kinesis executes the MqttToKinesisTransformer for each MQTT PUBLISH message your HiveMQ cluster receives that matches the <mqtt-topic-filters> you configure in the <mqtt-to-kinesis-route> tag of your conf/config.xml file. The transformer can publish an unlimited number of OutboundKinesisMessages via the MqttToKinesisOutput object.

Your compiled implementation (.class files) of the MqttToKinesisTransformer and all associated dependencies must be placed in a java archive (.jar) in the customizations folder of your HiveMQ Enterprise Extension for Amazon Kinesis. In addition, you must configure a <transformer> that references the canonical name of the implementing class in the conf/config.xml file via <implementation> tag.
Multiple threads can call methods concurrently. To ensure that all threads behave properly and prevent unintended interactions, your implementation of the MqttToKinesisTransformer interface must be thread-safe. Also, all exception handling must be done inside the methods of your MqttToKinesisTransformer implementation.

Transformer methods are not permitted to throw an Exception of any kind. If a method in your transformer throws an exception, fix and redeploy your transformer and disable/enable the Kinesis extension.

If the HiveMQ Amazon Kinesis extension cannot successfully complete a transformation, the extension drops the message, logs a warning, and increments the mqtt-to-kinesis.[route-id].failed.count metric.
To troubleshoot your transformer, monitor the mqtt-to-kinesis.[route-id].failed.count metric in the Amazon Kinesis Extension Metrics. An increase in the failed metric can indicate an issue with your transformer. Additionally, check your hivemq.log file for warnings related to the performance of your transformer.

Interface Description

MqttToKinesisInput

Contains the information of the MQTT PublishPacket to which the transformation is applied.

MqttToKinesisOutput

Returns the post-transformation OutboundKinesisRecords that the extension publishes to the associated Amazon Kinesis data stream.

@Override
public void transformMqttToKinesis(
        final @NotNull MqttToKinesisInput mqttToKinesisInput,
        final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
}

MqttToKinesisInitInput

The MqttToKinesisInitInput interface provides context for the setup of the MqttToKinesisTransformer. This interface is used to retrieve the associated custom-settings and the MetricRegistry of HiveMQ after the extension configuration loads or reloads.

Method Information

getCustomSettings()

Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty.

getMetricRegistry()

Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfill the monitoring needs of your specific business logic.

@Override
public void init(final @NotNull MqttToKinesisInitInput mqttToKinesisInitInput) {
    final CustomSettings customSettings = mqttToKinesisInitInput.getCustomSettings();
    final MetricRegistry metricRegistry = mqttToKinesisInitInput.getMetricRegistry();
}

MqttToKinesisInput

The MqttToKinesisInput object is the input parameter of the MqttToKinesisTransformer. The MqttToKinesisInput object contains the PublishPacket to which the transformation is applied.

Method Information

getPublishPacket()

Contains all the MQTT Publish information about the message that triggers the transformer call

getCustomSettings()

Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty.

getMetricRegistry()

Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfill the monitoring needs of your specific business logic.

@Override
public void transformMqttToKinesis(
        final @NotNull MqttToKinesisInput mqttToKinesisInput,
        final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
    final PublishPacket publishPacket = mqttToKinesisInput.getPublishPacket();
    final CustomSettings customSettings = mqttToKinesisInput.getCustomSettings();
    final MetricRegistry metricRegistry = mqttToKinesisInput.getMetricRegistry();
}

MqttToKinesisOutput

The MqttToKinesisOutput object is the output parameter of the MqttToKinesisTransformer. The MqttToKinesisOutput object provides the following methods:

Method Description

newOutboundKinesisRecordBuilder()

Creates an empty Builder to create OutboundKinesisRecords.

setOutboundKinesisRecords()

Sets the created OutboundKinesisRecords that are published to Amazon Kinesis.

@Override
public void transformMqttToKinesis(
        final @NotNull MqttToKinesisInput mqttToKinesisInput,
        final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
    try {
        final PublishPacket publishPacket = mqttToKinesisInput.getPublishPacket();
        final OutboundKinesisRecordBuilder builder = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .streamName("destination-stream")
                .data(publishPacket.getPayload().orElseGet(() -> ByteBuffer.allocate(0)))
                .partitionKey(publishPacket.getTopic())
                .randomExplicitHashKey();
        mqttToKinesisOutput.setOutboundKinesisRecords(List.of(builder.build()));
    } catch (final Exception e) {
        mqttToKinesisInput.getMetricRegistry().counter("mqtt2kinesis.failed.counter").inc();
    }
}

Amazon Kinesis to MQTT Customization

The Amazon Kinesis to MQTT Transformer lets you extend the capabilities of your HiveMQ Enterprise Extension for Amazon Kinesis to meet the specific needs of your individual use case.
Implement this transformer to programmatically create customized MQTT PublishPackets from Amazon Kinesis records.

  • Access fields of a specific Amazon Kinesis record:

    • Record data

    • Stream name

    • Partition key

    • Sequence number

    • Approximate arrival timestamp

    • Encryption type

  • Set information in the generated MQTT PUBLISH message as desired:

    • MQTT Payload

    • MQTT QoS

    • MQTT Retain flag

    • MQTT Topic

    • MQTT Payload format indicator

    • MQTT Message expiry

    • MQTT Response topic

    • MQTT Correlation data

    • MQTT Content type

    • MQTT User properties

  • Create multiple MQTT PUBLISH messages from an Amazon Kinesis record (fan-out)

Amazon Kinesis to MQTT Objects and Methods

KinesisToMqttTransformer

The KinesisToMqttTransformer interface is a transformer for the programmatic creation of MQTT PublishPackets from one or more InboundKinesisRecords.

The HiveMQ Enterprise Extension for Amazon Kinesis executes the KinesisToMqttTransformer for every Kinesis record your HiveMQ cluster polls from the <kinesis-streams> you configure in the <kinesis-to-mqtt-route> tag of your conf/config.xml file.

Your compiled implementation (.class files) of the KinesisToMqttTransformer and all associated dependencies must be placed in a java archive (.jar) in the customizations folder of your HiveMQ Enterprise Extension for Amazon Kinesis. Additionally, you must configure an <kinesis-to-mqtt-transformer> that references the canonical name of the implementing class in the conf/config.xml file.
Multiple threads can call methods concurrently. To ensure that all threads behave properly and prevent unintended interactions, your implementation of the KinesisToMqttTransformer interface must be thread-safe. In addition, exception handling must be done inside the methods of your KinesisToMqttTransformer implementation.

Transformer methods are not permitted to throw an Exception of any kind. If a method in your transformer throws an exception, fix and redeploy your transformer and disable/enable the Amazon Kinesis extension.

If the HiveMQ Amazon Kinesis extension cannot successfully complete a transformation, the extension drops the message, logs a warning, and increments the kinesis-to-mqtt.transformer.[transformer_id].failed.count metric.
To troubleshoot your transformer, monitor the kinesis-to-mqtt.transformer.[transformer_id].failed.count metric in the Amazon Kinesis Extension Metrics. An increase in the resend metric can indicate an issue with your transformer. Additionally, check your hivemq.log file for warnings related to the performance of your transformer.

Example warning in the hivemq.log file
2023-03-31 12:34:27,825 WARN  - HiveMQ Enterprise Extension for Amazon Kinesis: Message from stream 'kinesis-stream-01' dropped for kinesis-to-mqtt-route 'kinesis-to-mqtt-route-01'. Reason: Transformer 'fully.qualified.classname.to.YourTransformer' for kinesis-to-mqtt-route 'kinesis-to-mqtt-route-01' threw an unhandled 'RuntimeException' while processing an Amazon Kinesis record from stream 'kinesis-stream-01'. Transformers are responsible for their own exception handling.
One instance of your KinesisToMqttTransformer implementation is created for each <transformer> that you reference in your conf/config.xml file.
Method Description

transformKinesisToMqtt()

This callback is executed for every InboundKinesisRecord that the HiveMQ Enterprise Extension for Amazon Kinesis polls from Amazon Kinesis according to the configured <kinesis-streams> in the <kinesis-to-mqtt-route> tag.

public class MyKinesisToMqttTransformer implements KinesisToMqttTransformer {

    @Override
    public void init(final @NotNull KinesisToMqttInitInput kinesisToMqttInitInput) {
        // Insert your own business logic
    }

    @Override
    public void transformKinesisToMqtt(
            final @NotNull KinesisToMqttInput kinesisToMqttInput,
            final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {
        // Insert your own business logic
    }
}

KinesisToMqttInitInput

The KinesisToMqttInitInput interface provides context for the set up of a KinesisToMqttTransformer and is used to call the associated custom-settings and the MetricRegistry of HiveMQ after (re)loading of the extension configuration.

Method Information

getCustomSettings()

Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty.

getMetricRegistry()

Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfil the monitoring needs of your specific business logic.

@Override
public void init(final @NotNull KinesisToMqttInitInput kinesisToMqttInitInput) {
    final CustomSettings customSettings = kinesisToMqttInitInput.getCustomSettings();
    final MetricRegistry metricRegistry = kinesisToMqttInitInput.getMetricRegistry();
}

KinesisToMqttInput

The KinesisToMqttInput object is the input parameter of the KinesisToMqttTransformer. The KinesisToMqttInput object contains the information of the InboundKinesisRecord to which the transformation is applied.

Method Description

getInboundKinesisRecord()

Contains information of the Amazon Kinesis record that triggered the transformer call.

getCustomSettings()

Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty.

getMetricRegistry()

Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfil the monitoring needs of your specific business logic.

@Override
public void transformKinesisToMqtt(
        final @NotNull KinesisToMqttInput kinesisToMqttInput,
        final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {
    final InboundKinesisRecord inboundKinesisRecord = kinesisToMqttInput.getInboundKinesisRecord();
    final CustomSettings customSettings = kinesisToMqttInput.getCustomSettings();
    final MetricRegistry metricRegistry = kinesisToMqttInput.getMetricRegistry();
}

KinesisToMqttOutput

The KinesisToMqttOutput object is the output parameter of the KinesisToMqttTransformer. The KinesisToMqttOutput object allows access to the MQTT PublishBuilder. After the KinesisToMqttTransformer transforms the KinesisToMqttInput, HiveMQ publishes the output in MQTT messages.

Method Description

newPublishBuilder

Returns a new PublishBuilder. The PublishBuilder can be used to create new PublishPackets as desired.

setPublishes

Defines the MQTT publish messages that HiveMQ publishes once the transformation is complete. Each call of the setPublishes method overwrites the previous call of the method.

  • publishes: The publishes parameter of the setPublishes method lists the PublishPackets to be published. The HiveMQ Enterprise Extension for Amazon kinesis publishes messages in the order that the messages appear in the list. The same PublishPacket can be entered multiple times in the list.

If publishes or any element is null, a NullPointerException is thrown.
If a publish contains any element that is not created via a PublishBuilder, an IllegalArgumentException is thrown.
If the publishes list is empty, HiveMQ does not publish any MQTT messages.
@Override
public void transformKinesisToMqtt(
        final @NotNull KinesisToMqttInput kinesisToMqttInput,
        final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {
    try {
        final InboundKinesisRecord record = kinesisToMqttInput.getInboundKinesisRecord();
        final PublishBuilder builder = kinesisToMqttOutput.newPublishBuilder()
                .topic("destination-topic")
                .payload(record.getData());
        kinesisToMqttOutput.setPublishes(List.of(builder.build()));
    } catch (final Exception e) {
        kinesisToMqttInput.getMetricRegistry().counter("kinesis2mqtt.failed.counter").inc();
    }
}

Customization Use Cases

MQTT to Amazon Kinesis Customization Use Case Examples

Build and send custom Amazon Kinesis records in response to received MQTT messages: In this use case, the goal is to transform MQTT messages based on a defined business logic and output a customized Amazon Kinesis record.

Build Custom Amazon Kinesis records
Example configuration to send custom Amazon Kinesis records in response to a received MQTT message
public class BusinessMqttToKinesisTransformer implements MqttToKinesisTransformer {
    @Override
    public void transformMqttToKinesis(
            final @NotNull MqttToKinesisInput mqttToKinesisInput,
            final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {

        // Get MQTT payload
        final ByteBuffer mqttPayload =
                mqttToKinesisInput.getPublishPacket().getPayload().orElseGet(() -> ByteBuffer.allocate(0));

        // Manipulate the MQTT payload according to your business logic
        // For example, deserialize or add fields
        final byte[] data = ownBusinessLogic(mqttPayload);

        final OutboundKinesisRecord record = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .data(data)
                .streamName("stream-to-my-service")
                .partitionKey(mqttToKinesisInput.getPublishPacket().getTopic())
                .build();

        // Set record as output
        mqttToKinesisOutput.setOutboundKinesisRecords(List.of(record));
    }
}

Multicast one MQTT message to multiple Amazon Kinesis records: In this use case, the goal is to transform one MQTT message into multiple customized Amazon Kinesis records that each contain a specific part of the information from the original MQTT message:

Multicast MQTT Publish Messages
Example configuration to multicast from one MQTT message to many Amazon Kinesis records
public class MulticastMqttToKinesisTransformer implements MqttToKinesisTransformer {

    private static final @NotNull String LOCATION_SERVICE_STREAM = "location-service";
    private static final @NotNull String SPEED_SERVICE_STREAM = "speed-control-service";
    private static final @NotNull String FUEL_SERVICE_STREAM = "fuel-control-service";

    @Override
    public void transformMqttToKinesis(
            final @NotNull MqttToKinesisInput mqttToKinesisInput,
            final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {

        // Get MQTT payload
        final ByteBuffer mqttPayload =
                mqttToKinesisInput.getPublishPacket().getPayload().orElse(ByteBuffer.allocate(0));

        // Extract location from MQTT payload and create record
        final byte[] location = extractLocation(mqttPayload);
        final OutboundKinesisRecord locationRecord = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .streamName(LOCATION_SERVICE_STREAM)
                .partitionKey(publishPacket.getTopic())
                .data(location)
                .build();

        // Extract speed from MQTT payload and create record
        final byte[] speed = extractSpeed(mqttPayload);
        final OutboundKinesisRecord speedRecord = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .streamName(SPEED_SERVICE_STREAM)
                .partitionKey(publishPacket.getTopic())
                .data(speed)
                .build();

        // Extract fuel from MQTT payload and create record
        final byte[] fuel = extractSpeed(mqttPayload);
        final OutboundKinesisRecord fuelRecord = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .streamName(FUEL_SERVICE_STREAM)
                .partitionKey(publishPacket.getTopic())
                .data(fuel)
                .build();

        // Set the records as output
        mqttToKinesisOutput.setOutboundKinesisRecords(List.of(locationRecord, speedRecord, fuelRecord));
    }
}

Use custom logic to drop specific MQTT messages: In this use case, the goal is to drop received MQTT messages from topic drop/me and to create Amazon Kinesis records for all other.

Skip Specific MQTT Publish Messages
Example configuration with custom logic to drop specific MQTT messages
public class DroppingMqttToKinesisTransformer implements MqttToKinesisTransformer {

    @Override
    public void transformMqttToKinesis(
            final @NotNull MqttToKinesisInput mqttToKinesisInput,
            final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
        final @NotNull PublishPacket publishPacket = mqttToKinesisInput.getPublishPacket();
        if ("drop/me".equals(publishPacket.getTopic())) {
            return;
        }
        final OutboundKinesisRecord record = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .data(publishPacket.getPayload().orElseGet(() -> ByteBuffer.allocate(0)))
                .streamName("my-destination-stream")
                .partitionKey(publishPacket.getTopic())
                .build();
        mqttToKinesisOutput.setOutboundKinesisRecords(List.of(record));
    }
}

Use custom settings to transform MQTT PUBLISH messages: In this use case, the goal is to use custom settings from the configuration XML file to transform MQTT publishes.

Use Custom Settings to Transform MQTT Messages
Example configuration with custom settings
<transformer>
    <implementation>fully.qualified.classname.to.CustomSettingsMqttToKinesisTransformer</implementation>
    <custom-settings>
        <custom-setting>
            <name>destination-topic</name>
            <value>destination/pubsub/topic/a</value>
        </custom-setting>
    </custom-settings>
</transformer>
Example implementation to transform specific MQTT publishes with custom settings
public class CustomSettingsMqttToKinesisTransformer implements MqttToKinesisTransformer {

    @SuppressWarnings("NotNullFieldNotInitialized")
    private @NotNull String destinationStream;

    @Override
    public void init(final @NotNull MqttToKinesisInitInput mqttToKinesisInitInput) {
        final CustomSettings customSettings = mqttToKinesisInitInput.getCustomSettings();
        destinationStream = customSettings.getFirst("destination-stream").orElse("default");
    }

    @Override
    public void transformMqttToKinesis(
            final @NotNull MqttToKinesisInput mqttToKinesisInput,
            final @NotNull MqttToKinesisOutput mqttToKinesisOutput) {
        final PublishPacket publishPacket = mqttToKinesisInput.getPublishPacket();

        final OutboundKinesisRecord record = mqttToKinesisOutput.newOutboundKinesisRecordBuilder()
                .streamName(destinationStream)
                .partitionKey(publishPacket.getTopic())
                .data(publishPacket.getPayload().orElseGet(() -> ByteBuffer.allocate(0)))
                .build();

        mqttToKinesisOutput.setOutboundKinesisRecords(List.of(record));
    }
}

Amazon Kinesis to MQTT Customization Use Case Examples

Build custom MQTT topic from incoming Amazon Kinesis records: In this use case, the goal is to extract selected information from an incoming Amazon Kinesis record and publish this information to a specific MQTT topic based on your own custom business logic.

Extract Information from Amazon Kinesis Record
Example configuration to build custom MQTT topic from an incoming Amazon Kinesis record
public class BusinessKinesisToMqttTransformer implements KinesisToMqttTransformer {

    @Override
    public void transformKinesisToMqtt(
            final @NotNull KinesisToMqttInput kinesisToMqttInput,
            final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {

        final InboundKinesisRecord record = kinesisToMqttInput.getInboundKinesisRecord();

        final byte[] mqttPayload = ownBusinessLogic(record.getData());

        // Build MQTT publish
        final Publish publish = kinesisToMqttOutput.newPublishBuilder()
                .topic(record.getStreamName())
                .qos(Qos.AT_LEAST_ONCE)
                .payload(ByteBuffer.wrap(mqttPayload))
                .build();

        // Set MQTT publish as output
        kinesisToMqttOutput.setPublishes(List.of(publish));
    }
}

Use custom logic to skip specific Amazon Kinesis records: In this use case, the goal is to filter out incoming Amazon Kinesis records that are empty and to create MQTT messages only for received messages that contain data.

Skip Specific Amazon Kinesis Records
Example configuration to skip specific Amazon Kinesis records
public class DroppingKinesisToMqttTransformer implements KinesisToMqttTransformer {

    @Override
    public void transformKinesisToMqtt(
            final @NotNull KinesisToMqttInput kinesisToMqttInput,
            final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {

        final InboundKinesisRecord record = kinesisToMqttInput.getInboundKinesisRecord();

        final ByteBuffer data = record.getData();
        if (data.remaining() == 0) {
            // Drop the record if the data is empty.
            return;
        }

        // Build MQTT publish
        final Publish publish = kinesisToMqttOutput.newPublishBuilder()
                .topic(record.getStreamName())
                .payload(data)
                .build();
        kinesisToMqttOutput.setPublishes(List.of(publish));
    }
}

Use custom settings to transform Amazon Kinesis records: In this use case, the goal is to use custom settings from the configuration XML file to transform Amazon Kinesis records.

Transform Amazon Kinesis Records
Example configuration custom settings
<transformer>
    <implementation>fully.qualified.classname.to.CustomSettingsKinesisToMqttTransformer</implementation>
    <custom-settings>
        <custom-setting>
            <name>destination-topic</name>
            <value>destination/pubsub/topic/a</value>
        </custom-setting>
    </custom-settings>
</transformer>
Example implementation to transform specific Amazon Kinesis records with custom settings
public class CustomSettingsKinesisToMqttTransformer implements KinesisToMqttTransformer {

    @SuppressWarnings("NotNullFieldNotInitialized")
    private @NotNull String destinationTopic;

    @Override
    public void init(final @NotNull KinesisToMqttInitInput kinesisToMqttInitInput) {
        final CustomSettings customSettings = kinesisToMqttInitInput.getCustomSettings();
        destinationTopic = customSettings.getFirst("destination-topic").orElse("default/destination/topic");
    }

    @Override
    public void transformKinesisToMqtt(
            final @NotNull KinesisToMqttInput kinesisToMqttInput,
            final @NotNull KinesisToMqttOutput kinesisToMqttOutput) {
        // Get the Amazon Kinesis record.
        final InboundKinesisRecord record = kinesisToMqttInput.getInboundKinesisRecord();

        // Build MQTT publish
        final Publish publish = kinesisToMqttOutput.newPublishBuilder()
                .topic(destinationTopic)
                .payload(record.getData())
                .build();
        kinesisToMqttOutput.setPublishes(List.of(publish));
    }
}

Deploy Customization

You can deploy your customization with three simple steps:

  1. Configure your conf/config.xml to add the MQTT to Amazon Kinesis transformer.

  2. Run the ./gradlew jar task from your Gradle project to build your customization.

  3. For example, the task from the Hello World project.

  4. Move the jar file from your local build/libs/ folder to the HIVEMQ_HOME/extensions/hivemq-amazon-kinesis-extension/customizations directory of your HiveMQ installation.

SDK Metrics

When you implement a transformer, The HiveMQ Amazon Kinesis Extension Customization SDK adds useful metrics to your HiveMQ Enterprise Extension for Amazon Kinesis. Monitor the metrics to gain valuable insights into the behavior of your applications over time.

For more information, see HiveMQ Amazon Kinesis Metrics.

In addition to the default metrics, the metric registry that the HiveMQ Amazon Kinesis Extension Customization SDK exposes gives you the ability to create your own metrics to measure specific business aspects of your individual use case. For more information, see KinesisToMqttInitInput.