Policies

Policies describe how you want the HiveMQ broker to apply the selected schema to incoming MQTT message payload data.

The integrated policy engine of the HiveMQ Data Governance Hub gives you the ability to build individual data governance policies that maximize the value of your data pipelines right from the source.

The policies you create tell your HiveMQ broker how you want your incoming MQTT messages to be handled.

The specification format of a policy is JSON.

Policy Management

The HiveMQ Rest API provides the following methods to manage data validation policies for the HiveMQ Data Governance Hub:

Create a Policy

The fields in the body of the create a new policy HTTP request determine the type of policy that is created.

To create a new policy, all cluster nodes must run HiveMQ version 4.15.0 or higher and data validation must be enabled. For more information, see Requirements.
Table 1. REST API Create Policy HTTP Request Body Fields
Field Type Required Description

id

String

The identifier of the policy. The ID must be unique within one HiveMQ cluster. We recommend the use of proper namespacing. For example, com.hivemq.geo_application.only_gps_coordinates.

matching

Object

Defines the matching rules of the policy. For more information, see Matching in Policy Definitions.

validations

Object

Defines a list of the validations that are executed for all incoming MQTT messages that match MQTT topics configured in the matching definition. If no validations are present, the policy always evaluates successfully. For more information, see Validation in Policy Definitions.

onSuccess

Object

The action that defines the pipeline of operations the HiveMQ broker executes when the validation is successful. If no action is defined, no operations are executed when the policy evaluates successfully. If this is the last matching policy, then the publish gets forwarded to the broker. For more information, see Actions in Policy Definitions.

onFailure

Object

The action that defines the pipeline of operations the HiveMQ broker executes when the validation is unsuccessful. If no action is defined, no operations are executed when the policy evaluates unsuccessfully. If this is the last matching policy, then the publish is dropped. For more information, see Actions in Policy Definitions.

Your HiveMQ broker automatically replicates the schemas and policies you create to all nodes in the HiveMQ cluster.
Example policy specification
{
  "id": "com.hivemq.policy.coordinates",
  "matching": {
    "topicFilter": "coordinates/+"
  },
  "validation": {
    "validators": [
      {
        "type": "schema",
        "arguments": {
          "strategy": "ALL_OF",
          "schemas": [
            {
              "schemaId": "gps_coordinates"
            }
          ]
        }
      }
    ]
  },
  "onFailure": {
    "pipeline": [
      {
        "id": "logFailure",
        "functionId": "log",
        "arguments": {
          "level": "WARN",
          "message": "$clientId sent an invalid publish on topic '$topic' with result '$validationResult'"
        }
      }
    ]
  }
}

Delete a Policy

To delete a policy that is no longer in use, reference the policyId of the policy.

If you want to delete a policy and schema, you must delete the policy that references the schema before you can delete the schema.
Table 2. REST API Delete Policy Parameters
Parameter Type Required Description

policyId

String

The path parameter that provides the ID of the policy to delete. For example, policy1.

Get a Policy

To view the content of an existing policy, you reference the policyId of the policy.

To retrieve the content of a policy, all cluster nodes must run HiveMQ version 4.15.0 or higher.
Table 3. REST API Get Policy Parameters
Parameter Type Required Description

policyId

String

The path parameter that provides the ID of the policy.

fields

String

The query parameter that provides a comma-separated list of the fields to include in the response. Allowed values are: id, createdAt, matching, onSuccess, onFailure, and validation.

Matching in Policy Definitions

The first section of your policy identifies the policy and specifies the conditions the policy enforces.

Your matching configuration gives you highly customizable control over which MQTT messages a policy influences.

Topic-based matching in a policy definition functions in a similar way as MQTT topic subscriptions. For more information, see MQTT Topic Tree & Topic Matching: Challenges and Best Practices Explained.

Example minimal topic matching configuration in a policy
{
  "id": "com.hivemq.policy.coordinates",
  "matching": {
    "topicFilter": "coordinates/+"
  }
}

The example policy matches all MQTT messages that are published along the topic tree coordinates/+.
The + symbol defines a single topic wildcard.
For example, MQTT messages published to coordinates/europe, coordinates/africa, and coordinates/asia.

Based on the + single topic wildcard, publishes to multi level topics such as coordinates/usa/north are not part of the topic tree.
For more information, see MQTT Topics, Wildcards, and Best Practices.

