Data Transformations in HiveMQ Data Hub
IoT devices can send a wide range of data sets.
As a result, the MQTT data you receive often contains diverse data points, formats, and units.
Data transformation is sometimes necessary to bring diverse data into a common format that an application can understand and process.
The HiveMQ Data Hub transformation feature gives you the ability to add custom JavaScript-based transformation functions to your Data Hub data policies. For more information, see Modify Your MQTT Data In-Flight with Data Transformation.
This section of our documentation is part of the Early Access Preview (EAP) of the HiveMQ Data Hub Transformation feature. The documentation and the release candidate that it references are not intended for production use and can contain errors. Please note that we do not guarantee compatibility between EAP and final versions of the same feature. We appreciate your feedback as you try out the EAP and are happy to help. The best place to get in contact is our community forum. |
Configuration
On Linux systems, HiveMQ Data Hub scripting requires GLIBC version 2.29 or higher. |
HiveMQ Data Hub’s scripting engine can be configured as follows in the config.xml
:
JavaScript
JavaScript is currently one of the most widely adopted programming languages worldwide. Based on the ECMAScript standard, JavaScript is a well-established and highly versatile language that offers ease of use and a vast ecosystem of resources that can simplify development. In addition to the support of a large and active community of developers, JavaScript is a dynamically typed language with a rich set of built-in functions that cover a wide range of tasks.
The HiveMQ Data Hub supports the following JavaScript versions:
For more information about available JavaScript functions, visit Mozilla Developer Network.
Transformation Scripting with JavaScript
HiveMQ Data Hub provides an easy-to-use interface with JavaScript.
The diagram illustrates an incoming MQTT payload in JSON format with a single value
field.
The transform box represents a JavaScript function that transforms the incoming payload.
The resulting JSON object, shown on the right, contains the value
with a transformed value and a timestamp
with a transformed timestamp.
The transformation function is invoked for incoming MQTT PUBLISH packets and creates a payload.
Transformation API
The transformation function must implement the following API:
function transform(publish, context) {
return publish;
}
To state the entry point, the function name must be transform .The function has two parameters publish and context .
|
Publish Object
Field | Type | Description |
---|---|---|
|
|
The MQTT topic that is currently specified for this PUBLISH packet. |
|
|
The Quality of Service level for the PUBLISH packet. Possible values are |
|
|
Defines whether the PUBLISH packet is a retained message. Possible values are |
|
|
A list of the name and value of all user properties of the MQTT 5 PUBLISH packet. This setting has no effect on MQTT 3 clients. |
|
|
The JSON object representation of the deserialized MQTT payload. |
The publish
-object is passed as a parameter into the transform function.
The same object or a new object is returned as the transformed object.
The script can alter all of these fields and returns the publish
object.
If the retain or qos fields are absent in the returned publish object, the values from the original publish object that was passed to the transform function are used.
|
publish
-object in a JavaScript functionfunction transform(publish, context) {
const newPublish = {
payload: { value: "42" },
topic: "universal_topic",
userProperties: [ { name: "transformed", value: true } ],
retain: false,
qos: 1,
}
return newPublish;
}
The example function creates a new constant publish
-object and fills all fields with the appropriate values.
Context Object
The context
object contains additional context information.
Field | Type | Description |
---|---|---|
|
|
The arguments provided to the script. Currently, arguments can only be provided via a data policy. |
|
|
The |
|
|
The |
|
|
The branch objects to which newly created messages are sent. |
context
-object in a JavaScript functionfunction transform(publish, context) {
const payload = publish.payload;
publish.payload = {
value: payload.value + context.arguments.offset
}
return publish;
}
The example context object modifies the value
field of the payload with an additional constant offset
value from the arguments specified in a data policy.
addPublish Function
// with its own function pipeline, separate from the `onSuccess` and `onFailure` pipelines of a data policy.
The addPublish
function creates a new MQTT PUBLISH message from a transformation script.
Each new message is added to a named branch.
A branch is a grouping of newly created messages.
When a data policy uses a transformation script, the policy must define the function pipeline for each branch of the script.
For example, a data policy can choose to serialize the new messages using a different schema from the original message.
For more information, see Branches in Data Policies.
The arguments of the addPublish
function are defined as follows:
Field | Type | Description |
---|---|---|
|
|
An object that presents an MQTT PUBLISH object.
|
The Data Hub context
object provides access to the branches
object and can create as many branches as needed to fulfill your individual use case.
You access a branch with a given name as follows context.branches["branchName"]
, HiveMQ creates the branch if it does not yet exist.
The function addPublish
can then be used on this branch object to create messages.
The code context.branches["metric"].addPublish( publishObject )
creates a new message according to publishObject on the branch with the name metric .
Each branch has a unique name and contains a list of messages.
The following examples illustrate the Data Hub branch concept.
|
Sparkplug Metric Fan-out Example
function transform(publish, context) {
const metrics = publish.payload.metrics;
metrics.forEach( metric => {
const metricPublish = {
topic: publish.topic + '/' + metric.metricName,
payload: metric
}
context.branches['branchMetric'].addPublish(metricPublish);
});
return publish;
}
The script iterates over the metrics
array and adds one new message per metric to the branchMetric
branch.
Each new message has an updated topic and a payload that contains one element of the metrics
array.
In conclusion, the original publish is returned unmodified. A Data Hub data policy defines how further processing of new messages in the branches is handled. For more information, see Branches in Data Policies.
If you use Sparkplug workloads in your HiveMQ deployment, be sure to check out our new Sparkplug Module for Data Hub. For more information, see Data Hub Modules. |
Publish message for temperature violation (filtering)
function transform(publish, context) {
const temperatureThreshold = context.arguments.temperatureThreshold;
const payload = publish.payload;
if (payload.temperature > temperatureThreshold) {
// a new message about violation is created and published
const message = { "topic": publish.topic + "/exceeded", payload: payload };
context.branches["violations"].addPublish( message );
}
return publish;
}
The function in the example creates a new message when the temperature
field exceeds the predefined threshold.
The threshold is defined in a data policy and is provided via arguments
.
New messages are added to the
violations
branch.
The handing of the new messages are defined in the data policy.
Script Requirements
The interface between HiveMQ Data Hub and the script function must fulfill the following requirements:
-
The complete implementation of the transformation script must be contained in a single file.
-
The transform function must be named
transform
. -
The transform function must accept two parameters
function transform(publish, context)
. -
The transform function must return a
publish
object. -
Any branches accessed through the
context.branches
object must have defined pipelines in the corresponding data policy.
The maximum number of Data Hub scripts per HiveMQ Data Hub deployment is 5000 and the maximum source size per Data Hub script is 100KB .
|
Runtime Considerations
When you implement a transformation, it is important to keep CPU and memory resource usage in mind.
The execution of a transformation function is synchronous.
When a client publishes an MQTT payload, the payload is passed to the transformation script.
Next, the payload is processed and the clients receive an acknowledgment.
You can monitor your resource consumption levels with built-in Data Hub metrics.
For more information, see Data Hub metrics for HiveMQ Enterprise Edition and Data Hub metrics for HiveMQ Edge.
Log messages for the script engine and from console.log are written into a script.log log file.
|
Example Transform Functions
Here are some common examples of how to use the transformation functions in typical IoT scenarios.
Naturally, you can adapt the functions according to your individual needs.
Add a payload field
This example shows how to add a new field to incoming MQTT message payloads.
function transform(publish, context) {
publish.payload.timestamp = new Date().toJSON();
return publish;
}
In this example, a new field called timestamp
with the current date is added to the message payload.
Restructure the message payload
Transformation can also be used to reduce the structural complexity of a message payload:
function transform(publish, context) {
publish.payload = { "value": publish.payload.metrics.map( metric => metric.value ) };
return publish;
}
The transform script assumes that the incoming payloads contain the following structure: |
{
"metrics": [
{
"value": 1
},
{
"value": 5
},
{
"value": 100
}
]
}
applying the script the following payload is generated:
{
"value": [
1,
5,
100
]
}
Compute a payload field
This example builds on the array of values from the previous Restructure the payload example.
Here, we want to compute statistical properties from the array.
function transform(publish, context) {
publish.payload.max = Math.max( ...publish.payload.value );
publish.payload.min = Math.min( ...publish.payload.value );
publish.payload.avg = publish.payload.value.reduce( ( x, y ) => x + y, 0 ) / publish.payload.value.length;
return publish;
}
The computation of the maximum, minimum, and average of the values yields the following payload:
{
"values": [
1,
5,
100
],
"max": 100,
"min": 1,
"avg": 35.333333333333336
}
Rename a message payload field
In some cases, devices provide the same types of values but use different naming conventions.
The transformation function can be used to harmonize the incoming device data.
function getFieldsToRename() {
return [{ old: "Timestamp", new: "timestamp" }, { old: "temp", new: "temperature" } ];
}
function transform(publish, context) {
const payload = publish.payload;
getFieldsToRename().forEach( field => {
m[field.new] = m[field.old];
delete m[field.old]
} );
publish.payload = payload;
return publish;
}
The script defines a global list of fields to be renamed.
In the example, from Timestamp
to timestamp
and from temp
to temperature
.
The function transform
iterates through all fields, sets the new field name as specified where applicable, and deletes the old field.
Script Management from the HiveMQ Control Center
The Data Hub Script view in the HiveMQ Control Center facilitates your script management with an intuitive user interface. Our script creation wizard offer an intuitive way to create new scripts with context-sensitive help and immediate feedback on configuration validity.
For more information, see HiveMQ Control Center.