HiveMQ Data Hub Quick Start Guide

The HiveMQ Data Hub provides mechanisms to define how MQTT data is handled in your HiveMQ broker and to model how MQTT clients work with the broker.

This quick start guide shows you how to get started with HiveMQ Data Hub in two different ways:

For brevity, the quick start guides focus on the creation of JSON data schemas and data policies.

Requirements

Installation

  1. Download and install HiveMQ.

  2. Place your valid HiveMQ Data Hub license file (.plic) in the license folder of your HiveMQ installation.
    (Skip this step if you are using the Data Hub in Free Mode or Trial Mode).

Your HiveMQ Platform bundle includes a free version of the HiveMQ Data Hub with basic functionality.
Additionally, you can start a five-hour trial mode of the Data Hub to try out the full range of Data Hub capabilities.
To learn more about the different ways you can explore the HiveMQ Data Hub, see HiveMQ Data Hub Licensing.

Configuration

The HiveMQ Data Hub is enabled by default (<enabled>true</enabled>).

Example default Data Hub configuration in the HiveMQ config.xml file
<hivemq>
    <data-hub>
        <data-validation>
            <enabled>true</enabled>
        </data-validation>
        <behavior-validation>
            <enabled>true</enabled>
        </behavior-validation>
    </data-hub>
</hivemq>
To disable Data Hub, set the enabled tags for data validation and behavior validation in the data-hub section of your HiveMQ config.xml file to false.

Data Hub Quick Start with the HiveMQ Control Center

The easiest way to get started with the HiveMQ Data Hub is with your HiveMQ Control Center.

This guide gives you step-by-step instructions to do the following with the HiveMQ Control Center:

  • Install and configure the HiveMQ Data Hub.

  • Add a schema to define the expected structure and format of incoming MQTT message payload data.

  • Add a data policy to tell your HiveMQ broker how you want your incoming MQTT messages to be handled.

  • Test your setup with valid and invalid incoming MQTT message payload data.

Get Started on the HiveMQ Control Center

  • Use your browser to navigate to the HiveMQ Control Center.
    For more information, see HiveMQ Control Center.

Add a Schema

  • In the HiveMQ Control Center main navigation, expand the Data Hub menu and select Schemas.
    The Schemas overview for your HiveMQ deployment opens:

    Schema Overview
  • On the Schemas overview, select Add New Schema. A Schema Detail page opens:

    Schema Detail
  • When you create a schema, you need to specify a unique schema ID, the type of schema you want to create, and a description of the schema:

    Data Schema creation
  • Enter an identifier for your schema in the Schema ID field. The schema ID is used to reference the schema in a policy and must be unique in the HiveMQ broker.
    Each schema ID must begin with a letter and contain 1 to 1024 lowercase letters, uppercase letters, numbers, periods (.), hyphens (-), or underscores (_). In this guide, we use the schema ID gps_coordinates.

  • Select JSON in the Schema type field.
    Possible values are JSON or PROTOBUF.
    In this guide, we create a JSON Schema.

  • Copy and paste the following example schema definition into the Schema Definition field. The schema definition describes the JSON Schema and is used to validate and deserialize MQTT message payloads.

    Example schema definition
    {
      "$id": "https://example.com/geographical-location.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "Longitude and Latitude Values",
      "description": "A geographical coordinate.",
      "required": [
        "latitude",
        "longitude"
      ],
      "type": "object",
      "properties": {
        "latitude": {
          "type": "number",
          "minimum": -90,
          "maximum": 90
        },
        "longitude": {
          "type": "number",
          "minimum": -180,
          "maximum": 180
        }
      }
    }
  • To save the gps_coordinates JSON Schema, select Save Schema.

Add a Data Policy

