Actions
In HiveMQ Data Hub, an action is a collection of operations the broker performs in response to the outcome of a data-validation or a state transition in behavior-validation.
Actions give you the option to specify what happens once a validation finishes or a client transitions through various states to which the configured behavior model applies.
The available functions vary based on the type of the policy.
For more information, see Actions in Data Policies and Actions in Behavior Policies.
Actions in Data Policies
The outcome of each MQTT message validation in the validations
portion of a data policy can be success
or failure
.
Based on the validation outcome, you can define what the HiveMQ broker does in optional onSuccess
and onFailure
actions.
The onSuccess
and onFailure
actions each contain a pipeline of operations that executes tasks in the defined order to achieve the desired result.
Parameter | Description |
---|---|
|
Lists a sequence of operations in an array. HiveMQ executes the operations in the order they are listed.
|
onSuccess
and onFailure
configurations in a data policy "onSuccess": {
"pipeline": [
{
"id": "logSuccess",
"functionId": "System.log",
"arguments": {
"level": "INFO",
"message": "${clientId} sent a valid publish on topic '${topic}' with result '${validationResult}'"
}
}
]
},
"onFailure": {
"pipeline": [
{
"id": "logFailure",
"functionId": "System.log",
"arguments": {
"level": "WARN",
"message": "${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}'"
}
}
]
}
The onSuccess
and onFailure
action pipelines in a data policy have different default behaviors:
-
onSuccess
: If there is no terminal function in the pipeline, the MQTT message is published to the original topic. The MQTT message is acknowledged according to the MQTT specification. -
onFailure
: If there is no terminal function in the pipeline, the MQTT message is published to the original topic. However, actions such as dropping the message can be defined.
The publishing MQTT client is handled as follows:-
MQTT 3: The client is disconnected.
-
MQTT 5: A
PUBACK
with the reason stringThe publish processing was prevented by a policy.
and the reason code131
(Implementation Specific Error) is sent.
-
For more information, see Functions.
Transformations in Data Policies
This section of our documentation is part of the Early Access Preview (EAP) of the HiveMQ Data Hub Transformation feature. The documentation and the release candidate that it references are not intended for production use and can contain errors. Please note that we do not guarantee compatibility between EAP and final versions of the same feature. We appreciate your feedback as you try out the EAP and are happy to help. The best place to get in contact is our community forum. |
The Data Hub policy engine can execute transformation functions in the action pipelines of a data policy.
All transformation functions are non-terminal functions.
For each MQTT PUBLISH packet the data policy processes and references, the policy engine invokes the configured transformation function with the MQTT publish and a
Context object.
The resulting publish
-object is returned by the script and passed
to the next function in a pipeline.
Multiple transformation functions can be executed sequentially.
For more information, see Transformations.
Transformation Syntax in Data Policies
The following syntax can be used to reference a transformation function in an action pipeline of a data policy:
-
fn:function-id:latest
wherefn
is a necessary prefix to identify a script,function-id
specifies the function identifier during script creation, andlatest
states the latest version of the script. If desired, you can also specify a particular script version.
For more information on versioning, see Create Script. For more information on transformation functions, see Non-terminal Functions.
Deserialization & Serialization
Incoming and outgoing MQTT message payloads can be any arbitrary sequence of bytes.
However, in various application scenarios it can be useful to execute an intermediate transformation step as a script to ensure data is suitable for its intended purpose.
Every transformation script requires two steps:
-
A deserialization step to make incoming data accessible
-
A serialization step to transmit a byte sequence to consumers
Transformation functions can be embedded into your action pipelines. Functions are passed in a publish-object. The publish object includes a serialized payload that requires a deserialization. The deserialization step converts incoming MQTT payloads into an internal JSON representation. The JSON object is made available to the scripts for standardized data manipulation, regardless of the incoming data representation.
For example, for JavaScript, we provide JavaScript-flavored object manipulation.
A corresponding serialization step is required to serialize the transformed data into the desired data format.
The following structure is executed:
-
The incoming MQTT payload is deserialized according to a specified schema using a deserialization function. For more information, see Serdes.deserialize.
-
Transformation scripts are executed step by step. Multiple functions can be executed sequentially.
-
Finally, the transformed data is serialized to publish to the actual topic. For more information, see Serdes.serialize.
Example transformation in an action pipeline
The following example shows a transformation script with the prerequisite deserialization and serialization:
"onSuccess": {
"pipeline": [
{
"id": "operation-2eng0",
"functionId": "Serdes.deserialize",
"arguments": {
"schemaId": "schema-from-sensor"
"schemaVersion": "latest",
}
},
{
"id": "operation-ek2Mx",
"functionId": "fn:fahrenheit-to-celsius:latest",
"arguments": {}
},
{
"id": "operation-4DBF3",
"functionId": "Serdes.serialize",
"arguments": {
"schemaId": "schema-for-fan"
"schemaVersion": "latest",
}
}
]
}
The first action in this pipeline configures a Serdes.deserialize
function.
This function deserializes
the incoming MQTT message according to the schema-from-sensor
schema.
Next, the fahrenheit-to-celsius
transformation function is executed to perform a unit conversion.
Finally, to complete the action pipeline, the
Serdes.serialize function serializes the transformed
message into JSON format according to a new schema-for-fan
schema.
Branches in Data Policies
A branch
is a named and bounded list of MQTT messages created in a transformation
script. For more information, see addPublish
-function in Transformation Script
The total number of created messages in a single script invocation is 100 .
The created message limit is per transformation script, even if the messages are split across multiple branches.
|
For each created message in a branch, further actions are defined in a data policy.
Parameter | Description |
---|---|
|
Defines a list of one or more pipelines of operations to be executed for created messages on each specified
|
Currently, onBranch pipelines support the Serdes.serialize function only.
Each pipeline must contain a single Serdes.serialize .
Empty pipelines are not allowed. Additional functions can be added in future releases.
|
The following data flow is used for publish messages created by a transformation script.
A script can add multiple messages to a named branch.
The pipeline in the data policy with the corresponding branchId
is executed for these messages.
The function Serdes.serialize
in the branch pipeline defines the schema used for serialization of the created messages in that branch.
All messages of a branch are serialized and published to their designated topic.
The original message is handled according to the pipeline defined in onSuccess
or onFailure
.
{
"onSuccess": {
"pipeline": [
{
"id": "deserialize",
"functionId": "Serdes.deserialize",
"arguments": {
"schemaId": "schema-from-sensor",
"schemaVersion": "latest"
}
},
{
"id": "script-fahrenheit-to-celsius",
"functionId": "fn:fahrenheit-to-celsius:latest",
"arguments": {},
"onBranch": [
{
"branchId": "branch1",
"pipeline": [
{
"id": "serde-alert-schema",
"functionId": "Serdes.serialize",
"arguments": {
"schemaId": "alert-schema-json",
"schemaVersion": "latest"
}
}
]
}
]
},
{
"id": "serialize",
"functionId": "Serdes.serialize",
"arguments": {
"schemaId": "schema-for-fan",
"schemaVersion": "latest"
}
}
]
}
}
The example branch pipeline executes the fahrenheit-to-celsius
transformation function and creates additional messages in the branch1
branch.
All messages in branch1
are serialized as specified in the single operation for branch1
using the alert-schema-json
schema.
Afterward, all messages of branch1
are published.
No further policies are executed for the messages created from the branch and no
operations other than the Serdes.serialize
function are allowed in the pipeline.
The message the transformation script returns is passed to the Seredes.serialize
function as defined in the pipeline with the serialize
ID.
Actions in Behavior Policies
HiveMQ Data Hub behavior policies can model the journey of an MQTT client from the initial state, through intermediate states, to a final success
or failed
terminal state.
Each transition the client makes from one state to another state includes an event that uniquely specifies the to state and from state.
For example, an OnInboundConnect
event in a client transitions from an initial state to a connected state.
In a behavior policy, the transitions an MQTT client makes from one state to the next in the HiveMQ broker in response to an event can be used to trigger an optional pipeline of actions.
The action pipeline contains one or more operations that HiveMQ performs in the configured order. For example, an action pipeline that contains functions to print a log entry and increment a specific metric. For more information, see Functions.
Transitions in Behavior Policies
In the onTransition
configuration, you can precisely identify a particular transition in the referenced behavior model with a fromState
and`toState` along with an event.
Depending on the type of event, an action pipeline comprising a set of available functions can tbe triggered.
For more information, see Available behavior policy events and Functions.
Mqtt.OnInboundConnect | The event that occurs when the MQTT client connects to the broker with an MQTT CONNECT packet. |
---|---|
|
The event that occurs when the MQTT client publishes an MQTT PUBLISH packet. |
|
The event that occurs when the MQTT client sends a MQTT SUBSCRIBE packet. |
|
The event that occurs when the MQTT client sends an MQTT DISCONNECT packet. |
|
The event that occurs when the TCP connection of the MQTT client closes. |
Functions
You can use two categories of functions in your policies. Both categories of functions can be executed on any outcome.
-
non-terminal
functions: Allow further operations in the pipeline to be executed.
For example, theSystem.log
function logs a message to thehivemq.log
file and allows further steps to be executed. -
terminal
functions: End further operations in the pipeline. The first terminal function in a pipeline stops the execution.
For example, theDelivery.redirectTo
function publishes an MQTT message to a certain topic and stops further executions.
Non-terminal Functions
Non-terminal functions manipulate data or initiate additional activity without terminating the action pipeline.
Currently, the following non-terminal functions are supported:
-
System.log this function can be used in data and behavior policies.
-
Metrics.Counter.increment this function can be used in data and behavior policies.
-
Mqtt.UserProperties.add this function can be used in data and behavior policies.
-
Serdes.deserialize this function can be used in data policies only.
-
Serdes.serialize this function can be used in data policies only.
System.log (data and behavior policies)
Logs a message on the given level.
Argument | Type | Values | Description |
---|---|---|---|
|
String |
DEBUG, ERROR, WARN, INFO, TRACE |
Specifies the log level of the function in the |
|
String |
Adds a user-defined string that prints to the log file. For more information, see Example log message. |
System.log
function arguments{
"id": "log-my-log-message",
"functionId": "System.log",
"arguments": {
"level": "INFO",
"message": "My defined log message"
}
}
To learn more about how the System.log
function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.
Metrics.Counter.increment (data and behavior policies)
Increments a metric of type counter, which can be accessed with monitoring. If the metric does not exist yet, it will be created by the first execution of the function.
Argument | Type | Values | Description |
---|---|---|---|
|
String |
Specifies the name of the metric to be incremented. NOTE: Interpolation is currently not supported for this argument. |
|
|
Number |
Specifies the amount by which the counter should be incremented. Negative values are supported in order to support decrement operation. |
For HiveMQ Enterprise Edition, the created metric has the prefix com.hivemq.data-hub.custom.counters . For HiveMQ Edge, the created metric has the prefix com.hivemq.edge.data-hub.custom.counters .
|
Metrics.Counter.increment
function arguments{
"id": "increment-my-counter",
"functionId": "Metrics.Counter.increment",
"arguments": {
"metricName": "my-counter",
"incrementBy": 1
}
}
To learn more about how the Metrics.Counter.increment
function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.
Mqtt.UserProperties.add (data and behavior policies)
Adds a user property to the MQTT message.
The Mqtt.UserProperties.add
function is available for both data and behavior policies.
Argument | Type | Values | Description |
---|---|---|---|
|
String |
Specifies the name of the user property. Note that multiple user properties with the same name are allowed. |
|
|
String |
Specifies the value of the user property. |
Mqtt.UserProperties.add
function arguments{
"id": "add-my-user-property",
"functionId": "Mqtt.UserProperties.add",
"arguments": {
"name": "my-user-property",
"value": "my-value"
}
}
To learn more about how the Mqtt.UserProperties.add
function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.
Serdes.deserialize (data policies)
Deserializes a binary MQTT message payload into a data object based on the configured JSON Schema or Protobuf schema.
Argument | Type | Values | Description |
---|---|---|---|
|
String |
The identifier of the JSON Schema or Protobuf schema to be used for deserialization. |
|
|
String |
The version of the schema to be used for deserialization. |
{
"id": "my-deserialization",
"functionId": "Serdes.deserialize",
"arguments": {
"schemaId": "schema1",
"schemaVersion": "latest"
}
}
Serdes.serialize (data policies)
Serializes a data object into a binary MQTT message payload based on the configured JSON or Protobuf Schema.
Argument | Type | Values | Description |
---|---|---|---|
|
String |
The identifier of the JSON or Protobuf Schema to be used for serialization. |
|
|
String |
The version of the schema to be used for serialization. |
{
"id": "my-serialization",
"functionId": "Serdes.serialize",
"arguments": {
"schemaId": "schema1",
"schemaVersion": "latest"
}
}
Terminal Functions
Terminal functions manipulate data or initiate additional activity and terminate the pipeline of the action.
When an operation in a pipeline contains a terminal function, no further operations after that operation are executed.
Additionally, no further matching policies are evaluated.
Currently, the following terminal functions are supported. Some functions are limited to specific types of policies:
-
Delivery.redirectTo this function can be used in data policies only.
-
Mqtt.disconnect this function can be used in data and behavior policies.
-
Mqtt.drop this function can be used in data and behavior policies.
Delivery.redirectTo (data policies only)
Redirects an MQTT PUBLISH message to a specified topic.
The Delivery.redirectTo
function is available for data policies only.
Argument | Type | Values | Description |
---|---|---|---|
|
String |
The destination MQTT topic according to MQTT specification. |
|
|
Boolean |
|
Defines whether policies are executed after publishing to a different topic. Possible values are |
Delivery.redirectTo
function arguments{
"id": "redirect-to-operation",
"functionId": "Delivery.redirectTo",
"arguments": {
"topic": "redirected/my-topic",
"applyPolicies": false
}
}
To learn more about how the Delivery.redirectTo
function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.
Mqtt.disconnect (data and behavior policies)
Disconnects an MQTT client.
The Mqtt.disconnect
function is available for both data and behavior policies.
The Mqtt.disconnect
function has no arguments.
Mqtt.disconnect
{
"id": "client-disconnect",
"functionId": "MQTT.disconnect",
"arguments": {}
}
Mqtt.drop (data and behavior policies)
Drops the MQTT packet that is currently processed.
The Mqtt.drop
function is available for data and behavior policies.
In a behavior policy, the Mqtt.drop function is suitable for the Mqtt.OnIncomingPublish and Mqtt.OnIncomingSubscribe events only.
The Mqtt.drop function is not supported for DISCONNECT packets.
For CONNECT packets, use the Mqtt.disconnect function instead.
|
The Mqtt.drop
function has no arguments.
Mqtt.drop
{
"id": "drop-publish",
"functionId": "Mqtt.drop",
"arguments": {}
}
String Interpolation
You can interpolate function arguments that have the type string
.
For example, invalid_messages/${topic}
interpolates to invalid_message/sensor_data
for the variable topic set to sensor_data
.
In this case, variables that begin with a dollar sign are with curly brackets ${}
are interpolated during policy execution.
Interpolation enhances your ability to design flexible topic redirects, custom log messages, and more.
Variable | Type | Description | Example Value | Data Policy | Behavior Policy |
---|---|---|---|---|---|
|
String |
The MQTT client ID |
|
||
|
String |
The MQTT topic to which the MQTT message was published |
|
||
|
String |
The |
|
||
|
String |
A textual description of the validation result. This text can contain schema validation errors for further debugging. |
|
||
|
String |
Textual representation of the state of the state machine before the transition. |
|
||
|
String |
Textual representation of the state to which the state machine transitions. |
|
||
|
String |
Textual representation of the event that triggered the state machine transition. |
|
||
|
Long |
Current time in milliseconds since the UNIX epoch (Jan 1, 1970). |
|