Data Hub On Edge
Data Hub is available on HiveMQ Edge and offers the same functionality as for the HiveMQ Enterprise broker. For detailed information, see our Data Hub Quick Start Guide.
Data Hub Configuration on HiveMQ Edge
Data Hub on HiveMQ Edge can be configured with the following settings:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<hivemq>
<modules>
<data-hub>
<data-validation>
<enabled>true</enabled>
</data-validation>
<behavior-validation>
<enabled>true</enabled>
</behavior-validation>
<scripting>
<enabled>true</enabled>
</scripting>
</data-hub>
</modules>
</hivemq>
The example configuration enables all three main components in Data Hub by setting their respective flag to true
.
Data Hub Preset Loading
When HiveMQ Edge is installed with Helm, Data Hub loads its model from a JSON file called preset
.
The preset file contains a single JSON object with four properties:
-
"scripts"
: array of instances of type Script -
"schemas"
: array of instances of type PolicySchema -
"behaviorPolicies"
: array of instances of type BehaviorPolicy -
"dataPolicies"
: array of instances of type DataPolicy
The schemas, Script, PolicySchema, BehaviorPolicy and DataPolicy are defined in the version-controlled HiveMQ Edge Open API document.
Modifications to the Data Hub model must be entered directly in the preset file.
|
{
"scripts" : [
{
"createdAt" : "2025-02-20T12:51:22.428Z",
"description" : "This function transforms a publish.",
"functionType" : "TRANSFORMATION",
"id" : "my-transform.js",
"source" : "ZnVuY3Rpb24gdHJhbnNmb3JtKHB1Ymxpc2gsIGNvbnRleHQpIHsgcmV0dXJuIHB1Ymxpc2g7IH0=",
"version" : 1
}
],
"schemas" : [
{
"arguments" : {},
"createdAt" : "2025-02-20T14:07:01.604Z",
"id" : "schema",
"schemaDefinition" : "eyAgIiRpZCI6ICJodHRwczovL2V4YW1wbGUuY29tL3BlcnNvbi5zY2hlbWEuanNvbiIsICAidHlwZSI6ICJvYmplY3QiLCAgInByb3BlcnRpZXMiOiB7ICAgICJzdHJlZXRfYWRkcmVzcyI6IHsgInR5cGUiOiAic3RyaW5nIiB9LCAgICAiY2l0eSI6IHsgInR5cGUiOiAic3RyaW5nIiB9LCAgICAic3RhdGUiOiB7ICJ0eXBlIjogInN0cmluZyIgfSAgfSwgICJyZXF1aXJlZCI6IFsic3RyZWV0X2FkZHJlc3MiLCAiY2l0eSIsICJzdGF0ZSJdfQ==",
"type" : "JSON",
"version" : 1
}
],
"behaviorPolicies" : [
{
"behavior" : {
"id" : "Publish.quota",
"arguments" : {
"minPublishes" : 1,
"maxPublishes" : 10
}
},
"id" : "wildcardLogBehaviorPolicy",
"matching" : {
"clientIdRegex" : ".*"
},
"createdAt" : "2025-02-20T15:24:02.403Z",
"deserialization" : {
"publish" : {
"schema" : {
"schemaId" : "schema",
"version" : "latest"
}
},
"will" : {
"schema" : {
"schemaId" : "schema",
"version" : "latest"
}
}
},
"lastUpdatedAt" : "2025-02-20T15:24:02.403Z",
"onTransitions" : [
{
"fromState" : "Initial",
"toState" : "Connected",
"Mqtt.OnInboundConnect" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: ${fromState} to ${toState} on ${triggerEvent}",
"level" : "INFO"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
}
},
{
"fromState" : "Any.*",
"toState" : "Publishing",
"Mqtt.OnInboundPublish" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to publishing on ${triggerEvent}",
"level" : "INFO"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
}
},
{
"fromState" : "Any.*",
"toState" : "Violated",
"Connection.OnDisconnect" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to violated on ${triggerEvent}",
"level" : "WARN"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
},
"Event.OnAny" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to violated on any",
"level" : "WARN"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
},
"Mqtt.OnInboundPublish" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to violated on ${triggerEvent}",
"level" : "WARN"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
}
}
]
}
],
"dataPolicies" : [
{
"createdAt" : "2025-02-20T12:51:22.428Z",
"id" : "my-policy",
"lastUpdatedAt" : "2025-02-20T12:51:22.428Z",
"matching" : {
"topicFilter" : "#"
},
"onFailure" : {
"pipeline" : [
{
"arguments" : {
"level" : "WARN",
"message" : "${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}'."
},
"functionId" : "System.log",
"id" : "logFailure"
}
]
},
"onSuccess" : {
"pipeline" : [
{
"arguments" : {
"level" : "INFO",
"message" : "${clientId} sent a valid publish on topic '${topic}' with result '${validationResult}'."
},
"functionId" : "System.log",
"id" : "logSuccess"
}
]
},
"validation" : {
"validators" : [
{
"arguments" : {
"schemas" : [
{
"schemaId" : "schema",
"version" : "latest"
}
],
"strategy" : "ANY_OF"
},
"type" : "SCHEMA"
}
]
}
}
]
}
The following JSON Schema is used to validate the preset file:
{
"$schema" : "https://json-schema.org/draft/2020-12/schema",
"$id" : "https://hivemq.com/schemas/datahub_preset_schema.json",
"type" : "object",
"title" : "Preset",
"description" : "A top level container for schemas, scripts, and behavior/data policies",
"properties" : {
"scripts" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/Script"
}
},
"schemas" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/PolicySchema"
}
},
"behaviorPolicies" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/BehaviorPolicy"
}
},
"dataPolicies" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/DataPolicy"
}
}
},
"required" : [
"scripts",
"schemas",
"behaviorPolicies",
"dataPolicies"
],
"$defs" : {
"BehaviorPolicy" : {
"type" : "object",
"description" : "A policy which is used to validate and execute certain actions based on the validation result.",
"properties" : {
"behavior" : {
"type" : "object",
"description" : "The behavior referenced by the policy, that is validated by the policy.",
"properties" : {
"arguments" : {
"type" : "object",
"description" : "The arguments that the referenced validator type requires."
},
"id" : {
"type" : "string",
"description" : "The unique identifier of a pre-defined behavior."
}
},
"required" : [
"id"
]
},
"createdAt" : {
"type" : "string",
"format" : "date-time",
"description" : "The UTC formatted timestamp when the policy was created."
},
"deserialization" : {
"type" : "object",
"description" : "The deserializers used by the policy for particular message and/or payload types.",
"properties" : {
"publish" : {
"$ref" : "#/$defs/BehaviorPolicyDeserializer"
},
"will" : {
"$ref" : "#/$defs/BehaviorPolicyDeserializer"
}
}
},
"id" : {
"type" : "string",
"description" : "The unique identifier of the policy."
},
"lastUpdatedAt" : {
"type" : "string",
"format" : "date-time",
"description" : "The UTC formatted timestamp when the policy was most recently updated."
},
"matching" : {
"type" : "object",
"description" : "The matching rules the policy applies.",
"properties" : {
"clientIdRegex" : {
"type" : "string",
"description" : "The regex pattern to match the client id against."
}
},
"required" : [
"clientIdRegex"
]
},
"onTransitions" : {
"type" : "array",
"items" : {
"type" : "object",
"description" : "The actions that are executed for the specified transition.",
"properties" : {
"fromState" : {
"type" : "string",
"description" : "The exact state from which the transition happened."
},
"toState" : {
"type" : "string",
"description" : "The exact state to which the transition happened."
},
"Connection.OnDisconnect" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Event.OnAny" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundConnect" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundDisconnect" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundPublish" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundSubscribe" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
}
},
"required" : [
"fromState",
"toState"
]
}
}
},
"required" : [
"behavior",
"id",
"matching"
]
},
"BehaviorPolicyDeserializer" : {
"type" : "object",
"description" : "The deserializer applied to a particular message or payload type.",
"properties" : {
"schema" : {
"type" : "object",
"description" : "A schema reference is a unique identifier for a schema.",
"properties" : {
"schemaId" : {
"type" : "string",
"description" : "The identifier of the schema."
},
"version" : {
"type" : "string",
"description" : "The version of the schema. The value 'latest' may be used to always refer to the latest schema."
}
},
"required" : [
"schemaId",
"version"
]
}
},
"required" : [
"schema"
]
},
"BehaviorPolicyOnEvent" : {
"type" : "object",
"description" : "One or more operations that are triggered when the event occurs. If this field is empty, the transition does not trigger any operations.",
"properties" : {
"pipeline" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/PolicyOperation"
}
}
}
},
"DataPolicy" : {
"type" : "object",
"description" : "A data policy which is used to validate and execute certain actions based on the validation result.",
"properties" : {
"id" : {
"type" : "string",
"description" : "The unique identifier of the policy."
},
"matching" : {
"type" : "object",
"description" : "The matching rules the policy applies.",
"properties" : {
"topicFilter" : {
"type" : "string",
"description" : "The topic filter for which the policy is matched."
}
},
"required" : [
"topicFilter"
]
},
"onFailure" : {
"$ref" : "#/$defs/DataPolicyAction"
},
"onSuccess" : {
"$ref" : "#/$defs/DataPolicyAction"
},
"validation" : {
"type" : "object",
"description" : "The section of the policy that defines how incoming MQTT messages are validated. If this section is empty, the result of the policy validation is always successful.",
"properties" : {
"validators" : {
"type" : "array",
"description" : "The validators of the policy.",
"items" : {
"type" : "object",
"description" : "A policy validator which executes the defined validation.",
"properties" : {
"arguments" : {
"type" : "object",
"description" : "The required arguments of the referenced validator type."
},
"type" : {
"type" : "string",
"description" : "The type of the validator.",
"enum" : [
"SCHEMA"
]
}
},
"required" : [
"arguments",
"type"
]
}
}
}
}
},
"required" : [
"id",
"matching"
]
},
"DataPolicyAction" : {
"type" : "object",
"description" : "One or more operations the outcome of the validation triggers. When this field is empty, the outcome of the policy validation does not trigger any operations.",
"properties" : {
"pipeline" : {
"type" : "array",
"description" : "The pipeline to execute, when this action is triggered. The operations in the pipeline are executed in-order.",
"items" : {
"$ref" : "#/$defs/PolicyOperation"
}
}
}
},
"PolicySchema" : {
"type" : "object",
"properties" : {
"id" : {
"type" : "string",
"description" : "The unique identifier of the schema."
},
"schemaDefinition" : {
"type" : "string",
"description" : "The base64 encoded schema definition."
},
"arguments" : {
"type" : "object",
"description" : "The schema type dependent arguments.",
"properties" : {
"additionalProperties" : {
"type" : "string",
"description" : "The schema type dependent arguments."
}
}
},
"type" : {
"type" : "string",
"description" : "The type of the schema."
},
"version" : {
"type" : "integer",
"format": "int32",
"description" : "The version of the schema."
}
},
"required" : [
"id",
"schemaDefinition",
"type"
]
},
"PolicyOperation" : {
"type" : "object",
"description" : "The pipeline to execute when this action is triggered. The operations in the pipeline are executed in order.",
"properties" : {
"arguments" : {
"type" : "object",
"description" : "The required arguments of the referenced function."
},
"functionId" : {
"type" : "string",
"description" : "The unique ID of the referenced function to execute in this operation."
},
"id" : {
"type" : "string",
"description" : "The unique ID of the operation in the pipeline."
}
},
"required" : [
"arguments",
"functionId",
"id"
]
},
"Script" : {
"type" : "object",
"properties" : {
"functionType" : {
"type" : "string",
"description" : "The type of the function.",
"enum" : [
"TRANSFORMATION"
]
},
"id" : {
"type" : "string",
"description" : "The unique identifier of the script."
},
"source" : {
"type" : "string",
"description" : "The base64 encoded function source code."
},
"version" : {
"type" : "integer",
"format": "int32",
"description" : "The version of the script."
}
},
"required" : [
"functionType",
"id",
"source"
]
}
}
}
Data Hub Preset Reloading
The preset
file that defines the initial Data Hub model is checked for modifications every 60 seconds. If changes are detected, HiveMQ Edge attempts to apply them dynamically. The extent to which the running Data Hub model can be altered depends on the type of model element that you modify.
Here is a brief description of the update behavior for each element type:
-
Scripts (of type
Script
) and schemas (of typePolicySchema
) are uniquely identified by their ID and version number. Limitations: Once HiveMQ Edge starts and loads the initialpreset
file, existing scripts and schemas cannot be modified or removed. These operations require a restart of HiveMQ Edge to ensure data consistency and proper initialization. Allowed Operations: You can add new scripts or new versions of existing scripts, and add new schemas or new versions of existing schemas to thepreset
file. The newly added elements can then be referenced in subsequently defined or modified data and behavior policies. -
Behavior policies (of type
BehaviorPolicy
) and data policies (of typeDataPolicy
) are uniquely identified by their ID. They are not versioned. Limitations: Existing behavior policies and data policies cannot be removed while HiveMQ Edge is running. References to scripts and schemas must use the exact ID number. The keywordlatest
is not supported. Allowed Operations: You can modify existing behavior and data policies and add new behavior and data policies to thepreset
file. These changes will be applied dynamically without requiring a restart.
Changes detected in the preset file are applied in the following order:
-
Schemas
-
Scripts
-
Data Policies
-
Behavior Policies
This order ensures that any dependencies between these elements are correctly resolved during the update process. For example, new scripts or schemas must be processed before policies that might reference them.
Editing the preset file at runtime offers flexibility, but it is important to understand its limitations. Modifying or removing existing scripts and schemas requires a restart, whereas adding new scripts/schemas or scripts/schemas versions and modifying or adding behavior or data policies can be done dynamically. To avoid unexpected behavior, always ensure that your preset file changes maintain a valid Data Hub model. For more information, see the Script , PolicySchema , BehaviorPolicy , and DataPolicy definitions in the HiveMQ Edge Open API documentation.
|
Data Hub in the HiveMQ Edge REST API
The Data Hub portions of the HiveMQ REST API and the HiveMQ Edge REST API are identical. All available API interactions work the same way.
However, access to the HiveMQ Edge REST API requires JWT authorization. For more information on JWTs in HiveMQ Edge, see HiveMQ Edge API . An example to obtain a token is shown using cURL.
User Interface
Data Hub on Edge provides an easy-to-use way to manage policies, schemas and scripts.
List Data Hub Resources
In the screenshot below all available resources like policies, schemas and scripts are listed with additional information such. On the left-hand side, further actions per resource can be executed, such as downloading, or deleting the resource.
Downloading in particular is very helpful when transferring the resource from a testing environment to a production environment. However, all these actions can be executed via REST API as well.
Data Hub Policy Designer
Data Hub on Edge provides a Policy Designer, a visual tool to build resources. The Policy Designer is built to offer a completely revamped user experience. With the help of the designer, users can effortlessly build policies, schemas, and scripts all from a single view. Users can easily open the designer via the menu, as depicted in the screenshot below.
JavaScript Editing
The Policy Designer provides an easy-to-use JavaScript editor with syntax highlight shown in the screenshot below directly editable via a browser.