Next, create a data policy that outlines how you want the HiveMQ broker to handle incoming MQTT message packets.

  • In the HiveMQ Control Center main navigation, expand the Data Hub menu and select Data Policies.
    The Data Policies overview for your HiveMQ deployment opens:

    Data Policy Overview
  • On the Data Policies overview, select Add New Policy. A Data Policy Detail page opens:

    Data Policy Detail
  • When you create a data policy, the first step is to define a unique policy ID, specify a matching topic filter that determines which MQTT messages the policy validates, and add the validation method you want the policy to use:

    Data Policy Detail - Validation
  • Enter an identifier for your policy in the Policy ID field. The Schema ID is used to reference the schema in a policy and must be unique in the HiveMQ broker.
    Each schema ID must begin with a letter and contain 1 to 1024 lowercase letters, uppercase letters, numbers, periods (.), hyphens (-), or underscores (_). In this guide, we use the schema ID policy-coordinates.

  • Specify an MQTT topic filter in the Matching Topic Filter field to determine which MQTT message payloads the policy validates. The filter can be any valid MQTT topic filter, including wildcards. Each topic filter can only be used in one policy per HiveMQ cluster.
    In this guide, we use the Matching Topic Filter coordinates/+.

  • To select how you want the data policy to validate MQTT topic payloads, select Add Schema Validator on the Validation tab of the Data Policy Detail. If no validator is specified, no validation takes place and every message payload is evaluated as a success. One policy can contain multiple validators.

    • In this guide, on the Validations tab, select Add Schema in the Schemas area.

    • To select the schema that we created earlier in this guide, select gps_coordinates from the Schema ID menu.
      Data Hub also supports the use of schema versions. For this example, select latest from the Version menu.

    • It is possible to set multiple schemas for a single validator. The Validation Strategy determines how successful validation of a message is calculated. In this guide, we only set one schema and use the All Of strategy.

  • The result of your schema-based validation can be success or failure.
    Based on the validation outcome, you can define optional onFailure and onSuccess actions that take place when a validation finishes.

    Data Policy Action On Failure
  • On the Action On Failure tab of the Data Policy Detail, select Add Task to define one or more tasks to take place if the outcome of a validation in this policy is failure. Data Hub executes the tasks of an action pipeline in your defined order.

  • From the Function menu, select an operation that occurs when a validation fails.
    In this guide, we use System - Log Message to provide additional information.

    • Use the Level menu to set the log level of the system message.
      Possible log levels are TRACE, DEBUG, INFO, WARN, and ERROR. In this guide, we set the WARN level.

    • In the Message field, cut and paste the following variable to define the content of the log message:

      ${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}
      Data Policy Action On Succes
  • On the Action On Success tab of the Data Policy Detail, select Add Task to define one or more tasks to take place if the outcome of a validation in this policy is success. Data Hub executes the tasks of an action pipeline in your defined order.

  • From the Function menu, select an operation that occurs when a validation succeeds.
    In this guide, we use System - Log Message to provide additional information.

    • Use the Level menu to set the log level of the system message. In this guide, we set the INFO level.

    • In the Message field, cut and paste the following variable to define the content of the log message:

      ${clientId} sent a valid publish on topic '${topic}'
  • To complete your Data Policy, switch to the Summary tab of the Data Policy Detail and review the content of your policy draft:

    Data Policy Summary
  • To save your data policy, select Save Policy.

Edit or Delete a Data Policy

  • To edit or delete a Data Policy, return to the Policy Detail of the policy. To edit the policy, select the edit icon.
    To delete the policy, select Delete.

For more information on all the Data Hub options the HiveMQ Control Center offers, see HiveMQ Control Center.

Test Your Data Policy

The Data Hub dashboard on the HiveMQ Control Center gives you a clear visual overview of the health of your data pipelines.

Data Hub Dashboard
  • To test your Data Hub setup, follow the procedure in the Test section to send messages that generate success and failure outcomes.

Data Hub Quick Start with the HiveMQ REST API

You can also use the HiveMQ REST API to create and manage the HiveMQ Data Hub.

Before you get started, make sure that the HiveMQ REST API is enabled in the config.xml file of your HiveMQ installation.

To enable the HiveMQ REST API, set the <enabled> option in the <rest-api> section of the HiveMQ configuration to true.

Example to enable the HiveMQ Data Governance Hub and HiveMQ REST API
<hivemq>
    ...
    <!-- Enable the REST-API -->
    <rest-api>
        <enabled>true</enabled>
    </rest-api>
    ...
</hivemq>

This guide gives you step-by-step instructions to do the following with the HiveMQ REST API:

  • Install and configure the HiveMQ Data Hub.

  • Add a schema to define the expected structure and format of incoming MQTT message payload data.

  • Add a data policy to tell your HiveMQ broker how you want your incoming MQTT messages to be handled.

  • Test your setup with valid and invalid incoming MQTT message payload data.

For more information on all the Data Hub options the HiveMQ REST API offers, see HiveMQ Data Management - Open API Specification.

Add Schemas

The HiveMQ Data Hub supports JSON Schemas and Protobuf.

