HiveMQ Enterprise Extension for Amazon Kinesis

Amazon Kinesis is a suite of fully-managed Amazon Web Services (AWS) designed to ingest, process, and analyze large-scale data streams in real-time. Amazon Kinesis Data Streams is a component of Amazon Kinesis that offers the ability to continuously capture, store, and process large amounts of data from various sources.

The HiveMQ Enterprise Extension for Amazon Kinesis makes it easy to move MQTT messaging data between your HiveMQ broker and the Amazon Kinesis Data Streams service. Our native integration with Amazon Kinesis Data Streams is a fast, cost-effective way to establish bidirectional communication between IoT devices and Amazon Kinesis Data Streams for further ingestion into other AWS services.

The data you forward from HiveMQ to Amazon Kinesis Data Streams is immediately available for continuous stream processing and can be used to power live dashboards, generate real-time metrics and reporting, or deliver data into data stores and databases.

HiveMQ Enterprise Extension for Amazon Kinesis

Features

  • Flexible extension configuration to leverage Amazon Kinesis Data Streams and unlock the use of further AWS data processing services for your MQTT data.

  • Forward MQTT messages from IoT devices to one or more Amazon Kinesis Data Streams via your HiveMQ broker.

  • Consume Kinesis records from Amazon Kinesis Data Streams and publish this information to one or more MQTT topics.

  • Benefit from the unique scalability and high availability of the HiveMQ broker to ensure that records from your Kinesis Data Streams are forwarded to MQTT clients as expected.

  • Filter and transform MQTT and Kinesis messages bidirectionally at runtime with the HiveMQ customization SDK.

Requirements

If you do not provide a valid license, HiveMQ automatically uses a free trial license. Trial licenses for HiveMQ Enterprise Extensions are valid for 5 hours. For more license information or to request an extended evaluation license, contact HiveMQ sales.

Installation

  1. Place your HiveMQ Enterprise Extension for Amazon Kinesis license file (.elic) in the license folder of your HiveMQ installation. (Skip this step if you are using a trial version of the extension).

    All HiveMQ Enterprise Extensions are preinstalled in your HiveMQ release bundle and disabled by default
    └─ <HiveMQ folder>
    ├── README.txt
    ├── audit
    ├── backup
    ├── bin
    ├── conf
    ├── data
    ├── extensions
    │   ├── hivemq-amazon-kinesis-extension
    │   │   ├── conf
    │   │   │   ├── config.xml (needs to be added by the user)
    │   │   │   ├── config.xsd
    │   │   │   └── examples
    │   │   │       └── ...
    │   │   ├── hivemq-amazon-kinesis-extension.jar
    │   │   ├── hivemq-extension.xml
    │   │   └── third-party-licenses
    │   │       └── ...
    ├── license
    ├── log
    ├── third-party-licenses
    └── tools
  2. Before you enable the extension, you need to configure the extension to match your individual Amazon Kinesis Data Streams setup.
    For your convenience, we provide an example configuration conf/examples/config.xml that you can copy and modify as desired.
    The included config.xsd file outlines the schema and elements that can be used in the XML configuration.
    Your completed configuration file must be named config.xml and located in HIVEMQ_HOME/extensions/hivemq-amazon-kinesis-extension/conf/config.xml.
    For detailed information on configuration options, see Configuration.

    Starting with HiveMQ 4.15.0, the configuration for the HiveMQ Enterprise for Amazon Kinesis is located in HIVEMQ_HOME/extensions/hivemq-amazon-kinesis-extension/conf/config.xml. Support for the previous location HIVEMQ_HOME/extensions/hivemq-amazon-kinesis-extension/conf/hivemq-amazon-kinesis-extension.xml will be removed in a future release.

    If applicable, move the configuration from HIVEMQ_HOME/extensions/hivemq-amazon-kinesis-extension/conf/hivemq-amazon-kinesis-extension.xml to HIVEMQ_HOME/extensions/hivemq-amazon-kinesis-extension/conf/config.xml.

  3. To enable the HiveMQ Enterprise Extension for Amazon Kinesis, locate the hivemq-amazon-kinesis-extension folder in the extensions directory of your HiveMQ installation and remove the DISABLED file (if present).

