HiveMQ Enterprise Data Lake Extension

Data lakes are centralized repositories that allow organizations to store vast amounts of raw and processed data in its native format. This type of storage system can handle large volumes of structured, semi-structured, and unstructured data. The flexibility to store such a wide range of data types is a key feature of data lakes. Unlike traditional relational databases and data warehouses that typically require data to be structured and processed before storage, data lakes let you store data without extensive pre-processing or data transformation.

The HiveMQ Enterprise Data Lake Extension makes it possible to forward MQTT messages directly to your data lake without the need for additional infrastructure.

Features

  • Convert MQTT messages into Parquet table rows with column mappings.

  • Forward MQTT messages from IoT devices to one or more Amazon S3 buckets via your HiveMQ broker.

  • Forward MQTT messages from IoT devices to one or more Azure Blob Storage containers via your HiveMQ broker.

The HiveMQ Enterprise Data Lake Extension does not offer message delivery guarantees. MQTT data transferred to the data lake is sent with the equivalent of a QoS 0 guarantee. In the event of network or disk failure, data being transferred may be lost.

Requirements

If you do not provide a valid license, HiveMQ automatically uses a free trial license. Trial licenses for HiveMQ Enterprise Extensions are valid for 5 hours. For more license information or to request an extended evaluation license, contact HiveMQ sales.

Installation

  1. Place your HiveMQ Enterprise Data Lake Extension license file (.elic) in the license folder of your HiveMQ installation. (Skip this step if you are using a trial version of the extension).

    └─ <HiveMQ folder>
        ├─ bin
        ├─ conf
        ├─ data
        ├─ extensions
        │   ├─ hivemq-data-lake-extension
        │   └─ ...
        ├─ license
        ├─ log
        └─ ...
  2. Before you enable the extension, you need to configure the extension to match your individual data lake setup.
    For your convenience, we provide an example configuration conf/examples/config.xml that you can copy and modify as desired.
    The included config.xsd file outlines the schema and elements that can be used in the XML configuration.
    Your completed configuration file must be named config.xml and located in HIVEMQ_HOME/extensions/hivemq-data-lake-extension/conf/config.xml.
    For detailed information on configuration options, see Configuration.

  3. To enable the HiveMQ Enterprise Data Lake Extension, locate the hivemq-data-lake-extension folder in the extensions directory of your HiveMQ installation and remove the DISABLED file (if present).

To function properly, the HiveMQ Enterprise Data Lake Extension must be installed on all HiveMQ broker nodes in your HiveMQ cluster and the configuration file on each node must be identical.

Configuration

The extension configuration depends on the cloud provider you want to use.

Extension Configuration File

The config.xml file for your HiveMQ Enterprise Data Lake Extension must be located in the hivemq-data-lake-extension/conf/ folder within the extensions folder of your HiveMQ installation.

The extension uses a simple but powerful XML-based configuration.

The conf/examples/config.xml file is a configuration example that has all the parameters you need to send MQTT messages from your HiveMQ MQTT broker to your data lake.

If you copy and reuse the conf/examples/config.xml file, be sure to move the file to /conf/config.xml before you enable your extension. For more information, see Installation.

Amazon Web Services S3 configuration

Example data lake connection configuration for Amazon S3
<hivemq-data-lake-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                           xsi:noNamespaceSchemaLocation="config.xsd">

    <aws-credential-profiles>
        <aws-credential-profile>
            <id>my-aws-credential-profile-id</id>
            <profile-name>default</profile-name>
            <profile-file>~/.aws/credentials</profile-file>
        </aws-credential-profile>
    </aws-credential-profiles>

    <mqtt-to-s3-routes>
        <mqtt-to-s3-route>
            <id>my-s3-route</id>
            <enabled>true</enabled>
            <mqtt-topic-filters>
                <mqtt-topic-filter>#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <aws-credential-profile-id>my-aws-credential-profile-id</aws-credential-profile-id>
            <bucket>my-bucket</bucket>
            <region>eu-central-1</region>
            <file-name-template>${VAR:DATE_ISO_8601}/${VAR:NODE_ID}-${VAR:ROUTE_ID}-${VAR:TIMESTAMP_ISO_8601}.${VAR:FILE_EXTENSION}</file-name-template>
            <processor>
                <parquet>
                    <columns>
                        <column>
                            <name>topic</name>
                            <value>mqtt-topic</value>
                        </column>
                        <column>
                            <name>payload</name>
                            <value>mqtt-payload</value>
                        </column>
                    </columns>
                </parquet>
            </processor>
        </mqtt-to-s3-route>
    </mqtt-to-s3-routes>

