Actions

In HiveMQ Data Hub, an action is a collection of operations the broker performs in response to the outcome of a data-validation or a state transition in behavior-validation.
Actions give you the option to specify what happens once a validation finishes or a client transitions through various states to which the configured behavior model applies.

The available functions vary based on the type of the policy.
For more information, see Actions in Data Policies and Actions in Behavior Policies.

Actions in Data Policies

The outcome of each MQTT message validation in the validations portion of a data policy can be success or failure.

Based on the validation outcome, you can define what the HiveMQ broker does in optional onSuccess and onFailure actions. The onSuccess and onFailure actions each contain a pipeline of operations that executes tasks in the defined order to achieve the desired result.

Table 1. Available onSuccess and onFailure parameters
Parameter Description

pipeline

Lists a sequence of operations in an array. HiveMQ executes the operations in the order they are listed.
Each operation contains the following information:

  • id: The user-defined string that identifies the operation. For example, log-my-log-message.

  • functionId: The string that identifies the type of function you want the HiveMQ broker to execute. For example, Delivery.redirectTo.

  • arguments: Defines the necessary parameters of the function referenced in the functionId field of the onSuccess or onFailure pipeline.
    For more information, see non-terminal-functions and terminal-functions.

Example onSuccess and onFailure configurations in a data policy
  "onSuccess": {
    "pipeline": [
      {
        "id": "logSuccess",
        "functionId": "System.log",
        "arguments": {
          "level": "INFO",
          "message": "${clientId} sent a valid publish on topic '${topic}' with result '${validationResult}'"
        }
      }
    ]
  },
  "onFailure": {
    "pipeline": [
      {
        "id": "logFailure",
        "functionId": "System.log",
        "arguments": {
          "level": "WARN",
          "message": "${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}'"
        }
      }
    ]
  }

The onSuccess and onFailure action pipelines in a data policy have different default behaviors:

  • onSuccess: If there is no terminal function in the pipeline, the MQTT message is published to the original topic. The MQTT message is acknowledged according to the MQTT specification.

  • onFailure: If there is no terminal function in the pipeline, the MQTT message is published to the original topic. However, actions such as dropping the message can be defined.
    The publishing MQTT client is handled as follows:

    • MQTT 3: The client is disconnected.

    • MQTT 5: A PUBACK with the reason string The publish processing was prevented by a policy. and the reason code 131 (Implementation Specific Error) is sent.

For more information, see Functions.

Transformations in Data Policies

This section of our documentation is part of the Early Access Preview (EAP) of the HiveMQ Data Hub Transformation feature. The documentation and the release candidate that it references are not intended for production use and can contain errors. Please note that we do not guarantee compatibility between EAP and final versions of the same feature. We appreciate your feedback as you try out the EAP and are happy to help. The best place to get in contact is our community forum.

The Data Hub policy engine can execute transformation functions in the action pipelines of a data policy.
All transformation functions are non-terminal functions. For each MQTT PUBLISH packet the data policy processes and references, the policy engine invokes the configured transformation function with the MQTT publish and a Context object.
The resulting publish-object is returned by the script and passed to the next function in a pipeline. Multiple transformation functions can be executed sequentially.

For more information, see Transformations.

Transformation Syntax in Data Policies

The following syntax can be used to reference a transformation function in an action pipeline of a data policy:

  • fn:function-id:latest where fn is a necessary prefix to identify a script, function-id specifies the function identifier during script creation, and latest states the latest version of the script. If desired, you can also specify a particular script version.
    For more information on versioning, see Create Script. For more information on transformation functions, see Non-terminal Functions.

Deserialization & Serialization

Incoming and outgoing MQTT message payloads can be any arbitrary sequence of bytes.
However, in various application scenarios it can be useful to execute an intermediate transformation step as a script to ensure data is suitable for its intended purpose.

Every transformation script requires two steps:

  1. A deserialization step to make incoming data accessible

  2. A serialization step to transmit a byte sequence to consumers

Transformation functions can be embedded into your action pipelines. Functions are passed in a publish-object. The publish object includes a serialized payload that requires a deserialization. The deserialization step converts incoming MQTT payloads into an internal JSON representation. The JSON object is made available to the scripts for standardized data manipulation, regardless of the incoming data representation.

For example, for JavaScript, we provide JavaScript-flavored object manipulation.

