Data Hub On Edge

Data Hub is available on HiveMQ Edge and offers the same functionality as for the HiveMQ Enterprise broker. For detailed information, see our Data Hub Quick Start Guide.

Data Hub Configuration on HiveMQ Edge

Data Hub on HiveMQ Edge can be configured with the following settings:

Example Data Hub on HiveMQ Edge configuration
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<hivemq>
    <modules>
       <data-hub>
           <data-validation>
               <enabled>true</enabled>
           </data-validation>
           <behavior-validation>
                <enabled>true</enabled>
           </behavior-validation>
           <scripting>
                <enabled>true</enabled>
           </scripting>
       </data-hub>
    </modules>
</hivemq>

The example configuration enables all three main components in Data Hub by setting their respective flag to true.

Data Hub Preset Loading

When HiveMQ Edge is installed with Helm, Data Hub loads its model from a JSON file called preset.

The preset file contains a single JSON object with four properties:

  • "scripts": array of instances of type Script

  • "schemas": array of instances of type PolicySchema

  • "behaviorPolicies": array of instances of type BehaviorPolicy

  • "dataPolicies": array of instances of type DataPolicy

The schemas, Script, PolicySchema, BehaviorPolicy and DataPolicy are defined in the version-controlled HiveMQ Edge Open API document.

Modifications to the Data Hub model must be entered directly in the preset file.
Example Data Hub Preset on HiveMQ Edge
{
  "scripts" : [
    {
      "createdAt" : "2025-02-20T12:51:22.428Z",
      "description" : "This function transforms a publish.",
      "functionType" : "TRANSFORMATION",
      "id" : "my-transform.js",
      "source" : "ZnVuY3Rpb24gdHJhbnNmb3JtKHB1Ymxpc2gsIGNvbnRleHQpIHsgcmV0dXJuIHB1Ymxpc2g7IH0=",
      "version" : 1
    }
  ],
  "schemas" : [
    {
      "arguments" : {},
      "createdAt" : "2025-02-20T14:07:01.604Z",
      "id" : "schema",
      "schemaDefinition" : "eyAgIiRpZCI6ICJodHRwczovL2V4YW1wbGUuY29tL3BlcnNvbi5zY2hlbWEuanNvbiIsICAidHlwZSI6ICJvYmplY3QiLCAgInByb3BlcnRpZXMiOiB7ICAgICJzdHJlZXRfYWRkcmVzcyI6IHsgInR5cGUiOiAic3RyaW5nIiB9LCAgICAiY2l0eSI6IHsgInR5cGUiOiAic3RyaW5nIiB9LCAgICAic3RhdGUiOiB7ICJ0eXBlIjogInN0cmluZyIgfSAgfSwgICJyZXF1aXJlZCI6IFsic3RyZWV0X2FkZHJlc3MiLCAiY2l0eSIsICJzdGF0ZSJdfQ==",
      "type" : "JSON",
      "version" : 1
    }
  ],
  "behaviorPolicies" : [
    {
      "behavior" : {
        "id" : "Publish.quota",
        "arguments" : {
          "minPublishes" : 1,
          "maxPublishes" : 10
        }
      },
      "id" : "wildcardLogBehaviorPolicy",
      "matching" : {
        "clientIdRegex" : ".*"
      },
      "createdAt" : "2025-02-20T15:24:02.403Z",
      "deserialization" : {
        "publish" : {
          "schema" : {
            "schemaId" : "schema",
            "version" : "latest"
          }
        },
        "will" : {
          "schema" : {
            "schemaId" : "schema",
            "version" : "latest"
          }
        }
      },
      "lastUpdatedAt" : "2025-02-20T15:24:02.403Z",
      "onTransitions" : [
        {
          "fromState" : "Initial",
          "toState" : "Connected",
          "Mqtt.OnInboundConnect" : {
            "pipeline" : [
              {
                "arguments" : {
                  "message" : "Behavior policy ${policyId}: ${fromState} to ${toState} on ${triggerEvent}",
                  "level" : "INFO"
                },
                "functionId" : "System.log",
                "id" : "logFunction"
              }
            ]
          }
        },
        {
          "fromState" : "Any.*",
          "toState" : "Publishing",
          "Mqtt.OnInboundPublish" : {
            "pipeline" : [
              {
                "arguments" : {
                  "message" : "Behavior policy ${policyId}: any to publishing on ${triggerEvent}",
                  "level" : "INFO"
                },
                "functionId" : "System.log",
                "id" : "logFunction"
              }
            ]
          }
        },
        {
          "fromState" : "Any.*",
          "toState" : "Violated",
          "Connection.OnDisconnect" : {
            "pipeline" : [
              {
                "arguments" : {
                  "message" : "Behavior policy ${policyId}: any to violated on ${triggerEvent}",
                  "level" : "WARN"
                },
                "functionId" : "System.log",
                "id" : "logFunction"
              }
            ]
          },
          "Event.OnAny" : {
            "pipeline" : [
              {
                "arguments" : {
                  "message" : "Behavior policy ${policyId}: any to violated on any",
                  "level" : "WARN"
                },
                "functionId" : "System.log",
                "id" : "logFunction"
              }
            ]
          },
          "Mqtt.OnInboundPublish" : {
            "pipeline" : [
              {
                "arguments" : {
                  "message" : "Behavior policy ${policyId}: any to violated on ${triggerEvent}",
                  "level" : "WARN"
                },
                "functionId" : "System.log",
                "id" : "logFunction"
              }
            ]
          }
        }
      ]
    }
  ],
  "dataPolicies" : [
    {
      "createdAt" : "2025-02-20T12:51:22.428Z",
      "id" : "my-policy",
      "lastUpdatedAt" : "2025-02-20T12:51:22.428Z",
      "matching" : {
        "topicFilter" : "#"
      },
      "onFailure" : {
        "pipeline" : [
          {
            "arguments" : {
              "level" : "WARN",
              "message" : "${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}'."
            },
            "functionId" : "System.log",
            "id" : "logFailure"
          }
        ]
      },
      "onSuccess" : {
        "pipeline" : [
          {
            "arguments" : {
              "level" : "INFO",
              "message" : "${clientId} sent a valid publish on topic '${topic}' with result '${validationResult}'."
            },
            "functionId" : "System.log",
            "id" : "logSuccess"
          }
        ]
      },
      "validation" : {
        "validators" : [
          {
            "arguments" : {
              "schemas" : [
                {
                  "schemaId" : "schema",
                  "version" : "latest"
                }
              ],
              "strategy" : "ANY_OF"
            },
            "type" : "SCHEMA"
          }
        ]
      }
    }
  ]
}

