Policies

Policies describe how you want the HiveMQ broker to apply the selected schema to incoming MQTT message payload data.

The integrated policy engine of the HiveMQ Data Governance Hub gives you the ability to build individual data governance policies that maximize the value of your data pipelines right from the source.

The policies you create tell your HiveMQ broker how you want your incoming MQTT messages to be handled.

The specification format of a policy is JSON.

Policy Management

The HiveMQ Rest API provides the following methods to manage data validation policies for the HiveMQ Data Governance Hub:

Create a Policy

The fields in the body of the create a new policy HTTP request determine the type of policy that is created.

To create a new policy, all cluster nodes must run HiveMQ version 4.15.0 or higher and data validation must be enabled. For more information, see Data Governance Hub.
Table 1. REST API Create Policy HTTP Request Body Fields
Field Type Required Description

id

String

The identifier of the policy. The ID must be unique within one HiveMQ cluster. We recommend the use of proper namespacing. For example, com.hivemq.geo_application.only_gps_coordinates.

matching

Object

Defines the matching rules of the policy. For more information, see Matching in Policy Definitions.

validations

Object

Defines a list of the validations that are executed for all incoming MQTT messages that match MQTT topics configured in the matching definition. If no validations are present, the policy always evaluates successfully. For more information, see Validation in Policy Definitions.

onSuccess

Object

The action that defines the pipeline of operations the HiveMQ broker executes when the validation is successful. If no action is defined, no operations are executed when the policy evaluates successfully. If this is the last matching policy, then the publish gets forwarded to the broker. For more information, see Actions in Policy Definitions.

onFailure

Object

The action that defines the pipeline of operations the HiveMQ broker executes when the validation is unsuccessful. If no action is defined, no operations are executed when the policy evaluates unsuccessfully. If this is the last matching policy, then the publish is dropped. For more information, see Actions in Policy Definitions.

Your HiveMQ broker automatically replicates the schemas and policies you create to all nodes in the HiveMQ cluster.
Example policy specification
{
  "id": "com.hivemq.policy.coordinates",
  "matching": {
    "topicFilter": "coordinates/+"
  },
  "validation": {
    "validators": [
      {
        "type": "schema",
        "arguments": {
          "strategy": "ALL_OF",
          "schemas": [
            {
              "schemaId": "gps_coordinates",
              "version": "latest"
            }
          ]
        }
      }
    ]
  },
  "onFailure": {
    "pipeline": [
      {
        "id": "logFailure",
        "functionId": "System.log",
        "arguments": {
          "level": "WARN",
          "message": "${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}'"
        }
      }
    ]
  }
}

Update an existing Policy

The fields in the body of the update a policy HTTP request determine the type of policy that is updated.

The id of the policy and the request parameter policyId must match.

Table 2. REST API Update Policy Parameters
Parameter Type Required Description

policyId

String

The path parameter that provides the ID of the policy to update. This parameter must match the id in the policy request body. For example, policy1.

You cannot change the matching part of a policy, but it is possible to add, remove, and update the content of the validation, onSuccess, and onFailure parts of a policy.

To update a policy, all cluster nodes must run HiveMQ version 4.17.0 or higher and data validation must be enabled. For more information, see Data Governance Hub.
Table 3. REST API Update Policy HTTP Request Body Fields
Field Type Required Description

id

String

The identifier of the policy. The ID must be unique within one HiveMQ cluster. We recommend the use of proper namespacing. For example, com.hivemq.geo_application.only_gps_coordinates.

matching

Object

Defines the matching rules of the policy. The matching part of a policy cannot be updated. For more information, see Matching in Policy Definitions.

validations

Object

Defines a list of the validations that are executed for all incoming MQTT messages that match MQTT topics configured in the matching definition. If no validations are present, the policy always evaluates successfully. For more information, see Validation in Policy Definitions.

onSuccess

Object

The action that defines the pipeline of operations the HiveMQ broker executes when the validation is successful. If no action is defined, no operations are executed when the policy evaluates successfully. If this is the last matching policy, then the publish gets forwarded to the broker. For more information, see Actions in Policy Definitions.

onFailure

Object

The action that defines the pipeline of operations the HiveMQ broker executes when the validation is unsuccessful. If no action is defined, no operations are executed when the policy evaluates unsuccessfully. If this is the last matching policy, then the publish is dropped. For more information, see Actions in Policy Definitions.