</hivemq-data-lake-extension>

AWS Credential Profiles

To interact with Amazon S3, your HiveMQ extension must provide AWS security credentials to verify your identity and access permissions.

When you set up your AWS IAM (Identity and Access Management) security credential on the AWS management console, verify that your setup includes the required permissions for the configured routes. For more information, see Controlling Access to S3 Resources Using IAM.

Example AWS IAM policy configuration for an MQTT to S3 route:
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "mqttToS3",
			"Effect": "Allow",
			"Action": [
				"s3:PutObject"
			],
			"Resource": "*"
		}
	]
}
The AWS Identity and Access Management (IAM) service helps to securely control access to Amazon S3 and other AWS resources. In the AWS management console, you can create users and assign user permissions. To access resources on AWS, you create security credentials in the AWS management console and save the credentials for use in the extension profile file. For more information, see IAM access policies.

The aws-credential-profiles section of the extension configuration defines one or more sets of security credentials for your connections to AWS. If desired, use this section to override the default AWS credential provider chain.

Example AWS credential profile configuration
<aws-credential-profiles>
    <aws-credential-profile>
        <id>my-aws-credential-profile-id</id>
        <profile-file>/opt/hivemq/extensions/hivemq-data-lake-extension/aws-credentials</profile-file>
        <profile-name>my-profile-name</profile-name>
    </aws-credential-profile>
</aws-credential-profiles>
Each <aws-credential-profile> tag contains the file path to the credentials file that stores the associated security credentials. If your credentials file holds more than one set of credentials, use the optional <profile-name> tag to specify the credentials set that you want to use.

You can define as many <aws-credential-profile> tags as your use case requires.

Table 1. AWS credential profile parameters
Parameter Required Type Description

id

ID

The unique identifier of the AWS credential profile. This string can only contain lowercase alphanumeric characters, dashes, and underscores.

profile-file

String

Optional setting that provides the path to a file that contains AWS profile credentials. If unset, information is taken from the default AWS credentials location.

profile-name

String

Optional setting to select a specific profile in the defined <profile-file> tag. If unset, the profile name default is used.

MQTT to S3 Routes

The <mqtt-to-s3-routes> section of your extension configuration defines how MQTT messages are sent from the HiveMQ broker to Amazon S3 buckets.

You can define as many <mqtt-to-s3-route> tags as your use case requires.

Example MQTT to S3 route
<mqtt-to-s3-route>
    <id>my-s3-route</id>
    <enabled>true</enabled>
    <mqtt-topic-filters>
        <mqtt-topic-filter>#</mqtt-topic-filter>
    </mqtt-topic-filters>
    <aws-credential-profile-id>my-aws-credential-profile-id</aws-credential-profile-id>
    <bucket>my-bucket</bucket>
    <region>eu-central-1</region>
    <file-name-template>${VAR:DATE_ISO_8601}/${VAR:NODE_ID}-${VAR:ROUTE_ID}-${VAR:TIMESTAMP_ISO_8601}.${VAR:FILE_EXTENSION}</file-name-template>
    <processor>
        <parquet>
            <columns>
                <column>
                    <name>topic</name>
                    <value>mqtt-topic</value>
                </column>
                <column>
                    <name>payload</name>
                    <value>mqtt-payload</value>
                </column>
            </columns>
        </parquet>
    </processor>
</mqtt-to-s3-route>
Table 2. S3 parameters
Parameter Required Type Description

id

ID

The unique identifier of the mqtt-to-s3-route. This string can only contain lowercase alphanumeric characters, dashes, and underscores.

enabled

Boolean

Optional setting that defines whether the selected mqtt-to-s3-route is enabled or disabled. The default setting is true. To disable the route, set to false.

mqtt-topic-filters

Complex

A list of one or more MQTT topic filters to apply on this route.

  • mqtt-topic-filter: The topic filter to apply.

