HiveMQ Enterprise Extension for Google Cloud Pub/Sub Customization
Version 4.9.0 of HiveMQ and the Enterprise Extension for Google Cloud Pub/Sub introduces the HiveMQ Google Cloud Pub/Sub Extension Customization SDK.
Our flexible new API gives you the ability to programmatically specify sophisticated custom handling of message transformations between HiveMQ and Google Cloud Pub/Sub.
Features
The HiveMQ Google Cloud Pub/Sub Extension Customization SDK gives you greater control over message content, topics, and the bidirectional flow of exchanged messages between HiveMQ and Google Cloud Pub/Sub.
Requirements
-
Java 11
-
Java IDE that supports Gradle (for example, IntelliJ)
-
HiveMQ Professional or Enterprise Edition version 4.9.0 or higher
-
HiveMQ Enterprise Extension for Google Cloud Pub/Sub version 4.9.0 or higher
-
Active Google Cloud Pub/Sub account
Quick Start with Customization SDK
The Customization SDK for the HiveMQ Enterprise Extension for Google Cloud Pub/Sub uses the same Input/Output principle as the HiveMQ Extension SDK.
The quickest way to learn about the new Customization SDK is to check out our Hello World Customization project on GitHub and use it as the basis for your own customization.
The hivemq-google-cloud-pubsub-hello-world-customization
project provides two transformers to get you started with the following transformations in the HiveMQ Google Cloud Pub/Sub Extension Customization SDK:
-
One transformer that forwards MQTT PUBLISH messages to your Google Cloud Pub/Sub service:
-
Reads the Google Pub/Sub topic from the custom settings in the
google-cloud-pubsub-configuration.xml
of your HiveMQ instance. -
Preserves the MQTT topic, retained message flag, QoS level, and MQTT 5 user properties of the MQTT message as Google Cloud Pub/Sub message attributes.
-
-
One transformer that forwards Google Cloud Pub/Sub messages to HiveMQ:
-
Reads the QoS level from the custom settings of the
google-cloud-pubsub-configuration.xml
of your HiveMQ instance. -
Sets
mqtt/topic
as the destination MQTT topic. -
Preserves all Google Cloud Pub/Sub attributes as MQTT user properties.
-
MQTT to Google Cloud Pub/Sub Customization
The MQTT to Google Pub/Sub Transformer lets you extend the capabilities of your HiveMQ Enterprise Extension for Google Cloud Pub/Sub to meet the specific needs of your individual use case.
Implement this transformer to programmatically create customized Google Cloud Pub/Sub messages from MQTT PUBLISH messages:
-
Access data from all fields and properties of specific incoming MQTT PUBLISH messages:
-
MQTT message payload
-
MQTT QoS
-
MQTT retained message flag
-
MQTT topic
-
MQTT payload format indicator
-
MQTT message expiry
-
MQTT response topic
-
MQTT correlation data
-
MQTT content type
-
MQTT user properties
-
-
Set information for the target Google Cloud Pub/Sub topic as desired:
-
Google Cloud Pub/Sub attributes
-
Google Cloud Pub/Sub custom settings
-
-
Create multiple Google Cloud Pub/Sub messages from a single MQTT PUBLISH message.
MQTT to Google Cloud Pub/Sub Transformer Configuration
With the MQTT to Google Cloud Pub/Sub Transformer, you can add your own code to implement the transformation of MQTT messages to Google Cloud Pub/Sub messages.
To enable the use of a custom MQTT to Google Cloud Pub/Sub transformer, you extend the google-cloud-pubsub-configuration.xml
file of your HiveMQ Enterprise Extension for Google Cloud Pub/Sub.
Parameter | Required | Type | Description |
---|---|---|---|
|
Boolean |
Optional setting that defines whether the selected transformer is enabled or disabled. The default setting is |
|
|
String |
The unique identifier of the transformer. |
|
|
String |
The |
|
|
String |
The fully qualified class name of the transformer that is used. |
|
|
String |
A list of MQTT topic filters.
|
|
|
Element |
A list of the custom settings that are available in the init input method of the transformer.
|
<mqtt-to-pubsub-transformers>
<mqtt-to-pubsub-transformer>
<id>my-mqtt-to-pubsub-transformer-01</id>
<enabled>true</enabled>
<pubsub-connection>your-custom-connection-id</pubsub-connection>
<mqtt-topic-filters>
<mqtt-topic-filter>mqtt/topic/a</mqtt-topic-filter>
<mqtt-topic-filter>mqtt/topic/+/a</mqtt-topic-filter>
<mqtt-topic-filter>mqtt/topic/c/#</mqtt-topic-filter>
</mqtt-topic-filters>
<transformer>fully.qualified.classname.to.YourTransformer</transformer>
<custom-settings>
<custom-setting>
<name>destination</name>
<value>destination/pubsub/topic/a</value>
</custom-setting>
<custom-setting>
<name>destination</name>
<value>destination/pubsub/topic/b</value>
</custom-setting>
<custom-setting>
<name>filepath</name>
<value>path-to-file</value>
</custom-setting>
</custom-settings>
</mqtt-to-pubsub-transformer>
</mqtt-to-pubsub-transformers>
MQTT to Google Cloud Pub/Sub Objects and Methods
MqttToPubSubTransformer
The MqttToPubSubTransformer
interface is a transformer for the programmatic creation of one or more OutboundPubSubMessages
from MQTT PublishPackets
.
The HiveMQ Enterprise Extension for Google Cloud Pub/Sub executes the MqttToPubSubTransformer
for each MQTT PUBLISH message your HiveMQ cluster receives that matches the <mqtt-topic-filters>
you configure in the <mqtt-to-pubsub-transformer>
tag of your google-cloud-pubsub-configuration.xml
file. The transformer can publish a virtually unlimited number of OutboundPubSubMessage
via the MqttToPubSubOutput
object.
Your compiled implementation (.class files) of the MqttToPubSubTransformer and all associated dependencies must be placed in a java archive (.jar) in the customizations folder of your HiveMQ Enterprise Extension for Google Pub/Sub. Additionally, you must configure an <mqtt-to-pubsub-transformer> that references the canonical name of the implementing class in the google-cloud-pubsub-configuration.xml file.
|
+
Multiple threads can call methods concurrently. To ensure that all threads behave properly and prevent unintended interactions, your implementation of the MqttToPubSubTransformer
interface must be thread-safe. Additionally, exception handling must be done inside the methods of your MqttToPubSubTransformer
implementation.
Transformer methods are not permitted to throw an Exception
of any kind. If a method in your transformer throws an exception, fix and redeploy your transformer and disable/enable the Google Cloud Pub/Sub extension.+
If the HiveMQ Google Cloud Pub/Sub extension cannot successfully complete a transformation, the extension drops the message, logs a warning, and increments the pubsub-to-mqtt.transformer.[transformer_id].failed.count`metric.
metric in the Google Cloud Pub/Sub Extension Customization SDK Metrics. An increase in the resend metric can indicate an issue with your transformer. Additionally, check your
To troubleshoot your transformer, monitor the `mqtt-to-pubsub.transformer.[transformer_id].failed.counthivemq.log
file for warnings related to the performance of your transformer.
Interface | Description |
---|---|
Contains the information of the MQTT |
|
Returns the post-transformation |
@Override
public void transformMqttToPubSub(
final @NotNull MqttToPubSubInput mqttToPubSubInput,
final @NotNull MqttToPubSubOutput mqttToPubSubOutput) {
final PublishPacket publishPacket = mqttToPubSubInput.getPublishPacket();
}
MqttToPubSubInitInput
The MqttToPubSubInitInput
interface provides context for the setup of an MqttToPubSubTransformer
and is used to retrieve the associated custom-settings
and pubsub-connection
of the selected transformer and the MetricRegistry
of HiveMQ after (re)loading of the extension configuration.
Method | Information |
---|---|
|
Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty. |
|
Provides the connection identifier and the Google Cloud project identifier with which the transformer is associated as defined in the extension configuration. |
|
Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfil the monitoring needs of your specific business logic. |
@Override
public void init(final @NotNull MqttToPubSubInitInput input) {
final CustomSettings customSettings = input.getCustomSettings();
final PubSubConnection pubSubConnection = input.getPubSubConnection();
final MetricRegistry metricRegistry = input.getMetricRegistry();
}
MqttToPubSubInput
The MqttToPubSubInput
object is the input parameter of the MqttToPubSubTransformer
. The MqttToPubSubInput
object contains the PublishPacket
to which the transformation is applied.
Method | Information |
---|---|
|
Contains all the MQTT Publish information about the message that triggered the transformer call |
@Override
public void transformMqttToPubSub(
final @NotNull MqttToPubSubInput mqttToPubSubInput,
final @NotNull MqttToPubSubOutput mqttToPubSubOutput) {
final PublishPacket publishPacket = mqttToPubSubInput.getPublishPacket();
}
MqttToPubSubOutput
The MqttToPubSubOutput
object is the output parameter of the MqttToPubSubTransformer
. The MqttToPubSubOutput
object provides the following methods:
Method | Description |
---|---|
|
Creates an empty Builder to create |
|
Sets the created |
@Override
public void transformMqttToPubSub(
final @NotNull MqttToPubSubInput mqttToPubSubInput,
final @NotNull MqttToPubSubOutput mqttToPubSubOutput) {
final PublishPacket publishPacket = mqttToPubSubInput.getPublishPacket();
final OutboundPubSubMessageBuilder builder = mqttToPubSubOutput.newOutboundPubSubMessageBuilder();
publishPacket.getPayload().ifPresent(builder::data);
mqttToPubSubOutput.setOutboundPubSubMessages(List.of(builder.build()));
}
Google Cloud Pub/Sub to MQTT Customization
The Google Cloud Pub/Sub to MQTT Transformer lets you extend the capabilities of your HiveMQ Enterprise Extension for Google Cloud Pub/Sub to meet the specific needs of your individual use case.
Implement this transformer to programmatically create customized MQTT PublishPackets
from Google Cloud Pub/Sub messages.
-
Access data from all fields of a specific Google Cloud Pub/Sub message:
-
Pub/Sub data
-
Pub/Sub attributes
-
Pub/Sub ordering key
-
Pub/Sub publish time
-
Pub/Sub message ID
-
Pub/Sub subscription
-
-
Set information in the generated MQTT PUBLISH message as desired:
-
MQTT Payload
-
MQTT QoS
-
MQTT Retain flag
-
MQTT Topic
-
MQTT Payload format indicator
-
MQTT Message expiry
-
MQTT Response topic
-
MQTT Correlation data
-
MQTT Content type
-
MQTT User properties
-
-
Create multiple MQTT PUBLISH messages from a Google Cloud Pub/Sub message
Google Cloud Pub/Sub to MQTT Transformer Configuration
With the Google Cloud Pub/Sub to MQTT Transformer, you can add your own code to implement the transformation of Google Cloud Pub/Sub messages into MQTT messages.
To enable the use of a custom Google Cloud Pub/Sub to MQTT transformer, you extend the google-cloud-pubsub-configuration.xml
file of your HiveMQ Enterprise Extension for Google Cloud Pub/Sub.
Parameter | Required | Type | Description |
---|---|---|---|
|
Boolean |
Optional setting that defines whether the selected transformer is enabled or disabled. The default setting is |
|
|
String |
The unique identifier of the transformer. |
|
|
String |
The |
|
|
String |
The fully qualified class name of the transformer that is used. |
|
|
String |
A list of the Google Cloud Pub/Sub source subscriptions.
|
|
|
Element |
A list of the custom settings that are available in the init input method of the transformer.
|
<pubsub-to-mqtt-transformers>
<pubsub-to-mqtt-transformer>
<id>my-pubsub-to-mqtt-transformer-01</id>
<enabled>true</enabled>
<pubsub-connection>your-custom-connection-id</pubsub-connection>
<pubsub-subscriptions>
<pubsub-subscription>
<name>sub-for-a</name>
</pubsub-subscription>
<pubsub-subscription>
<name>sub-for-b</name>
</pubsub-subscription>
</pubsub-subscriptions>
<transformer>fully.qualified.classname.to.YourTransformer</transformer>
<custom-settings>
<custom-setting>
<name>destination</name>
<value>destination/mqtt/topic/a</value>
</custom-setting>
<custom-setting>
<name>destination</name>
<value>destination/mqtt/topic/b</value>
</custom-setting>
<custom-setting>
<name>filepath</name>
<value>path-to-file</value>
</custom-setting>
</custom-settings>
</pubsub-to-mqtt-transformer>
</pubsub-to-mqtt-transformers>
Google Pub/Sub to MQTT Objects and Methods
PubSubToMqttTransformer
The PubSubToMqttTransformer
interface is a transformer for the programmatic creation of MQTT PublishPackets
from one or more InboundPubSubMessages
.
The HiveMQ Enterprise Extension for Google Pub/Sub executes the PubSubToMqttTransformer
implementation for every InboundPubSubMessage
that matches the <pubsub-subscriptions>
you configure in the <pubsub-to-mqtt-transformer>
tag of your google-cloud-pubsub-configuration.xml
file. The transformer can publish a virtually unlimited number of MQTT PublishPackets
via the PubSubToMqttOutput
object.
Your compiled implementation (.class files) of the PubSubToMqttTransformer and all associated dependencies must be placed in a java archive (.jar) in the customizations folder of your HiveMQ Enterprise Extension for Google Cloud Pub/Sub. Additionally, you must configure a <pubsub-to-mqtt-transformer> that references the canonical name of the implementing class in the google-cloud-pubsub-configuration.xml file.
|
Multiple threads can call methods concurrently. To ensure that all threads behave properly and prevent unintended interactions, your implementation of the PubSubToMqttTransformer interface must be thread-safe. Additionally, exception handling must be done inside the methods of your PubSubToMqttTransformer implementation.Transformer methods are not permitted to throw an Exception of any kind. If a method in your transformer throws an exception, fix and redeploy your transformer and disable/enable the Google Cloud Pub/Sub extension.+
|
If the HiveMQ Google Cloud Pub/Sub extension cannot successfully complete a transformation, the extension drops the message, logs a warning, and increments the pubsub-to-mqtt.transformer.[transformer_id].failed.count`metric.
metric in the Google Cloud Pub/Sub Extension Customization SDK Metrics. An increase in the resend metric can indicate an issue with your transformer. Additionally, check your
To troubleshoot your transformer, monitor the `pubsub-to-mqtt.transformer.[transformer_id].failed.counthivemq.log
file for warnings related to the performance of your transformer.
hivemq.log
fileTransformer with id test-transformer threw an unhandled RuntimeException with message null while processing a record from topic test, partition test-1 with offset 1000. Transformers are responsible for their own exception handling.
One instance of your PubSubToMqttTransformer implementation is created for each <pubsub-to-mqtt-transformer> that you reference in your google-cloud-pubsub-configuration.xml file.
|
Method | Description |
---|---|
|
This callback is executed for every |
public class PubSubToMqttHelloWorldTransformer implements PubSubToMqttTransformer {
@Override
public void init(final @NotNull PubSubToMqttInitInput input) {
// Insert your own business logic
}
@Override
public void transformPubSubToMqtt(
final @NotNull PubSubToMqttInput pubSubToMqttInput,
final @NotNull PubSubToMqttOutput pubSubToMqttOutput) {
// Insert your own business logic
}
}
PubSubToMqttInitInput
The PubSubToMqttInitInput
interface provides context for the set up of a PubSubToMqttTransformer
and is used to call the associated custom-settings
and pubsub-connection
and the MetricRegistry
of HiveMQ after (re)loading of the extension configuration.
Method | Information |
---|---|
|
Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty. |
|
Provides the connection identifier and the Google Cloud project identifier with which the transformer is associated as defined in the extension configuration. |
|
Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfil the monitoring needs of your specific business logic. |
@Override
public void init(final @NotNull PubSubToMqttInitInput input) {
final CustomSettings customSettings = input.getCustomSettings();
final PubSubConnection pubSubConnection = input.getPubSubConnection();
final MetricRegistry metricRegistry = input.getMetricRegistry();
}
PubSubToMqttInput
The PubSubToMqttInput
object is the input parameter of the PubSubToMqttTransformer
. The PubSubToMqttInput
object contains the information of the InboundPubSubMessage
to which the transformation is applied.
Method | Description |
---|---|
|
Contains all the Google Cloud Pub/Sub message information that triggered the transformer call. |
@Override
public void transformPubSubToMqtt(
final @NotNull PubSubToMqttInput pubSubToMqttInput,
final @NotNull PubSubToMqttOutput pubSubToMqttOutput) {
final InboundPubSubMessage inboundPubSubMessage = pubSubToMqttInput.getInboundPubSubMessage();
}
PubSubToMqttOutput
The PubSubToMqttOutput
object is the output parameter of the PubSubToMqttTransformer
. The PubSubToMqttOutput
object allows access to the MQTT PublishBuilder
. After the PubSubToMqttTransformer
transforms the PubSubToMqttInput
, HiveMQ publishes the outputs in MQTT messages.
Method | Description |
---|---|
|
Returns a new |
|
Defines the MQTT publish messages that HiveMQ publishes once the transformation is complete. Each call of the
|
If a publish or any element of a publish is null, a NullPointerException is thrown. If a publish contains any element that is not created via a PublishBuilder , an IllegalArgumentException is thrown.If the publishes list is empty, HiveMQ does not publish any MQTT message.
|
@Override
public void transformPubSubToMqtt(
final @NotNull PubSubToMqttInput pubSubToMqttInput,
final @NotNull PubSubToMqttOutput pubSubToMqttOutput) {
final InboundPubSubMessage inboundPubSubMessage = pubSubToMqttInput.getInboundPubSubMessage();
final PublishBuilder publishBuilder = pubSubToMqttOutput.newPublishBuilder();
inboundPubSubMessage.getData().ifPresent(publishBuilder::payload);
pubSubToMqttOutput.setPublishes(List.of(publishBuilder.build()));
}
Customization Use Cases
MQTT to Google Cloud Pub/Sub Customization Use Case Examples
Build and send custom Google Cloud Pub/Sub messages in response to received MQTT messages: In this use case, the goal is to transform MQTT messages based on a defined business logic and output a customized Google Cloud Pub/Sub message.
public class BusinessLogicTransformer implements MqttToPubSubTransformer {
@Override
public void transformMqttToPubSub(
final @NotNull MqttToPubSubInput mqttToPubSubInput,
final @NotNull MqttToPubSubOutput mqttToPubSubOutput) {
// Get MQTT payload
final ByteBuffer mqttPayload = mqttToPubSubInput.getPublishPacket().getPayload().orElse(ByteBuffer.allocate(0));
// Manipulate the MQTT payload according to your business logic
// For example, deserialize or add fields
final byte[] data = ownBusinessLogic(mqttPayload);
final OutboundPubSubMessage message = output.newOutboundPubSubMessageBuilder()
.topicName(input.getPublishPacket().getTopic())
.data(input.getPublishPacket().getPayload().orElse(ByteBuffer.allocate(0)))
.build();
// Set message as output
mqttToPubSubOutput.setOutboundPubSubMessages(List.of(message));
}
}
Multicast one MQTT message to multiple Google Cloud Pub/Sub topics: In this use case, the goal is to transform one MQTT message into multiple customized Google Cloud Pub/Sub messages that each contain a specific part of the information from the original MQTT message:
public class MulticastTransformer implements MqttToPubSubTransformer {
private static final String PUBSUB_TOPIC_LOCATION_SERVICE = "location-service";
private static final String PUBSUB_TOPIC_SPEED_SERVICE = "speed-control-service";
private static final String PUBSUB_TOPIC_FUEL_SERVICE = "fuel-control-service";
@Override
public void transformMqttToPubSub(
final @NotNull MqttToPubSubInput input,
final @NotNull MqttToPubSubOutput output) {
// Get MQTT payload
final ByteBuffer mqttPayload = input.getPublishPacket().getPayload().orElse(ByteBuffer.allocate(0));
// Extract information from MQTT payload
byte[] location = extractLocation(mqttPayload);
// Create record for the location information
final OutboundPubSubMessage locationMessage = output.newOutboundPubSubMessageBuilder()
.topicName(PUBSUB_TOPIC_LOCATION_SERVICE)
.data(location)
.build();
// Create record for the speed information
byte[] speed = extractSpeed(mqttPayload);
final OutboundPubSubMessage speedMessage = output.newOutboundPubSubMessageBuilder()
.topicName(PUBSUB_TOPIC_SPEED_SERVICE)
.data(speed)
.build();
// Create record for the fuel information
byte[] fuel = extractFuel(mqttPayload);
final OutboundPubSubMessage fuelMessage = output.newOutboundPubSubMessageBuilder()
.topicName(PUBSUB_TOPIC_FUEL_SERVICE)
.data(fuel)
.build();
// Set the 3 PubSub messages as output
output.setOutboundPubSubMessages(List.of(locationMessage, speedMessage, fuelMessage));
}
private byte[] extractFuel(final @NotNull ByteBuffer mqttPayload) {
// Pseudo-implementation, add custom code
return null;
}
private byte[] extractSpeed(final @NotNull ByteBuffer mqttPayload) {
// Pseudo-implementation, add custom code
return null;
}
private byte[] extractLocation(final @NotNull ByteBuffer mqttPayload) {
// Pseudo-implementation, add custom code
return null;
}
}
Use custom logic to drop specific MQTT messages: In this use case, the goal is to filter out received MQTT messages that have empty message payloads and to create Google Cloud Pub/Sub messages only for received MQTT messages with message payloads that contain data.
public class DroppingTransformer implements MqttToPubSubTransformer {
@Override
public void transformMqttToPubSub(
final @NotNull MqttToPubSubInput input,
final @NotNull MqttToPubSubOutput output) {
if (input.getPublishPacket().getPayload().isEmpty()) {
output.setOutboundPubSubMessages(List.of());
return;
}
final OutboundPubSubMessage message = output.newOutboundPubSubMessageBuilder()
.topicName(input.getPublishPacket().getTopic())
.data(input.getPublishPacket().getPayload())
.build();
output.setOutboundPubSubMessages(List.of(message));
}
}
Use custom settings to transform MQTT PUBLISH messages: In this use case, the goal is to use custom settings from the configuration XML file to transform MQTT publishes.
<mqtt-to-pubsub-transformers>
<mqtt-to-pubsub-transformer>
<enabled>true</enabled>
<id>my-mqtt-to-pubsub-transformer-01</id>
<pubsub-connection>your-custom-connection-id</pubsub-connection>
<transformer>fully.qualified.classname.to.CustomSettingsTransformer</transformer>
<mqtt-topic-filters>
<mqtt-topic-filter>mqtt/topic/a</mqtt-topic-filter>
</mqtt-topic-filters>
<custom-settings>
<custom-setting>
<name>destination-topic</name>
<value>destination/pubsub/topic/a</value>
</custom-setting>
</custom-settings>
</mqtt-to-pubsub-transformer>
</mqtt-to-pubsub-transformers>
public class CustomSettingsTransformer implements MqttToPubSubTransformer {
private @NotNull Optional<String> destinationTopic = Optional.empty();
@Override
public void init(@NotNull MqttToPubSubInitInput transformerInitInput) {
final CustomSettings customSettings = transformerInitInput.getCustomSettings();
destinationTopic = customSettings.getFirst("destination-topic");
}
@Override
public void transformMqttToPubSub(
final @NotNull MqttToPubSubInput input,
final @NotNull MqttToPubSubOutput output) {
final OutboundPubSubMessage message = output.newOutboundPubSubMessageBuilder()
.topicName(destinationTopic.orElse("default/destination/topic"))
.data(input.getPublishPacket().getPayload().orElse(ByteBuffer.allocate(0)))
.build();
output.setOutboundPubSubMessages(List.of(message));
}
}
Google Cloud Pub/Sub to MQTT Customization Use Case Examples
Build custom MQTT topic from incoming Google Cloud Pub/Sub messages: In this use case, the goal is to extract selected information from an incoming Google Cloud Pub/Sub message and publish this information to a specific MQTT topic based on your own custom business logic.
public class CustomTransformer implements PubSubToMqttTransformer {
@Override
public void transformPubSubToMqtt(
final @NotNull PubSubToMqttInput input,
final @NotNull PubSubToMqttOutput output) {
// Get the PubSub message
final InboundPubSubMessage pubSubMessage = input.getInboundPubSubMessage();
final byte[] mqttPayload = ownBusinessLogic(pubSubMessage.getDataAsByteArray().orElse(new byte[0]));
// Build MQTT publish
final Publish publish = output.newPublishBuilder()
.topic(pubSubMessage.getSubscriptionName())
.qos(Qos.AT_LEAST_ONCE)
.payload(ByteBuffer.wrap(mqttPayload))
.build();
// Set MQTT publish as output
output.setPublishes(List.of(publish));
}
private @NotNull byte[] ownBusinessLogic(final byte @NotNull [] mqttPayload) {
// Insert your own business logic
return mqttPayload;
}
}
Use custom logic to skip specific Google Cloud Pub/Sub messages: In this use case, the goal is to filter out incoming Google Cloud Pub/Sub messages that are empty and to create MQTT messages only for received messages that contain data.
public class SkippingTransformer implements PubSubToMqttTransformer {
@Override
public void transformPubSubToMqtt(
final @NotNull PubSubToMqttInput input,
final @NotNull PubSubToMqttOutput output) {
// Get the PubSub message
final InboundPubSubMessage message = input.getInboundPubSubMessage();
// Skip the PubSub message if the data is missing
if (message.getData().isEmpty()) {
output.setPublishes(List.of());
return;
}
// Build MQTT publish
final Publish publish = output.newPublishBuilder()
.topic(message.getSubscriptionName())
.payload(message.getData().get())
.build();
output.setPublishes(List.of(publish));
}
}
Use custom settings to transform Google Cloud Pub/Sub messages: In this use case, the goal is to use custom settings from the configuration XML file to transform Google Cloud Pub/Sub messages.
<pubsub-to-mqtt-transformers>
<pubsub-to-mqtt-transformer>
<id>my-pubsub-to-mqtt-transformer-01</id>
<enabled>true</enabled>
<pubsub-connection>your-custom-connection-id</pubsub-connection>
<pubsub-subscriptions>
<pubsub-subscription>
<name>your-subscription</name>
</pubsub-subscription>
</pubsub-subscriptions>
<transformer>fully.qualified.classname.to.CustomSettingsTransformer</transformer>
<custom-settings>
<custom-setting>
<name>destination-topic</name>
<value>destination/mqtt/topic/a</value>
</custom-setting>
</custom-settings>
</pubsub-to-mqtt-transformer>
</pubsub-to-mqtt-transformers>
public class CustomSettingsTransformer implements PubSubToMqttTransformer {
private @NotNull Optional<String> destinationTopic = Optional.empty();
@Override
public void init(@NotNull PubSubToMqttInitInput transformerInitInput) {
final CustomSettings customSettings = transformerInitInput.getCustomSettings();
destinationTopic = customSettings.getFirst("destination-topic");
}
@Override
public void transformPubSubToMqtt(
final @NotNull PubSubToMqttInput input,
final @NotNull PubSubToMqttOutput output) {
// Get the PubSub message
final InboundPubSubMessage message = input.getInboundPubSubMessage();
// Build MQTT publish
final Publish publish = output.newPublishBuilder()
.topic(destinationTopic.orElse("default/destination/topic"))
.payload(message.getData().orElse(ByteBuffer.allocate(0)))
.build();
output.setPublishes(List.of(publish));
}
}
Deploy Customization
You can deploy your customization with three simple steps:
-
Configure your
google-cloud-pubsub-configuration.xml
to add the MQTT to Google Cloud Pub/Sub transformer. -
Run the
./gradlew jar
task from your Gradle project to build your customization. For example, the task from the Hello World project. -
Move the jar file from your local
build/libs/
folder to theHIVEMQ_HOME/extensions/hivemq-google-cloud-pubsub-extension/customizations
directory of your HiveMQ installation.
SDK Metrics
When you implement a transformer, The HiveMQ Google Cloud Pub/Sub Extension Customization SDK adds useful metrics to your HiveMQ Enterprise Extension for Google Cloud Pub/Sub . Monitor the metrics to gain valuable insights into the behavior of your applications over time.
For more information, see HiveMQ Google Pub/Sub Extension Customization SDK Metrics.
In addition to the default metrics, the Metric Registry that the Google Cloud Pub/Sub Extension Customization SDK exposes gives you the ability to create your own metrics to measure specific business aspects of your individual use case.
For more information, see PubSubToMqttInitInput.