Add a JSON Schema

  1. Create a JSON Schema called coordinates.json:

    coordinates.json
    {
      "$id": "https://example.com/geographical-location.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "Longitude and Latitude Values",
      "description": "A geographical coordinate.",
      "required": [
        "latitude",
        "longitude"
      ],
      "type": "object",
      "properties": {
        "latitude": {
          "type": "number",
          "minimum": -90,
          "maximum": 90
        },
        "longitude": {
          "type": "number",
          "minimum": -180,
          "maximum": 180
        }
      }
    }
  2. Encode the JSON Schema file in base64 (encoding is required):

    cat coordinates.json | base64 > coordinates_base64.txt
    Resulting coordinates_base64.txt
    ewogICIkaWQiOiAiaHR0cHM6Ly9leGFtcGxlLmNvbS9nZW9ncmFwaGljYWwtbG9jYXRpb24uc2NoZW1hLmpzb24iLAogICIkc2NoZW1hIjogImh0dHBzOi8vanNvbi1zY2hlbWEub3JnL2RyYWZ0LzIwMjAtMTIvc2NoZW1hIiwKICAidGl0bGUiOiAiTG9uZ2l0dWRlIGFuZCBMYXRpdHVkZSBWYWx1ZXMiLAogICJkZXNjcmlwdGlvbiI6ICJBIGdlb2dyYXBoaWNhbCBjb29yZGluYXRlLiIsCiAgInJlcXVpcmVkIjogWwogICAgImxhdGl0dWRlIiwKICAgICJsb25naXR1ZGUiCiAgXSwKICAidHlwZSI6ICJvYmplY3QiLAogICJwcm9wZXJ0aWVzIjogewogICAgImxhdGl0dWRlIjogewogICAgICAidHlwZSI6ICJudW1iZXIiLAogICAgICAibWluaW11bSI6IC05MCwKICAgICAgIm1heGltdW0iOiA5MAogICAgfSwKICAgICJsb25naXR1ZGUiOiB7CiAgICAgICJ0eXBlIjogIm51bWJlciIsCiAgICAgICJtaW5pbXVtIjogLTE4MCwKICAgICAgIm1heGltdW0iOiAxODAKICAgIH0KICB9Cn0K
  3. Define the body of the create schema POST request that you need to upload the schema to your HiveMQ broker in a file called coordinates-body.json:

    Body for the create schema POST request
    {
      "id": "gps_coordinates",
      "type": "JSON",
      "schemaDefinition": "ewogICIkaWQiOiAiaHR0cHM6Ly9leGFtcGxlLmNvbS9nZW9ncmFwaGljYWwtbG9jYXRpb24uc2NoZW1hLmpzb24iLAogICIkc2NoZW1hIjogImh0dHBzOi8vanNvbi1zY2hlbWEub3JnL2RyYWZ0LzIwMjAtMTIvc2NoZW1hIiwKICAidGl0bGUiOiAiTG9uZ2l0dWRlIGFuZCBMYXRpdHVkZSBWYWx1ZXMiLAogICJkZXNjcmlwdGlvbiI6ICJBIGdlb2dyYXBoaWNhbCBjb29yZGluYXRlLiIsCiAgInJlcXVpcmVkIjogWwogICAgImxhdGl0dWRlIiwKICAgICJsb25naXR1ZGUiCiAgXSwKICAidHlwZSI6ICJvYmplY3QiLAogICJwcm9wZXJ0aWVzIjogewogICAgImxhdGl0dWRlIjogewogICAgICAidHlwZSI6ICJudW1iZXIiLAogICAgICAibWluaW11bSI6IC05MCwKICAgICAgIm1heGltdW0iOiA5MAogICAgfSwKICAgICJsb25naXR1ZGUiOiB7CiAgICAgICJ0eXBlIjogIm51bWJlciIsCiAgICAgICJtaW5pbXVtIjogLTE4MCwKICAgICAgIm1heGltdW0iOiAxODAKICAgIH0KICB9Cn0K"
    }
    1. The id can be any identifier, you will need this as the schema’s reference in a policy.

    2. The type must be PROTOBUF or JSON.

    3. The schemaDefinition is the content of coordinates_base64.txt.

  4. To upload the coordinates-body.json schema to your broker, run the following command:

    curl -H "Content-Type: application/json" -d "@coordinates-body.json" -X POST http://localhost:8888/api/v1/data-validation/schemas
This example assumes that your HIVEMQ REST API runs at http://localhost:8888.

