HiveMQ Enterprise Extension for Kafka

Apache Kafka is a popular open-source streaming platform that makes it easy to share data between enterprise systems and applications.

The HiveMQ Enterprise Extension for Kafka implements the native Kafka protocol inside your HiveMQ MQTT broker. This implementation solves the difficulty of using Kafka for IoT by seamlessly integrating MQTT messages into the Kafka messaging flow.

Use the HiveMQ Enterprise Extension for Kafka to add monitored, bi-directional MQTT messaging to and from your Kafka clusters for a highly-scalable and resilient end-to-end IoT solution.

HiveMQ plus Kafka

Features

  • Forward MQTT messages from IoT devices that are connected to your HiveMQ MQTT broker to topics in one or more Kafka clusters.

  • Poll information from a Kafka topic and publish this information as MQTT messages to one or more MQTT topics.

  • Use placeholders to dynamically generate MQTT messages at runtime with values extracted from a Kafka record.

  • Use multiple MQTT topic filters with full support of wildcards to route MQTT messages to the desired Kafka topics.

  • Buffer messages on the HiveMQ MQTT broker to ensure high-availability and failure tolerance whenever a Kafka cluster is temporarily unavailable.

  • Monitor all MQTT messages that are written to and from Kafka on the centralized HiveMQ Control Center.

  • Validate messages that you read from Kafka with the help of a schema registry.

  • Use the HiveMQ Kafka Extension Customization SDK to programmatically specify sophisticated custom-handling of message transformations between HiveMQ and Kafka.

With the HiveMQ Enterprise Extension for Kafka, your HiveMQ MQTT broker can forward MQTT messages from multiple MQTT topic filters to as many Kafka topics as you choose. Conversely, the extension enables you to poll information from the records in a Kafka topic, extract the data you need, and publish the information to as many MQTT topics as your use case requires.

Bi-directional communication
Figure 1. Bi-directional communication

The MQTT-to-Kafka function of the HiveMQ Enterprise Extension for Kafka acts as multiple Kafka producers that route the selected MQTT publish messages to the desired Kafka topics. Each HiveMQ extension on every HiveMQ node in the HiveMQ cluster automatically opens connections to all Kafka brokers that are needed in the desired Kafka clusters.

MQTT to Kafka
Figure 2. MQTT to Kafka

HiveMQ sends the MQTT messages that are published to the selected MQTT topic to Kafka with the original MQTT topic from the publish message as Kafka key and the payload of the MQTT publish message as the Kafka value.

The Kafka-to-MQTT function of the extension makes it possible to transform Kafka records and publish values that are extracted from the different areas of the Kafka topic in new MQTT messages. There are many ways to configure how HiveMQ uses the information from Kafka. All HiveMQ Enterprise Extensions for Kafka in a HiveMQ cluster work together as a consumer group to balance the workload and retrieve data from an individual Kafka topic.

By default, the Kafka topic is used as the MQTT topic and the Kafka value is the MQTT payload.

The in-order guarantee of Kafka records is per partition on a Kafka topic. This order is preserved for the MQTT Client that receives the created MQTT publishes.
Kafka to MQTT
Figure 3. Kafka to MQTT

The ability to multicast targeted MQTT messages from individual records in a Kafka topic is attractive for a wide range of possible use cases. For example, to collect event information from particular services in a Kafka topic, listen to the Kafka topic, and react with dynamically generated MQTT messages to the appropriate MQTT topics.

Message Multicating
Figure 4. Kafka to MQTT Message Multicasting

Since the HiveMQ extension manages both the MQTT-to-Kafka and the Kafka-to-MQTT functions, you can easily maintain a clear overview of all communication from the control center of your HiveMQ broker.

The HiveMQ Enterprise Extension for Kafka does not interfere with the publish/subscribe mechanism of MQTT and can be used in addition to normal MQTT subscriptions and shared subscriptions.

Requirements

If you do not provide a valid license, HiveMQ uses a free trial license automatically. The trial license for the HiveMQ Enterprise Extension for Kafka is valid for 5 hours. For more license information or to request an extended evaluation license, contact HiveMQ sales.

Installation

  1. Place your HiveMQ Enterprise Extension for Kafka license file (.elic) in the license folder of your HiveMQ installation. (Skip this step if you are using a trial version of the extension. For information on evaluation limits, see HiveMQ website.)

    Since HiveMQ 4.4, the HiveMQ Enterprise Extensions are preinstalled in your HiveMQ release bundle and disabled.
    └─ <HiveMQ folder>
    ├── README.txt
    ├── audit
    ├── backup
    ├── bin
    ├── conf
    ├── data
    ├── extensions
    │   ├── hivemq-kafka-extension
    │   │   ├── conf
    │   │   │   ├── config.xml (needs to be added by the user)
    │   │   │   ├── config.xsd
    │   │   │   └── examples
    │   │   │       └── ...
    │   │   ├── hivemq-kafka-extension.jar
    │   │   ├── hivemq-extension.xml
    │   │   └── third-party-licenses
    │   │       └── ...
    ├── license
    ├── log
    ├── third-party-licenses
    └── tools
  2. Before you enable the extension, you need to configure the extension for your individual Kafka clusters and Kafka topics. For your convenience, we provide an example configuration conf/examples/config.xml that you can copy and modify as desired.
    The included config.xsd file outlines the schema and elements that can be used in the XML configuration.
    Your completed configuration file must be named config.xml and located at HIVEMQ_HOME/extensions/hivemq-kafka-extension/conf/config.xml
    The config.xsd file outlines the schema and elements that can be used in the XML configuration.
    For information on all your configuration options, see Configuration.

  3. To enable the HiveMQ Enterprise Extension for Kafka, locate the hivemq-kafka-extension folder in the extensions directory of your HiveMQ installation and remove the DISABLED file.

    Starting with HiveMQ 4.15.0, the configuration for the HiveMQ Kafka Extension is located in HIVEMQ_HOME/extensions/hivemq-kafka-extension/conf/config.xml. Support for the previous location HIVEMQ_HOME/extensions/hivemq-kafka-extension/kafka-configuration.xml will be removed in a future release.

    If applicable, move the configuration from HIVEMQ_HOME/extensions/hivemq-kafka-extension/kafka-configuration.xml to HIVEMQ_HOME/extensions/hivemq-kafka-extension/conf/config.xml.

  4. To verify that the extension is installed and configured properly, check the HiveMQ logs and the newly-created Kafka tab that is added to your HiveMQ Control Center.

kafka control center 1710
Figure 5. HiveMQ Control Center
To function properly, the HiveMQ Enterprise Extension for Kafka must be installed on all HiveMQ broker nodes in a HiveMQ cluster.
If any of the configured Kafka topics do not already exist, the HiveMQ Enterprise Extension for Kafka attempts to create the topic automatically. The topic is created with a default replication factor of 1 and a default partition count of 10.

Configuration

The HiveMQ Enterprise Extension for Kafka supports hot reload of your extension configuration. Changes that you make to the configuration of the extension are updated while the extension is running, without the need for a restart.

Version 1.1.0 of the HiveMQ Enterprise Extension for Kafka renamed two topic-mapping tags:

  • <mqtt-to-kafka-mappings> replaces the <topic-mappings> tag.

  • <mqtt-to-kafka-mapping> replaces the <topic-mapping> tag.

The new tags reflect increased topic-mapping functionality in the extension that makes it possible to read records from Kafka and publish them as MQTT messages. Although the old topic-mapping tags are backward compatible, we highly recommended that you replace the old tags when you update to the new version of the extension to ensure clarity.

The HiveMQ Enterprise Extension for Kafka is preconfigured with sensible default settings. These default values are usually sufficient to get you started.

