Data Hub Preset Loading
When HiveMQ Edge is installed with Helm, Data Hub loads its model from a JSON file called preset
.
The preset file contains a single JSON object with four properties:
-
"scripts"
: array of instances of type Script -
"schemas"
: array of instances of type PolicySchema -
"behaviorPolicies"
: array of instances of type BehaviorPolicy -
"dataPolicies"
: array of instances of type DataPolicy
The schemas, Script, PolicySchema, BehaviorPolicy and DataPolicy are defined in the version-controlled HiveMQ Edge Open API document (the relevant file will match in name with your HiveMQ Edge’s installed version).
Modifications to the Data Hub model must be entered directly in the preset file.
|
{
"scripts" : [
{
"createdAt" : "2025-02-20T12:51:22.428Z",
"description" : "This function transforms a publish.",
"functionType" : "TRANSFORMATION",
"id" : "my-transform.js",
"source" : "ZnVuY3Rpb24gdHJhbnNmb3JtKHB1Ymxpc2gsIGNvbnRleHQpIHsgcmV0dXJuIHB1Ymxpc2g7IH0=",
"version" : 1
}
],
"schemas" : [
{
"arguments" : {},
"createdAt" : "2025-02-20T14:07:01.604Z",
"id" : "schema",
"schemaDefinition" : "eyAgIiRpZCI6ICJodHRwczovL2V4YW1wbGUuY29tL3BlcnNvbi5zY2hlbWEuanNvbiIsICAidHlwZSI6ICJvYmplY3QiLCAgInByb3BlcnRpZXMiOiB7ICAgICJzdHJlZXRfYWRkcmVzcyI6IHsgInR5cGUiOiAic3RyaW5nIiB9LCAgICAiY2l0eSI6IHsgInR5cGUiOiAic3RyaW5nIiB9LCAgICAic3RhdGUiOiB7ICJ0eXBlIjogInN0cmluZyIgfSAgfSwgICJyZXF1aXJlZCI6IFsic3RyZWV0X2FkZHJlc3MiLCAiY2l0eSIsICJzdGF0ZSJdfQ==",
"type" : "JSON",
"version" : 1
}
],
"behaviorPolicies" : [
{
"behavior" : {
"id" : "Publish.quota",
"arguments" : {
"minPublishes" : 1,
"maxPublishes" : 10
}
},
"id" : "wildcardLogBehaviorPolicy",
"matching" : {
"clientIdRegex" : ".*"
},
"createdAt" : "2025-02-20T15:24:02.403Z",
"deserialization" : {
"publish" : {
"schema" : {
"schemaId" : "schema",
"version" : "latest"
}
},
"will" : {
"schema" : {
"schemaId" : "schema",
"version" : "latest"
}
}
},
"lastUpdatedAt" : "2025-02-20T15:24:02.403Z",
"onTransitions" : [
{
"fromState" : "Initial",
"toState" : "Connected",
"Mqtt.OnInboundConnect" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: ${fromState} to ${toState} on ${triggerEvent}",
"level" : "INFO"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
}
},
{
"fromState" : "Any.*",
"toState" : "Publishing",
"Mqtt.OnInboundPublish" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to publishing on ${triggerEvent}",
"level" : "INFO"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
}
},
{
"fromState" : "Any.*",
"toState" : "Violated",
"Connection.OnDisconnect" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to violated on ${triggerEvent}",
"level" : "WARN"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
},
"Event.OnAny" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to violated on any",
"level" : "WARN"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
},
"Mqtt.OnInboundPublish" : {
"pipeline" : [
{
"arguments" : {
"message" : "Behavior policy ${policyId}: any to violated on ${triggerEvent}",
"level" : "WARN"
},
"functionId" : "System.log",
"id" : "logFunction"
}
]
}
}
]
}
],
"dataPolicies" : [
{
"createdAt" : "2025-02-20T12:51:22.428Z",
"id" : "my-policy",
"lastUpdatedAt" : "2025-02-20T12:51:22.428Z",
"matching" : {
"topicFilter" : "#"
},
"onFailure" : {
"pipeline" : [
{
"arguments" : {
"level" : "WARN",
"message" : "${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}'."
},
"functionId" : "System.log",
"id" : "logFailure"
}
]
},
"onSuccess" : {
"pipeline" : [
{
"arguments" : {
"level" : "INFO",
"message" : "${clientId} sent a valid publish on topic '${topic}' with result '${validationResult}'."
},
"functionId" : "System.log",
"id" : "logSuccess"
}
]
},
"validation" : {
"validators" : [
{
"arguments" : {
"schemas" : [
{
"schemaId" : "schema",
"version" : "latest"
}
],
"strategy" : "ANY_OF"
},
"type" : "SCHEMA"
}
]
}
}
]
}
The following JSON Schema is used to validate the preset file:
{
"$schema" : "https://json-schema.org/draft/2020-12/schema",
"$id" : "https://hivemq.com/schemas/datahub_preset_schema.json",
"type" : "object",
"title" : "Preset",
"description" : "A top level container for schemas, scripts, and behavior/data policies",
"properties" : {
"scripts" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/Script"
}
},
"schemas" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/PolicySchema"
}
},
"behaviorPolicies" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/BehaviorPolicy"
}
},
"dataPolicies" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/DataPolicy"
}
}
},
"required" : [
"scripts",
"schemas",
"behaviorPolicies",
"dataPolicies"
],
"$defs" : {
"BehaviorPolicy" : {
"type" : "object",
"description" : "A policy which is used to validate and execute certain actions based on the validation result.",
"properties" : {
"behavior" : {
"type" : "object",
"description" : "The behavior referenced by the policy, that is validated by the policy.",
"properties" : {
"arguments" : {
"type" : "object",
"description" : "The arguments that the referenced validator type requires."
},
"id" : {
"type" : "string",
"description" : "The unique identifier of a pre-defined behavior."
}
},
"required" : [
"id"
]
},
"createdAt" : {
"type" : "string",
"format" : "date-time",
"description" : "The UTC formatted timestamp when the policy was created."
},
"deserialization" : {
"type" : "object",
"description" : "The deserializers used by the policy for particular message and/or payload types.",
"properties" : {
"publish" : {
"$ref" : "#/$defs/BehaviorPolicyDeserializer"
},
"will" : {
"$ref" : "#/$defs/BehaviorPolicyDeserializer"
}
}
},
"id" : {
"type" : "string",
"description" : "The unique identifier of the policy."
},
"lastUpdatedAt" : {
"type" : "string",
"format" : "date-time",
"description" : "The UTC formatted timestamp when the policy was most recently updated."
},
"matching" : {
"type" : "object",
"description" : "The matching rules the policy applies.",
"properties" : {
"clientIdRegex" : {
"type" : "string",
"description" : "The regex pattern to match the client id against."
}
},
"required" : [
"clientIdRegex"
]
},
"onTransitions" : {
"type" : "array",
"items" : {
"type" : "object",
"description" : "The actions that are executed for the specified transition.",
"properties" : {
"fromState" : {
"type" : "string",
"description" : "The exact state from which the transition happened."
},
"toState" : {
"type" : "string",
"description" : "The exact state to which the transition happened."
},
"Connection.OnDisconnect" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Event.OnAny" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundConnect" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundDisconnect" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundPublish" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
},
"Mqtt.OnInboundSubscribe" : {
"$ref" : "#/$defs/BehaviorPolicyOnEvent"
}
},
"required" : [
"fromState",
"toState"
]
}
}
},
"required" : [
"behavior",
"id",
"matching"
]
},
"BehaviorPolicyDeserializer" : {
"type" : "object",
"description" : "The deserializer applied to a particular message or payload type.",
"properties" : {
"schema" : {
"type" : "object",
"description" : "A schema reference is a unique identifier for a schema.",
"properties" : {
"schemaId" : {
"type" : "string",
"description" : "The identifier of the schema."
},
"version" : {
"type" : "string",
"description" : "The version of the schema. The value 'latest' may be used to always refer to the latest schema."
}
},
"required" : [
"schemaId",
"version"
]
}
},
"required" : [
"schema"
]
},
"BehaviorPolicyOnEvent" : {
"type" : "object",
"description" : "One or more operations that are triggered when the event occurs. If this field is empty, the transition does not trigger any operations.",
"properties" : {
"pipeline" : {
"type" : "array",
"items" : {
"$ref" : "#/$defs/PolicyOperation"
}
}
}
},
"DataPolicy" : {
"type" : "object",
"description" : "A data policy which is used to validate and execute certain actions based on the validation result.",
"properties" : {
"id" : {
"type" : "string",
"description" : "The unique identifier of the policy."
},
"matching" : {
"type" : "object",
"description" : "The matching rules the policy applies.",
"properties" : {
"topicFilter" : {
"type" : "string",
"description" : "The topic filter for which the policy is matched."
}
},
"required" : [
"topicFilter"
]
},
"onFailure" : {
"$ref" : "#/$defs/DataPolicyAction"
},
"onSuccess" : {
"$ref" : "#/$defs/DataPolicyAction"
},
"validation" : {
"type" : "object",
"description" : "The section of the policy that defines how incoming MQTT messages are validated. If this section is empty, the result of the policy validation is always successful.",
"properties" : {
"validators" : {
"type" : "array",
"description" : "The validators of the policy.",
"items" : {
"type" : "object",
"description" : "A policy validator which executes the defined validation.",
"properties" : {
"arguments" : {
"type" : "object",
"description" : "The required arguments of the referenced validator type."
},
"type" : {
"type" : "string",
"description" : "The type of the validator.",
"enum" : [
"SCHEMA"
]
}
},
"required" : [
"arguments",
"type"
]
}
}
}
}
},
"required" : [
"id",
"matching"
]
},
"DataPolicyAction" : {
"type" : "object",
"description" : "One or more operations the outcome of the validation triggers. When this field is empty, the outcome of the policy validation does not trigger any operations.",
"properties" : {
"pipeline" : {
"type" : "array",
"description" : "The pipeline to execute, when this action is triggered. The operations in the pipeline are executed in-order.",
"items" : {
"$ref" : "#/$defs/PolicyOperation"
}
}
}
},
"PolicySchema" : {
"type" : "object",
"properties" : {
"id" : {
"type" : "string",
"description" : "The unique identifier of the schema."
},
"schemaDefinition" : {
"type" : "string",
"description" : "The base64 encoded schema definition."
},
"arguments" : {
"type" : "object",
"description" : "The schema type dependent arguments.",
"properties" : {
"additionalProperties" : {
"type" : "string",
"description" : "The schema type dependent arguments."
}
}
},
"type" : {
"type" : "string",
"description" : "The type of the schema."
},
"version" : {
"type" : "integer",
"format": "int32",
"description" : "The version of the schema."
}
},
"required" : [
"id",
"schemaDefinition",
"type"
]
},
"PolicyOperation" : {
"type" : "object",
"description" : "The pipeline to execute when this action is triggered. The operations in the pipeline are executed in order.",
"properties" : {
"arguments" : {
"type" : "object",
"description" : "The required arguments of the referenced function."
},
"functionId" : {
"type" : "string",
"description" : "The unique ID of the referenced function to execute in this operation."
},
"id" : {
"type" : "string",
"description" : "The unique ID of the operation in the pipeline."
}
},
"required" : [
"arguments",
"functionId",
"id"
]
},
"Script" : {
"type" : "object",
"properties" : {
"functionType" : {
"type" : "string",
"description" : "The type of the function.",
"enum" : [
"TRANSFORMATION"
]
},
"id" : {
"type" : "string",
"description" : "The unique identifier of the script."
},
"source" : {
"type" : "string",
"description" : "The base64 encoded function source code."
},
"version" : {
"type" : "integer",
"format": "int32",
"description" : "The version of the script."
}
},
"required" : [
"functionType",
"id",
"source"
]
}
}
}
Data Hub Preset Reloading
The preset
file that defines the initial Data Hub model is checked for modifications every 5 seconds. If changes are detected, HiveMQ Edge attempts to apply them dynamically, by means of calls to the
HiveMQ Edge Open API in the following order:
-
Schemas
-
Scripts
-
Data Policies
-
Behavior Policies
The order of elements is important to ensure that dependencies are properly resolved during the update process. For example, scripts or schemas must be loaded before any policies that reference them, to avoid errors and ensure correct behavior. |
The extent to which you can modify a running Data Hub model varies based on the model type.
Scripts and Schemas
Scripts (Script
) and schemas (PolicySchema
) are identified with a unique ID and version number. Version numbers start at 1 and must increment sequentially with no gaps (1, 2, 3, …). The first script
or schema
in a preset
file must be explicitly declared as version 1, and a version 3 cannot exist without a preceding version 2.
Limitations: After HiveMQ Edge starts and loads the initial preset
file, existing scripts and schemas cannot be modified or removed, all of their attributes are exclusively read-only. This means that if version v
of a script or schema was present in the preset file when HiveMQ Edge started, none of its attributes can be modified. To ensure data consistency and proper initialization, modifications or deletions require a restart of HiveMQ Edge. To update a script or schema without a restart, you can create a new version (+1). All versions of a script or schema must be presented in ascending order in the preset
file.
Allowed Operations: You can add new scripts or new versions of existing scripts, and add new schemas or new versions of existing schemas to the preset
file. The newly added elements can then be referenced in subsequently defined or modified data and behavior policies.
Behavior policies and Data policies
Behavior policies (BehaviorPolicy
) and data policies (DataPolicy
) are uniquely identified by their ID. They are not versioned.
Limitations: You cannot remove an existing behavior or data policy while HiveMQ Edge is running. References to scripts and schemas must use the exact ID and version number. The keyword latest
is not supported.
Allowed Operations: You can modify existing behavior and data policies and add new behavior and data policies to the preset
file. These changes will be applied dynamically without requiring a restart. When changes are detected, the affected policy is deleted and created anew with the latest definition found in the preset file.
Editing the preset file at runtime offers flexibility, but it is important to understand its limitations. Modifying or removing existing scripts and schemas requires a restart, whereas adding new scripts/schemas or scripts/schemas versions and modifying or adding behavior or data policies can be done dynamically. To avoid unexpected behavior, always ensure that your preset file changes maintain a valid Data Hub model. For more information, see the Script , PolicySchema , BehaviorPolicy , and DataPolicy definitions in the HiveMQ Edge Open API documentation.
|