Add a Protobuf Schema

  1. Create a Protobuf schema called coordinates.proto:

    coordinates.proto
    syntax = "proto3";
    message GpsCoordinates {
      int32 longitude = 1;
      int32 latitude = 2;
    }
  2. Compile coordinates.proto to the descriptor set coordinates.desc:

    protoc --descriptor_set_out=coordinates.desc coordinates.proto
  3. Encode the descriptor file in base64 (encoding is required):

    cat coordinates.desc | base64 > coordinates_base64.txt
    Resulting coordinates_base64.txt
    CmcKEWNvb3JkaW5hdGVzLnByb3RvIkoKDkdwc0Nvb3JkaW5hdGVzEhwKCWxvbmdpdHVkZRgBIAEoBVIJbG9uZ2l0dWRlEhoKCGxhdGl0dWRlGAIgASgFUghsYXRpdHVkZWIGcHJvdG8z
  4. Craft the body of the create schema POST request to upload the schema called coordinates-body.json:

    Body for the post request
    {
      "id": "gps_coordinates",
      "type": "PROTOBUF",
      "schemaDefinition": "CmcKEWNvb3JkaW5hdGVzLnByb3RvIkoKDkdwc0Nvb3JkaW5hdGVzEhwKCWxvbmdpdHVkZRgBIAEoBVIJbG9uZ2l0dWRlEhoKCGxhdGl0dWRlGAIgASgFUghsYXRpdHVkZWIGcHJvdG8z",
      "arguments": {
        "messageType": "GpsCoordinates"
      }
    }
    1. The id can be any identifier, you will need this as the schema’s reference in a policy.

    2. The type must be PROTOBUF or JSON.

    3. The schemaDefinition is the content of coordinates_base64.txt.

    4. The messageType argument must be the name of the protobuf message from Step 1.

  5. Upload it via

    curl  -H "Content-Type: application/json" -d "@coordinates-body.json" -X POST http://localhost:8888/api/v1/data-validation/schemas

Add a Data Policy

The policies you create tell your HiveMQ broker how you want your incoming MQTT messages to be handled.

HiveMQ Data Hub supports two types of policies. In this guide you create a data policy.

  • Data Policies

  • Behavior Policies

When you use the Data Hub in free mode, you can create one data policy or one behavior policy and use basic functionality.
To create additional policies and explore all the capabilities of the HiveMQ Data Hub, activate your 5-hour full trial or contact our sales team for licensing information.
  1. Create a data policy in the file coordinates-policy.json

    {
      "id": "com.hivemq.policy.coordinates",
      "matching": {
        "topicFilter": "coordinates/+"
      },
      "validation": {
        "validators": [
          {
            "type": "schema",
            "arguments": {
              "strategy": "ALL_OF",
              "schemas": [
                {
                  "schemaId": "gps_coordinates",
                  "version": "latest"
                }
              ]
            }
          }
        ]
      },
      "onSuccess": {
        "pipeline": [
          {
            "id": "logSuccess",
            "functionId": "System.log",
            "arguments": {
              "level": "INFO",
              "message": "${clientId} sent a valid publish on topic '${topic}' with result '${validationResult}'"
            }
          }
        ]
      },
      "onFailure": {
        "pipeline": [
          {
            "id": "logFailure",
            "functionId": "System.log",
            "arguments": {
              "level": "WARN",
              "message": "${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}'"
            }
          }
        ]
      }
    }
  2. Upload the policy to your HiveMQ broker with the following command (you must upload the referenced schema before you upload the policy):

    curl  -H "Content-Type: application/json" -d "@coordinates-policy.json" -X POST http://localhost:8888/api/v1/data-hub/data-validation/policies

    The policy is now applied and all incoming MQTT messages are subject to your specified data validation requirements.

Test Your Data Validation Setup

Send an MQTT message with a valid payload

mqtt pub -h localhost -p 1883 -t coordinates/test -m "{\"longitude\":12, \"latitude\":42 }"
Example log output
10:27:33.449 INFO - hmq_GAht0_1_08087d5b1233e0bae4b6b10bfda40cc7 sent a valid publish on topic 'coordinates/test' with result 'ValidationResults:[{schemaId=gps_coordinates, success=true}]'

Send an MQTT message with an invalid payload

mqtt pub -h localhost -p 1883 -t coordinates/test -m "{\"longitude\":12, \"latitude\":1337 }"
Example log output
10:32:06.237 WARN - hmq_skuhj_1_04c848009887ef38f86c729269faae16 sent an invalid publish on topic 'coordinates/test' with result 'ValidationResults:[{schemaId=gps_coordinates, success=false, errors=[$.latitude: must have a maximum value of 90]}]'

Delete a Policy

To delete the policy you just created, run the following command:

curl -X DELETE http://localhost:8888/api/v1/data-validation/policies/com.hivemq.policy.coordinates

Learn More

Once you set up your first schemas and policies, you are ready to dive deeper into all the capabilities the HiveMQ Data Hub offers. To learn more, browse our full user documentation on Policies, Schemas, and Behavior Models or explore a collection of practical use cases in our HiveMQ Policy Cookbooks.