The following JSON Schema is used to validate the preset file:

Data Hub Preset schema on HiveMQ Edge
{
  "$schema" : "https://json-schema.org/draft/2020-12/schema",
  "$id" : "https://hivemq.com/schemas/datahub_preset_schema.json",
  "type" : "object",
  "title" : "Preset",
  "description" : "A top level container for schemas, scripts, and behavior/data policies",
  "properties" : {
    "scripts" : {
      "type" : "array",
      "items" : {
        "$ref" : "#/$defs/Script"
      }
    },
    "schemas" : {
      "type" : "array",
      "items" : {
        "$ref" : "#/$defs/PolicySchema"
      }
    },
    "behaviorPolicies" : {
      "type" : "array",
      "items" : {
        "$ref" : "#/$defs/BehaviorPolicy"
      }
    },
    "dataPolicies" : {
      "type" : "array",
      "items" : {
        "$ref" : "#/$defs/DataPolicy"
      }
    }
  },
  "required" : [
    "scripts",
    "schemas",
    "behaviorPolicies",
    "dataPolicies"
  ],
  "$defs" : {
    "BehaviorPolicy" : {
      "type" : "object",
      "description" : "A policy which is used to validate and execute certain actions based on the validation result.",
      "properties" : {
        "behavior" : {
          "type" : "object",
          "description" : "The behavior referenced by the policy, that is validated by the policy.",
          "properties" : {
            "arguments" : {
              "type" : "object",
              "description" : "The arguments that the referenced validator type requires."
            },
            "id" : {
              "type" : "string",
              "description" : "The unique identifier of a pre-defined behavior."
            }
          },
          "required" : [
            "id"
          ]
        },
        "createdAt" : {
          "type" : "string",
          "format" : "date-time",
          "description" : "The UTC formatted timestamp when the policy was created."
        },
        "deserialization" : {
          "type" : "object",
          "description" : "The deserializers used by the policy for particular message and/or payload types.",
          "properties" : {
            "publish" : {
              "$ref" : "#/$defs/BehaviorPolicyDeserializer"
            },
            "will" : {
              "$ref" : "#/$defs/BehaviorPolicyDeserializer"
            }
          }
        },
        "id" : {
          "type" : "string",
          "description" : "The unique identifier of the policy."
        },
        "lastUpdatedAt" : {
          "type" : "string",
          "format" : "date-time",
          "description" : "The UTC formatted timestamp when the policy was most recently updated."
        },
        "matching" : {
          "type" : "object",
          "description" : "The matching rules the policy applies.",
          "properties" : {
            "clientIdRegex" : {
              "type" : "string",
              "description" : "The regex pattern to match the client id against."
            }
          },
          "required" : [
            "clientIdRegex"
          ]
        },
        "onTransitions" : {
          "type" : "array",
          "items" : {
            "type" : "object",
            "description" : "The actions that are executed for the specified transition.",
            "properties" : {
              "fromState" : {
                "type" : "string",
                "description" : "The exact state from which the transition happened."
              },
              "toState" : {
                "type" : "string",
                "description" : "The exact state to which the transition happened."
              },
              "Connection.OnDisconnect" : {
                "$ref" : "#/$defs/BehaviorPolicyOnEvent"
              },
              "Event.OnAny" : {
                "$ref" : "#/$defs/BehaviorPolicyOnEvent"
              },
              "Mqtt.OnInboundConnect" : {
                "$ref" : "#/$defs/BehaviorPolicyOnEvent"
              },
              "Mqtt.OnInboundDisconnect" : {
                "$ref" : "#/$defs/BehaviorPolicyOnEvent"
              },
              "Mqtt.OnInboundPublish" : {
                "$ref" : "#/$defs/BehaviorPolicyOnEvent"
              },
              "Mqtt.OnInboundSubscribe" : {
                "$ref" : "#/$defs/BehaviorPolicyOnEvent"
              }
            },
            "required" : [
              "fromState",
              "toState"
            ]
          }
        }
      },
      "required" : [
        "behavior",
        "id",
        "matching"
      ]
    },
    "BehaviorPolicyDeserializer" : {
      "type" : "object",
      "description" : "The deserializer applied to a particular message or payload type.",
      "properties" : {
        "schema" : {
          "type" : "object",
          "description" : "A schema reference is a unique identifier for a schema.",
          "properties" : {
            "schemaId" : {
              "type" : "string",
              "description" : "The identifier of the schema."
            },
            "version" : {
              "type" : "string",
              "description" : "The version of the schema. The value 'latest' may be used to always refer to the latest schema."
            }
          },
          "required" : [
            "schemaId",
            "version"
          ]
        }
      },
      "required" : [
        "schema"
      ]
    },
    "BehaviorPolicyOnEvent" : {
      "type" : "object",
      "description" : "One or more operations that are triggered when the event occurs. If this field is empty, the transition does not trigger any operations.",
      "properties" : {
        "pipeline" : {
          "type" : "array",
          "items" : {
            "$ref" : "#/$defs/PolicyOperation"
          }
        }
      }
    },
    "DataPolicy" : {
      "type" : "object",
      "description" : "A data policy which is used to validate and execute certain actions based on the validation result.",
      "properties" : {
        "id" : {
          "type" : "string",
          "description" : "The unique identifier of the policy."
        },
        "matching" : {
          "type" : "object",
          "description" : "The matching rules the policy applies.",
          "properties" : {
            "topicFilter" : {
              "type" : "string",
              "description" : "The topic filter for which the policy is matched."
            }
          },
          "required" : [
            "topicFilter"
          ]
        },
        "onFailure" : {
          "$ref" : "#/$defs/DataPolicyAction"
        },
        "onSuccess" : {
          "$ref" : "#/$defs/DataPolicyAction"
        },
        "validation" : {
          "type" : "object",
          "description" : "The section of the policy that defines how incoming MQTT messages are validated. If this section is empty, the result of the policy validation is always successful.",
          "properties" : {
            "validators" : {
              "type" : "array",
              "description" : "The validators of the policy.",
              "items" : {
                "type" : "object",
                "description" : "A policy validator which executes the defined validation.",
                "properties" : {
                  "arguments" : {
                    "type" : "object",
                    "description" : "The required arguments of the referenced validator type."
                  },
                  "type" : {
                    "type" : "string",
                    "description" : "The type of the validator.",
                    "enum" : [
                      "SCHEMA"
                    ]
                  }
                },
                "required" : [
                  "arguments",
                  "type"
                ]
              }
            }
          }
        }
      },
      "required" : [
        "id",
        "matching"
      ]
    },
    "DataPolicyAction" : {
      "type" : "object",
      "description" : "One or more operations the outcome of the validation triggers.  When this field is empty, the outcome of the policy validation does not trigger any operations.",
      "properties" : {
        "pipeline" : {
          "type" : "array",
          "description" : "The pipeline to execute, when this action is triggered. The operations in the pipeline are executed in-order.",
          "items" : {
            "$ref" : "#/$defs/PolicyOperation"
          }
        }
      }
    },
    "PolicySchema" : {
      "type" : "object",
      "properties" : {
        "id" : {
          "type" : "string",
          "description" : "The unique identifier of the schema."
        },
        "schemaDefinition" : {
          "type" : "string",
          "description" : "The base64 encoded schema definition."
        },
        "arguments" : {
          "type" : "object",
          "description" : "The schema type dependent arguments.",
          "properties" : {
            "additionalProperties" : {
              "type" : "string",
              "description" : "The schema type dependent arguments."
            }
          }
        },
        "type" : {
          "type" : "string",
          "description" : "The type of the schema."
        },
        "version" : {
          "type" : "integer",
          "format": "int32",
          "description" : "The version of the schema."
        }
      },
      "required" : [
        "id",
        "schemaDefinition",
        "type"
      ]
    },
    "PolicyOperation" : {
      "type" : "object",
      "description" : "The pipeline to execute when this action is triggered. The operations in the pipeline are executed in order.",
      "properties" : {
        "arguments" : {
          "type" : "object",
          "description" : "The required arguments of the referenced function."
        },
        "functionId" : {
          "type" : "string",
          "description" : "The unique ID of the referenced function to execute in this operation."
        },
        "id" : {
          "type" : "string",
          "description" : "The unique ID of the operation in the pipeline."
        }
      },
      "required" : [
        "arguments",
        "functionId",
        "id"
      ]
    },
    "Script" : {
      "type" : "object",
      "properties" : {
        "functionType" : {
          "type" : "string",
          "description" : "The type of the function.",
          "enum" : [
            "TRANSFORMATION"
          ]
        },
        "id" : {
          "type" : "string",
          "description" : "The unique identifier of the script."
        },
        "source" : {
          "type" : "string",
          "description" : "The base64 encoded function source code."
        },
        "version" : {
          "type" : "integer",
          "format": "int32",
          "description" : "The version of the script."
        }
      },
      "required" : [
        "functionType",
        "id",
        "source"
      ]
    }
  }
}

