HiveMQ Enterprise Data Lake Extension
Data lakes are centralized repositories that allow organizations to store vast amounts of raw and processed data in its native format. This type of storage system can handle large volumes of structured, semi-structured, and unstructured data. The flexibility to store such a wide range of data types is a key feature of data lakes. Unlike traditional relational databases and data warehouses that typically require data to be structured and processed before storage, data lakes let you store data without extensive pre-processing or data transformation.
The HiveMQ Enterprise Data Lake Extension makes it possible to forward MQTT messages directly to your data lake without the need for additional infrastructure.
Features
-
Convert MQTT messages into Parquet table rows with column mappings.
-
Forward MQTT messages from IoT devices to one or more Amazon S3 buckets via your HiveMQ broker.
-
Forward MQTT messages from IoT devices to one or more Azure Blob Storage containers via your HiveMQ broker.
The HiveMQ Enterprise Data Lake Extension does not offer message delivery guarantees. MQTT data transferred to the data lake is sent with the equivalent of a QoS 0 guarantee. In the event of network or disk failure, data being transferred may be lost. |
Requirements
-
A running HiveMQ Professional or Enterprise Edition installation, version 4.23 or higher.
-
An active Amazon Web Services (AWS) or Microsoft Azure account.
-
For production use, a valid HiveMQ Enterprise Data Lake Extension license.
If you do not provide a valid license, HiveMQ automatically uses a free trial license. Trial licenses for HiveMQ Enterprise Extensions are valid for 5 hours. For more license information or to request an extended evaluation license, contact HiveMQ sales. |
Installation
-
Place your HiveMQ Enterprise Data Lake Extension license file (.elic) in the license folder of your HiveMQ installation. (Skip this step if you are using a trial version of the extension).
└─ <HiveMQ folder> ├─ bin ├─ conf ├─ data ├─ extensions │ ├─ hivemq-data-lake-extension │ └─ ... ├─ license ├─ log └─ ...
-
Before you enable the extension, you need to configure the extension to match your individual data lake setup.
For your convenience, we provide an example configurationconf/examples/config.xml
that you can copy and modify as desired.
The includedconfig.xsd
file outlines the schema and elements that can be used in the XML configuration.
Your completed configuration file must be namedconfig.xml
and located inHIVEMQ_HOME/extensions/hivemq-data-lake-extension/conf/config.xml
.
For detailed information on configuration options, see Configuration. -
To enable the HiveMQ Enterprise Data Lake Extension, locate the
hivemq-data-lake-extension
folder in theextensions
directory of your HiveMQ installation and remove theDISABLED
file (if present).
To function properly, the HiveMQ Enterprise Data Lake Extension must be installed on all HiveMQ broker nodes in your HiveMQ cluster and the configuration file on each node must be identical. |
Configuration
The extension configuration depends on the cloud provider you want to use.
-
-
AWS Credential Profiles: Provides information about the credential profiles used to interact with AWS.
-
MQTT to S3 Routes: Defines how MQTT messages are sent from your HiveMQ broker to the configured Amazon S3 bucket.
-
-
-
Azure Blob Storage Connections: Provides information about the authentication method used to interact with Azure.
-
MQTT to Azure Blob Storage Routes: Defines how MQTT messages are sent from your HiveMQ broker to the configured Azure Blob Storage container.
-
Extension Configuration File
The config.xml
file for your HiveMQ Enterprise Data Lake Extension must be located in the hivemq-data-lake-extension/conf/
folder within the extensions
folder of your HiveMQ installation.
The extension uses a simple but powerful XML-based configuration.
The conf/examples/config.xml
file is a configuration example that has all the parameters you need to send MQTT messages from your HiveMQ MQTT broker to your data lake.
If you copy and reuse the conf/examples/config.xml file, be sure to move the file to /conf/config.xml before you enable your extension.
For more information, see Installation.
|
Amazon Web Services S3 configuration
<hivemq-data-lake-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="config.xsd">
<aws-credential-profiles>
<aws-credential-profile>
<id>my-aws-credential-profile-id</id>
<profile-name>default</profile-name>
<profile-file>~/.aws/credentials</profile-file>
</aws-credential-profile>
</aws-credential-profiles>
<mqtt-to-s3-routes>
<mqtt-to-s3-route>
<id>my-s3-route</id>
<enabled>true</enabled>
<mqtt-topic-filters>
<mqtt-topic-filter>#</mqtt-topic-filter>
</mqtt-topic-filters>
<aws-credential-profile-id>my-aws-credential-profile-id</aws-credential-profile-id>
<bucket>my-bucket</bucket>
<region>eu-central-1</region>
<file-name-template>${VAR:DATE_ISO_8601}/${VAR:NODE_ID}-${VAR:ROUTE_ID}-${VAR:TIMESTAMP_ISO_8601}.${VAR:FILE_EXTENSION}</file-name-template>
<processor>
<parquet>
<columns>
<column>
<name>topic</name>
<value>mqtt-topic</value>
</column>
<column>
<name>payload</name>
<value>mqtt-payload</value>
</column>
</columns>
</parquet>
</processor>
</mqtt-to-s3-route>
</mqtt-to-s3-routes>
</hivemq-data-lake-extension>
AWS Credential Profiles
To interact with Amazon S3, your HiveMQ extension must provide AWS security credentials to verify your identity and access permissions.
When you set up your AWS IAM (Identity and Access Management) security credential on the AWS management console, verify that your setup includes the required permissions for the configured routes. For more information, see Controlling Access to S3 Resources Using IAM.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "mqttToS3",
"Effect": "Allow",
"Action": [
"s3:PutObject"
],
"Resource": "*"
}
]
}
The AWS Identity and Access Management (IAM) service helps to securely control access to Amazon S3 and other AWS resources. In the AWS management console, you can create users and assign user permissions. To access resources on AWS, you create security credentials in the AWS management console and save the credentials for use in the extension profile file. For more information, see IAM access policies. |
The aws-credential-profiles
section of the extension configuration defines one or more sets of security credentials for your connections to AWS.
If desired, use this section to override the default AWS credential provider chain.
<aws-credential-profiles>
<aws-credential-profile>
<id>my-aws-credential-profile-id</id>
<profile-file>/opt/hivemq/extensions/hivemq-data-lake-extension/aws-credentials</profile-file>
<profile-name>my-profile-name</profile-name>
</aws-credential-profile>
</aws-credential-profiles>
Each <aws-credential-profile> tag contains the file path to the credentials file that stores the associated security credentials.
If your credentials file holds more than one set of credentials, use the optional <profile-name> tag to specify the credentials set that you want to use.
|
You can define as many <aws-credential-profile>
tags as your use case requires.
Parameter | Required | Type | Description |
---|---|---|---|
|
ID |
The unique identifier of the AWS credential profile. This string can only contain lowercase alphanumeric characters, dashes, and underscores. |
|
|
String |
Optional setting that provides the path to a file that contains AWS profile credentials. If unset, information is taken from the default AWS credentials location. |
|
|
String |
Optional setting to select a specific profile in the defined |
MQTT to S3 Routes
The <mqtt-to-s3-routes>
section of your extension configuration defines how MQTT messages are sent from the HiveMQ broker to Amazon S3 buckets.
You can define as many <mqtt-to-s3-route>
tags as your use case requires.
<mqtt-to-s3-route>
<id>my-s3-route</id>
<enabled>true</enabled>
<mqtt-topic-filters>
<mqtt-topic-filter>#</mqtt-topic-filter>
</mqtt-topic-filters>
<aws-credential-profile-id>my-aws-credential-profile-id</aws-credential-profile-id>
<bucket>my-bucket</bucket>
<region>eu-central-1</region>
<file-name-template>${VAR:DATE_ISO_8601}/${VAR:NODE_ID}-${VAR:ROUTE_ID}-${VAR:TIMESTAMP_ISO_8601}.${VAR:FILE_EXTENSION}</file-name-template>
<processor>
<parquet>
<columns>
<column>
<name>topic</name>
<value>mqtt-topic</value>
</column>
<column>
<name>payload</name>
<value>mqtt-payload</value>
</column>
</columns>
</parquet>
</processor>
</mqtt-to-s3-route>
Parameter | Required | Type | Description |
---|---|---|---|
|
ID |
The unique identifier of the |
|
|
Boolean |
Optional setting that defines whether the selected |
|
|
Complex |
A list of one or more MQTT topic filters to apply on this route.
|
|
|
IDREF |
Optional setting that identifies the |
|
|
String |
The name of the Amazon S3 bucket. |
|
|
String |
Optional setting that defines the AWS region the extension uses to access S3.
When unset, the default AWS region selection logic determines the region.
If the default region is not found, the selected |
|
|
String |
Optional setting to configure the naming of the files and folders in the Amazon S3 bucket. Use forward slashes to organize files in your Amazon S3 folders.
Defaults to
|
|
|
Complex |
Defines the format HiveMQ uses to transfer MQTT messages to the Amazon S3 bucket in the selected route.
|
Azure Blob Storage configuration
<hivemq-data-lake-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="config.xsd">
<azure-blob-storage-connections>
<azure-blob-storage-connection>
<id>my-abs-connection-id</id>
<authentication>
<connection-string>DefaultEndpointsProtocol=https;AccountName=my-account;AccountKey=my-key;EndpointSuffix=core.windows.net</connection-string>
</authentication>
</azure-blob-storage-connection>
</azure-blob-storage-connections>
<mqtt-to-azure-blob-storage-routes>
<mqtt-to-azure-blob-storage-route>
<id>my-abs-route</id>
<enabled>true</enabled>
<mqtt-topic-filters>
<mqtt-topic-filter>#</mqtt-topic-filter>
</mqtt-topic-filters>
<azure-blob-storage-connection-id>my-abs-connection-id</azure-blob-storage-connection-id>
<container>my-container</container>
<file-name-template>${VAR:DATE_ISO_8601}/${VAR:NODE_ID}-${VAR:ROUTE_ID}-${VAR:TIMESTAMP_ISO_8601}.${VAR:FILE_EXTENSION}</file-name-template>
<processor>
<parquet>
<columns>
<column>
<name>topic</name>
<value>mqtt-topic</value>
</column>
<column>
<name>payload</name>
<value>mqtt-payload</value>
</column>
</columns>
</parquet>
</processor>
</mqtt-to-azure-blob-storage-route>
</mqtt-to-azure-blob-storage-routes>
</hivemq-data-lake-extension>
Azure Blob Storage Connections
To interact with Azure Blob Storage, your HiveMQ extension must provide Azure security credentials to verify your identity and access permissions.
The azure-blob-storage-connections
section of the extension configuration defines one or more sets of security credentials for your connections to Azure.
Connection String
The connection string authentication includes the required authorization information to connect to your Azure Blob Storage account using Shared Key authorization.
<azure-blob-storage-connections>
<azure-blob-storage-connection>
<id>my-abs-connection-id</id>
<authentication>
<connection-string>DefaultEndpointsProtocol=https;AccountName=my-account;AccountKey=my-key;EndpointSuffix=core.windows.net</connection-string>
</authentication>
</azure-blob-storage-connection>
</azure-blob-storage-connections>
Microsoft Entra ID
Microsoft Entra ID enables secure passwordless connections to your Azure Blob Storage account.
This feature enables several authentication methods, such as:
-
Environment (example: Service principal with secret)
-
Workload Identity (example: HiveMQ cluster running on Azure Kubernetes Service)
-
Managed Identity (example: HiveMQ cluster running on virtual machines in Azure)
Regardless of the method, permission to Azure Blob Storage must be granted on Azure. The way you obtain the permission varies based on the Azure service on which your HiveMQ cluster runs. For more information, see passwordless connections with Azure Blob Storage on your Azure hosting environment.
To learn more about this feature and all authentication methods Azure supports, see DefaultAzureCredential in the official Azure documentation. |
<azure-blob-storage-connections>
<azure-blob-storage-connection>
<id>my-abs-connection-id</id>
<azure-blob-storage-endpoint>https://my-storage-account.blob.core.windows.net</azure-blob-storage-endpoint>
<authentication>
<microsoft-entra-id/>
</authentication>
</azure-blob-storage-connection>
</azure-blob-storage-connections>
You can define as many <azure-blob-storage-connection>
tags as your use case requires.
Parameter | Required | Type | Description |
---|---|---|---|
|
ID |
The unique identifier of the Azure Blob Storage connection. This string can only contain lowercase alphanumeric characters, dashes, and underscores. |
|
|
String |
Defines the Azure Blob Storage endpoint to which the extension connects. The endpoint setting is required for You can usually find this endpoint via the Azure portal on Storage Account > Settings > Endpoints > Blob service. |
|
|
Complex |
Defines the authentication mechanism to use to connect to Azure Blob Storage.
|
MQTT to Azure Blob Storage Routes
The <mqtt-to-azure-blob-storage-routes>
section of your extension configuration defines how MQTT messages are sent from the HiveMQ broker to Azure Blob Storage containers.
You can define as many <mqtt-to-azure-blob-storage-route>
tags as your use case requires.
<mqtt-to-azure-blob-storage-routes>
<mqtt-to-azure-blob-storage-route>
<id>my-abs-route</id>
<enabled>true</enabled>
<mqtt-topic-filters>
<mqtt-topic-filter>#</mqtt-topic-filter>
</mqtt-topic-filters>
<azure-blob-storage-connection-id>my-abs-connection-id</azure-blob-storage-connection-id>
<container>my-container</container>
<file-name-template>${VAR:DATE_ISO_8601}/${VAR:NODE_ID}-${VAR:ROUTE_ID}-${VAR:TIMESTAMP_ISO_8601}.${VAR:FILE_EXTENSION}</file-name-template>
<processor>
<parquet>
<columns>
<column>
<name>topic</name>
<value>mqtt-topic</value>
</column>
<column>
<name>payload</name>
<value>mqtt-payload</value>
</column>
</columns>
</parquet>
</processor>
</mqtt-to-azure-blob-storage-route>
</mqtt-to-azure-blob-storage-routes>
Parameter | Required | Type | Description |
---|---|---|---|
|
ID |
The unique identifier of the |
|
|
Boolean |
Optional setting that defines whether the selected |
|
|
Complex |
A list of one or more MQTT topic filters to apply on this route.
|
|
|
IDREF |
Identifies the |
|
|
String |
The name of the Azure Blob Storage container. |
|
|
String |
Optional setting to configure the naming of the files and folders in the Azure Blob Storage container. Use forward slashes to organize files in your Azure Blob Storage virtual directories.
Defaults to
|
|
|
Complex |
Defines the format HiveMQ uses to transfer MQTT messages to the Azure Blob Storage container in the selected route.
|
Parquet processor
The Parquet processor helps you convert MQTT messages into Parquet rows. The extension binds the configured values with the respective MQTT PUBLISH properties.
<processor>
<parquet>
<columns>
<column>
<name>topic</name>
<value>mqtt-topic</value>
</column>
<column>
<name>payload</name>
<value>mqtt-payload</value>
</column>
<column>
<name>my_user_property</name>
<value>mqtt-user-properties$myUserPropertyName</value>
</column>
</columns>
</parquet>
</processor>
The following table lists all values the extension recognizes:
Name | Type | Logical Type | Description |
---|---|---|---|
|
BYTE_ARRAY |
STRING |
The topic of the MQTT PUBLISH. |
|
BYTE_ARRAY |
The payload of the MQTT PUBLISH as raw binary data. |
|
|
BYTE_ARRAY |
STRING |
The payload of the MQTT PUBLISH as a UTF-8 string. |
|
BOOLEAN |
The retain flag of the MQTT PUBLISH. |
|
|
INT32 |
The packet ID of the MQTT PUBLISH. |
|
|
BYTE_ARRAY |
STRING |
The payload format indicator of the MQTT PUBLISH. |
|
BYTE_ARRAY |
STRING |
The response topic of the MQTT PUBLISH. |
|
BYTE_ARRAY |
The correlation data of the MQTT PUBLISH as raw binary data. |
|
|
BYTE_ARRAY |
STRING |
The correlation data of the MQTT PUBLISH as a UTF-8 string. |
|
REPEATED GROUP |
LIST |
The user properties of the MQTT PUBLISH as an array. |
|
BYTE_ARRAY |
STRING |
The value of the user property of the MQTT PUBLISH with the matching property name. Example: |
|
INT_64 |
TIMESTAMP(isAdjustedToUTC=false, unit=MILLIS) |
The arrival timestamp of the PUBLISH message represented as a UNIX timestamp value in milliseconds.
|
Some properties in an MQTT PUBLISH message are optional. The number of values the Data Lake extension binds varies based on the properties that are present in the MQTT PUBLISH message. |
Environment variables
HiveMQ offers placeholders that can be replaced with the content of environment variables when the configuration file is read. For many use cases, it can be beneficial or necessary to use environment variables to configure items such as ports and bind addresses on the system on which you run HiveMQ. For example, when you run HiveMQ in a containerized environment.
You can use ${ENV:+YOUR_ENVVAR_NAME+}
in the config.xml
file.
HiveMQ replaces the placeholder with the value of the specified environment variable during startup.
export MY_PROFILE_FILE="/path/to/profile/file"
<hivemq-data-lake-extension>
<aws-credential-profiles>
<aws-credential-profile>
<id>my-aws-credential-profile-id</id>
<profile-name>default</profile-name>
<profile-file>${ENV:MY_PROFILE_FILE}</profile-file>
</aws-credential-profile>
</aws-credential-profiles>
</hivemq-data-lake-extension>
<hivemq-data-lake-extension>
<aws-credential-profiles>
<aws-credential-profile>
<id>my-aws-credential-profile-id</id>
<profile-name>default</profile-name>
<profile-file>/path/to/profile/file</profile-file>
</aws-credential-profile>
</aws-credential-profiles>
</hivemq-data-lake-extension>
Make sure that HiveMQ is started in the same context as your environment variables are set, otherwise, HiveMQ will not be able to access them. |