Your HiveMQ broker automatically replicates policies you update to all nodes in the HiveMQ cluster.
Example policy specification
{
  "id": "com.hivemq.policy.coordinates",
  "matching": {
    "topicFilter": "coordinates/+"
  },
  "validation": {
    "validators": [
      {
        "type": "schema",
        "arguments": {
          "strategy": "ALL_OF",
          "schemas": [
            {
              "schemaId": "gps_coordinates",
              "version": "latest"
            }
          ]
        }
      }
    ]
  },
  "onFailure": {
    "pipeline": [
      {
        "id": "logFailure",
        "functionId": "System.log",
        "arguments": {
          "level": "WARN",
          "message": "${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}'"
        }
      }
    ]
  }
}

Delete a Policy

To delete a policy that is no longer in use, reference the policyId of the policy.

If you want to delete a policy and schema, you must delete the policy that references the schema before you can delete the schema.
Table 4. REST API Delete Policy Parameters
Parameter Type Required Description

policyId

String

The path parameter that provides the ID of the policy to delete. For example, policy1.

Get a Policy

To view the content of an existing policy, you reference the policyId of the policy.

To retrieve the content of a policy, all cluster nodes must run HiveMQ version 4.15.0 or higher.
Table 5. REST API Get Policy Parameters
Parameter Type Required Description

policyId

String

The path parameter that provides the ID of the policy.

fields

String

The query parameter that provides a comma-separated list of the fields to include in the response. Allowed values are: id, createdAt, matching, onSuccess, onFailure, and validation.

Matching in Policy Definitions

The first section of your policy identifies the policy and specifies the conditions the policy enforces.

Your matching configuration gives you highly customizable control over which MQTT messages a policy influences.

Topic-based matching in a policy definition functions in a similar way as MQTT topic subscriptions. For more information, see MQTT Topic Tree & Topic Matching: Challenges and Best Practices Explained.

Example minimal topic matching configuration in a policy
{
  "id": "com.hivemq.policy.coordinates",
  "matching": {
    "topicFilter": "coordinates/+"
  }
}

The example policy matches all MQTT messages that are published along the topic tree coordinates/+.
The + symbol defines a single topic wildcard.
For example, MQTT messages published to coordinates/europe, coordinates/africa, and coordinates/asia.

Based on the + single topic wildcard, publishes to multi level topics such as coordinates/usa/north are not part of the topic tree.
For more information, see MQTT Topics, Wildcards, and Best Practices.

Policy Execution in a Topic Tree

The following examples illustrate how polices and topic filters interact.

MQTT topics are arranged in a hierarchical tree structure, similar to the file system of a computer. A forward slash / separates each level of the tree.
Example Topic Tree with Policies
Figure 1. Example topic tree and associated policies

The example diagram depicts a topic tree with a top-level topic called myhome, four subtopics, and five policies.

  • Policy P1 is configured to match the topic filter myhome.

  • Policy P2 is configured to match the topic filter myhome/firstfloor.

  • Policy P3 is configured to match the topic filter myhome/groundfloor.

  • Policy P4 is configured to match the topic filter myhome/groundfloor/livingroom.

  • Policy P5 is configured to match the topic filter myhome/groundfloor/kitchen.

Table 6. Policy execution order for MQTT messages sent to MQTT topics in the example topic tree
Topic Policies Executed

myhome

P1

myhome/firstfloor

P2

myhome/groundfloor

P3

myhome/groundfloor/livingroom

P4

myhome/groundfloor/kitchen

P5

It is also possible to add policies to match wildcard topic filters.

MQTT supports two types of wildcard characters.
+ matches any single level of the topic tree.
# matches multiple levels of the topic tree.
  • Policy P6 is configured to match the single topic wildcard myhome/+ .

  • Policy P7 is configured to match the multi-level wildcard topic myhome/#.

Example Topic Wildcrds with Policies
Figure 2. Example wildcard topics and associated policies
Table 7. Policy execution order for MQTT messages sent to MQTT wildcard topics
Topic Policies Executed

myhome

P7, P6, P1

myhome/firstfloor

P7, P6, P2

myhome/groundfloor

P7, P6, P3

myhome/groundfloor/livingroom

P7, P4

myhome/groundfloor/kitchen

P7, P5

In the HiveMQ Data Governance Hub data validation feature, policies execute from the least specific :

  • The policies along a topic tree execute from the least specific to the most specific.

  • A multi-level wildcard # executes before single level wildcard.

  • A single level wildcard + executes before a single topic segment.