Policy Execution in a Topic Tree

The following examples illustrate how polices and topic filters interact.

MQTT topics are arranged in a hierarchical tree structure, similar to the file system of a computer. A forward slash / separates each level of the tree.
Example Topic Tree with Policies
Figure 1. Example topic tree and associated policies

The example diagram depicts a topic tree with a top-level topic called myhome, four subtopics, and five policies.

  • Policy P1 is configured to match the topic filter myhome.

  • Policy P2 is configured to match the topic filter myhome/firstfloor.

  • Policy P3 is configured to match the topic filter myhome/groundfloor.

  • Policy P4 is configured to match the topic filter myhome/groundfloor/livingroom.

  • Policy P5 is configured to match the topic filter myhome/groundfloor/kitchen.

Table 4. Policy execution order for MQTT messages sent to MQTT topics in the example topic tree
Topic Policies Executed

myhome

P1

myhome/firstfloor

P2

myhome/groundfloor

P3

myhome/groundfloor/livingroom

P4

myhome/groundfloor/kitchen

P5

It is also possible to add policies to match wildcard topic filters.

MQTT supports two types of wildcard characters.
+ matches any single level of the topic tree.
# matches multiple levels of the topic tree.
  • Policy P6 is configured to match the single topic wildcard myhome/+ .

  • Policy P7 is configured to match the multi-level wildcard topic myhome/#.

Example Topic Wildcrds with Policies
Figure 2. Example wildcard topics and associated policies
Table 5. Policy execution order for MQTT messages sent to MQTT wildcard topics
Topic Policies Executed

myhome

P7, P6, P1

myhome/firstfloor

P7, P6, P2

myhome/groundfloor

P7, P6, P3

myhome/groundfloor/livingroom

P7, P4

myhome/groundfloor/kitchen

P7, P5

In the HiveMQ Data Governance Hub data validation feature, policies execute from the least specific :

  • The policies along a topic tree execute from the least specific to the most specific.

  • A multi-level wildcard # executes before single level wildcard.

  • A single level wildcard + executes before a single topic segment.

For example, in a use case that aims to simplify data handling, a wildcard can be used to ensure that all MQTT message that all MQTT message payloads are JSON formatted. Using the example topic tree, policy P1 can be defined with a multi-level topic filter # and a simple JSON schema validation.
Since every MQTT message to any MQTT topic must pass P1, all MQTT messages must be JSON.

Schema-based Validation in Policy Definitions

Since MQTT is data agnostic, MQTT clients can publish data to downstream services through the broker regardless of whether the data is valid or not.
In practice, invalid or incorrectly formatted data can cause unpredictable behavior. For example, in a microservice that needs to process sensor data.

The validations section of your HiveMQ Data Governance Hub policy ensures that the MQTT data in your broker is valid, reliable, consistent, and conforms to your predefined standards.

The validations section of your policy definition determines how incoming messages are validated. Currently, HiveMQ Data Hub data validation supports validators of the type schema only.

  • The array of validators in the validations section lists the validators the policy executes for all incoming MQTT messages.

Each validator can have one of two outcomes:

  • success: All validators evaluate to true

  • failure: Any validator evaluate to false

Schema-based data validation is an effective way to enhance the value of your data pipelines.
Validation against appropriately configured schemas can ensure data quality, reduce errors, and improve the overall usability and interoperability of your data.

The HiveMQ Data Governance Hub supports schema validation for JSON Schema and Protobuf.

To set up schema-based validation in your policy, set the validator type to schema and define the arguments that you want to use.

  • schemas: Lists an array of one or more schemas that are used for the validation.

    • schemaId: The unique string that references the schema in the HiveMQ Data Governance Hub.

  • strategy: Defines how the success or failure of the validator is evaluated. Possible entries are ALL-OF and ANY_OF.

    • ALL_OF: Specifies that the validation is only considered successful (success) if all listed schemas are valid, otherwise unsuccessful (failure).

    • ANY_OF: Specifies that the validation is considered successful (success) if any of the listed schema are valid, otherwise unsuccessful (failure).

Example minimal validation configuration in a policy
"validation": {
    "validators": [
      {
        "type": "schema",
        "arguments": {
          "strategy": "ALL_OF",
          "schemas": [
            {
              "schemaId": "gps_coordinates"
            }
          ]
        }
      }
    ]
},