aws-credential-profile-id

IDREF

Optional setting that identifies the aws-credential-profile from your <aws-credential-profiles> configuration to use for the route. When unset, the default AWS credential retrieval order applies. If the default credentials are not found, the selected <mqtt-to-s3-route> does not start and HiveMQ logs an error message.

bucket

String

The name of the Amazon S3 bucket.

region

String

Optional setting that defines the AWS region the extension uses to access S3. When unset, the default AWS region selection logic determines the region. If the default region is not found, the selected <mqtt-to-s3-route> does not start and HiveMQ logs an error message.

file-name-template

String

Optional setting to configure the naming of the files and folders in the Amazon S3 bucket. Use forward slashes to organize files in your Amazon S3 folders. Defaults to ${VAR:DATE_ISO_8601}/${VAR:NODE_ID}-${VAR:ROUTE_ID}-${VAR:TIMESTAMP_ISO_8601}.${VAR:FILE_EXTENSION}. You can use ${VAR:+PLACEHOLDER_NAME+} to refer to the following placeholders:

  • TIMESTAMP_ISO_8601: The instant the selected file was created in ISO 8601 format.

  • DATE_ISO_8601: The date the selected file was created in ISO 8601 format.

  • YEAR: The year the selected file was created as a number.

  • MONTH: The month the selected file was created as a number.

  • DAY: The day the selected file was created as a number.

  • HOUR: The hour the selected file was created as a number.

  • MINUTE: The minute the selected file was created as a number.

  • SECOND: The second the selected file was created as a number.

  • MILLISECOND: The millisecond the selected file was created as a number.

  • NODE_ID: The identifier of the HiveMQ broker node on which the extension runs.

  • ROUTE_ID: The identifier of the mqtt-to-azure-blob-storage-route.

  • FILE_EXTENSION: The file suffix that identifies the format of the file. Currently, only parquet is supported.

processor

Complex

Defines the format HiveMQ uses to transfer MQTT messages to the Amazon S3 bucket in the selected route.

  • parquet: Configures how the extension inserts rows into parquet files. For an example configuration, see Parquet processor.

    • columns: A list of the columns where values are inserted.

      • column: A column name and value binding.

        • name: The name of the column where the value is inserted.

        • value: The MQTT property to replace and bind to the selected column.

Azure Blob Storage configuration

Example data lake connection configuration for Azure Blob Storage
<hivemq-data-lake-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                           xsi:noNamespaceSchemaLocation="config.xsd">

    <azure-blob-storage-connections>
        <azure-blob-storage-connection>
            <id>my-abs-connection-id</id>
            <authentication>
                <connection-string>DefaultEndpointsProtocol=https;AccountName=my-account;AccountKey=my-key;EndpointSuffix=core.windows.net</connection-string>
            </authentication>
        </azure-blob-storage-connection>
    </azure-blob-storage-connections>

    <mqtt-to-azure-blob-storage-routes>
        <mqtt-to-azure-blob-storage-route>
            <id>my-abs-route</id>
            <enabled>true</enabled>
            <mqtt-topic-filters>
                <mqtt-topic-filter>#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <azure-blob-storage-connection-id>my-abs-connection-id</azure-blob-storage-connection-id>
            <container>my-container</container>
            <file-name-template>${VAR:DATE_ISO_8601}/${VAR:NODE_ID}-${VAR:ROUTE_ID}-${VAR:TIMESTAMP_ISO_8601}.${VAR:FILE_EXTENSION}</file-name-template>
            <processor>
                <parquet>
                    <columns>
                        <column>
                            <name>topic</name>
                            <value>mqtt-topic</value>
                        </column>
                        <column>
                            <name>payload</name>
                            <value>mqtt-payload</value>
                        </column>
                    </columns>
                </parquet>
            </processor>
        </mqtt-to-azure-blob-storage-route>
    </mqtt-to-azure-blob-storage-routes>

</hivemq-data-lake-extension>

Azure Blob Storage Connections

To interact with Azure Blob Storage, your HiveMQ extension must provide Azure security credentials to verify your identity and access permissions.

The azure-blob-storage-connections section of the extension configuration defines one or more sets of security credentials for your connections to Azure.