For example, in a use case that aims to simplify data handling, a wildcard can be used to ensure that all MQTT message that all MQTT message payloads are JSON formatted. Using the example topic tree, policy P1 can be defined with a multi-level topic filter # and a simple JSON schema validation.
Since every MQTT message to any MQTT topic must pass P1, all MQTT messages must be JSON.

Schema-based Validation in Policy Definitions

Since MQTT is data agnostic, MQTT clients can publish data to downstream services through the broker regardless of whether the data is valid or not.
In practice, invalid or incorrectly formatted data can cause unpredictable behavior. For example, in a microservice that needs to process sensor data.

The validations section of your HiveMQ Data Governance Hub policy ensures that the MQTT data in your broker is valid, reliable, consistent, and conforms to your predefined standards.

The validations section of your policy definition determines how incoming messages are validated. Currently, HiveMQ Data Hub data validation supports validators of the type schema only.

  • The array of validators in the validations section lists the validators the policy executes for all incoming MQTT messages.

Each validator can have one of two outcomes:

  • success: All validators evaluate to true

  • failure: Any validator evaluate to false

Schema-based data validation is an effective way to enhance the value of your data pipelines.
Validation against appropriately configured schemas can ensure data quality, reduce errors, and improve the overall usability and interoperability of your data.

The HiveMQ Data Governance Hub supports schema validation for JSON Schema and Protobuf.

To set up schema-based validation in your policy, set the validator type to schema and define the arguments that you want to use.

  • schemas: Lists an array of one or more schemas that are used for the validation.

    • schemaId: The unique string that references the schema in the HiveMQ Data Governance Hub.

    • version: The version number of the schema to specify a certain version or the latest schema by using "latest".

  • strategy: Defines how the success or failure of the validator is evaluated. Possible entries are ALL-OF and ANY_OF.

    • ALL_OF: Specifies that the validation is only considered successful (success) if all listed schemas are valid, otherwise unsuccessful (failure).

    • ANY_OF: Specifies that the validation is considered successful (success) if any of the listed schema are valid, otherwise unsuccessful (failure).

Example minimal validation configuration in a policy
"validation": {
    "validators": [
      {
        "type": "schema",
        "arguments": {
          "strategy": "ALL_OF",
          "schemas": [
            {
              "schemaId": "gps_coordinates",
              "version": "1"
            }
          ]
        }
      }
    ]
}

Actions in Schema-based Policy Definitions

The result of each MQTT message validation in the validations portion of your policy can be success or failure.

Based on the result, you can define what the HiveMQ broker does with MQTT message in optional onSuccess and onFailure actions.

  • onSuccess or onFailure:

    • pipeline: Lists a sequence of operations in an array. HiveMQ executes the operations in the order they are listed. Each operation contains the following information:

      • id: The user-defined string that identifies the function. For example, log-my-log-message.

        • functionId: The string that identifies the type of function you want the HiveMQ broker to execute. Possible values are currently System.log, Delivery.redirectTo and Metrics.Counter.increment.

        • arguments: Defines the necessary parameters of the System.log, Delivery.redirectTo or Metrics.Counter.increment function referenced in the functionid field of the onSuccess or onFailure pipeline. For more information, see non-terminal-functions and terminal-functions.

Example onSuccess and onFailure configurations in a policy
  "onSuccess": {
    "pipeline": [
      {
        "id": "logSuccess",
        "functionId": "System.log",
        "arguments": {
          "level": "INFO",
          "message": "${clientId} sent a valid publish on topic '${topic}' with result '${validationResult}'"
        }
      }
    ]
  },
  "onFailure": {
    "pipeline": [
      {
        "id": "logFailure",
        "functionId": "System.log",
        "arguments": {
          "level": "WARN",
          "message": "${clientId} sent an invalid publish on topic '${topic}' with result '${validationResult}'"
        }
      }
   ]
 }

You can use two categories of functions in your policy definition. Both categories of functions can be executed on any outcome.

  • non-terminal function: Allows further operations in the pipeline to be executed. For example, the System.log function logs a message to the hivemq.log file, but further steps are executed.

  • terminal function: Ends further operations in the pipeline. The first terminal function in a pipeline stops the execution. For example, the Delivery.redirectTo function publishes an MQTT message to a certain topic and stops further executions.

