Data Transformations in HiveMQ Data Hub

IoT devices can send a wide range of data sets. As a result, the MQTT data you receive often contains diverse data points, formats, and units.
Data transformation is sometimes necessary to bring diverse data into a common format that an application can understand and process.

The HiveMQ Data Hub transformation feature gives you the ability to add custom JavaScript-based transformation functions to your Data Hub data policies. For more information, see Modify Your MQTT Data In-Flight with Data Transformation.

Configuration

On Linux systems, HiveMQ Data Hub scripting requires GLIBC version 2.29 or higher.

HiveMQ Data Hub’s scripting engine can be configured as follows in the config.xml:

HiveMQ Enterprise Edition

<data-hub>
    <scripting>
        <enabled>true</enabled>
    </scripting>
</data-hub>

HiveMQ Edge

<modules>
   <data-hub>
       <scripting>
            <enabled>false</enabled>
       </scripting>
   </data-hub>
</modules>

JavaScript

JavaScript is currently one of the most widely adopted programming languages worldwide. Based on the ECMAScript standard, JavaScript is a well-established and highly versatile language that offers ease of use and a vast ecosystem of resources that can simplify development. In addition to the support of a large and active community of developers, JavaScript is a dynamically typed language with a rich set of built-in functions that cover a wide range of tasks.

The HiveMQ Data Hub supports the following JavaScript versions:

For more information about available JavaScript functions, visit Mozilla Developer Network.

Transformation Scripting with JavaScript

HiveMQ Data Hub provides an easy-to-use interface with JavaScript.

Diagram: Data Transformation Principle
Figure 1. Example of the transformation script principle

The diagram illustrates an incoming MQTT payload in JSON format with a single value field.
The transform box represents a JavaScript function that transforms the incoming payload.
The resulting JSON object, shown on the right, contains the value with a transformed value and a timestamp with a transformed timestamp.

The transformation function is invoked for incoming MQTT PUBLISH packets and creates a payload.

Transformation API

The transformation function must implement the following API:

function transform(publish, context) {
    return publish;
}
To state the entry point, the function name must be transform.
The function has two parameters publish and context.

Publish Object

Table 1. publish-object
Field Type Description

topic

String

The MQTT topic that is currently specified for this PUBLISH packet.

qos

Number

The Quality of Service level for the PUBLISH packet. Possible values are 0, 1, or 2.

retain

Boolean

Defines whether the PUBLISH packet is a retained message. Possible values are true and false. For more information, see Deserialization & Serialization

userProperties

Array of Object

A list of the name and value of all user properties of the MQTT 5 PUBLISH packet. This setting has no effect on MQTT 3 clients.

payload

Object

The JSON object representation of the deserialized MQTT payload.
For more information, see Deserialization & Serialization

The publish-object is passed as a parameter into the transform function.
The same object or a new object is returned as the transformed object. The script can alter all of these fields and returns the publish object.

If the retain or qos fields are absent in the returned publish object, the values from the original publish object that was passed to the transform function are used.
Example publish-object in a JavaScript function
function transform(publish, context) {
  const newPublish = {
    payload: { value: "42" },
    topic: "universal_topic",
    userProperties: [ { name: "transformed", value: true } ],
    retain: false,
    qos: 1,
  }

  return newPublish;
}

The example function creates a new constant publish-object and fills all fields with the appropriate values.

Context Object

The context object contains additional context information.

Table 2. context-object
Field Type Description

arguments

Object

The arguments provided to the script. Currently, arguments can only be provided via a data policy.

policyId

String

The policyid of the policy from which the transformation function is called.

clientId

String

The clientId of the client from which the MQTT publish was sent.

branches

object

The branch objects to which newly created messages are sent.

Example context-object in a JavaScript function
function transform(publish, context) {
  const payload = publish.payload;

  publish.payload = {
    value: payload.value + context.arguments.offset
  }

  return publish;
}

The example context object modifies the value field of the payload with an additional constant offset value from the arguments specified in a data policy.

addPublish Function

// with its own function pipeline, separate from the `onSuccess` and `onFailure` pipelines of a data policy.

The addPublish function creates a new MQTT PUBLISH message from a transformation script. Each new message is added to a named branch. A branch is a grouping of newly created messages. When a data policy uses a transformation script, the policy must define the function pipeline for each branch of the script. For example, a data policy can choose to serialize the new messages using a different schema from the original message. For more information, see Branches in Data Policies.

The arguments of the addPublish function are defined as follows:

Table 3. addPublish: Arguments
Field Type Description

publish

Object

An object that presents an MQTT PUBLISH object.

  • topic: The new topic of the message.

  • qos: The Quality of Service level of the message. Possible values are 0, 1, or 2. The default setting is 1.

  • retain: Defines whether the new message is a retained message. Possible values are true or false. The default setting is false.

  • userProperties: A list of name-value pairs for MQTT user properties.

  • payload: The new payload for the new message. The payload must be a JSON-object.

The Data Hub context object provides access to the branches object and can create as many branches as needed to fulfill your individual use case. You access a branch with a given name as follows context.branches["branchName"], HiveMQ creates the branch if it does not yet exist. The function addPublish can then be used on this branch object to create messages.