A corresponding serialization step is required to serialize the transformed data into the desired data format.

The following structure is executed:

  • The incoming MQTT payload is deserialized according to a specified schema using a deserialization function. For more information, see Serdes.deserialize.

  • Transformation scripts are executed step by step. Multiple functions can be executed sequentially.

  • Finally, the transformed data is serialized to publish to the actual topic. For more information, see Serdes.serialize.

Example transformation in an action pipeline

The following example shows a transformation script with the prerequisite deserialization and serialization:

Example action pipeline
"onSuccess": {
    "pipeline": [
      {
        "id": "operation-2eng0",
        "functionId": "Serdes.deserialize",
        "arguments": {
          "schemaId": "schema-from-sensor"
          "schemaVersion": "latest",
        }
      },
      {
        "id": "operation-ek2Mx",
        "functionId": "fn:fahrenheit-to-celsius:latest",
        "arguments": {}
      },
      {
        "id": "operation-4DBF3",
        "functionId": "Serdes.serialize",
        "arguments": {
          "schemaId": "schema-for-fan"
          "schemaVersion": "latest",
        }
      }
    ]
}

The first action in this pipeline configures a Serdes.deserialize function.
This function deserializes the incoming MQTT message according to the schema-from-sensor schema.
Next, the fahrenheit-to-celsius transformation function is executed to perform a unit conversion.
Finally, to complete the action pipeline, the Serdes.serialize function serializes the transformed message into JSON format according to a new schema-for-fan schema.

Branches in Data Policies

A branch is a named and bounded list of MQTT messages created in a transformation script. For more information, see addPublish-function in Transformation Script

The total number of created messages in a single script invocation is 100. The created message limit is per transformation script, even if the messages are split across multiple branches.

For each created message in a branch, further actions are defined in a data policy.

Table 2. Available onBranch parameters
Parameter Description

onBranch

Defines a list of one or more pipelines of operations to be executed for created messages on each specified branchId.

  • branchId: The unique branch identifier used in the transformation script

  • pipeline: A pipeline of actions executed for all messages in the specified branch.

Currently, onBranch pipelines support the Serdes.serialize function only. Each pipeline must contain a single Serdes.serialize. Empty pipelines are not allowed. Additional functions can be added in future releases.

The following data flow is used for publish messages created by a transformation script. A script can add multiple messages to a named branch. The pipeline in the data policy with the corresponding branchId is executed for these messages. The function Serdes.serialize in the branch pipeline defines the schema used for serialization of the created messages in that branch. All messages of a branch are serialized and published to their designated topic. The original message is handled according to the pipeline defined in onSuccess or onFailure.

Example branch stream pipeline configuration
{
  "onSuccess": {
    "pipeline": [
      {
        "id": "deserialize",
        "functionId": "Serdes.deserialize",
        "arguments": {
          "schemaId": "schema-from-sensor",
          "schemaVersion": "latest"
        }
      },
      {
        "id": "script-fahrenheit-to-celsius",
        "functionId": "fn:fahrenheit-to-celsius:latest",
        "arguments": {},
        "onBranch": [
          {
            "branchId": "branch1",
            "pipeline": [
              {
                "id": "serde-alert-schema",
                "functionId": "Serdes.serialize",
                "arguments": {
                  "schemaId": "alert-schema-json",
                  "schemaVersion": "latest"
                }
              }
            ]
          }
        ]
      },
      {
        "id": "serialize",
        "functionId": "Serdes.serialize",
        "arguments": {
          "schemaId": "schema-for-fan",
          "schemaVersion": "latest"
        }
      }
    ]
  }
}

The example branch pipeline executes the fahrenheit-to-celsius transformation function and creates additional messages in the branch1 branch. All messages in branch1 are serialized as specified in the single operation for branch1 using the alert-schema-json schema. Afterward, all messages of branch1 are published. No further policies are executed for the messages created from the branch and no operations other than the Serdes.serialize function are allowed in the pipeline.

The message the transformation script returns is passed to the Seredes.serialize function as defined in the pipeline with the serialize ID.

Actions in Behavior Policies

HiveMQ Data Hub behavior policies can model the journey of an MQTT client from the initial state, through intermediate states, to a final success or failed terminal state.
Each transition the client makes from one state to another state includes an event that uniquely specifies the to state and from state.
For example, an OnInboundConnect event in a client transitions from an initial state to a connected state.