Data Hub Preset Reloading

The preset file that defines the initial Data Hub model is checked for modifications every 5 seconds. If changes are detected, HiveMQ Edge attempts to apply them dynamically, by means of calls to the HiveMQ Edge Open API in the following order:

  • Schemas

  • Scripts

  • Data Policies

  • Behavior Policies

The order of elements is important to ensure that dependencies are properly resolved during the update process. For example, scripts or schemas must be loaded before any policies that reference them, to avoid errors and ensure correct behavior.

The extent to which you can modify a running Data Hub model varies based on the model type.

Scripts and Schemas

Scripts (Script) and schemas (PolicySchema) are identified with a unique ID and version number. Version numbers start at 1 and must increment sequentially with no gaps (1, 2, 3, …​). The first script or schema in a preset file must be explicitly declared as version 1, and a version 3 cannot exist without a preceding version 2.

Limitations: After HiveMQ Edge starts and loads the initial preset file, existing scripts and schemas cannot be modified or removed, all of their attributes are exclusively read-only. This means that if version v of a script or schema was present in the preset file when HiveMQ Edge started, none of its attributes can be modified. To ensure data consistency and proper initialization, modifications or deletions require a restart of HiveMQ Edge. To update a script or schema without a restart, you can create a new version (+1). All versions of a script or schema must be presented in ascending order in the preset file.

