HiveMQ Enterprise Extension for Amazon Kinesis

Amazon Kinesis is a suite of fully-managed Amazon Web Services (AWS) designed to ingest, process, and analyze large-scale data streams in real-time. As part of the Kinesis suite, Amazon Kinesis Data Streams offers the ability to continuously capture enormous amounts of data from various sources.

The HiveMQ Enterprise Extension for Amazon Kinesis makes it easy to move MQTT device data from your HiveMQ broker directly to the Amazon Kinesis Data Streams service. Our native integration with Amazon Kinesis Data Streams is a fast, cost-effective way to efficiently stream MQTT data from all IoT devices connected to your HiveMQ broker.

The data you forward from HiveMQ to Amazon Kinesis Data Streams is immediately available for continuous stream processing and can be used to power live dashboards, generate real-time metrics and reporting, or deliver data into data stores and databases.

HiveMQ Enterprise Extension for Amazon Kinesis

Features

  • Forward MQTT messages from IoT devices to one or more Amazon Kinesis Data Streams via your HiveMQ broker.

  • Flexible extension configuration to leverage Amazon Kinesis Data Streams and unlock the use of further AWS data processing services for your MQTT data.

Requirements

If you do not provide a valid license, HiveMQ automatically uses a free trial license. Trial licenses for HiveMQ Enterprise Extensions are valid for 5 hours. For more license information or to request an extended evaluation license, contact HiveMQ sales.

Installation

  1. Place your HiveMQ Enterprise Extension for Amazon Kinesis license file (.elic) in the license folder of your HiveMQ installation. (Skip this step if you are using a trial version of the extension).

    All HiveMQ Enterprise Extensions are preinstalled in your HiveMQ release bundle and disabled by default
    └─ <HiveMQ folder>
        ├─ bin
        ├─ config
        ├─ data
        ├─ extensions
        │   ├─ hivemq-amazon-kinesis-extension
        │   └─ ...
        ├─ license
        ├─ log
        └─ ...
  2. Before you enable the extension, you need to configure the extension to match your individual Amazon Kinesis Data Streams setup.
    For your convenience, we provide an amazon-kinesis-configuration-example.xml configuration file that you can copy and modify as desired.
    The included amazon-kinesis-configuration.xsd file outlines the schema and elements that can be used in the XML configuration.
    Your completed configuration file must be named amazon-kinesis-configuration.xml.
    For detailed information on configuration options, see Configuration.

  3. To enable the HiveMQ Enterprise Extension for Amazon Kinesis, locate the hivemq-amazon-kinesis-extension folder in the extensions directory of your HiveMQ installation and remove the DISABLED file (if present).

To function properly, the HiveMQ Enterprise Extension for Amazon Kinesis must be installed on all HiveMQ broker nodes in your HiveMQ cluster and the configuration file on each node must be identical.

Configuration

The HiveMQ Enterprise Extension for Amazon Kinesis supports hot reload of the extension configuration. Changes that you make to the configuration of the extension are updated while the extension is running, with no need to restart. When the extension recognizes a valid configuration has been loaded, the previous configuration file is automatically archived in the config-archive of the extension home folder.

If you load an invalid configuration at runtime and a previous valid configuration exists in the archive, HiveMQ uses the previous configuration.

The extension configuration is divided into two sections:

  • AWS Credential Profiles: Provides information about the credential profiles used to interact with Amazon Web Services.

  • MQTT to Kinesis Routes: Defines how MQTT messages are sent from your HiveMQ broker to Amazon Kinesis Data Streams

Extension Configuration File

The amazon-kinesis-configuration.xml file is located in the hivemq-amazon-kinesis-extension folder within the extensions folder of your HiveMQ installation.

The extension uses a simple but powerful XML-based configuration.

The amazon-kinesis-configuration-example.xml file contains a basic configuration example that has all the parameters you need to send MQTT messages from your HiveMQ MQTT broker to Amazon Kinesis Data Streams.

If you copy and reuse the amazon-kinesis-configuration-example.xml file, be sure to rename the file amazon-kinesis-configuration.xml before you enable your extension. For more information, see Installation.
Example Amazon Kinesis Data Streams extension configuration
<hivemq-amazon-kinesis-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                                 xsi:noNamespaceSchemaLocation="amazon-kinesis-configuration.xsd">
    <aws-credential-profiles>
        <aws-credential-profile>
            <id>aws-credential-profile-01</id>
            <profile-file>/opt/hivemq/extensions/hivemq-amazon-kinesis-extension/aws-credentials</profile-file>
        </aws-credential-profile>
    </aws-credential-profiles>

    <mqtt-to-kinesis-routes>
        <mqtt-to-kinesis-route>
            <id>my-mqtt-to-kinesis-route</id>
            <enabled>true</enabled>
            <aws-credential-profile-id>aws-credential-profile-01</aws-credential-profile-id>
            <region>eu-central-1</region>
            <mqtt-topic-filters>
                <mqtt-topic-filter>mqtt/topic/a</mqtt-topic-filter>
            </mqtt-topic-filters>
            <processor>
                <mapping>
                    <kinesis-streams>
                        <kinesis-stream>
                            <name>my-kinesis-stream</name>
                            <partition-key>
                                <mqtt-topic/>
                            </partition-key>
                            <explicit-hash-key>
                                <random/>
                            </explicit-hash-key>
                        </kinesis-stream>
                    </kinesis-streams>
                </mapping>
            </processor>
        </mqtt-to-kinesis-route>
    </mqtt-to-kinesis-routes>