Actions in Schema-based Policy Definitions

The result of each MQTT message validation in the validations portion of your policy can be success or failure.

Based on the result, you can define what the HiveMQ broker does with MQTT message in optional onSuccess and onFailure actions.

  • onSuccess or onFailure:

    • pipeline: Lists a sequence of operations in an array. HiveMQ executes the operations in the order they are listed. Each operation contains the following information:

      • id: The user-defined string that identifies the function. For example, log-my-log-message.

        • functionId: The string that identifies the type of function you want the HiveMQ broker to execute. Possible values are currently log and to.

        • arguments: Defines the necessary parameters of the log or to function referenced in the functionid field of the onSuccess or onFailure pipeline. For more information, see non-terminal-functions and terminal-functions.

Example onSuccess and onFailure configurations in a policy
  "onSuccess": {
    "pipeline": [
      {
        "id": "logSuccess",
        "functionId": "log",
        "arguments": {
          "level": "INFO",
          "message": "$clientId sent a valid publish on topic '$topic' with result '$validationResult'"
        }
      }
    ]
  },
  "onFailure": {
    "pipeline": [
      {
        "id": "logFailure",
        "functionId": "log",
        "arguments": {
          "level": "WARN",
          "message": "$clientId sent an invalid publish on topic '$topic' with result '$validationResult'"
        }
      }
   ]
 }

You can use two categories of functions in your policy definition. Both categories of functions can be executed on any outcome.

  • non-terminal function: Allows further operations in the pipeline to be executed. For example, the log function logs a message to the hivemq.log file, but further steps are executed.

  • terminal function: Ends further operations in the pipeline. The first terminal function in a pipeline stops the execution. For example, the to function publishes an MQTT message to a certain topic and stops further executions.

The onSuccess and onFailure pipelines have different default behaviors:

  • onSuccess: If there is no terminal function in the pipeline, the MQTT message is published to the original topic. The MQTT message is acknowledge according to the MQTT specification.

  • onFailure: If there is no terminal function in the pipeline, the MQTT message is dropped.
    The publishing MQTT client is handled as follows:

    • MQTT 3: The client is disconnected.

    • MQTT 5: A PUBACK with the reason string The publish processing was prevented by a policy. and the reason code 131 (Implementation Specific Error) is sent.

Non-terminal Functions

The log function is currently the only non-terminal function available for use in your onSuccess and onFailure pipelines.

Table 6. Available log function arguments
Argument Type Values Description

level

String

DEBUG, ERROR, WARN, INFO, TRACE

Specifies the log level of the function in the hivemq.log file.

message

String

Adds a user-defined string that prints to the log file. For more information, see Example log message.

Example user-defined log message
{
  "id": "log-my-log-messgae",
  "functionId": "log",
  "arguments": {
     "level": "INFO",
     "message": "My defined log message"
  }
}

Terminal Functions

The to function is currently the only terminal function available for use in your onSuccess and onFailure pipelines.

Table 7. Available to function arguments
Argument Type Values

Description

topic

String

The destination MQTT topic according to MQTT specification.

applyPolicies

Boolean

true, false

Defines whether policies are executed after publishing to a different topic. Possible values are true and false.
NOTE: If multiple policies have a terminal action to with applyPolicies set to true, HiveMQ evaluates a maximum of 20 policies. To avoid endless loops, no additional policies are evaluated once the limit is reached.

String Interpolation

You can interpolate function arguments that have the type string. For example, invalid_messages/$topic interpolates to invalid_message/sensor_data for the variable topic set to sensor_data. That means, variables that begin with a dollar sign $ are interpolated during policy execution.

Interpolation enhances your ability to design flexible topic redirects, custom log messages, and more.

Table 8. Available predefined variables
Variable Type Description Example Value

clientId

String

The MQTT client ID

a6dc66f2-efec-45e8-922a-3ee1ee7a68d7

topic

String

The MQTT topic to which the MQTT message was published

myhome/groundfloor/livingroom

policyId

String

The id of the policy that is currently executed

com.hivemq.policy.coordinates

validationResult

String

A textural description of the validation result. This text can contain schema validation errors for further debugging.

ValidationResults:[{schemaId=schema1, success=true}].