Configuration of the extension is divided into four sections:

  • Kafka Clusters: General information and connection settings for one or more Kafka clusters.

  • MQTT-to-Kafka Mappings: Routing information for one or more MQTT topic filters to a Kafka topic.

  • Kafka-to-MQTT Mappings: Routing information for one or more records in a Kafka topic to one or more MQTT topics.

  • Schema Registry: Schema used to deserialize values in the Kafka topic.

Each mapping refers to exactly one Kafka cluster. However, a single Kafka cluster can use multiple mappings.

Schema Validation of the Extension Configuration File

Starting with HiveMQ 4.31.0, the Kafka extension gives you the option to activate schema validation for the config.xml configuration file of your extension. Currently, schema validation is disabled by default.

The schema validation uses the provided config.xsd file to validate the config.xml before it is used by the extension. Schema validation catches configuration issues that the extension does not detect programmatically.

For example, tags with invalid or misspelled names are detected with the schema validation. Duplicate tags result in ambiguous behavior depending on which of the values is parsed. For optional tags these configuration errors often go unnoticed without schema validation enabled.

We recommend to activate schema validation to highlight possible configuration issues.

You can enable the schema validation behavior in two ways:

  • Add a line with the following Java system property to your bin/run.sh file: : JAVA_OPTS="$JAVA_OPTS -Dhivemq.extensions.kafka.config.skip-schema-validation=false"

  • Provide the environment variable: export HIVEMQ_EXTENSIONS_KAFKA_CONFIG_SKIP_SCHEMA_VALIDATION=false

The provided config.xsd file can be used with XML editors and IDEs that have schema support to add helpful content completion assistance for your XML configuration.

Configuration File

The config.xml file is located in HIVEMQ_HOME/extensions/hivemq-kafka-extension/conf/config.xml.

The extension uses a simple but powerful XML based configuration.

If you want to copy and reuse the example configuration, be sure to move the file to conf/config.xml before you enable your extension.
Example configuration
<?xml version="1.0" encoding="UTF-8" ?>
<kafka-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:noNamespaceSchemaLocation="config.xsd">

    <kafka-clusters>
        <kafka-cluster>
            <id>cluster01</id>
            <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>
        </kafka-cluster>
    </kafka-clusters>

    <mqtt-to-kafka-mappings>
        <mqtt-to-kafka-mapping>
            <id>mapping01</id>
            <cluster-id>cluster01</cluster-id>
            <mqtt-topic-filters>
                <mqtt-topic-filter>mytopic/#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <kafka-topic>my-kafka-topic</kafka-topic>
        </mqtt-to-kafka-mapping>
    </mqtt-to-kafka-mappings>

    <kafka-to-mqtt-mappings>
        <kafka-to-mqtt-mapping>
            <id>mapping02</id>
            <cluster-id>cluster01</cluster-id>
            <kafka-topics>
                <kafka-topic>first-kafka-topic</kafka-topic>
                <kafka-topic>second-kafka-topic</kafka-topic>
                <!-- Arbitrary number of Kafka topics -->
            </kafka-topics>
        </kafka-to-mqtt-mapping>
    </kafka-to-mqtt-mappings>

</kafka-configuration>
All HiveMQ Enterprise Extensions support the use of environment variables. The ${ENV:VARIABLE_NAME} pattern allows you to add placeholders to your configuration xml file that are replaced with the value of an environment variable at runtime. Environment variables cannot be hot reloaded. You must set your environment variables before HiveMQ startup. For more information, see Environment Variables.

You can apply changes to the config.xml file at runtime. There is no need to restart the extension or HiveMQ for the changes to take effect (hot reload). The previous configuration file is automatically archived to the config-archive subfolder in the extension folder.

If you change the configuration file at runtime and the new configuration contains errors, the previous configuration is kept.

When configuration changes at runtime, topic mapping pauses, and restarts automatically to include the changes.

When a topic mapping restarts, the existing connections to Kafka for the topic mapping are closed and re-established.
If only the mqtt-topic-filters in a topic mapping change, no restart of the topic mapping is required.


Kafka Cluster

The extension uses the <kafka-cluster> settings to establish the connection to a Kafka cluster.

Example minimal configuration of Kafka cluster connection
<kafka-clusters>
    <kafka-cluster>
        <id>cluster01</id>
        <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>
    </kafka-cluster>
</kafka-clusters>


The following <kafka-cluster> values can be configured:

Name Default Mandatory Description

id

-

The unique identifier of the Kafka cluster. This string can only contain the following characters abcdefghijklmnopqrstuvwxyz0123456789-_.

bootstrap-servers

-

A comma-separated list of the Kafka bootstrap servers with host and port.

tls

disabled

Shows whether a TLS configuration is enabled for the Kafka cluster.

authentication

none

Shows the type of Authentication that is configured for the Kafka cluster.

If you need to connect to the same Kafka cluster with different settings, simply add the same cluster multiple times with a different id and different settings.


Topic Mapping & Message Processing

HiveMQ and Kafka Architecture
Figure 6. HiveMQ and Kafka Architecture


MQTT-to-Kafka Topic Mapping

MQTT-to-Kafka mappings represent the routing information from MQTT topic filters to Kafka topics. The mapping from MQTT to Kafka is relatively simple. You select the Kafka Topic on which you want to publish the MQTT Topic. The MQTT Topic is used as the Kafka Key and the MQTT Payload is used as the Kafka Value.

Example minimal configuration of an MQTT to Kafka topic mapping
<mqtt-to-kafka-mappings>

    <mqtt-to-kafka-mapping>
        <id>mapping01</id>

        <!-- Kafka cluster to use for this topic mapping -->
        <cluster-id>cluster01</cluster-id>

        <!-- List of MQTT topic filters -->
        <mqtt-topic-filters>
            <mqtt-topic-filter>topic1/#</mqtt-topic-filter>
            <mqtt-topic-filter>topic2/#</mqtt-topic-filter>
        </mqtt-topic-filters>

        <!-- Target Kafka topic -->
        <kafka-topic>topic01</kafka-topic>

        <!-- 100 * 1024 * 1024 = 104857600 = 100 MiB-->
        <kafka-max-request-size-bytes>104857600</kafka-max-request-size-bytes>

    </mqtt-to-kafka-mapping>

</mqtt-to-kafka-mappings>

The following <mqtt-to-kafka-mapping> values can be configured:

Name Default Mandatory Description

id

-

The unique identifier for this topic mapping. This string can only contain the following characters abcdefghijklmnopqrstuvwxyz0123456789-_.

cluster-id

-

The identifier of the referenced Kafka cluster.

mqtt-topic-filters

-

A list of MQTT topic filters.

kafka-topic

-

The Kafka topic to which the MQTT topic is routed.

kafka-acks

ONE

The Kafka acknowledgment mode. This mode specifies the way that the Kafka cluster must acknowledge incoming messages.
Three Kafka acknowledgment modes are available:
ZERO (do not wait for an acknowledgment)
ONE (only the lead partition of the replication must send an acknowledgment, this is the default setting)
ACK (all replications must send an acknowledgment, depending on Kafka server configuration)

kafka-max-request-size-bytes

-

Optional setting to define the maximum size in bytes that a request to Kafka can contain. This value determines the largest MQTT message size that you can send to Kafka.
The max-request-byte-size value you define must be a positive integer greater than 0.
(NOTE: The maximum size the MQTT protocol allows in a single MQTT publish message is 256 MB.)
When configured, the kafka-max-request-size-bytes defines the max.request.size setting for the Kafka producer client of your HiveMQ Enterprise Extension for Kafka. Messages that exceed the set request limit are dropped and not forwarded to the Kafka broker.
The dropped messages increment the kafka-extension.total.failed.count and kafka-extension.topic-mapping.<topic-mapping>.failed.count of your extension metrics. For more information, see Kafka Extension Metrics.
When no max-request-byte-size is set, the Kafka extension uses the default max.request.size limit of 1 MB per message request to Kafka.

