HiveMQ Enterprise Extension for Snowflake

Snowflake is a popular cloud-based data management and data warehousing platform designed to store, process, and analyze data. The fully managed Snowflake data platform offers efficient data storage, transformation, and management. The HiveMQ Enterprise Extension for Snowflake makes it possible to forward MQTT messages directly to Snowflake via the Snowpipe Streaming SDK without additional infrastructure.

Features

  • Forward MQTT messages from IoT devices to one or more Snowflake Data Clouds via your HiveMQ broker. Convert MQTT messages into Snowflake table rows with convenient column mappings.

The Snowflake extension via the Snowflake SDK does not offer message delivery guarantees. MQTT data transferred to Snowflake is sent with the equivalent of a QoS 0 guarantee. In the event of network or disk failure, data being transferred may be lost.

Requirements

If you do not provide a valid license, HiveMQ automatically uses a free trial license. Trial licenses for HiveMQ Enterprise Extensions are valid for 5 hours. For more license information or to request an extended evaluation license, contact HiveMQ sales.

Installation

  1. Place your HiveMQ Enterprise Extension for Snowflake license file (.elic) in the license folder of your HiveMQ installation. (Skip this step if you are using a trial version of the extension).

    └─ <HiveMQ folder>
        ├─ bin
        ├─ conf
        ├─ data
        ├─ extensions
        │   ├─ hivemq-snowflake-extension
        │   └─ ...
        ├─ license
        ├─ log
        └─ ...
  2. Before you enable the extension, you need to configure the extension to match your individual Snowflake setup.
    For your convenience, we provide an example configuration conf/examples/config.xml that you can copy and modify as desired.
    The included config.xsd file outlines the schema and elements that can be used in the XML configuration.
    Your completed configuration file must be named config.xml and located in HIVEMQ_HOME/extensions/hivemq-snowflake-extension/conf/config.xml.
    For detailed information on configuration options, see Configuration.

  3. To enable the HiveMQ Enterprise Extension for Snowflake, locate the hivemq-snowflake-extension folder in the extensions directory of your HiveMQ installation and remove the DISABLED file (if present).

To function properly, the HiveMQ Enterprise Extension for Snowflake must be installed on all HiveMQ broker nodes in your HiveMQ cluster and the configuration file on each node must be identical.

Configuration

The extension configuration is divided into two sections:

  • Snowflakes: Provides information about the Snowflake Data Cloud accounts to which your HiveMQ broker connects.

  • MQTT to Snowflake Routes: Defines how MQTT messages are sent from your HiveMQ broker to the configured Snowflake databases.

Extension Configuration File

The config.xml file for your Snowflake extension must be located in the hivemq-snowflake-extension/conf/ folder within the extensions folder of your HiveMQ installation.

The extension uses a simple but powerful XML-based configuration.

The conf/examples/config.xml file is a configuration example that has all the parameters you need to send MQTT messages from your HiveMQ MQTT broker to Snowflake.

If you copy and reuse the conf/examples/config.xml file, be sure to move the file to /conf/config.xml before you enable your extension. For more information, see Installation.
Example Snowflake database connection configuration
<hivemq-snowflake-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                            xsi:noNamespaceSchemaLocation="config.xsd">
    <snowflakes>
        <snowflake>
            <id>my-snowflake-id</id>
            <account-url>https://myorg-myaccountname.snowflakecomputing.com</account-url>
            <user>my-user</user>
            <role>my-role</role>
            <private-key>path/to/rsa_key.p8</private-key>
            <private-key-passphrase>my-passphrase</private-key-passphrase>
        </snowflake>
    </snowflakes>

    <mqtt-to-snowflake-routes>
        <mqtt-to-snowflake-route>
            <id>my-snowflake-route</id>
            <snowflake-id>my-snowflake-id</snowflake-id>
            <enabled>true</enabled>
            <mqtt-topic-filters>
                <mqtt-topic-filter>#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <processor>
                <snowpipe-streaming>
                    <database>my_database</database>
                    <schema>my_schema</schema>
                    <table>my_table</table>
                    <columns>
                        <column>
                            <name>topic</name>
                            <value>mqtt-topic</value>
                        </column>
                        <column>
                            <name>payload</name>
                            <value>mqtt-payload-utf8</value>
                        </column>
                    </columns>
                </snowpipe-streaming>
            </processor>
        </mqtt-to-snowflake-route>
    </mqtt-to-snowflake-routes>