Example Azure Blob Storage credential profile configuration
<azure-blob-storage-connections>
    <azure-blob-storage-connection>
        <id>my-abs-connection-id</id>
        <authentication>
            <connection-string>DefaultEndpointsProtocol=https;AccountName=my-account;AccountKey=my-key;EndpointSuffix=core.windows.net</connection-string>
        </authentication>
    </azure-blob-storage-connection>
</azure-blob-storage-connections>

You can define as many <azure-blob-storage-connection> tags as your use case requires.

Table 3. Azure Blob Storage connection parameters
Parameter Required Type Description

id

ID

The unique identifier of the Azure Blob Storage connection. This string can only contain lowercase alphanumeric characters, dashes, and underscores.

authentication

Complex

Defines the authentication mechanism to use to connect to Azure Blob Storage.

  • connection-string: Uses an Azure Blob Storage-provided connection string to authenticate.

MQTT to Azure Blob Storage Routes

The <mqtt-to-azure-blob-storage-routes> section of your extension configuration defines how MQTT messages are sent from the HiveMQ broker to Azure Blob Storage containers.

You can define as many <mqtt-to-azure-blob-storage-route> tags as your use case requires.

Example MQTT to Azure Blob Storage route
<mqtt-to-azure-blob-storage-routes>
    <mqtt-to-azure-blob-storage-route>
        <id>my-abs-route</id>
        <enabled>true</enabled>
        <mqtt-topic-filters>
            <mqtt-topic-filter>#</mqtt-topic-filter>
        </mqtt-topic-filters>
        <azure-blob-storage-connection-id>my-abs-connection-id</azure-blob-storage-connection-id>
        <container>my-container</container>
        <file-name-template>${VAR:DATE_ISO_8601}/${VAR:NODE_ID}-${VAR:ROUTE_ID}-${VAR:TIMESTAMP_ISO_8601}.${VAR:FILE_EXTENSION}</file-name-template>
        <processor>
            <parquet>
                <columns>
                    <column>
                        <name>topic</name>
                        <value>mqtt-topic</value>
                    </column>
                    <column>
                        <name>payload</name>
                        <value>mqtt-payload</value>
                    </column>
                </columns>
            </parquet>
        </processor>
    </mqtt-to-azure-blob-storage-route>
</mqtt-to-azure-blob-storage-routes>
Table 4. Azure Blob Storage parameters
Parameter Required Type Description

id

ID

The unique identifier of the mqtt-to-azure-blob-storage-route. This string can only contain lowercase alphanumeric characters, dashes, and underscores.

enabled

Boolean

Optional setting that defines whether the selected mqtt-to-azure-blob-storage-route is enabled or disabled. The default setting is true. To disable the route, set to false.

mqtt-topic-filters

Complex

A list of one or more MQTT topic filters to apply on this route.

  • mqtt-topic-filter: The topic filter to apply.

azure-blob-storage-connection-id

IDREF

Identifies the azure-blob-storage-connection from your <azure-blob-storage-connections> configuration to use for the route.

container

String

The name of the Azure Blob Storage container.

file-name-template

String

Optional setting to configure the naming of the files and folders in the Azure Blob Storage container. Use forward slashes to organize files in your Azure Blob Storage virtual directories. Defaults to ${VAR:DATE_ISO_8601}/${VAR:NODE_ID}-${VAR:ROUTE_ID}-${VAR:TIMESTAMP_ISO_8601}.${VAR:FILE_EXTENSION}. You can use ${VAR:+PLACEHOLDER_NAME+} to refer to the following placeholders:

  • TIMESTAMP_ISO_8601: The instant the selected file was created in ISO 8601 format.

  • DATE_ISO_8601: The date the selected file was created in ISO 8601 format.

  • YEAR: The year the selected file was created as a number.

  • MONTH: The month the selected file was created as a number.

  • DAY: The day the selected file was created as a number.

  • HOUR: The hour the selected file was created as a number.

  • MINUTE: The minute the selected file was created as a number.

  • SECOND: The second the selected file was created as a number.

  • MILLISECOND: The millisecond the selected file was created as a number.

  • NODE_ID: The identifier of the HiveMQ broker node on which the extension runs.

  • ROUTE_ID: The identifier of the mqtt-to-azure-blob-storage-route.

  • FILE_EXTENSION: The file suffix that identifies the format of the file. Currently, only parquet is supported.