To function properly, the HiveMQ Enterprise Extension for Amazon Kinesis must be installed on all HiveMQ broker nodes in your HiveMQ cluster and the configuration file on each node must be identical.

Configuration

The HiveMQ Enterprise Extension for Amazon Kinesis supports hot reload of the extension configuration. Changes that you make to the configuration of the extension are updated while the extension is running, with no need to restart. When the extension recognizes a valid configuration has been loaded, the previous configuration file is automatically archived in the config-archive of the extension home folder.

If you load an invalid configuration at runtime and a previous valid configuration exists in the archive, HiveMQ uses the previous configuration.

The extension configuration is divided into three sections:

  • AWS Credential Profiles: Provides information about the credential profiles used to interact with Amazon Web Services.

  • MQTT to Kinesis Routes: Defines how MQTT messages are sent from your HiveMQ broker to Amazon Kinesis Data Streams.

  • Kinesis to MQTT Routes: Defines how Amazon Kinesis records are sent from Amazon Kinesis Data Streams to your HiveMQ broker.

The customization SDK for the HiveMQ Extension for Amazon Kinesis gives you the ability to add custom message transformations. The configuration for the customization is specified as part of the routes. For more information, see Transformer Processor in MQTT to Amazon Kinesis Routes and Transformer Processor in Amazon Kinesis to MQTT Routes.

Extension Configuration File

The config.xml file must be located in the hivemq-amazon-kinesis-extension folder within the extensions folder of your HiveMQ installation.

The extension uses a simple but powerful XML-based configuration.

The conf/examples/config.xml file is a configuration example that has all the parameters you need to send MQTT messages from your HiveMQ MQTT broker to Amazon Kinesis Data Streams and retrieve messages from Amazon Kinesis Data Steams and send them as MQTT messages to your MQTT clients.

If you copy and reuse the example file, be sure to rename and move the file to conf/config.xml before you enable your extension.
Example Amazon Kinesis Data Streams extension configuration
<hivemq-amazon-kinesis-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                                 xsi:noNamespaceSchemaLocation="config.xsd">
    <aws-credential-profiles>
        <aws-credential-profile>
            <id>aws-credential-profile-01</id>
            <profile-file>/opt/hivemq/extensions/hivemq-amazon-kinesis-extension/aws-credentials</profile-file>
        </aws-credential-profile>
    </aws-credential-profiles>

    <mqtt-to-kinesis-routes>
        <mqtt-to-kinesis-route>
            <id>my-mqtt-to-kinesis-route</id>
            <enabled>true</enabled>
            <aws-credential-profile-id>aws-credential-profile-01</aws-credential-profile-id>
            <region>eu-central-1</region>
            <mqtt-topic-filters>
                <mqtt-topic-filter>mqtt/topic/in</mqtt-topic-filter>
            </mqtt-topic-filters>
            <processor>
                <mapping>
                    <kinesis-streams>
                        <kinesis-stream>
                            <name>my-kinesis-stream-out</name>
                            <partition-key>
                                <mqtt-topic/>
                            </partition-key>
                            <explicit-hash-key>
                                <random/>
                            </explicit-hash-key>
                        </kinesis-stream>
                    </kinesis-streams>
                </mapping>
            </processor>
        </mqtt-to-kinesis-route>
    </mqtt-to-kinesis-routes>
    <kinesis-to-mqtt-routes>
        <kinesis-to-mqtt-route>
            <id>my-kinesis-to-mqtt-route</id>
            <enabled>true</enabled>
            <aws-credential-profile-id>aws-credential-profile-01</aws-credential-profile-id>
            <region>eu-central-1</region>
            <kinesis-streams>
                <kinesis-stream>
                    <name>my-kinesis-stream-in</name>
                    <consumer-application-name>my-consumer-application</consumer-application-name>
                    <consumer-retrieval-mode>
                        <polling/>
                    </consumer-retrieval-mode>
                </kinesis-stream>
            </kinesis-streams>
            <processor>
                <mapping>
                    <mqtt-topics>
                        <mqtt-topic>mqtt/topic/out</mqtt-topic>
                    </mqtt-topics>
                </mapping>
            </processor>
        </kinesis-to-mqtt-route>
    </kinesis-to-mqtt-routes>