In a behavior policy, the transitions an MQTT client makes from one state to the next in the HiveMQ broker in response to an event can be used to trigger an optional pipeline of actions.

The action pipeline contains one or more operations that HiveMQ performs in the configured order. For example, an action pipeline that contains functions to print a log entry and increment a specific metric. For more information, see Functions.

Transitions in Behavior Policies

In the onTransition configuration, you can precisely identify a particular transition in the referenced behavior model with a fromState and`toState` along with an event. Depending on the type of event, an action pipeline comprising a set of available functions can tbe triggered.
For more information, see Available behavior policy events and Functions.

Table 3. Available behavior policy events
Mqtt.OnInboundConnect The event that occurs when the MQTT client connects to the broker with an MQTT CONNECT packet.

Mqtt.OnInboundPublish

The event that occurs when the MQTT client publishes an MQTT PUBLISH packet.

Mqtt.OnInboundSubscribe

The event that occurs when the MQTT client sends a MQTT SUBSCRIBE packet.

Mqtt.OnInboundDisconnect

The event that occurs when the MQTT client sends an MQTT DISCONNECT packet.

Connection.OnDisconnect

The event that occurs when the TCP connection of the MQTT client closes.

Functions

You can use two categories of functions in your policies. Both categories of functions can be executed on any outcome.

  • non-terminal functions: Allow further operations in the pipeline to be executed.
    For example, the System.log function logs a message to the hivemq.log file and allows further steps to be executed.

  • terminal functions: End further operations in the pipeline. The first terminal function in a pipeline stops the execution.
    For example, the Delivery.redirectTo function publishes an MQTT message to a certain topic and stops further executions.

Non-terminal Functions

Non-terminal functions manipulate data or initiate additional activity without terminating the action pipeline.

Currently, the following non-terminal functions are supported:

System.log (data and behavior policies)

Logs a message on the given level.

Table 4. Available System.log function arguments
Argument Type Values Description

level

String

DEBUG, ERROR, WARN, INFO, TRACE

Specifies the log level of the function in the hivemq.log file.

message

String

Adds a user-defined string that prints to the log file. For more information, see Example log message.

Example user-defined System.log function arguments
{
  "id": "log-my-log-message",
  "functionId": "System.log",
  "arguments": {
    "level": "INFO",
    "message": "My defined log message"
  }
}

To learn more about how the System.log function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.

Metrics.Counter.increment (data and behavior policies)

Increments a metric of type counter, which can be accessed with monitoring. If the metric does not exist yet, it will be created by the first execution of the function.

Table 5. Available Metrics.Counter.increment function arguments
Argument Type Values Description

metricName

String

Specifies the name of the metric to be incremented.

NOTE: Interpolation is currently not supported for this argument.

incrementBy

Number

Specifies the amount by which the counter should be incremented. Negative values are supported in order to support decrement operation.

For HiveMQ Enterprise Edition, the created metric has the prefix com.hivemq.data-hub.custom.counters. For HiveMQ Edge, the created metric has the prefix com.hivemq.edge.data-hub.custom.counters.
Example user-defined Metrics.Counter.increment function arguments
{
  "id": "increment-my-counter",
  "functionId": "Metrics.Counter.increment",
  "arguments": {
    "metricName": "my-counter",
    "incrementBy": 1
  }
}

To learn more about how the Metrics.Counter.increment function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.

Mqtt.UserProperties.add (data and behavior policies)

Adds a user property to the MQTT message. The Mqtt.UserProperties.add function is available for both data and behavior policies.

Table 6. Available Mqtt.UserProperties.add function arguments
Argument Type Values Description

name

String

Specifies the name of the user property. Note that multiple user properties with the same name are allowed.

value

String

Specifies the value of the user property.

Example user-defined Mqtt.UserProperties.add function arguments
{
  "id": "add-my-user-property",
  "functionId": "Mqtt.UserProperties.add",
  "arguments": {
    "name": "my-user-property",
    "value": "my-value"
  }
}

To learn more about how the Mqtt.UserProperties.add function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.

Serdes.deserialize (data policies)

Deserializes a binary MQTT message payload into a data object based on the configured JSON Schema or Protobuf schema.

Table 7. Available Serdes.deserialize function arguments
Argument Type Values Description