Allowed Operations: You can add new scripts or new versions of existing scripts, and add new schemas or new versions of existing schemas to the preset file. The newly added elements can then be referenced in subsequently defined or modified data and behavior policies.

Behavior policies and Data policies

Behavior policies (BehaviorPolicy) and data policies (DataPolicy) are uniquely identified by their ID. They are not versioned.

Limitations: You cannot remove an existing behavior or data policy while HiveMQ Edge is running. References to scripts and schemas must use the exact ID and version number. The keyword latest is not supported.

Allowed Operations: You can modify existing behavior and data policies and add new behavior and data policies to the preset file. These changes will be applied dynamically without requiring a restart. When changes are detected, the affected policy is deleted and created anew with the latest definition found in the preset file.

Editing the preset file at runtime offers flexibility, but it is important to understand its limitations. Modifying or removing existing scripts and schemas requires a restart, whereas adding new scripts/schemas or scripts/schemas versions and modifying or adding behavior or data policies can be done dynamically. To avoid unexpected behavior, always ensure that your preset file changes maintain a valid Data Hub model. For more information, see the Script, PolicySchema, BehaviorPolicy, and DataPolicy definitions in the HiveMQ Edge Open API documentation.

Data Hub in the HiveMQ Edge REST API

The Data Hub portions of the HiveMQ REST API and the HiveMQ Edge REST API are identical. All available API interactions work the same way.