</hivemq-amazon-kinesis-extension>

AWS Credential Profiles

To interact with Amazon Kinesis Data Streams, your HiveMQ extension must provide AWS security credentials to verify your identity and access permissions.

When you set up your AWS IAM (Identity and Access Management) security credential on the AWS management console, verify that your setup includes the required permissions for the configured routes. For more information, see Controlling Access to Amazon Kinesis Data Streams Resources Using IAM.

Example AWS IAM policy configuration for an MQTT to Kinesis route:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "KinesisPublishing",
            "Effect": "Allow",
            "Action": "kinesis:PutRecords",
            "Resource": "*"
        }
    ]
}
Example AWS IAM policy configuration for a Kinesis to MQTT route (with polling data retrieval mode)
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "KCLPolling",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:Scan",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "dynamodb:GetItem",
                "kinesis:ListShards",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": "*"
        }
    ]
}
Example AWS IAM policy configuration for a Kinesis to MQTT route (with enhanced fan-out data retrieval mode)
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "KCLEnhancedFanOut",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:Scan",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "dynamodb:GetItem",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:DescribeStreamConsumer",
                "kinesis:RegisterStreamConsumer",
                "kinesis:SubscribeToShard"
            ],
            "Resource": "*"
        }
    ]
}
The AWS Identity and Access Management (IAM) service helps to securely control access to AWS Kinesis Data Streams and other AWS resources. In the AWS management console, you can create users and assign user permissions. To access resources on AWS, you create security credentials in the AWS management console and save the credentials for use in the extension profile file. For more information, see IAM access policies.

The aws-credential-profiles section of the extension configuration defines one or more sets of security credentials for your connections to AWS. If desired, use this section to override the default AWS credential provider chain.

Example AWS credential profile configuration
    <aws-credential-profiles>
        <aws-credential-profile>
            <id>aws-credential-profile-01</id>
            <profile-file>/opt/hivemq/extensions/hivemq-amazon-kinesis-extension/aws-credentials</profile-file>
            <profile-name>your-profile-name</profile-name>
        </aws-credential-profile>
    </aws-credential-profiles>
Each <aws-credential-profile> tag contains the file path to the credentials file that stores the associated security credentials. If your credentials file holds more than one set of credentials, use the optional <profile-name> tag to specify the credentials set that you want to use.

You can define as many <aws-credential-profile> tags as your use case requires.

Table 1. AWS credential profile parameters
Parameter Required Type Description

id

String

The unique identifier of the AWS credential profile. This string can only contain alphanumeric characters, dashes, and underscores.

profile-file

String

Optional setting that provides the path to a file that contains AWS profile credentials. If unset, information is taken from the default AWS credentials location.

profile-name

String

Optional setting to select a specific profile in the defined <profile-file> tag. If unset, the profile name default is used.

MQTT to Amazon Kinesis Routes

The <mqtt-to-kinesis-routes> section of your extension configuration defines how MQTT messages are sent from the HiveMQ broker to Amazon Kinesis Data Streams.

You can define as many individual <mqtt-to-kinesis-route> tags as your use case requires.

Example Mqtt to Kinesis Route
<mqtt-to-kinesis-route>
    <id>my-mqtt-to-kinesis-route</id>
    <enabled>true</enabled>
    <aws-credential-profile-id>aws-credential-profile-01</aws-credential-profile-id>
    <region>eu-central-1</region>
    <mqtt-topic-filters>
        <mqtt-topic-filter>mqtt/topic/in</mqtt-topic-filter>
    </mqtt-topic-filters>
    <processor>
        ...
    </processor>