The default maximum message size that Kafka brokers accept is 1 MB. If you configure the max-request-byte-size to forward MQTT messages that are larger than 1 MB to Kafka, check that the maximum message size of your Kafka broker is adjusted accordingly.
If you want to send the same messages to multiple Kafka topics, simply duplicate the topic mapping and change the id and the kafka-topic of the additional topic mappings.

Example MQTT to Kafka Use Case

Map multiple MQTT topics to one Kafka topic: In this use case, the goal is to send messages from millions of connected cars (MQTT clients) to one Kafka topic.

MQTT to Kafka
Example configuration of multiple MQTT topics to one Kafka topic
<mqtt-to-kafka-mappings>

    <mqtt-to-kafka-mapping>
        <id>many-to-one-mapping</id>

        <cluster-id>cluster-local-1</cluster-id>

        <mqtt-topic-filters>
            <mqtt-topic-filter>ger/+/speed</mqtt-topic-filter>
            <mqtt-topic-filter>usa/+/velo</mqtt-topic-filter>
        </mqtt-topic-filters>

        <kafka-topic>carsSpeed</kafka-topic>

    </mqtt-to-kafka-mapping>

</mqtt-to-kafka-mappings>


Kafka-to-MQTT Topic Mapping

Kafka-to-MQTT mappings represent the routing information from Kafka topics to MQTT topics. Information from the Kafka record is sent on the MQTT topics that you define in the configuration.

If no MQTT topic is set, the Kafka topic is used as the MQTT topic. Unless otherwise configured, the payload of the MQTT message is the value of the Kafka record:

Kafka to MQTT

Each Kafka-to-MQTT topic mapping in the Kafka Extension has a dedicated Kafka consumer. In the HiveMQ cluster, all Kafka consumers for the same topic mapping are combined in one consumer group. The number of Kafka-to-MQTT topic mappings equals the number of consumer groups.

The HiveMQ Enterprise Extension for Kafka automatically assigns a consumer group name to each Kafka-to-MQTT topic mapping that you create. The consumer group name consists of the following information:
hivemq- + mappingID + -group.
For example, the extension automatically assigns a topic mapping with the topic mapping ID topic-mapping-1 the consumer group name hivemq-topic-mapping-1-group.

Verify that the permissions on your Kafka cluster allow consumer group name patterns that begin with hivemq.
If your architecture uses multiple HiveMQ clusters to consume identical topics from a single Kafka cluster, your HiveMQ topic mapping IDs must be unique across all HiveMQ clusters to ensure all clusters consume all messages.

The extension reads and processes Kafka records in batches. The offset of the last record of each processed batch is committed to Kafka as soon as the batch is processed. This method ensures continuity if an extension in the cluster is temporarily unavailable or any other rebalancing of the cluster occurs.

After a restart, or when one node leaves the cluster and another node takes over, the extension continues reading from the point where it stopped before the restart.

Since each partition in your Kafka topic has one dedicated consumer, it is a best practice to have more partitions than Kafka extensions (consumers). This configuration option for the Kafka topic ensures higher efficiency and maximum utilization of your HiveMQ extension.

To increase practical usability and fulfill a wide range of use cases, it is possible to extract information from the Kafka value. To enable the extraction of information, the Kafka value must be deserialized by the HiveMQ Enterprise Extension for Kafka. Supported information formats are listed here. The information can be extracted with a semantic that is similar to XPath. For more information, see supported path syntax.

Example minimal configuration of a Kafka-to-MQTT topic mapping
<kafka-to-mqtt-mappings>

    <kafka-to-mqtt-mapping>
        <!-- The ID is used for logging messages -->
        <id>mapping01</id>

        <!-- Kafka cluster to use for this topic mapping -->
        <cluster-id>cluster02</cluster-id>

        <!-- List of kafka topics -->
        <kafka-topics>
            <kafka-topic>topic1/#</kafka-topic>
            <kafka-topic>topic2/#</kafka-topic>
        </kafka-topics>

    </kafka-to-mqtt-mapping>

</kafka-to-mqtt-mappings>

In the minimal configuration example, the MQTT topic of the MQTT publish messages defaults to the Kafka topic and the MQTT payload defaults to the Kafka value.


The following <kafka-to-mqtt-mapping> values can be configured:

Name Default Mandatory Type in Path Description

id

-

-

The unique identifier for this mapping. This string can only contain the following characters abcdefghijklmnopqrstuvwxyz0123456789-_.

cluster-id

-

-

The identifier of the referenced Kafka cluster.

kafka-topics

-

-

At least one Kafka topic and/or Kafka-topic pattern is required. Be careful when using kafka-topic-pattern and deserialization, you might poll messages from kafka you don’t have a schema for, which can lead to losing messages. The safe route is to explicitly specify all kafka-topics here and also for each schema.

use-schema-registry

none

-

Specifies whether the Kafka records are deserialized.
Three modes are possible:
none (no deserialization)
local (use local files to deserialize AVRO or JSON objects)
confluent (use Confluent Schema Registry to deliver the right schema for your message)

mqtt-topics

${KAFKA_TOPIC}

String-Array

One or more MQTT topics to which the messages are sent.

mqtt-payload

${KAFKA_VALUE}

Bytes, other

The payload for the MQTT messages. If no schema is applied, the original Kafka value is sent as the MQTT payload and cannot be modified.
To send an array of bytes, specify bytes.
To send arrays such as string or numbers, specify other. Other types are automatically converted to a UTF-8 string.

mqtt-publish-fields

-

see below

The metadata that is defined for the MQTT publish message. See the following MQTT Publish Fields table for information on all available fields.

If you use a schema registry, we do not recommend the use of Kafka topic patterns. The combination of Kafka topic patterns and a schema registry can cause unexpected behavior that is difficult to interpret.

When you use a JSON schema, you can select a byte array type for the mqtt-payload in the following way:

Encode your payload as a base64 string.

In the JSON schema, specify the type as `string` with an additional property `contentEncoding` set to `base64`.

The Kafka extension decodes your Base64 string into a byte array when selected by the path syntax as the MQTT payload. The contentEncoding parameter is available since draft version 7 of the JSON schema.

The Kafka extension only implements base64 and only permits this encoding for string type values.
Example payload field using byte arrays with JSON schema
{
    "payload": {
        "type": "string",
        "contentEncoding": "base64"
    }
}


MQTT Publish Fields

In the <mqtt-publish-fields> of the Kafka-to-MQTT configuration, you can specify the metadata of the MQTT Publish message that the mapping creates.

Each MQTT Publish field in the configuration file can contain only one variable.
Name Default Mandatory Type in Path Description

retained-flag

false

Boolean

Defines whether all MQTT messages from the corresponding Kafka-to-MQTT mapping are retained. Possible values are true or false. If set to true and retain is disabled by HiveMQ, messages are not retained.

payload-format-indicator

-

String

Sets the format of the payload of the corresponding Kafka-to-MQTT mapping. Possible values are UTF-8 or UNSPECIFIED.

message-expiry-interval

-

Long,integer, or string

A long value that indicates the lifetime in seconds of a published message. The value must be positive.

response-topic

-

String

Sets the response topic for the MQTT messages.

correlation-data

-

Bytes

Sets the correlation data for the MQTT messages. This data is used to show to which request a response message belongs. If you insert the correlation-data directly into the config, which means you don’t use path-syntax to extract it, you have to encode it in Base64.

user-properties

-

String

Sets multiple <user-properties> with <key> and <value> pairs for the MQTT messages.

content-type

-

String

Specifies the content type of the MQTT message.

qos