processor

Complex

Defines the format HiveMQ uses to transfer MQTT messages to the Azure Blob Storage container in the selected route.

  • parquet: Configures how the extension inserts rows into parquet files. For an example configuration, see Parquet processor.

    • columns: A list of the columns where values are inserted.

      • column: A column name and value binding.

        • name: The name of the column where the value is inserted.

        • value: The MQTT property to replace and bind to the selected column.

Parquet processor

The Parquet processor helps you convert MQTT messages into Parquet rows. The extension binds the configured values with the respective MQTT PUBLISH properties.

Example parquet configuration
<processor>
    <parquet>
        <columns>
            <column>
                <name>topic</name>
                <value>mqtt-topic</value>
            </column>
            <column>
                <name>payload</name>
                <value>mqtt-payload</value>
            </column>
        </columns>
    </parquet>
</processor>

The following table lists all values the extension recognizes:

Table 5. Available values
Name Type Logical Type Description

mqtt-topic

BYTE_ARRAY

STRING

The topic of the MQTT PUBLISH.

mqtt-payload

BYTE_ARRAY

The payload of the MQTT PUBLISH as raw binary data.

mqtt-payload-utf8

BYTE_ARRAY

STRING

The payload of the MQTT PUBLISH as a UTF-8 string.

mqtt-retain

BOOLEAN

The retain flag of the MQTT PUBLISH.

mqtt-packet-id

INT32

The packet ID of the MQTT PUBLISH.

mqtt-payload-format-indicator

BYTE_ARRAY

STRING

The payload format indicator of the MQTT PUBLISH.

mqtt-response-topic

BYTE_ARRAY

STRING

The response topic of the MQTT PUBLISH.

mqtt-correlation-data

BYTE_ARRAY

The correlation data of the MQTT PUBLISH as raw binary data.

mqtt-correlation-data-utf8

BYTE_ARRAY

STRING

The correlation data of the MQTT PUBLISH as a UTF-8 string.

timestamp

INT_64

TIMESTAMP(isAdjustedToUTC=false, unit=MILLIS)

The arrival timestamp of the PUBLISH message represented as a UNIX timestamp value in milliseconds.

  • The timestamp of an incoming PUBLISH message records the moment the message arrived at the broker.

  • The timestamp of a message created via the Extension SDK records the moment the message passed to the PublishService.

  • The timestamp of a Will Publish message sent to subscribers records the moment the delivery of the message started.

Some properties in an MQTT PUBLISH message are optional. The number of values the Data Lake extension binds varies based on the properties that are present in the MQTT PUBLISH message.

Environment variables

HiveMQ offers placeholders that can be replaced with the content of environment variables when the configuration file is read. For many use cases, it can be beneficial or necessary to use environment variables to configure items such as ports and bind addresses on the system on which you run HiveMQ. For example, when you run HiveMQ in a containerized environment.

You can use ${ENV:+YOUR_ENVVAR_NAME+} in the config.xml file. HiveMQ replaces the placeholder with the value of the specified environment variable during startup.

Example to set an environment variable
export MY_PROFILE_FILE="/path/to/profile/file"
Example use of the environment variable in the configuration
<hivemq-data-lake-extension>

    <aws-credential-profiles>
        <aws-credential-profile>
            <id>my-aws-credential-profile-id</id>
            <profile-name>default</profile-name>
            <profile-file>${ENV:MY_PROFILE_FILE}</profile-file>
        </aws-credential-profile>
    </aws-credential-profiles>

</hivemq-data-lake-extension>
Result of the example configuration in HiveMQ
<hivemq-data-lake-extension>

    <aws-credential-profiles>
        <aws-credential-profile>
            <id>my-aws-credential-profile-id</id>
            <profile-name>default</profile-name>
            <profile-file>/path/to/profile/file</profile-file>
        </aws-credential-profile>
    </aws-credential-profiles>

</hivemq-data-lake-extension>
Make sure that HiveMQ is started in the same context as your environment variables are set, otherwise, HiveMQ will not be able to access them.