schemaId

String

The identifier of the JSON Schema or Protobuf schema to be used for deserialization.

schemaVersion

String

The version of the schema to be used for deserialization.

Example
{
  "id": "my-deserialization",
  "functionId": "Serdes.deserialize",
  "arguments": {
    "schemaId": "schema1",
    "schemaVersion": "latest"
  }
}

Serdes.serialize (data policies)

Serializes a data object into a binary MQTT message payload based on the configured JSON or Protobuf Schema.

Table 8. Available Serdes.serialize function arguments
Argument Type Values Description

schemaId

String

The identifier of the JSON or Protobuf Schema to be used for serialization.

schemaVersion

String

The version of the schema to be used for serialization.

Example
{
  "id": "my-serialization",
  "functionId": "Serdes.serialize",
  "arguments": {
    "schemaId": "schema1",
    "schemaVersion": "latest"
  }
}

Terminal Functions

Terminal functions manipulate data or initiate additional activity and terminate the pipeline of the action.
When an operation in a pipeline contains a terminal function, no further operations after that operation are executed.
Additionally, no further matching policies are evaluated.

Currently, the following terminal functions are supported. Some functions are limited to specific types of policies:

Delivery.redirectTo (data policies only)

Redirects an MQTT PUBLISH message to a specified topic.
The Delivery.redirectTo function is available for data policies only.

Table 9. Available Delivery.redirectTo function arguments
Argument Type Values Description

topic

String

The destination MQTT topic according to MQTT specification.

applyPolicies

Boolean

true, false

Defines whether policies are executed after publishing to a different topic. Possible values are true and false.
NOTE: If multiple policies have a terminal action Delivery.redirectTo with applyPolicies set to true, HiveMQ evaluates a maximum of 20 policies. To avoid endless loops, no additional policies are evaluated once the limit is reached.

Example user-defined Delivery.redirectTo function arguments
{
  "id": "redirect-to-operation",
  "functionId": "Delivery.redirectTo",
  "arguments": {
    "topic": "redirected/my-topic",
    "applyPolicies": false
  }
}

To learn more about how the Delivery.redirectTo function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.

Mqtt.disconnect (data and behavior policies)

Disconnects an MQTT client. The Mqtt.disconnect function is available for both data and behavior policies. The Mqtt.disconnect function has no arguments.

Example Mqtt.disconnect
{
  "id": "client-disconnect",
  "functionId": "MQTT.disconnect",
  "arguments": {}
}

Mqtt.drop (data and behavior policies)

Drops the MQTT packet that is currently processed. The Mqtt.drop function is available for data and behavior policies.

In a behavior policy, the Mqtt.drop function is suitable for the Mqtt.OnIncomingPublish and Mqtt.OnIncomingSubscribe events only. The Mqtt.drop function is not supported for DISCONNECT packets. For CONNECT packets, use the Mqtt.disconnect function instead.

The Mqtt.drop function has no arguments.

Example Mqtt.drop
{
  "id": "drop-publish",
  "functionId": "Mqtt.drop",
  "arguments": {}
}

String Interpolation

You can interpolate function arguments that have the type string.
For example, invalid_messages/${topic} interpolates to invalid_message/sensor_data for the variable topic set to sensor_data.
In this case, variables that begin with a dollar sign are with curly brackets ${} are interpolated during policy execution.

Interpolation enhances your ability to design flexible topic redirects, custom log messages, and more.

Table 10. Available predefined variables
Variable Type Description Example Value Data Policy Behavior Policy

clientId

String

The MQTT client ID

a6dc66f2-efec-45e8-922a-3ee1ee7a68d7

topic

String

The MQTT topic to which the MQTT message was published

myhome/groundfloor/livingroom

policyId

String

The id of the policy that is currently executed

com.hivemq.policy.coordinates

validationResult

String

A textual description of the validation result. This text can contain schema validation errors for further debugging.

ValidationResults:[{schemaId=schema1, success=true}]

fromState

String

Textual representation of the state of the state machine before the transition.

Initial

toState

String

Textual representation of the state to which the state machine transitions.

Connected

triggerEvent

String

Textual representation of the event that triggered the state machine transition.

MQTT - Inbound PUBLISH

timestamp

Long

Current time in milliseconds since the UNIX epoch (Jan 1, 1970).

1696925620296