1

Long, integer, or string

Sets the Quality of Service (QoS) for the MQTT message. 0 at most once, 1 at least once, 2 exactly once.

Example configuration of MQTT publish fields
<kafka-to-mqtt-mapping>
    ...

    <mqtt-publish-fields>
        <retained-flag>true</retained-flag>
        <payload-format-indicator>UTF-8</payload-format-indicator>
        <message-expiry-interval>10000</message-expiry-interval>
        <response-topic>my/response/topic</response-topic>
        <correlation-data>my/correlation/data</correlation-data>
        <user-properties>
            <user-property>
                <key>key1</key>
                <value>value1</value>
            </user-property>
            <user-property>
                <key>key2</key>
                <value>value2</value>
            </user-property>
        </user-properties>
        <content-type>JSON</content-type>
        <qos>2</qos>
    </mqtt-publish-fields>

</kafka-to-mqtt-mapping>

Example Kafka to MQTT Use Cases

Map multiple Kafka topics to one MQTT topic: In this use case, the goal is to send alert messages to multiple MQTT clients from a backend service that is connected to Kafka. With the HiveMQ Enterprise Extension for Kafka, as many Kafka topics as desired can be mapped to a single MQTT topic. Through the HiveMQ broker, MQTT clients can subscribe to a single MQTT topic to get all configured alerts.

Kafka to MQTT
Example configuration of multiple Kafka topics to one MQTT topic
<kafka-to-mqtt-mappings>

    <kafka-to-mqtt-mapping>
        <id>mapping-01</id>

        <cluster-id>cluster-local</cluster-id>

        <kafka-topics>
            <kafka-topic>securityAlert</kafka-topic>
            <kafka-topic>speedAlert</kafka-topic>
            <kafka-topic>weatherAlert</kafka-topic>
        </kafka-topics>

        <mqtt-topics>
            <mqtt-topic>notify/alert</mqtt-topic>
        </mqtt-topics>

    </kafka-to-mqtt-mapping>

</kafka-to-mqtt-mappings>

Multicast one Kafka topic to several MQTT topics: In this use case, the goal is to send updates from a backend system that utilizes Kafka to several models of an IoT wearable device (MQTT clients).

Kafka to MQTT
Example configuration of multicasting from one Kafka topic to multiple MQTT topics
<kafka-to-mqtt-mappings>

    <kafka-to-mqtt-mapping>
        <!-- The ID is used for logging messages -->
        <id>mapping01</id>

        <!-- Kafka cluster to use for this topic mapping -->
        <cluster-id>cluster01</cluster-id>

        <!-- Kafka topic from which you are casting -->
        <kafka-topics>
            <kafka-topic>sysUpdate</kafka-topic>
        </kafka-topics>

        <!-- Multiple MQTT topics-->
        <mqtt-topics>
            <mqtt-topic>wearMode1</mqtt-topic>
            <mqtt-topic>wearMode2</mqtt-topic>
            <mqtt-topic>wearMode3</mqtt-topic>
            <mqtt-topic>wearModeX</mqtt-topic>
        </mqtt-topics>

    </kafka-to-mqtt-mapping>

</kafka-to-mqtt-mappings>

Use Kafka constants to efficiently map one Kafka topic to one or more MQTT topics: In this use case, the goal is to map information from a backend system that utilizes Kafka to a large number of connected cars (MQTT clients). The kafka-key contains the MQTT topic. The kafka-value uses all information that is included in the value of the Kafka record as the payload of the MQTT message. At runtime, Kafka constant placeholders are replaced with the corresponding values from the Kafka record to create the appropriate MQTT messages for each connected car (MQTT client).

Kafka to MQTT with Kafka constants
Example configuration of one Kafka topic to many MQTT topics using Kafka constants
<kafka-to-mqtt-mappings>

    <kafka-to-mqtt-mapping>
        <!-- The ID is used for logging messages -->
        <id>mapping03</id>

        <!-- Kafka cluster to use for this topic mapping -->
        <cluster-id>cluster03</cluster-id>

        <!-- Kafka topic from which you are casting  -->
        <kafka-topics>
            <kafka-topic>update-1</kafka-topic>
        </kafka-topics>

        <!-- Multiple MQTT topics-->
        <mqtt-topics>
            <mqtt-topic>${KAFKA_KEY}</mqtt-topic>
        </mqtt-topics>
        <mqtt-payload>${KAFKA_VALUE}</mqtt-payload>

    </kafka-to-mqtt-mapping>

</kafka-to-mqtt-mappings>
Multicast from one Kafka topic to many MQTT topics with path syntax:

In this use case, the goal is to send an identical section of information to multiple IoT devices from a backend system that utilizes Kafka. Path syntax is used to extract specific parts of data from a deserialized Kafka record. This method requires the use of a schema registry.