</hivemq-amazon-kinesis-extension>

AWS Credential Profiles

To successfully route MQTT messages to Amazon Kinesis Data Streams, your HiveMQ extension must provide AWS security credentials to verify your identity and access permissions.

When you set up your AWS IAM security credential on the AWS management console, verify that your setup includes permission for the kinesis:PutRecords action. For more information, see Controlling Access to Amazon Kinesis Data Streams Resources Using IAM.

Example AWS IAM permission for kinesis:PutRecords
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": "kinesis:PutRecords",
                "Resource": "my-kinesis-stream"
            }
        ]
    }
The AWS Identity and Access Management (IAM) service helps to securely control access to AWS Kinesis Data Streams and other AWS resources. In the AWS management console, you can create users and assign user permissions. To access resources on AWS, you create security credentials in the AWS management console and save the credentials for use in the extension profile file. For more information, see IAM access policies.

The aws-credential-profiles section of the extension configuration defines one or more sets of security credentials for your connections to AWS. If desired, use this section to override the default AWS credential provider chain.

Example AWS credential profile configuration
    <aws-credential-profiles>
        <aws-credential-profile>
            <id>aws-credential-profile-01</id>
            <profile-file>/opt/hivemq/extensions/hivemq-amazon-kinesis-extension/aws-credentials</profile-file>
            <profile-name>your-profile-name</profile-name>
        </aws-credential-profile>
    </aws-credential-profiles>
Each <aws-credential-profile> tag contains the file path to the credentials file that stores the associated security credentials. If your credentials file holds more than one set of credentials, use the optional <profile-name> tag to specify the credentials set that you want to use.

You can define as many <aws-credential-profile> tags as your use case requires.

Table 1. AWS credential profile parameters
Parameter Required Type Description

id

String

The unique identifier of the AWS credential profile. This string can only contain alphanumeric characters, dashes, and underscores.

profile-file

String

Optional setting that provides the path to a file that contains AWS profile credentials. If unset, information is taken from the default AWS credentials location.

profile-name

String

Optional setting to select a specific profile in the defined <profile-file> tag. If unset, the profile name default is used.

MQTT to Kinesis Routes

The <mqtt-to-kinesis-routes> section of your extension configuration defines how MQTT messages are sent from the HiveMQ broker to Amazon Kinesis Data Streams.

Currently, the HiveMQ Enterprise Extension for Amazon Kinesis supports <mqtt-to-kinesis-routes> only.

You can define as many individual <mqtt-to-kinesis-route> tags as your use case requires.

Table 2. MQTT to Kinesis route parameters
Parameter Required Type Description

id

String

The unique identifier of the MQTT to Amazon Kinesis route. This string can only contain alphanumeric characters, dashes, and underscores.

enabled

String

Optional setting that defines whether the selected route is enabled or disabled. The default setting is true.

aws-credential-profile-id

String

Optional setting that identifies the aws-credential-profile from your <aws-credential-profiles> configuration to use for the route. When unset, standard AWS credential retrieval order applies. If the credentials are not found, the selected <mqtt-to-kinesis-route> does not start and HiveMQ logs an error message.

mqtt-topic-filters

String

A list of one or more MQTT topic filters.

  • mqtt-topic-filter: The source MQTT topic from which MQTT messages are routed to Amazon Kinesis Data Streams. You can define as many individual <mqtt-topic-filter> tags as your use case requires.

region

String

Optional setting that defines the AWS region the extension uses to access the AWS Kinesis Data Streams. When unset, standard AWS region selection logic determines the region. If the region is not found, the selected <mqtt-to-kinesis-route> does not start and HiveMQ logs an error message.

processor

Complex

The method HiveMQ uses to forward MQTT messages to Amazon Kinesis Data Streams received in the selected <mqtt-to-kinesis-route>.

  • mapping: The routing information for MQTT messages received on the configured <mqtt-topic-filters> of the <mqtt-to-kinesis-route> to Amazon Kinesis.

Processors

The <processor> section defines which method HiveMQ uses to transfer MQTT messages to AWS Kinesis in the selected route.

Currently, the HiveMQ Enterprise Extension for Amazon Kinesis provides a <mapping> processor only.
Mapping Processor

The <kinesis-streams> section of your <mapping> processor configuration lists one or more destination data streams in Amazon Kinesis to which configured MQTT messages are routed.

You can define as many destination <kinesis-stream> tags as your use case requires.