However, access to the HiveMQ Edge REST API requires JWT authorization. For more information on JWTs in HiveMQ Edge, see HiveMQ Edge API . An example to obtain a token is shown using cURL.

User Interface

Data Hub on Edge provides an easy-to-use way to manage policies, schemas and scripts.

List Data Hub Resources

In the screenshot below all available resources like policies, schemas and scripts are listed with additional information such. On the left-hand side, further actions per resource can be executed, such as downloading, or deleting the resource.

Downloading in particular is very helpful when transferring the resource from a testing environment to a production environment. However, all these actions can be executed via REST API as well.

List Data Hub Resources

Data Hub Policy Designer

Data Hub on Edge provides a Policy Designer, a visual tool to build resources. The Policy Designer is built to offer a completely revamped user experience. With the help of the designer, users can effortlessly build policies, schemas, and scripts all from a single view. Users can easily open the designer via the menu, as depicted in the screenshot below.

Policy Designer

JavaScript Editing

The Policy Designer provides an easy-to-use JavaScript editor with syntax highlight shown in the screenshot below directly editable via a browser.

Policy Designer JavaScript Editor

Schema Editing

Schemas can be easily created using the editor for JSON and Protobuf. Protobuf schemas can be directly edited without any compilation locally on your compute. The Policy Designer takes care of the compilation.

Policy Esigner Schema Editor

Keyboard Shortcuts

For easier and more efficient interaction, the following keyboard shortcuts are supported:

Keyboard Shortcuts