The code context.branches["metric"].addPublish( publishObject ) creates a new message according to publishObject on the branch with the name metric. Each branch has a unique name and contains a list of messages. The following examples illustrate the Data Hub branch concept.

Sparkplug Metric Fan-out Example

Example branches to split metrics
function transform(publish, context) {
  const metrics = publish.payload.metrics;

  metrics.forEach( metric => {
    const metricPublish = {
       topic: publish.topic + '/' + metric.metricName,
       payload: metric
    }

    context.branches['branchMetric'].addPublish(metricPublish);
  });

  return publish;
}

The script iterates over the metrics array and adds one new message per metric to the branchMetric branch. Each new message has an updated topic and a payload that contains one element of the metrics array.

In conclusion, the original publish is returned unmodified. A Data Hub data policy defines how further processing of new messages in the branches is handled. For more information, see Branches in Data Policies.

If you use Sparkplug workloads in your HiveMQ deployment, be sure to check out our new Sparkplug Module for Data Hub. For more information, see Data Hub Modules.

Publish message for temperature violation (filtering)

Example branch to create new message for temperature violation
function transform(publish, context) {
  const temperatureThreshold = context.arguments.temperatureThreshold;
  const payload = publish.payload;

  if (payload.temperature > temperatureThreshold) {
    // a new message about violation is created and published
    const message = { "topic": publish.topic + "/exceeded", payload: payload };
    context.branches["violations"].addPublish( message );
  }

  return publish;
}

The function in the example creates a new message when the temperature field exceeds the predefined threshold. The threshold is defined in a data policy and is provided via arguments. New messages are added to the violations branch. The handing of the new messages are defined in the data policy.

Script Requirements

The interface between HiveMQ Data Hub and the script function must fulfill the following requirements:

  • The complete implementation of the transformation script must be contained in a single file.

  • The transform function must be named transform.

  • The transform function must accept two parameters function transform(publish, context).

  • The transform function must return a publish object.

  • Any branches accessed through the context.branches object must have defined pipelines in the corresponding data policy.

The maximum number of Data Hub scripts per HiveMQ Data Hub deployment is 5000 and the maximum source size per Data Hub script is 100KB.

Runtime Considerations

When you implement a transformation, it is important to keep CPU and memory resource usage in mind. The execution of a transformation function is synchronous.
When a client publishes an MQTT payload, the payload is passed to the transformation script.
Next, the payload is processed and the clients receive an acknowledgment.

You can monitor your resource consumption levels with built-in Data Hub metrics.
For more information, see Data Hub metrics for HiveMQ Enterprise Edition and Data Hub metrics for HiveMQ Edge.

Log messages for the script engine and from console.log are written into a script.log log file.

Example Transform Functions

Here are some common examples of how to use the transformation functions in typical IoT scenarios.
Naturally, you can adapt the functions according to your individual needs.

Add a payload field

This example shows how to add a new field to incoming MQTT message payloads.

Example to add a new field
function transform(publish, context) {
  publish.payload.timestamp = new Date().toJSON();

  return publish;
}

In this example, a new field called timestamp with the current date is added to the message payload.

Restructure the message payload

Transformation can also be used to reduce the structural complexity of a message payload:

Example to restructure the message payload
function transform(publish, context) {
  publish.payload = { "value": publish.payload.metrics.map( metric => metric.value ) };

  return publish;
}
The transform script assumes that the incoming payloads contain the following structure:
{
  "metrics": [
    {
      "value": 1
    },
    {
      "value": 5
    },
    {
      "value": 100
    }
  ]
}

applying the script the following payload is generated:

{
  "value": [
    1,
    5,
    100
  ]
}

Compute a payload field

This example builds on the array of values from the previous Restructure the payload example.
Here, we want to compute statistical properties from the array.

Example to compute maximum, minimum, and average payload values
function transform(publish, context) {
    publish.payload.max = Math.max( ...publish.payload.value );
    publish.payload.min = Math.min( ...publish.payload.value );
    publish.payload.avg = publish.payload.value.reduce( ( x, y ) => x + y, 0 ) / publish.payload.value.length;

    return publish;
}

The computation of the maximum, minimum, and average of the values yields the following payload:

{
  "values": [
    1,
    5,
    100
  ],
  "max": 100,
  "min": 1,
  "avg": 35.333333333333336
}

Rename a message payload field

In some cases, devices provide the same types of values but use different naming conventions.
The transformation function can be used to harmonize the incoming device data.

Example to rename message payload fields
function getFieldsToRename() {
    return [{ old: "Timestamp", new: "timestamp" }, { old: "temp", new: "temperature" } ];
}

function transform(publish, context) {
  const payload = publish.payload;

  getFieldsToRename().forEach( field => {
    m[field.new] = m[field.old];
    delete m[field.old]
  } );

  publish.payload = payload;

  return publish;
}

The script defines a global list of fields to be renamed. In the example, from Timestamp to timestamp and from temp to temperature.
The function transform iterates through all fields, sets the new field name as specified where applicable, and deletes the old field.

Script Management from the HiveMQ Control Center

The Data Hub Script view in the HiveMQ Control Center facilitates your script management with an intuitive user interface. Our script creation wizard offer an intuitive way to create new scripts with context-sensitive help and immediate feedback on configuration validity.

For more information, see HiveMQ Control Center.