Table 3. Mapping processor parameters
Parameter Required Type Description

kinesis-stream

Complex

The destination data stream in Amazon Kinesis to which MQTT messages are routed. Each <mqtt-to-kinesis-route> must contain at least one destination <kinesis-stream>.

Table 4. Stream parameters
Parameter Required Type Description

name

String

The name of the destination data stream in Amazon Kinesis Data Streams.

partition-key

Complex

Determines the shard in the destination Amazon Kinesis data stream to which the MQTT message is sent.
NOTE: If you configure an <explicit-hash-key> and a <partition-key> , the <explicit-hash-key> determines the destination shard.

  • <random>: Sets a randomly generated partition key for each MQTT message. This setting distributes messages across all available shards. For typical use cases, we recommend the <random> setting for best performance and throughput.

  • <mqtt-topic>: Sets the topic of the MQTT message as the partition key. This setting sorts MQTT messages with the same topic into the same shard. Due to Kinesis limits, the topic string is truncated after 256 bytes.

  • <fixed>: Sets the same partition key for all MQTT messages. This setting sorts all MQTT messages into the same shard.

NOTE: When you configure the <partition-key> with <mqtt-topic> or <fixed>, Kinesis attempts to preserve message ordering in the destination shard. However, errors, resends, and load situations that trigger dynamic resharding of the AWS Kinesis Data Stream can impact message ordering in destination shards.

explicit-hash-key

Complex

Explicitly defines the shard in the destination Amazon Kinesis Data Stream to which the MQTT message is sent. This setting overrides the <partition-key> shard assignment. When an explicit-hash-key is set, the <partition-key> can be used to transfer metadata such as the topic the MQTT message is published to.

  • <random>: Sets a randomly generated hash key for each MQTT message. Use this setting to distribute messages across all available shards. For typical use cases, we recommend the <random> setting for best performance.

  • <fixed>: Sets the same hash key for all MQTT messages. Use this setting to sort all MQTT messages into the same shard. The hash key must be a natural number >= 0 and < 2^128.

TIP: To transfer the source MQTT topic to the consumer and ensure efficient performance, you can combine a <partition-key> <mqtt-topic> configuration with a <explicit-hash-key> <random> setting.

Metrics

The HiveMQ Enterprise Extension for Amazon Kinesis provides several metrics that can be monitored to observe how the extension behaves over time.

All HiveMQ Enterprise Extensions for Amazon Kinesis metrics start with the prefix com.hivemq.extensions.amazon-kinesis.
The following table lists each metric the extension exposes.
For increased readability, the com.hivemq.extensions.amazon-kinesis. prefix is omitted in the tables.
For more information on HiveMQ metrics, see metric types.

Table 5. HiveMQ Enterprise Extension for Amazon Kinesis metrics
Name Type Description

mqtt-to-kinesis.mapping.count.current

Counter

The current number of MQTT to Amazon Kinesis mappings.

mqtt-to-kinesis.consumer.count.current

Counter

The current number of MQTT to Amazon Kinesis consumers.

mqtt-to-kinesis.in-progress.count.current

Counter

The current number of MQTT to Amazon Kinesis messages that MQTT to Amazon Kinesis consumers process.

mqtt-to-kinesis.total.success.count

Counter

The total number of MQTT to Amazon Kinesis messages from all transformers and mappings that the extension successfully forwards.

mqtt-to-kinesis.total.failed.count

Counter

The total number of MQTT to Amazon Kinesis messages the extension cannot successfully forward.

mqtt-to-kinesis.total.sent.count

Counter

The total number of MQTT to Amazon Kinesis messages the extension sends.

mqtt-to-kinesis.total.resent.count

Counter

The total number of MQTT to Amazon Kinesis messages the extension resends.

mqtt-to-kinesis.total.ignored.count

Counter

The total number of MQTT to Amazon Kinesis messages the extension ignores.

mqtt-to-kinesis.total.dropped.count

Counter

The total number of messages the extension drops from MQTT to Amazon Kinesis.

mqtt-to-kinesis.[route-id].latency

Timer

The length of time route [route-id] requires to map MQTT to Amazon Kinesis messages.

mqtt-to-kinesis.[route-id].success.count

Counter

The number of MQTT to Amazon Kinesis messages route [route-id] successfully forwards.

mqtt-to-kinesis.[route-id].failed.count

Counter

The number of MQTT to Amazon Kinesis messages route [route-id] cannot successfully forward.

mqtt-to-kinesis.[route-id].sent.count

Counter

The number of MQTT to Amazon Kinesis messages route [route-id] sends.

mqtt-to-kinesis.[route-id].resent.count

Counter

The number of MQTT to Amazon Kinesis messages route [route-id] resends.

mqtt-to-kinesis.[route-id].ignored.count

Counter

The number of MQTT to Amazon Kinesis messages route [route-id] ignored.

mqtt-to-kinesis.[route-id].dropped.count

Counter

The number of Amazon Kinesis to MQTT messages route [route-id] drops.