</mqtt-to-kinesis-route>
Table 2. MQTT to Amazon Kinesis route parameters
Parameter Required Type Description

id

String

The unique identifier of the MQTT to Amazon Kinesis route. This string can only contain alphanumeric characters, dashes, and underscores.

enabled

String

Optional setting that defines whether the selected route is enabled or disabled. The default setting is true.

aws-credential-profile-id

String

Optional setting that identifies the aws-credential-profile from your <aws-credential-profiles> configuration to use for the route. When unset, the default AWS credential retrieval order applies. If the default credentials are not found, the selected <mqtt-to-kinesis-route> does not start and HiveMQ logs an error message.

mqtt-topic-filters

String

A list of one or more MQTT topic filters.

  • mqtt-topic-filter: The source MQTT topic from which MQTT messages are routed to Amazon Kinesis Data Streams. You can define as many individual <mqtt-topic-filter> tags as your use case requires.

region

String

Optional setting that defines the AWS region the extension uses to access AWS Kinesis Data Streams. When unset, the default AWS region selection logic determines the region. If the default region is not found, the selected <mqtt-to-kinesis-route> does not start and HiveMQ logs an error message.

processor

Complex

The method HiveMQ uses to forward MQTT messages to Amazon Kinesis Data Streams received in the selected <mqtt-to-kinesis-route>.

  • mapping: The routing information for MQTT messages received on the configured <mqtt-topic-filters> of the <mqtt-to-kinesis-route> to Amazon Kinesis.

  • transformer: The transformer invoked for MQTT messages received on the configured <mqtt-topic-filters> of the <mqtt-to-kinesis-route> to Amazon Kinesis.

Processors in MQTT to Amazon Kinesis Routes

In an MQTT to Kinesis route, the <processor> section defines which method HiveMQ uses to transfer MQTT messages to AWS Kinesis in the selected route.

Mapping Processor in MQTT to Amazon Kinesis Routes

The <kinesis-streams> section of the <mapping> processor configuration in your <mqtt-to-kinesis-route> lists the destination Amazon Kinesis data streams to which configured MQTT messages are routed.

You can define as many destination <kinesis-stream> tags as your use case requires.

Example Mqtt to Kinesis Mapping Processor
<mapping>
    <kinesis-streams>
        <kinesis-stream>
            <name>my-kinesis-stream-out</name>
            <partition-key>
                <mqtt-topic/>
            </partition-key>
            <explicit-hash-key>
                <random/>
            </explicit-hash-key>
        </kinesis-stream>
    </kinesis-streams>
</mapping>
Table 3. Mapping processor parameters
Parameter Required Type Description

kinesis-stream

Complex

The destination data stream in Amazon Kinesis to which MQTT messages are routed. Each <mqtt-to-kinesis-route> must contain at least one destination <kinesis-stream>.

Table 4. Stream parameters
Parameter Required Type Description

name

String

The name of the destination data stream in Amazon Kinesis Data Streams.

partition-key

Complex

Determines the shard in the destination Amazon Kinesis data stream to which the MQTT message is sent.
NOTE: If you configure an <explicit-hash-key> and a <partition-key>, the <explicit-hash-key> determines the destination shard.

  • <random>: Sets a randomly generated partition key for each MQTT message. This setting distributes messages across all available shards. For typical use cases, we recommend the <random> setting for best performance and throughput.

  • <mqtt-topic>: Sets the topic of the MQTT message as the partition key. This setting sorts MQTT messages with the same topic into the same shard. Due to Kinesis limits, the topic string is truncated after 256 bytes.

  • <fixed>: Sets the same partition key for all MQTT messages. This setting sorts all MQTT messages into the same shard.

NOTE: When you configure the <partition-key> with <mqtt-topic> or <fixed>, Kinesis attempts to preserve message ordering in the destination shard. However, errors, resends, and load situations that trigger dynamic resharding of the AWS Kinesis Data Stream can impact message ordering in destination shards.