</hivemq-snowflake-extension>

Snowflake Connection Configuration

The <snowflakes> section of your configuration lists the Snowflake Data Clouds to which configured MQTT messages are routed.

You can define as many <snowflake> tags as your use case requires.

Example Snowflake connection configuration
<snowflakes>
    <snowflake>
        <id>my-snowflake-id</id>
        <account-url>https://myorg-myaccountname.snowflakecomputing.com</account-url>
        <user>my-user</user>
        <role>my-role</role>
        <private-key>path/to/rsa_key.p8</private-key>
        <private-key-passphrase>my-passphrase</private-key-passphrase>
    </snowflake>
</snowflakes>
You can use Environment variables to map properties such as the <private-key-passphrase>.
Table 1. Snowflake configuration parameters
Parameter Required Type Description

id

ID

The ID of the Snowflake connection. This string can only contain lowercase alphanumeric characters, dashes, and underscores.

account-url

String

The account URL for the Snowflake database. The format of the URL is usually https://<orgname>-<account_name>.snowflakecomputing.com. To find your organization and account name, see, Example Snowflake account and organization name location.

For more information, see, Standard Account URLs.

user

String

The user to connect with. The user must have a role with sufficient permissions.

For more information, see, User Management.

role

String

The user role to connect with. The role must have INSERT permissions on the destination table.

For more information, see, Required access privileges.

private-key

String

The path to a private key file in PEM format for key pair authentication.

For more information and instructions on how to generate a key pair, see, Using Key Pair Authentication.

private-key-passphrase

-

String

Optional setting that defines the passphrase to decrypt the private key. If the private key is not encrypted, a passphrase is unnecessary.

Snowflake account and organization name
Figure 1. Example Snowflake account and organization name location

MQTT to Snowflake Routes

The <mqtt-to-snowflake-routes> section of your extension configuration defines how MQTT messages are sent from the HiveMQ broker to Snowflake.

You can define as many <mqtt-to-snowflake-route> tags as your use case requires.

Example MQTT to Snowflake route with a Snowpipe Streaming configuration
<mqtt-to-snowflake-route>
    <id>my-snowflake-route</id>
    <snowflake-id>my-snowflake-id</snowflake-id>
    <enabled>true</enabled>
    <mqtt-topic-filters>
        <mqtt-topic-filter>#</mqtt-topic-filter>
    </mqtt-topic-filters>
    <processor>
        <snowpipe-streaming>
            <database>my_database</database>
            <schema>my_schema</schema>
            <table>my_table</table>
            <columns>
                <column>
                    <name>topic</name>
                    <value>mqtt-topic</value>
                </column>
                <column>
                    <name>payload</name>
                    <value>mqtt-payload-utf8</value>
                </column>
            </columns>
        </snowpipe-streaming>
    </processor>
</mqtt-to-snowflake-route>
Table 2. Snowflake parameters
Parameter Required Type Description

id

ID

The ID of the mqtt-to-snowflake-route. This string can only contain lowercase alphanumeric characters, dashes, and underscores.

snowflake-id

IDREF

The ID of the Snowflake database.

enabled

-

Boolean

Optional setting that defines whether the selected mqtt-to-snowflake-route is enabled or disabled. The default setting is true. To disable the route, set to false.

mqtt-topic-filters

String

A list of one or more MQTT topic filters.

  • mqtt-topic-filter: The source MQTT topic filters to select which MQTT messages are routed to the Snowflake database. You can define as many individual <mqtt-topic-filter> tags as your use case requires.

processor

Complex

Defines the method HiveMQ uses to transfer MQTT messages to Snowflake in the selected route.

  • snowpipe-streaming: Configures how the extension inserts rows into the selected Snowflake database.

    • database: The name of the destination Snowflake database.

    • schema: The name of the database schema.

    • table: The name of the table where rows are inserted.

    • columns: A list of the table columns where values are inserted.

      • column: A column name and value binding.

        • name: The name of the column where the value is inserted.

        • value: The MQTT property to replace and bind to the selected column.

Snowpipe Streaming

The Snowpipe Streaming processor helps you convert MQTT messages into Snowflake rows.