The onSuccess and onFailure pipelines have different default behaviors:

  • onSuccess: If there is no terminal function in the pipeline, the MQTT message is published to the original topic. The MQTT message is acknowledge according to the MQTT specification.

  • onFailure: If there is no terminal function in the pipeline, the MQTT message is dropped.
    The publishing MQTT client is handled as follows:

    • MQTT 3: The client is disconnected.

    • MQTT 5: A PUBACK with the reason string The publish processing was prevented by a policy. and the reason code 131 (Implementation Specific Error) is sent.

Non-terminal Functions

Non-terminal functions are functions which either manipulate data or cause side effects without terminating the pipeline of the action.

Currently, the following non-terminal functions are supported:

System.log

Logs a message on the given level.

Table 8. Available System.log function arguments
Argument Type Values Description

level

String

DEBUG, ERROR, WARN, INFO, TRACE

Specifies the log level of the function in the hivemq.log file.

message

String

Adds a user-defined string that prints to the log file. For more information, see Example log message.

Example user-defined System.log function arguments
{
  "id": "log-my-log-message",
  "functionId": "System.log",
  "arguments": {
     "level": "INFO",
     "message": "My defined log message"
  }
}

To learn more about how the System.log function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.

Metrics.Counter.increment

Increments a metric of type counter, which can be accessed with monitoring. If the metric does not exist yet, it will be created by the first execution of the function.

Table 9. Available Metrics.Counter.increment function arguments
Argument Type Values Description

metricName

String

Specifies the name of the metric which should be incremented. The metric name will be prefixed with com.hivemq.data-governance-hub.data-validation.custom.counters..
NOTE: Interpolation is currently not supported for this argument.

incrementBy

Number

Specifies the amount by which the counter should be incremented. Negative values are supported in order to support decrement operation.

Example user-defined Metrics.Counter.increment function arguments
{
  "id": "increment-my-counter",
  "functionId": "Metrics.Counter.increment",
  "arguments": {
     "metricName": "my-counter",
     "incrementBy": 1
  }
}

To learn more about how the Metrics.Counter.increment function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.

UserProperties.add

Adds a user property to the MQTT message.

Table 10. Available UserProperties.add function arguments
Argument Type Values Description

name

String

Specifies the name of the user property. Note that multiple user properties with the same name are allowed.

value

String

Specifies the value of the user property.

Example user-defined UserProperties.add function arguments
{
  "id": "add-my-user-property",
  "functionId": "UserProperties.add",
  "arguments": {
     "name": "my-user-property",
     "value": "my-value"
  }
}

To learn more about how the UserProperties.add function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.

Terminal Functions

Terminal functions are functions which manipulate data or cause side effects, and additionally terminate the pipeline of the action. When a terminal function terminates a pipeline, no further operations after the operation containing the terminal function will be executed. Additionally, no further matching policies will be evaluated.

Currently, the following terminal functions are supported:

Delivery.redirectTo

Redirects a publish to another given topic.

Table 11. Available Delivery.redirectTo function arguments
Argument Type Values Description

topic

String

The destination MQTT topic according to MQTT specification.

applyPolicies

Boolean

true, false

Defines whether policies are executed after publishing to a different topic. Possible values are true and false.
NOTE: If multiple policies have a terminal action Delivery.redirectTo with applyPolicies set to true, HiveMQ evaluates a maximum of 20 policies. To avoid endless loops, no additional policies are evaluated once the limit is reached.

Example user-defined Delivery.redirectTo function arguments
{
  "id": "redirect-to-operation",
  "functionId": "Delivery.redirectTo",
  "arguments": {
     "topic": "redirected/my-topic",
     "applyPolicies": false
  }
}

To learn more about how the Delivery.redirectTo function can be used in a policy, see the full running example in our HiveMQ policy cookbooks repository.

String Interpolation

You can interpolate function arguments that have the type string. For example, invalid_messages/${topic} interpolates to invalid_message/sensor_data for the variable topic set to sensor_data. That means, variables that begin with a dollar sign and surrounded with curly brackets ${} are interpolated during policy execution.

Interpolation enhances your ability to design flexible topic redirects, custom log messages, and more.

Table 12. Available predefined variables
Variable Type Description Example Value

clientId

String

The MQTT client ID

a6dc66f2-efec-45e8-922a-3ee1ee7a68d7

topic

String

The MQTT topic to which the MQTT message was published

myhome/groundfloor/livingroom

policyId

String

The id of the policy that is currently executed

com.hivemq.policy.coordinates

validationResult

String

A textural description of the validation result. This text can contain schema validation errors for further debugging.

ValidationResults:[{schemaId=schema1, success=true}].