explicit-hash-key

Complex

Explicitly defines the shard in the destination Amazon Kinesis Data Stream to which the MQTT message is sent. This setting overrides the <partition-key> shard assignment. When an explicit-hash-key is set, the <partition-key> can be used to transfer metadata such as the topic the MQTT message is published to.

  • <random>: Sets a randomly generated hash key for each MQTT message. Use this setting to distribute messages across all available shards. For typical use cases, we recommend the <random> setting for best performance.

  • <fixed>: Sets the same hash key for all MQTT messages. Use this setting to sort all MQTT messages into the same shard. The hash key must be a natural number >= 0 and < 2^128.

TIP: To transfer the source MQTT topic to the consumer and ensure efficient performance, you can combine a <partition-key> <mqtt-topic> configuration with an <explicit-hash-key> <random> setting.

Transformer Processor in MQTT to Amazon Kinesis Routes

The customizable <transformer> processor gives you fine-grained control over the routing and transformation of MQTT message to Amazon Kinesis. The <transformer> processor is implemented with the HiveMQ Enterprise Extension for Amazon Kinesis Customization SDK. The customization SDK provides a flexible API with the ability to programmatically specify the custom handling of message transformations.

Example Mqtt to Kinesis Transformer Processor
<transformer>
    <implementation>fully.qualified.classname.to.YourTransformer</implementation>
    <custom-settings>
        <custom-setting>
            <name>your-setting-name</name>
            <value>your-setting-value-01</value>
        </custom-setting>
        <custom-setting>
            <name>your-setting-name</name>
            <value>your-setting-value-02</value>
        </custom-setting>
    </custom-settings>
</transformer>
Table 5. Transformer processor parameters
Parameter Required Type Description

implementation

String

The fully qualified class name of the transformer that is used.

custom-settings

Complex

A list of custom settings that are available as an input to the transformer.

  • custom-setting: The name and value pair of a custom attribute the transformer implements. The name does not need to be unique. You can configure as many custom-setting tags as your use case requires.

Amazon Kinesis to MQTT Routes

The <kinesis-to-mqtt-routes> section of your extension configuration defines the Amazon Kinesis data streams from which Kinesis records are sent to the one or more topics on your HiveMQ broker.

All Amazon Kinesis extensions of your HiveMQ cluster nodes with the same <consumer-application-name> work together to pull messages from your Kinesis data stream.

The extension utilizes the Amazon Kinesis Consumer Library (KCL) to consume Kinesis records. The Kinesis Consumer Library internally uses the AWS DynamoDB service to share the consumer application state between multiple HiveMQ nodes.

Every <kinesis-stream> represents an Amazon Kinesis Data Streams consumer application (using the KCL) and creates a related DynamoDB table that uses the <consumer-application-name> as table name. These tables require operational awareness and introduce additional AWS costs.

Be cautious when changing the correlation between a <consumer-application-name> and a Kinesis data stream. The consumer application state stored in the DynamoDB table is only applicable for the initial stream. If the table is reinitialized, the Kinesis Data Streams messages are re-consumed from the earliest messages in the stream.
Example Kinesis to MQTT Route
<kinesis-to-mqtt-route>
    <id>my-kinesis-to-mqtt-route</id>
    <enabled>true</enabled>
    <aws-credential-profile-id>aws-credential-profile-01</aws-credential-profile-id>
    <region>eu-central-1</region>
    <kinesis-streams>
        <kinesis-stream>
            <name>my-kinesis-stream-in</name>
            <consumer-application-name>my-consumer-application</consumer-application-name>
            <consumer-retrieval-mode>
                <polling/>
            </consumer-retrieval-mode>
        </kinesis-stream>
    </kinesis-streams>
    <processor>
        ...
    </processor>
</kinesis-to-mqtt-route>
Table 6. Amazon Kinesis to MQTT route parameters
Parameter Required Type Description

id

String