Configuration of the Snowpipe Streaming processor includes the database, schema, table name, target columns, and respective value binding. The extension binds the configured values with the respective MQTT PUBLISH properties.

Example snowpipe streaming configuration
<snowpipe-streaming>
    <database>my_database</database>
    <schema>my_schema</schema>
    <table>my_table</table>
    <columns>
        <column>
            <name>topic</name>
            <value>mqtt-topic</value>
        </column>
        <column>
            <name>payload</name>
            <value>mqtt-payload-utf8</value>
        </column>
    </columns>
</snowpipe-streaming>

The following table lists all values the HiveMQ Snowflake extension recognizes:

Table 3. Available values
Name Type Description

mqtt-topic

String

The topic of the MQTT PUBLISH.

mqtt-payload-utf8

String

The payload of the MQTT PUBLISH as a UTF-8 string.

mqtt-payload-base64

String

The payload of the MQTT PUBLISH as a Base64 string.

mqtt-retain

boolean

The retain flag of the MQTT PUBLISH.

mqtt-packet-id

int

The packet ID of the MQTT PUBLISH.

mqtt-payload-format-indicator

String

The payload format indicator of the MQTT PUBLISH.

mqtt-response-topic

String

The response topic of the MQTT PUBLISH.

mqtt-correlation-data-utf8

String

The correlation data of the MQTT PUBLISH as a UTF-8 string.

mqtt-correlation-data-base64

String

The correlation data of the MQTT PUBLISH as a Base64 string.

mqtt-user-properties-json

JSON Array

The user properties of the MQTT PUBLISH as a JSON array string.

mqtt-user-properties$property-name

String

The value of the user property of the MQTT PUBLISH with the matching property name.

Example: mqtt-user-properties$myUserPropertyName will bind the value of the first user property with the name myUserPropertyName.

timestamp-ms

long

The arrival timestamp of the PUBLISH message represented as a UNIX timestamp value in milliseconds.

  • The timestamp of an incoming PUBLISH message records the moment the message arrived at the broker.

  • The timestamp of a message created via the Extension SDK records the moment the message passed to the PublishService.

  • The timestamp of a Will Publish message sent to subscribers records the moment the delivery of the message started.

timestamp-iso-8601

Instant

The same information as timestamp-ms represented as an ISO 8601 string.

Some properties in an MQTT PUBLISH message are optional. The number of values the Snowflake extension binds varies based on the properties that are present in the MQTT PUBLISH message.

Environment variables

HiveMQ offers placeholders that can be replaced with the content of environment variables when the configuration file is read. For many use cases, it can be beneficial or necessary to use environment variables to configure items such as ports and bind addresses on the system on which you run HiveMQ. For example, when you run HiveMQ in a containerized environment.

You can use ${YOUR_ENVVAR_NAME} in the config.xml file. HiveMQ replaces the placeholder with the value of the specified environment variable during startup.

Example to set an environment variable
export MY_SNOWFLAKE_PRIVATE_KEY_PASSPHRASE="some long sentence"
Example use of the environment variable in the configuration
<hivemq-snowflake-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                            xsi:noNamespaceSchemaLocation="config.xsd">
    <snowflakes>
        <snowflake>
            <id>my-snowflake-id</id>
            <account-url>https://myorg-myaccountname.snowflakecomputing.com</account-url>
            <user>my-user</user>
            <role>my-role</role>
            <private-key>path/to/rsa_key.p8</private-key>
            <private-key-passphrase>${MY_SNOWFLAKE_PRIVATE_KEY_PASSPHRASE}</private-key-passphrase>
        </snowflake>
    </snowflakes>
</hivemq-snowflake-extension>
Result of the example configuration in HiveMQ
<hivemq-snowflake-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                            xsi:noNamespaceSchemaLocation="config.xsd">
    <snowflakes>
        <snowflake>
            <id>my-snowflake-id</id>
            <account-url>https://myorg-myaccountname.snowflakecomputing.com</account-url>
            <user>my-user</user>
            <role>my-role</role>
            <private-key>path/to/rsa_key.p8</private-key>
            <private-key-passphrase>some long sentence</private-key-passphrase>
        </snowflake>
    </snowflakes>
</hivemq-snowflake-extension>
Make sure that HiveMQ is started in the same context as your environment variables are set, otherwise HiveMQ will not be able to access them.