Kafka to MQTT path syntax
Example configuration of multicasting from one Kafka topic to many MQTT topics using path syntax
<kafka-to-mqtt-mappings>

    <kafka-to-mqtt-mapping>
        <!-- The ID is used for logging messages -->
        <id>mapping03</id>
        <!-- Kafka cluster to use for this topic mapping -->
        <cluster-id>cluster03</cluster-id>
        <!-- the schema registry to use to deserialize this Kafka topic -->
        <use-schema-registry>confluent</use-schema-registry>

        <!-- Kafka topic from which you are casting extracted information -->
        <kafka-topics>
            <kafka-topic>firmwareNew</kafka-topic>
        </kafka-topics>

        <!-- An array of MQTT topics -->
        <mqtt-topics>
            <mqtt-topic>${//mqtt-info/topic}</mqtt-topic>
        </mqtt-topics>
         <!-- The information that is sent from the Kafka record -->
        <mqtt-payload>${//mqtt-info/data}</mqtt-payload>

    </kafka-to-mqtt-mapping>

</kafka-to-mqtt-mappings>


Schema Registries

Kafka itself does not check the format of a message or verify message correctness.

The use of one or more schema registries can ensure proper messaging functionality and provide message validation. For example, when you receive a message from Kafka and want to forward the message through HiveMQ, you can validate the message with a schema. If the message format is invalid and does not fit the specified schema, the HiveMQ broker does not publish the message.

A schema registry is a centralized location that stores all available schemas.

Schemas define the structure of a data format. Schemas can be used for several purposes:

  • Validate the format of a Kafka message payload before further processing

  • Make data more compact for transmission

  • Extract information from the Kafka value to create MQTT messages

If you want to use serialized values that you extract from a Kafka record, you must first deserialize the values with an appropriate schema.

When you set the <use-schema-registry> setting to local or confluent in a topic mapping, you can specify the corresponding <schema-registries> in the configuration of your extension.

As your application changes over time, it can be useful to support multiple versions of a schema so that data encoded with older schemas is handled correctly. To help you ensure schema compatibility, the Confluent Schema Registry supports schema evolution.

Currently, the HiveMQ Enterprise Extension for Kafka supports the deserialization of Kafka values that use the Avro or JSON schema formats. The extension does not yet support deserialization of the Kafka key.

Local Schema Registry

The local schema registry resides on the same machine as the HiveMQ Enterprise Extension for Kafka.

Local Schema Regisrty
Figure 7. Local Schema Registry

Your local schema registry can contain more than one schema file of the same type.
Each schema file that you want to use must be added to your local schema registry.

If you use a schema file, the HiveMQ extension checks each message that arrives against the selected schema and only processes valid messages.

A Kafka topic can only have one schema file in the local schema registry. If a Kafka topic matches multiple entries in the local schema registry, the first schema that appears in the configuration file is used.
Table 1. Schema Registry
Name Default Mandatory Description

local-schema-registry

-

A schema registry where schemas are placed in a local folder.

confluent-schema-registry

-

A schema registry from Confluent.

Table 2. Local Schema Registry
Name Default Mandatory Description

name

-

The name of the local schema registry.

avro-schema-entries

-

Contains multiple Avro schema entries.

avro-schema-entry

-

Defines an Avro schema with serialization, Avro file, and Kafka topic fields.

avro-file

-

Specifies the relative path to the Avro file. The resulting path searches extenstionhome/local-schema-registry/<your-configured-path>.

serialization

binary

Specifies whether the Avro object is serialized in binary or json format. The format must be provided for objects that you want to deserialize.

json-schema-entries

-

Contains multiple JSON schema entries.

json-schema-entry

-

Defines a JSON schema with serialization, JSON schema file, and Kafka topic fields.

json-file

-

Specifies the relative path to the JSON schema file.

kafka-topics

-

Kafka topics, for the specified schema. Either one Kafka topic or one Kafka-topic pattern is required. These fields specify to which Kafka topics the schema applies.

kafka-topic

-

The Kafka topic of the entry.

kafka-topic-pattern

-

Identifies which schema file the Kafka topic uses. Use kafka-topic-pattern cautiously. Inaccurate use of a topic pattern can poll messages from topics that do not use the selected schema and cannot be deserialized. The mismatch can cause lost messages. Best practice is to explicitly specify all Kafka topics to which the schema applies.

To use the schema registry for all polled kafka-topics, use <kafka-topic-pattern>.*</kafka-topic-pattern>. This pattern is a regular expression that matches every topic.
Since version 4.5.0 of the HiveMQ Enterprise Extension for Kafka, it is possible to use multiple <kafka-topic-pattern> in combination with multiple <kafka-topic>.
Example configuration of a local schema registry
<kafka-configuration>
    ...
    <schema-registries>
        <local-schema-registry>
            <name>local-register</name>
            <avro-schema-entries>
                <avro-schema-entry>
                    <serialization>binary</serialization>
                    <!-- The relative path where the avro file is searched is extensionhome/local-schema-registry/schemafile.avsc -->
                    <avro-file>schemafile.avsc</avro-file>
                    <kafka-topics>
                        <kafka-topic>the-first-kafka-topic</kafka-topic>
                        <kafka-topic>a-second-kafka-topic</kafka-topic>
                    </kafka-topics>
                </avro-schema-entry>
            </avro-schema-entries>
            <json-schema-entries>
                <json-schema-entry>
                    <!-- The relative path where the avro file is searched is extensionhome/local-schema-registry/schemafile.avsc -->
                    <json-file>schemafile.avsc</json-file>
                    <kafka-topics>
                        <kafka-topic-pattern>a-regex-.*</kafka-topic-pattern>
                    </kafka-topics>
                </json-schema-entry>
            </json-schema-entries>
        </local-schema-registry>
    </schema-registries>
    ...
</kafka-configuration>
Confluent Schema Registry
Confluent Schema Regisrty
Figure 8. Confluent Schema Registry

With the HiveMQ Enterprise Extension for Kafka, you can also use Confluent Schema Registry as a source for your schema files.

The Confluent Schema Registry is a distributed storage layer for schemas that uses Kafka as the underlying storage mechanism. Schema Registry automatically stores a versioned history of all your Avro schemas, checks compatibility, and retrieves the right schema for every incoming message to HiveMQ.

Through the ability to store and retrieve multiple versions of a schema, Schema Registry supports schema evolution that makes it easier to accommodate changes in data over time. For example, provide backward compatibility to avoid loss of data.

To prevent unnecessary calls to the Confluent Schema Registry, schema files that you have already used are cached in the extension.

Table 3. Confluent Schema Registry
Name Default Mandatory Description

name

-

The name of the Confluent schema registry.

url

-

The URL of the Confluent schema registry. To use a TLS connection, start the URL with https:.

tls

-

Defines options for the TLS connection. Specify cipher suites, protocols, a truststore, and whether hostname verification is used.

authentication

-

Defines HTTP authentication options to authenticate on the schema registry.

authentication.plain

-

Use plain HTTP authentication. For more information, see HTTP authentication.

authentication.plain.username

-

The username to use for plain authentication.

authentication.plain.password

-

The password to use for plain authentication.

authentication.token

-

Specifies an authentication token to use.

truststore

-

Specifies a trust store that is used to trust the schema registry servers certificate.

truststore.path

-

Specifies a path to a trust store file.

truststore.password

-

The password to open the trust store.

keystore

-

Specifies a key store that can be used to authenticate on the schema registry.

keystore.path

-

Specifies a path to a key store file.

keystore.password

-

The password to open the key store.

keystore.private-key-password

-

The private key password.

protocols

All protocols enabled by the JVM

The enabled protocols.

cipher-suites

All cipher suites enabled by the JVM

The enabled cipher-suites.

hostname-verification

true

Specifies whether hostname verification is enabled. This option can be set to true or false. Disabling hostname verification can increase vulnerability to man-in-the-middle attacks.

kafka-topics

-

Kafka topics for the specified schema registry. One Kafka topic or one Kafka-topic pattern is required. These fields specify to which Kafka topics schemas from the schema registry apply.

kafka-topic

-

The Kafka topic of the entry.

kafka-topic-pattern

-

A pattern that identifies to which Kafka topics the schema registry applies. Use Kafka topic patterns carefully. It is possible to poll messages from topics that don’t use the selected schema registry and therefore can’t be deserialized. This can lead to lost messages. The best practice is to explicitly specify all Kafka topics to which the schema registry applies.

Example configuration of Confluent Schema Registry
<kafka-configuration>
    ...
    <schema-registries>
        <confluent-schema-registry>
                <name>confluent-sr</name>
                <url>https://localhost:8081</url> <!-- If the URL starts with https:, TLS is enabled -->
                <kafka-topics>
                    <kafka-topic>kafka-topic</kafka-topic>
                </kafka-topics>

                <!-- optional TLS configuration -->
                <tls>
                  <!-- truststore, to trust the schema registry servers certificate -->
                  <truststore>
                      <path>/opt/hivemq/conf/kafka-trust.jks</path>
                      <password>truststorepassword</password>
                  </truststore>

                  <!-- keystore, when using SSL client authentication -->
                  <keystore>
                      <path>/opt/hivemq/conf/kafka-keystore.jks</path>
                      <password>keystorepassword</password>
                      <private-key-password>keypassword</private-key-password>
                  </keystore>

                  <!-- supported cipher suites -->
                  <cipher-suites>
                      	<cipher-suite>TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384</cipher-suite>
                        <cipher-suite>TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384</cipher-suite>
                        <cipher-suite>TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256</cipher-suite>
                  </cipher-suites>

                  <!-- supported TLS protocols -->
                  <protocols>
                      <protocol>TLSv1.2</protocol>
                  </protocols>

                  <!-- Specifies whether the client verifies the hostname of the server -->
                  <hostname-verification>true</hostname-verification>
                </tls>

                <!-- optional HTTP authentication configuration -->
                <authentication>
                      <plain>
                          <username>srUser</username>
                          <password>srPassword</password>
                      </plain>
                </authentication>
        </confluent-schema-registry>
    </schema-registries>
    ...
</kafka-configuration>
Supported Formats

The HiveMQ Enterprise Extension for Kafka currently supports deserialization of the following formats with the local schema registry:

  • AVRO

  • JSON (with JSON Schema Draft v4, v6, and v7 specifications for validation)

The HiveMQ Enterprise Extension for Kafka currently supports deserialization of the following formats with Confluent Schema Registry:

  • AVRO

Authentication for Confluent Schema Registry

There are several ways to authenticate on a Confluent Schema Registry instance. For more information on how to configure authentication on the Schema Registry, see Schema Registry Security

  • HTTP authentication methods (configured in the authentication tag)

    • Basic authentication: A simple username/password combination encoded in the HTTP headers of each request to the Schema Registry.

    • Bearer (token) authentication: Token-based authentication that encodes a token string into an HTTP header

  • TLS client authentication (configured in the tls.keystore tag): The server requests a certificate from the client within the TLS handshake, which the client provides from its keystore.

For the HTTP authentication methods you can use either basic or token authentication but not both at the same time. However, you can combine any HTTP authentication method (none, basic, bearer) with TLS client authentication.
Kafka Constants

Kafka Constants are placeholders that represent various sections of information from a Kafka record.
At runtime, the HiveMQ Enterprise Extension for Kafka replaces the placeholders in the generated MQTT message with specific content from the Kafka record.
HiveMQ places the information that is read from the Kafka record directly into the placeholder without any transformation or deserialization.

In a Kafka to MQTT topic mapping, variables such as Kafka Constant placeholders are denoted with '${}'.

Name Description

${KAFKA_TOPIC}

The value from the topic of the Kafka record.

${KAFKA_KEY}

The value from the key of the Kafka record.

${KAFKA_VALUE}

The value from the value field of the Kafka record.

Example of Kafka Constants usage
<mqtt-topics>
    <mqtt-topic>topic/test/topic-1</mqtt-topic>
    <mqtt-topic>${KAFKA_TOPIC}</mqtt-topic>
    <mqtt-topic>topic/test/${KAFKA_KEY}</mqtt-topic>
</mqtt-topics>
<mqtt-payload>${KAFKA_VALUE}</mqtt-payload>
Supported Path Syntax

To extract values from fields of the deserialized Kafka value, the HiveMQ Enterprise Extension for Kafka uses a path syntax that is similar to the XPath syntax. The path syntax is denoted by ${}.

  • // selects the root element

  • / moves down in the tree

  • nodename specifies the node that gets selected or moved to

  • nodenames[x] selects the x-th element in the nodenames array. x is a 0-indexed value (unlike regular XPath syntax, which is 1-indexed).

The path must begin with the root element //.
The path must point to a value with the corresponding types. For example, the path used for mqtt-payload must point to a field of the type Bytes.
Example path syntax
<mqtt-topics>
    <mqtt-topic>topic/example/topic-1/${//mqtt-information/topic}</mqtt-topic>
</mqtt-topics>
<mqtt-payload>${//mqtt-information/client[2]/payload}</mqtt-payload>
Path Examples

This section provides examples of path semantic usage.

We use the Avro schema as a basis:

Example Avro Schema
{
  "type": "record",
  "name": "userInfo",
  "namespace": "my.example",
  "fields": [
    {
      "name": "clientIDs",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "payload",
      "type": "bytes"
    },
    {
      "name": "publish_metadata",
      "type": {
        "type": "record",
        "name": "publish_information",
        "fields": [
          {
            "name": "retained",
            "type": "boolean",
            "default": "NONE"
          },
          {
            "name": "payload_format_indicator",
            "type": "string"
          },
          {
            "name": "expiry",
            "type": "long"
          }
        ]
      },
      "default": {}
    }
  ]
}

This example shows how to extract client IDs and insert them into MQTT topics. The format of the topics is test/client/<clientId>. This format makes it possible to send one Kafka record to many MQTT topics. To get the client IDs, we can use the path //clientIDs.

The configuration in the config.xml file is done as follows:

Example path syntax configuration
<mqtt-topics>
    <mqtt-topic>test/client/${//clientIDs}</mqtt-topic>
</mqtt-topics>
The path must be placed within ${ and }

In the next example, we assume the field `clientIDs' contains the values ["clientId1", "clientId2", "clientId3"]. As a result, the MQTT publish is sent to the following three topics:

  • test/client/clientId1

  • test/client/clientId2

  • test/client/clientId3

To omit the first and last client ID and only send the MQTT Publish to the second client ID, we can add [1] at the end of the desired entry: //clientIDs[1]. (Since arrays begin at 0, we need to use [1] for the second entry)

Example path syntax configuration
<mqtt-topics>
    <mqtt-topic>test/client/${//clientIDs[1]}</mqtt-topic>
</mqtt-topics>

TLS

To connect to your Kafka clusters over a secure TCP connection, configure the TLS options in your <kafka-cluster> settings.

Table 4. TLS configuration settings for Kafka cluster
Name Default Mandatory Description

enabled

false

If TLS is enabled, this setting can be set to true or false.

protocols

All protocols enabled by the JVM

The enabled protocols.

cipher-suites

All cipher suites enabled the JVM

The enabled cipher-suites.

keystore.path

-

The path to the key store where your certificate and private key are included.

keystore.password

-

The password to open the key store.

keystore.private-key-password

-

The password for the private key.

truststore.path

-

The path for the trust store which includes trusted client certificates.

truststore.password

-

The password to open the trust store.

hostname-verification

true

If hostname verification is enabled, this option can be set to true or false.

Example TLS configuration
<kafka-clusters>
    <kafka-cluster>
        <id>cluster01</id>
        <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>

        <tls>
            <enabled>true</enabled>

            <!-- Truststore, to trust the Kafka server's certificate -->
            <truststore>
                <path>/opt/hivemq/conf/kafka-trust.jks</path>
                <password>truststorepassword</password>
            </truststore>

            <!-- Keystore, when mutual TLS is used -->
            <keystore>
                <path>/opt/hivemq/conf/kafka-key.jks</path>
                <password>keystorepassword</password>
                <private-key-password>privatekeypassword</private-key-password>
            </keystore>

            <!-- Cipher suites supported by the client -->
            <cipher-suites>
                <cipher-suite>TLS_AES_128_GCM_SHA256</cipher-suite>
                <cipher-suite>TLS_AES_256_GCM_SHA384</cipher-suite>
            </cipher-suites>

            <!-- Supported TLS protocols -->
            <protocols>
                <protocol>TLSv1.2</protocol>
            </protocols>

            <!-- If the client should verify the server's hostname -->
            <hostname-verification>true</hostname-verification>
        </tls>

    </kafka-cluster>
</kafka-clusters>

Authentication

Authentication against Kafka servers can be configured per Kafka cluster in the <kafka-cluster> settings.

The HiveMQ Enterprise Extension for Kafka supports the following authentication mechanisms:

By default, the No Authentication option is enabled.
Example authentication configuration
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <plain>
                <username>kafka-user</username>
                <password>kafka-password</password>
            </plain>
        </authentication>

    </kafka-cluster>
</kafka-clusters>

No Authentication

To disable authentication against the Kafka cluster, enter none as your authentication option.

Example no-authentication configuration
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <none/>
        </authentication>

    </kafka-cluster>
</kafka-clusters>

Plain

Plain authentication uses only the username and password for authentication (not recommended).

Example plain authentication configuration
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <plain>
                <username>kafka-user</username>
                <password>kafka-password</password>
            </plain>
        </authentication>

    </kafka-cluster>
</kafka-clusters>

SCRAM SHA256

Authentication with Salted Challenge Response Authentication Mechanism (SCRAM) alleviates most of the security concerns that come with a plain username/password authentication.

Example SCRAM SHA256 authentication configuration
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <scram-sha256>
                <username>kafka-user</username>
                <password>kafka-user-secret</password>
            </scram-sha256>
        </authentication>

    </kafka-cluster>
</kafka-clusters>

SCRAM SHA512

Authentication with Salted Challenge Response Authentication Mechanism (SCRAM) alleviates most of the security concerns that come with a plain username/password authentication.

Example SCRAM SHA256 authentication configuration
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <scram-sha512>
                <username>kafka-user</username>
                <password>kafka-user-secret</password>
            </scram-sha512>
        </authentication>

    </kafka-cluster>
</kafka-clusters>

GSSAPI

The Generic Security Service Application Program Interface (GSSAPI) is used to allow authentication against Kerberos.

If you use GSSAPI to authenticate the HiveMQ Enterprise Extension to your Kafka clusters via Kerberos, the Kafka client in the extension must have administrative rights. If admin rights are not enabled, the extension is not permitted to connect to Kafka.
Example GSSAPI authentication configuration
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <gssapi>
                <key-tab-file>/opt/hivemq/kafka-client.keytab</key-tab-file>
                <kerberos-service-name>kafka</kerberos-service-name>
                <principal>kafka/kafka1.hostname.com@EXAMPLE.COM</principal>
                <store-key>true</store-key>
                <use-key-tab>true</use-key-tab>
            </gssapi>
        </authentication>

    </kafka-cluster>
</kafka-clusters>

The use of a Kerberos ticket cache can be configured as follows:

Example GSSAPI authentication configuration
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <gssapi>
                <use-ticket-cache>true</use-ticket-cache>
            </gssapi>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
Name Default Mandatory Description

use-key-tab

-

Enables the use of a keytab file. This option requires a configured key-tab-file. Allowed values are true or false.

key-tab-file

-

Absolute path to the keytab file.

kerberos-service-name

-

Name of the Kerberos service.

principal

-

The name of the principal that is used. The principal can be a simple username or a service name.

store-key

-

Stores the keytab or the principal key in the private credentials of the subject. Allowed values are true or false.

use-ticket-cache

-

Enables the use of a ticket cache. Allowed values are true or false.

Microsoft Entra ID

Microsoft Entra ID enables secure passwordless connections with Azure Event Hubs for Kafka.

This feature enables several authentication methods, such as:

Regardless of the method, permission to Azure Event Hubs must be granted on Azure. The way you obtain permission varies based on the service you use to run the HiveMQ cluster. For more information, see use passwordless connections with Azure Event Hubs for Kafka on your Azure hosting environment.

To learn more about this feature and all authentication methods Azure supports, see DefaultAzureCredential in the official Azure documentation.
Example Microsoft Entra ID authentication configuration
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <microsoft-entra-id/>
        </authentication>

    </kafka-cluster>
</kafka-clusters>

Kafka Extension Customization

Version 4.4.0 of the HiveMQ Enterprise Extension for Kafka introduces the HiveMQ Kafka Extension Customization SDK.

Kafka Extension Customization SDK

Our flexible API gives you the ability to customize the management of your Kafka topics and implement custom logic for MQTT-to-Kafka messaging. Use the API to programmatically specify sophisticated custom-handling of message transformations between HiveMQ and Kafka.

To enable the use of a custom transformer, you extend the config.xml file of your HiveMQ Enterprise Extension for Kafka.

MQTT-to-Kafka Extension Customization

Table 5. MQTT to Kafka transformer configuration settings
Setting Mandatory Description

id

The unique identifier of the transformer. This string can only contain the following characters abcdefghijklmnopqrstuvwxyz0123456789-_.

cluster-id

The identifier of the referenced Kafka cluster.

mqtt-topic-filters

The list of MQTT topic filters that determines which received MQTT publish messages are forwarded to the transformer.

transformer

The canonical class name of the transformer that is used.

Example MQTT to Kafka transformer configuration
<mqtt-to-kafka-transformers>
    <mqtt-to-kafka-transformer>
        <id>my-transformer</id>
        <cluster-id>cluster01</cluster-id>
        <mqtt-topic-filters>
            <mqtt-topic-filter>transform/#</mqtt-topic-filter>
        </mqtt-topic-filters>
        <transformer>com.hivemq.transformers.MyTransformer</transformer>
    </mqtt-to-kafka-transformer>
</mqtt-to-kafka-transformers>

For detailed information, see HiveMQ Kafka Extension Customization SDK.

Kafka-to-MQTT Extension Customization

Table 6. Kafka to MQTT transformer configuration settings
Setting Mandatory Description

id

The unique identifier of the Kafka-to-MQTT transformer. This string can only contain the following characters abcdefghijklmnopqrstuvwxyz0123456789-_.

cluster-id

The identifier of the referenced Kafka cluster.

kafka-topics

The list of Kafka topics or topic patterns that determines which records will be polled from Kafka.

transformer

The canonical class name of the Kafka-to-MQTT transformer that is used.

Example Kafka to MQTT transformer configuration
<kafka-to-mqtt-transformers>
    <kafka-to-mqtt-transformer>
        <id>transformer01</id>
        <cluster-id>cluster01</cluster-id>
        <transformer>com.hivemq.extensions.kafka.transformer.MyCustom2MqttTransformer</transformer>
        <kafka-topics>
            <kafka-topic>first-kafka-topic</kafka-topic>
            <kafka-topic>second-kafka-topic</kafka-topic>
            <!-- arbitrary amount of kafka topics -->
            <kafka-topic-pattern>first-pattern-*</kafka-topic-pattern>
            <kafka-topic-pattern>second-pattern-*</kafka-topic-pattern>
            <!-- arbitrary amount of kafka topic patterns -->
        </kafka-topics>
    </kafka-to-mqtt-transformer>
</kafka-to-mqtt-transformers>

For detailed information, see HiveMQ Kafka Extension Customization SDK.

Kafka Dashboard in the HiveMQ Control Center

The HiveMQ Enterprise Extension for Kafka adds additional metrics to your HiveMQ control center. This information allows you to monitor the messages that the extension processes. Metrics are created for every action that can be measured.

kafka control center 1710

The Dashboard for the HiveMQ Enterprise Extension for Kafka provides an overview bar and detailed graphs for both messaging directions: from MQTT to Kafka and from Kafka to MQTT.

MQTT-to-Kafka Section

The MQTT-to-Kafka section of the Kafka page provides information on MQTT messages that are sent to Kafka. The overview bar in the MQTT-to-Kafka section shows you the key metrics for all MQTT-to-Kafka mappings at a glance:

kafka producer bar top 1710
Name Value type Description

Current Status

Status

The current operational status of the extension. If manual intervention is required, errors or warnings display here.

Consumed MQTT Messages

Messages per second

The number of inbound MQTT Publish messages per second that the HiveMQ Enterprise Extension for Kafka processes.

Written to Kafka

Messages per second

The number of outbound messages that have already been written to Kafka.

Producer Topic Mappings

Absolute value

The number of configured MQTT-to-Kafka topic mappings.

Kafka Clusters

Absolute value

The number of configured Kafka clusters.

Kafka Brokers

Absolute value

The number of available Kafka brokers that are visible to the extension.

The detailed graphs provide additional insights into the messages the HiveMQ Enterprise Extension for Kafka processes.

The Consumed MQTT Publish Messages graph displays the Inbound MQTT Publish messages per second that are processed by the extension. Information is shown per HiveMQ cluster node.

kafka consumed mqtt 1710
Figure 9. Consumed MQTT Publish Messages

The Messages Written to Kafka graph displays the Outbound messages that are written to Kafka. These messages have been acknowledged by Kafka. Information is shown per Kafka cluster. It is expected that both graphs show an identical overall sum.

kafka messages written 1710
Figure 10. Messages Written to Kafka

Kafka to MQTT Section

The Kafka-to-MQTT section of the Kafka page shows data on the information that is sent from Kafka to MQTT. The overview bar in the Kafka-to-MQTT section shows you the key metrics for all Kafka-to-MQTT mappings at a glance:

kafka consumer bar top 1710
Name Value type Description

Non-deserializable Kafka Messages

Absolute value

The number of Kafka messages that could not be deserialized and used.

Created MQTT messages

Messages per second

The number of inbound MQTT Publish messages per second that the HiveMQ Enterprise Extension produces.

Polled from Kafka

Messages per second

The number of outbound messages that are polled from Kafka.

Consumer Topic Mappings

Absolute value

The number of configured Kafka-to-MQTT topic mappings.

Kafka Clusters

Absolute value

The number of configured Kafka clusters.

Kafka Brokers

Absolute value

The number of available Kafka brokers that are visible to the extension.

The Created MQTT Publish Messages graph displays the MQTT Publish messages per second that are created by the extension. Information is shown per HiveMQ cluster node.

kafka created mqtt 1710

The Messages Polled From Kafka graph displays the inbound messages that are polled from Kafka. Information is shown per Kafka cluster. It is expected that the Created MQTT Publish Messages graph shows an identical or higher overall sum than the Messages Polled From Kafka graph. This discrepancy is due to scenarios in which multiple MQTT messages are created from a single Kafka record.

kafka messages polled 1710

Kafka Extension Metrics

The HiveMQ Enterprise Extension Kafka provides several metrics that can be monitored to observe how the extension behaves over time.

Name Type Description

kafka-extension.kafka-brokers.total.count

Counter

The total number of Kafka brokers to which the extension connects.

kafka-extension.producer.count

Counter

The total number of Kafka producers.

kafka-extension.producer.bytes.incoming.total

Counter

The total number of bytes read from Kafka.

kafka-extension.producer.bytes.outgoing.total

Counter

The total number of bytes written to Kafka.

kafka-extension.total.failed.count

Counter

The total number of records the extension cannot produce to Kafka.

kafka-extension.total.ignored.count

Counter

The total number of records the extension ignored.

kafka-extension.total.retry.count

Counter

The total number of records that were retried to be produced to Kafka.

kafka-extension.total.send.count

Counter

The total number of records the extension attempts to send to Kafka.

kafka-extension.total.success.count

Counter

The total number of records the extension successfully produced to Kafka.

kafka-extension.kafka-cluster.[kafka-cluster].success.count

Counter

The number of records the extension successfully produces to Kafka cluster.

kafka-extension.topic-mapping.[mapping-id].latency

Timer

The amount of time the extension requires to produce publishes to Kafka per topic mapping.

kafka-extension.topic-mapping.[mapping-id].failed.count

Counter

The number of records that the extension cannot produce to Kafka per topic mapping.

kafka-extension.topic-mapping.[mapping-id].retry.count

Counter

The number of records the extension attempts to produce to Kafka more than once per topic mapping.

kafka-extension.topic-mapping.[mapping-id].send.count

Counter

The number of records the extension sends to Kafka per topic mapping.

kafka-extension.topic-mapping.[mapping-id].success.count

Counter

The number of records the extension successfully produces to Kafka per topic mapping.

kafka-extension.kafka-to-mqtt-mapping.total.count

Counter

The number of mqtt-to-kafka-mappings.

kafka-extension.kafka-to-mqtt-transformer.total.count

Counter

The number of mqtt-to-kafka-mappings.

kafka-extension.kafka-to-mqtt-mapping.kafka-brokers.count

Counter

The number of connected Kafka brokers for kafka-to-mqtt messaging.

kafka-extension.kafka-to-mqtt.failure.rate-limiting.exceeded.count

Counter

The total number of times the rate limit of the PublishService is exceeded.

kafka-extension.kafka-to-mqtt.total.polled-record.count

Counter

The total number of Kafka records the Kafka to MQTT mappings and transformers receive from a Kafka cluster.

kafka-extension.kafka-consumer.incoming.record.count

Counter

The total number of Kafka records all Kafka to MQTT mappings and transformers in the extension receive.

kafka-extension.kafka-to-mqtt.total.failed.count

Counter

The total number of Kafka records that the extension cannot forward to HiveMQ of all Kafka to MQTT mappings and transformers.

kafka-extension.kafka-to-mqtt.total.ignored.count

Counter

The total number of MQTT messages for which the extension did not create a Kafka record.

kafka-extension.kafka-to-mqtt.total.retry.count

Counter

The total number of Kafka records all Kafka to MQTT mappings and transformers retry.

kafka-extension.kafka-to-mqtt.total.send-publishes.count

Counter

The total number of MQTT messages all Kafka to MQTT mappings and transformers publish from the received Kafka records.

kafka-extension.kafka-to-mqtt-mapping.[schema-registry-name].schema.retry.count

Counter

The number of schema retrieval retry attempts per schema registry. For example, when the schema registry is temporarily unavailable or authentication or authorization fails.

kafka-extension.confluent-schema-registry.[schema-registry-name].auth-failed.count

Counter

The number of failed authentication attempts per schema registry.

kafka-extension.confluent-schema-registry.[schema-registry-name].auth-successful.count

Counter

The number of successful authentication attempts per schema registry.

kafka-extension.kafka-cluster.[kafka-cluster].polled-record.count

Counter

The number of records the extension polls from Kafka per cluster.

kafka-extension.kafka-to-mqtt-mapping.[mapping-id].latency

Timer

The amount of time the extension requires to produce publishes from Kafka records per topic mapping.

kafka-extension.kafka-to-mqtt-mapping.[mapping-id].invalid-messages.count

Counter

The number of invalid records that are dropped per topic mapping. For example, schema not found, does not match schema, does not contain configured field mapped by path syntax.

kafka-extension.kafka-to-mqtt-mapping.[mapping-id].polled-record.count

Counter

The number of records the extension polls from Kafka per topic mapping.

kafka-extension.kafka-to-mqtt-mapping.[mapping-id].send-publishes.count

Counter

The number of publishes the extension sends to MQTT per topic mapping.

Kafka Extension Customization SDK Metrics

The following metrics are available when you implement an MQTT to Kafka or Kafka to MQTT transformer with the HiveMQ Kafka Extension Customization SDK:

Name Type Description

kafka-extension.transformer.[transformer-id].latency

Timer

The amount of time the extension requires to transform an MQTT publish message to a Kafka topic per transformer.

kafka-extension.transformer.[transformer-id].failed.count

Counter

The number of records that the extension cannot produce to Kafka per transformer.

kafka-extension.transformer.[transformer-id].ignored.count

Counter

The number of MQTT messages for which the extension did not create a Kafka record per transformer.

kafka-extension.transformer.[transformer-id].retry.count

Counter

The number of records the extension attempts to produce to Kafka more than once per transformer.

kafka-extension.transformer.[transformer-id].send.count

Counter

The number of records the extension attempts to send to Kafka per transformer.

kafka-extension.transformer.[transformer-id].success.count

Counter

The number of records the extension successfully produces to Kafka per transformer.

kafka-extension.kafka-to-mqtt-transformer.[transformer-id].latency

Timer

The amount of time the extension requires to produce publishes from Kafka records per transformer.

kafka-extension.kafka-to-mqtt-transformer.[transformer-id].failed-transformation.count

Counter

The total number of unsuccessful transformation attempts, including multiple failures for the same Kafka record.

kafka-extension.kafka-to-mqtt-transformer.[transformer-id].ignored.count

Counter

The number of times the selected transformer returned an empty list of MQTT publishes.

kafka-extension.kafka-to-mqtt-transformer.[transformer-id].polled-record.count

Counter

The number of Kafka records the selected transformer receives from a Kafka cluster.

kafka-extension.kafka-to-mqtt-transformer.[transformer-id].retry.count

Counter

The number of times the selected transformer retries to transform Kafka records to MQTT publishes.

kafka-extension.kafka-to-mqtt-transformer.[transformer-id].send-publishes.count

Counter

The number of published MQTT messages the selected transformer created from Kafka Records.

Support

If you need help with the HiveMQ Enterprise Extension for Kafka or have suggestions on how we can improve the extension, please contact us at contact@hivemq.com.