The unique identifier of the Amazon Kinesis to MQTT route. This string can only contain alphanumeric characters, dashes, and underscores.

enabled

String

Optional setting that defines whether the selected route is enabled or disabled. The default setting is true.

aws-credential-profile-id

String

Optional setting that identifies the aws-credential-profile from your <aws-credential-profiles> configuration to use for the route. When unset, the default AWS credential retrieval order applies. If the default credentials are not found, the selected <kinesis-to-mqtt-route> continues to retry and HiveMQ logs an error message.

region

String

Optional setting that defines the AWS region the extension uses to access AWS Kinesis Data Streams. When unset, the default AWS region selection logic determines the region. If the default region is not found, the selected <kinesis-to-mqtt-route> continues to retry and HiveMQ logs an error message.

kinesis-streams

Complex

A list of one or more Kinesis streams.

  • kinesis-streams: The Kinesis data streams from which the extension retrieves messages in the selected route using the Kinesis Consumer Library (KCL) as a consumer.

processor

Complex

The method HiveMQ uses to convert Kinesis records into MQTT publishes in the defined <kinesis-streams>.

  • mapping: The mapping rules for Kinesis records received on the configured <kinesis-streams> of the <kinesis-to-mqtt-route>.

  • transformer: The transformer invoked for Kinesis records received on the configured <kinesis-streams> of the <kinesis-to-mqtt-route>.

Kinesis Streams

In a Kinesis to MQTT route, the <kinesis-streams> section defines how data records are retrieved and processed from the configured Amazon Kinesis data streams.

You can define as many individual <kinesis-stream> tags as your use case requires.

Table 7. Amazon Kinesis stream parameters
Parameter Required Type Description

name

String

The name of the Amazon Kinesis data stream source.

consumer-application-name

String

The unique name of the consumer application that the Kinesis Consumer Library (KCL) uses to enable the consumption of data from the stream. For more information, see Kinesis Data Streams Terminology. This name is used to create a DynamoDB table and the name of the enhanced-fan-out consumer.

consumer-retrieval-mode

Complex

The way the Kinesis consumer collects data from the Amazon Kinesis Data Streams.

  • <polling>: The consumer continuously polls the selected source stream for data records. For more information, see Developing Custom Consumers.

  • <enhanced-fan-out>: Kinesis Data Streams pushes data records from the stream to the selected consumers that use the AWS enhanced fan-out feature. For more information, see Enhanced Fan-Out.
    TIP: The dedicated throughput that the AWS enhanced fan-out feature provides generates additional AWS costs.

Processors in Amazon Kinesis to MQTT Routes

In a Kinesis to MQTT route, the <processor> section defines which method HiveMQ uses to convert MQTT messages from AWS Kinesis in the selected route.

Mapping Processor in Amazon Kinesis to MQTT Routes

The <mqtt-topics> section of the <mapping> processor configuration in your <kinesis-to-mqtt-route> lists the destination MQTT topics to which configured Amazon Kinesis Data Streams records are routed.

You can define as many destination <mqtt-topic> tags as your use case requires.

Example Kinesis To MQTT Mapping Processor
<mapping>
    <mqtt-topics>
        <mqtt-topic>mqtt/topic/out</mqtt-topic>
    </mqtt-topics>
</mapping>
Table 8. Mapping processor parameters
Parameter Required Type Description

mqtt-topic

String

The destination MQTT topic to which Amazon Kinesis records are routed. Each <kinesis-to-mqtt> must contain at least one destination <mqtt-topic>.

mqtt-publish-fields

Complex

Optional metadata that is defined for the MQTT publish message. For more information, see MQTT Publish Fields for Amazon Kinesis to MQTT Mapping.

MQTT Publish Fields for Amazon Kinesis to MQTT Mapping

In the <mqtt-publish-fields> of your Amazon Kinesis to MQTT mapping configuration, you can specify the metadata that outgoing MQTT publish messages in the <kinesis-to-mqtt> mapping contain.

