Data Hub On Edge
Data Hub is available on HiveMQ Edge and offers the same functionality as for the HiveMQ Enterprise broker. For detailed information, see our Data Hub Quick Start Guide.
Data Hub Configuration on HiveMQ Edge
Data Hub on HiveMQ Edge can be configured with the following settings:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<hivemq>
<modules>
<data-hub>
<data-validation>
<enabled>true</enabled>
</data-validation>
<behavior-validation>
<enabled>true</enabled>
</behavior-validation>
<scripting>
<enabled>true</enabled>
</scripting>
</data-hub>
</modules>
</hivemq>
The example configuration enables all three main components in Data Hub by setting their respective flag to true
.
Data Hub Preset Loading
When HiveMQ Edge is installed with Helm, Data Hub loads its model from a JSON file called preset
.
The preset file contains a single JSON object with four properties:
-
"scripts"
: array of instances of type Script -
"schemas"
: array of instances of type PolicySchema -
"behaviorPolicies"
: array of instances of type BehaviorPolicy -
"dataPolicies"
: array of instances of type DataPolicy
The schemas, Script, PolicySchema, BehaviorPolicy and DataPolicy are defined in the version-controlled HiveMQ Edge Open API document.
Modifications to the Data Hub model must be entered directly in the preset file.
|
{
"scripts" : [
{
"createdAt" : "2025-02-20T12:51:22.428Z",
"description" : "This function transforms a publish.",
"functionType" : "TRANSFORMATION",
"id" : "my-transform.js",
"source" : "ZnVuY3Rpb24gdHJhbnNmb3JtKHB1Ymxpc2gsIGNvbnRleHQpIHsgcmV0dXJuIHB1Ymxpc2g7IH0=",
"version" : 1
}
],
"schemas" : [
{
"arguments" : {},
"createdAt" : "2025-02-20T14:07:01.604Z",
"id" : "schema",
"schemaDefinition" : "eyAgIiRpZCI6ICJodHRwczovL2V4YW1wbGUuY29tL3BlcnNvbi5zY2hlbWEuanNvbiIsICAidHlwZSI6ICJvYmplY3QiLCAgInByb3BlcnRpZXMiOiB7ICAgICJzdHJlZXRfYWRkcmVzcyI6IHsgInR5cGUiOiAic3RyaW5nIiB9LCAgICAiY2l0eSI6IHsgInR5cGUiOiAic3RyaW5nIiB9LCAgICAic3RhdGUiOiB7ICJ0eXBlIjogInN0cmluZyIgfSAgfSwgICJyZXF1aXJlZCI6IFsic3RyZWV0X2FkZHJlc3MiLCAiY2l0eSIsICJzdGF0ZSJdfQ==",
"type" : "JSON",
"version" : 1
}
],
"behaviorPolicies" : [
{
"behavior" : {
"id" : "Publish.quota",
"arguments" : {
"minPublishes" : 1,
"maxPublishes" : 10
}
},
"id" : "wildcardLogBehaviorPolicy",
"matching" : {
"clientIdRegex" : ".*"
},
"createdAt" : "2025-02-20T15:24:02.403Z",
"deserialization" : {
"publish" : {
"schema" : {
"schemaId" : "schema",
"version" : "latest"
}
},
"will" : {
"schema" : {
"schemaId" : "schema",
"version" : "latest"
}
}
},
"lastUpdatedAt" : "2025-02-20T15:24:02.403Z",
"onTransitions" : [
{
"fromState" : "Initial",
"toState" : "Connected",
"Mqtt.OnInboundConnect" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: ${fromState} to ${toState} on ${triggerEvent}",
"level" : "INFO"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
}
},
{
"fromState" : "Any.*",
"toState" : "Publishing",
"Mqtt.OnInboundPublish" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to publishing on ${triggerEvent}",
"level" : "INFO"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
}
},
{
"fromState" : "Any.*",
"toState" : "Violated",
"Connection.OnDisconnect" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to violated on ${triggerEvent}",
"level" : "WARN"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
},
"Event.OnAny" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to violated on any",
"level" : "WARN"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
},
"Mqtt.OnInboundPublish" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to violated on ${triggerEvent}",
"level" : "WARN"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
}
}
]
}
],
"dataPolicies" : [
{
"createdAt" : "2025-02-20T12:51:22.428Z",
"id" : "my-policy",
"lastUpdatedAt" : "2025-02-20T12:51:22.428Z",
"matching" : {
"topicFilter" : "#"
},
"onFailure" : {
"pipeline" : [
{
"arguments" : {
"level" : "WARN",
"message" : "${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}'."
},
"functionId" : "System.log",
"id" : "logFailure"
}
]
},
"onSuccess" : {
"pipeline" : [
{
"arguments" : {
"level" : "INFO",
"message" : "${clientId} sent a valid publish on topic '${topic}' with result '${validationResult}'."
},
"functionId" : "System.log",
"id" : "logSuccess"
}
]
},
"validation" : {
"validators" : [
{
"arguments" : {
"schemas" : [
{
"schemaId" : "schema",
"version" : "latest"
}
],
"strategy" : "ANY_OF"
},
"type" : "SCHEMA"
}
]
}
}
]
}
The following JSON Schema is used to validate the preset file:
{
"$schema" : "https://json-schema.org/draft/2020-12/schema",
"$id" : "https://hivemq.com/schemas/datahub_preset_schema.json",
"type" : "object",
"title" : "Preset",
"description" : "A top level container for schemas, scripts, and behavior/data policies",
"properties" : {
"scripts" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/Script"
}
},
"schemas" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/PolicySchema"
}
},
"behaviorPolicies" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/BehaviorPolicy"
}
},
"dataPolicies" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/DataPolicy"
}
}
},
"required" : [
"scripts",
"schemas",
"behaviorPolicies",
"dataPolicies"
],
"$defs" : {
"BehaviorPolicy" : {
"type" : "object",
"description" : "A policy which is used to validate and execute certain actions based on the validation result.",
"properties" : {
"behavior" : {
"type" : "object",
"description" : "The behavior referenced by the policy, that is validated by the policy.",
"properties" : {
"arguments" : {
"type" : "object",
"description" : "The arguments that the referenced validator type requires."
},
"id" : {
"type" : "string",
"description" : "The unique identifier of a pre-defined behavior."
}
},
"required" : [
"id"
]
},
"createdAt" : {
"type" : "string",
"format" : "date-time",
"description" : "The UTC formatted timestamp when the policy was created."
},
"deserialization" : {
"type" : "object",
"description" : "The deserializers used by the policy for particular message and/or payload types.",
"properties" : {
"publish" : {
"$ref" : "#/$defs/BehaviorPolicyDeserializer"
},
"will" : {
"$ref" : "#/$defs/BehaviorPolicyDeserializer"
}
}
},
"id" : {
"type" : "string",
"description" : "The unique identifier of the policy."
},
"lastUpdatedAt" : {
"type" : "string",
"format" : "date-time",
"description" : "The UTC formatted timestamp when the policy was most recently updated."
},
"matching" : {
"type" : "object",
"description" : "The matching rules the policy applies.",
"properties" : {
"clientIdRegex" : {
"type" : "string",
"description" : "The regex pattern to match the client id against."
}
},
"required" : [
"clientIdRegex"
]
},
"onTransitions" : {
"type" : "array",
"items" : {
"type" : "object",
"description" : "The actions that are executed for the specified transition.",
"properties" : {
"fromState" : {
"type" : "string",
"description" : "The exact state from which the transition happened."
},
"toState" : {
"type" : "string",
"description" : "The exact state to which the transition happened."
},
"Connection.OnDisconnect" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Event.OnAny" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundConnect" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundDisconnect" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundPublish" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundSubscribe" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
}
},
"required" : [
"fromState",
"toState"
]
}
}
},
"required" : [
"behavior",
"id",
"matching"
]
},
"BehaviorPolicyDeserializer" : {
"type" : "object",
"description" : "The deserializer applied to a particular message or payload type.",
"properties" : {
"schema" : {
"type" : "object",
"description" : "A schema reference is a unique identifier for a schema.",
"properties" : {
"schemaId" : {
"type" : "string",
"description" : "The identifier of the schema."
},
"version" : {
"type" : "string",
"description" : "The version of the schema. The value 'latest' may be used to always refer to the latest schema."
}
},
"required" : [
"schemaId",
"version"
]
}
},
"required" : [
"schema"
]
},
"BehaviorPolicyOnEvent" : {
"type" : "object",
"description" : "One or more operations that are triggered when the event occurs. If this field is empty, the transition does not trigger any operations.",
"properties" : {
"pipeline" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/PolicyOperation"
}
}
}
},
"DataPolicy" : {
"type" : "object",
"description" : "A data policy which is used to validate and execute certain actions based on the validation result.",
"properties" : {
"id" : {
"type" : "string",
"description" : "The unique identifier of the policy."
},
"matching" : {
"type" : "object",
"description" : "The matching rules the policy applies.",
"properties" : {
"topicFilter" : {
"type" : "string",
"description" : "The topic filter for which the policy is matched."
}
},
"required" : [
"topicFilter"
]
},
"onFailure" : {
"$ref" : "#/$defs/DataPolicyAction"
},
"onSuccess" : {
"$ref" : "#/$defs/DataPolicyAction"
},
"validation" : {
"type" : "object",
"description" : "The section of the policy that defines how incoming MQTT messages are validated. If this section is empty, the result of the policy validation is always successful.",
"properties" : {
"validators" : {
"type" : "array",
"description" : "The validators of the policy.",
"items" : {
"type" : "object",
"description" : "A policy validator which executes the defined validation.",
"properties" : {
"arguments" : {
"type" : "object",
"description" : "The required arguments of the referenced validator type."
},
"type" : {
"type" : "string",
"description" : "The type of the validator.",
"enum" : [
"SCHEMA"
]
}
},
"required" : [
"arguments",
"type"
]
}
}
}
}
},
"required" : [
"id",
"matching"
]
},
"DataPolicyAction" : {
"type" : "object",
"description" : "One or more operations the outcome of the validation triggers. When this field is empty, the outcome of the policy validation does not trigger any operations.",
"properties" : {
"pipeline" : {
"type" : "array",
"description" : "The pipeline to execute, when this action is triggered. The operations in the pipeline are executed in-order.",
"items" : {
"$ref" : "#/$defs/PolicyOperation"
}
}
}
},
"PolicySchema" : {
"type" : "object",
"properties" : {
"id" : {
"type" : "string",
"description" : "The unique identifier of the schema."
},
"schemaDefinition" : {
"type" : "string",
"description" : "The base64 encoded schema definition."
},
"arguments" : {
"type" : "object",
"description" : "The schema type dependent arguments.",
"properties" : {
"additionalProperties" : {
"type" : "string",
"description" : "The schema type dependent arguments."
}
}
},
"type" : {
"type" : "string",
"description" : "The type of the schema."
},
"version" : {
"type" : "integer",
"format": "int32",
"description" : "The version of the schema."
}
},
"required" : [
"id",
"schemaDefinition",
"type"
]
},
"PolicyOperation" : {
"type" : "object",
"description" : "The pipeline to execute when this action is triggered. The operations in the pipeline are executed in order.",
"properties" : {
"arguments" : {
"type" : "object",
"description" : "The required arguments of the referenced function."
},
"functionId" : {
"type" : "string",
"description" : "The unique ID of the referenced function to execute in this operation."
},
"id" : {
"type" : "string",
"description" : "The unique ID of the operation in the pipeline."
}
},
"required" : [
"arguments",
"functionId",
"id"
]
},
"Script" : {
"type" : "object",
"properties" : {
"functionType" : {
"type" : "string",
"description" : "The type of the function.",
"enum" : [
"TRANSFORMATION"
]
},
"id" : {
"type" : "string",
"description" : "The unique identifier of the script."
},
"source" : {
"type" : "string",
"description" : "The base64 encoded function source code."
},
"version" : {
"type" : "integer",
"format": "int32",
"description" : "The version of the script."
}
},
"required" : [
"functionType",
"id",
"source"
]
}
}
}
Data Hub Preset Reloading
The preset
file that defines the initial Data Hub model is checked for modifications every 5 seconds. If changes are detected, HiveMQ Edge attempts to apply them dynamically, by means of calls to the
HiveMQ Edge Open API in the following order:
-
Schemas
-
Scripts
-
Data Policies
-
Behavior Policies
The order of elements is important to ensure that dependencies are properly resolved during the update process. For example, scripts or schemas must be loaded before any policies that reference them, to avoid errors and ensure correct behavior. |
The extent to which you can modify a running Data Hub model varies based on the model type.
Scripts and Schemas
Scripts (Script
) and schemas (PolicySchema
) are identified with a unique ID and version number. Version numbers start at 1 and must increment sequentially with no gaps (1, 2, 3, …). The first script
or schema
in a preset
file must be explicitly declared as version 1, and a version 3 cannot exist without a preceding version 2.
Limitations: After HiveMQ Edge starts and loads the initial preset
file, existing scripts and schemas cannot be modified or removed, all of their attributes are exclusively read-only. This means that if version v
of a script or schema was present in the preset file when HiveMQ Edge started, none of its attributes can be modified. To ensure data consistency and proper initialization, modifications or deletions require a restart of HiveMQ Edge. To update a script or schema without a restart, you can create a new version (+1). All versions of a script or schema must be presented in ascending order in the preset
file.
Allowed Operations: You can add new scripts or new versions of existing scripts, and add new schemas or new versions of existing schemas to the preset
file. The newly added elements can then be referenced in subsequently defined or modified data and behavior policies.
Behavior policies and Data policies
Behavior policies (BehaviorPolicy
) and data policies (DataPolicy
) are uniquely identified by their ID. They are not versioned.
Limitations: You cannot remove an existing behavior or data policy while HiveMQ Edge is running. References to scripts and schemas must use the exact ID and version number. The keyword latest
is not supported.
Allowed Operations: You can modify existing behavior and data policies and add new behavior and data policies to the preset
file. These changes will be applied dynamically without requiring a restart. When changes are detected, the affected policy is deleted and created anew with the latest definition found in the preset file.
Editing the preset file at runtime offers flexibility, but it is important to understand its limitations. Modifying or removing existing scripts and schemas requires a restart, whereas adding new scripts/schemas or scripts/schemas versions and modifying or adding behavior or data policies can be done dynamically. To avoid unexpected behavior, always ensure that your preset file changes maintain a valid Data Hub model. For more information, see the Script , PolicySchema , BehaviorPolicy , and DataPolicy definitions in the HiveMQ Edge Open API documentation.
|
Data Hub in the HiveMQ Edge REST API
The Data Hub portions of the HiveMQ REST API and the HiveMQ Edge REST API are identical. All available API interactions work the same way.
However, access to the HiveMQ Edge REST API requires JWT authorization. For more information on JWTs in HiveMQ Edge, see HiveMQ Edge API . An example to obtain a token is shown using cURL.
User Interface
Data Hub on Edge provides an easy-to-use way to manage policies, schemas and scripts.
List Data Hub Resources
In the screenshot below all available resources like policies, schemas and scripts are listed with additional information such. On the left-hand side, further actions per resource can be executed, such as downloading, or deleting the resource.
Downloading in particular is very helpful when transferring the resource from a testing environment to a production environment. However, all these actions can be executed via REST API as well.
Data Hub Policy Designer
Data Hub on Edge provides a Policy Designer, a visual tool to build resources. The Policy Designer is built to offer a completely revamped user experience. With the help of the designer, users can effortlessly build policies, schemas, and scripts all from a single view. Users can easily open the designer via the menu, as depicted in the screenshot below.
JavaScript Editing
The Policy Designer provides an easy-to-use JavaScript editor with syntax highlight shown in the screenshot below directly editable via a browser.