Example MQTT publish fields configuration
<mapping>
    <mqtt-topics>
        <mqtt-topic>mqtt/topic/out</mqtt-topic>
    </mqtt-topics>
    <mqtt-publish-fields>
        <retained-flag>true</retained-flag>
        <payload-format-indicator>UTF_8</payload-format-indicator>
        <message-expiry-interval>3600</message-expiry-interval>
        <response-topic>response/topic</response-topic>
        <correlation-data>correlation/data</correlation-data>
        <content-type>xml</content-type>
        <qos>2</qos>
        <user-properties>
            <user-property>
                <name>prop1</name>
                <value>val1</value>
            </user-property>
        </user-properties>
    </mqtt-publish-fields>
</mapping>
Parameter Default Required Type Description

retained-flag

false

Boolean

Defines whether all MQTT messages from the corresponding mapping are retained. Possible values are true or false. If set to true and retain is disabled by HiveMQ, messages are not retained.

payload-format-indicator

String

Sets the format of the payload of the corresponding Amazon Kinesis to MQTT mapping. Possible values are UTF-8 or UNSPECIFIED.

message-expiry-interval

Integer

Sets the lifetime in seconds of all MQTT messages from the corresponding Amazon Kinesis to MQTT mapping. The message expiry timer starts at the moment that the MQTT message is published. The value must be positive.

response-topic

String

Sets the response topic for all MQTT messages from the corresponding Amazon Kinesis to MQTT mapping.

correlation-data

String

Sets the data used to match response requests to the correct response messages for all MQTT messages from the corresponding Amazon Kinesis to MQTT mapping. The correlation data must be a base64 encoded.

user-properties

Complex

Sets multiple <user-properties> with <name> and <value> pairs for the MQTT messages.

content-type

String

Specifies the message payload content of all MQTT messages from the corresponding Amazon Kinesis to MQTT mapping in a UTF-8 encoded string.

qos

1

Integer

Sets the Quality of Service (QoS) for all MQTT messages from the corresponding Amazon Kinesis to MQTT mapping. Possible values are 0 at most once, 1 at least once, and 2 exactly once.

Each MQTT publish field can contain only one variable.
Transformer Processor in Amazon Kinesis to MQTT Routes

The customizable <transformer> processor gives you fine-grained control over the routing and transformation of Amazon Kinesis records to MQTT. The <transformer> processor is implemented with the HiveMQ Enterprise Extension for Amazon Kinesis Customization SDK. The customization SDK provides a flexible API with the ability to programmatically specify the custom handling of message transformations.

Example Kinesis to MQTT Transformer Processor
<transformer>
    <implementation>fully.qualified.classname.to.YourTransformer</implementation>
    <custom-settings>
        <custom-setting>
            <name>your-setting-name</name>
            <value>your-setting-value-01</value>
        </custom-setting>
        <custom-setting>
            <name>your-setting-name</name>
            <value>your-setting-value-02</value>
        </custom-setting>
    </custom-settings>
</transformer>
Table 9. Transformer processor parameters
Parameter Required Type Description

implementation

String

The fully qualified class name of the transformer that is used.

custom-settings

Complex

A list of custom settings that are available as an input of the transformer.

  • custom-setting: The name and value pair of a custom attribute the transformer implements. The name does not need to be unique. You can configure as many custom-setting tags as your use case requires.

Metrics

The HiveMQ Enterprise Extension for Amazon Kinesis provides several metrics that can be monitored to observe how the extension behaves over time.

All HiveMQ Enterprise Extensions for Amazon Kinesis metrics start with the prefix com.hivemq.extensions.amazon-kinesis.
The following table lists each metric the extension exposes.
For more information on HiveMQ metrics, see metric types.

Table 10. HiveMQ Enterprise Extension for Amazon Kinesis metrics
Name Type Description

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.mapping.count.current

Gauge

The current number of MQTT to Amazon Kinesis mappings.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.transformer.count.current

Gauge

The current number of MQTT to Amazon Kinesis transformers.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.consumer.count.current

Gauge

The current number of MQTT to Amazon Kinesis consumers.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.in-progress.count.current

Gauge

The current number of MQTT to Amazon Kinesis messages that MQTT to Amazon Kinesis consumers process.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.total.dropped.count

Counter

The total number of messages the extension drops from MQTT to Amazon Kinesis.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.total.failed.count

Counter

The total number of MQTT to Amazon Kinesis messages the extension cannot successfully forward.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.total.ignored.count

Counter

The total number of MQTT to Amazon Kinesis messages the extension ignores.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.total.resent.count

Counter

The total number of MQTT to Amazon Kinesis messages the extension resends.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.total.sent.count

Counter

The total number of MQTT to Amazon Kinesis messages the extension sends.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.total.success.count

Counter

The total number of MQTT to Amazon Kinesis messages that the extension successfully forwards.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.[route-id].latency

Timer

The length of time route [route-id] requires to map MQTT to Amazon Kinesis messages.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.[route-id].dropped.count

Counter

The number of Amazon Kinesis to MQTT messages route [route-id] drops.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.[route-id].failed.count

Counter

The number of MQTT to Amazon Kinesis messages route [route-id] cannot successfully forward.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.[route-id].ignored.count

Counter

The number of MQTT to Amazon Kinesis messages route [route-id] ignored.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.[route-id].resent.count

Counter

The number of MQTT to Amazon Kinesis messages route [route-id] resends.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.[route-id].sent.count

Counter

The number of MQTT to Amazon Kinesis messages route [route-id] sends.

com.hivemq.extensions.amazon-kinesis.mqtt-to-kinesis.[route-id].success.count

Counter

The number of MQTT to Amazon Kinesis messages route [route-id] successfully forwards.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.mapping.count.current

Gauge

The current number of Amazon Kinesis to MQTT mappings.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.transformer.count.current

Gauge

The current number of Amazon Kinesis to MQTT transformers.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.consumer.count.current

Gauge

The current number of Amazon Kinesis to MQTT messages consumers.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.in-progress.count.current

Gauge

The current number of Amazon Kinesis to MQTT messages that MQTT to Amazon Kinesis consumers process.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.total.dropped.count

Counter

The total number of messages the extension drops from Amazon Kinesis to MQTT.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.total.failed.count

Counter

The total number of Amazon Kinesis to MQTT messages the extension cannot forward successfully.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.total.ignored.count

Counter

The total number of Amazon Kinesis to MQTT messages the extension ignores.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.total.rate-limit-exceeded.count

Counter

The total number of Amazon Kinesis to MQTT messages that exceed the configured rate-limit.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.total.received.count

Counter

The total number of Amazon Kinesis to MQTT messages the extension receives.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.total.sent.count

Counter

The total number of Amazon Kinesis to MQTT messages the extension sends.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.total.success.count

Counter

The total number of Amazon Kinesis to MQTT messages the extension successfully forwards.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.[route-id].latency

Timer

The length of time route [route-id] requires to map Amazon Kinesis to MQTT messages.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.[route-id].dropped.count

Counter

The number of Amazon Kinesis to MQTT messages route [route-id] drops.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.[route-id].failed.count

Counter

The number of Amazon Kinesis to MQTT messages route [route-id] cannot successfully forward.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.[route-id].ignored.count

Counter

The number of Amazon Kinesis to MQTT messages route [route-id] ignored.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.[route-id].rate-limit-exceeded.count

Counter

The number of Amazon Kinesis to MQTT messages in route [route-id] that exceed the configured rate-limit.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.[route-id].received.count

Counter

The number of Amazon Kinesis to MQTT messages route [route-id] receives.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.[route-id].sent.count

Counter

The number of Amazon Kinesis to MQTT messages route [route-id] sends.

com.hivemq.extensions.amazon-kinesis.kinesis-to-mqtt.[route-id].success.count

Counter

The number of Amazon Kinesis to MQTT messages route [route-id] successfully forwards.