Data Hub Behavior Models
The HiveMQ Data Hub uses state machines to model the behavior of an MQTT client as it moves through your HiveMQ broker.
Behavior validation relies on the interaction between predefined state machine models and policies:
State machines offer a simple and powerful way to represent the behavior of an MQTT client.
Here are some key aspects of a state machine as they apply to HiveMQ Data Hub:
-
States: Shows the current state of the MQTT client in the state machine.
-
Initial State: The starting point of the state machine which will be entered as soon as a client is matched by the policy.
-
Intermediate State: A transient state that the state machine can pass through after the initial state, but before reaching a terminal state.
-
Terminal State: The terminal state type comprises two subtypes: Success (indicating successful behavior) and Failed (signifying a failed behavior).
-
-
Transitions: The movement of the MQTT client from one state to another. Each transition consists of a from state, a to state and a specific event.
-
Actions: Optional pipelines of functions that the event in a transition triggers.
Mqtt.events Model
The Mqtt.events
behavior model allows you to intercept specific MQTT packets for further actions.
The model itself does not enforce any particular behavior and is very useful in debugging scenarios.
States | Type | Description |
---|---|---|
|
|
The starting point of the model which is entered as soon as a client is matched by the policy. |
|
|
The state models that a client is successfully connected to the broker. After a connection is established, every MQTT packet loops into the |
|
|
Once the client connection is closed, the model transitions into the |
Mqtt.events
model{
"id": "drop-subscribes",
"matching": {
"clientIdRegex": ".*"
},
"behavior": {
"id": "Mqtt.events"
},
"onTransitions": [
{
"fromState": "Any.*",
"toState": "Any.*",
"Mqtt.OnInboundSubscribe": {
"pipeline": [
{
"id": "log-subscribe-attempt",
"functionId": "System.log",
"arguments": {
"level": "DEBUG",
"message": "A client is subscribing to a topic but dropped by the policy."
}
},
{
"id": "drop-subscribe",
"functionId": "Mqtt.drop",
"arguments": {}
}
]
}
}
]
}
The example behavior policy matches every client ID (as indicated with the regular expression .*
).
The behavior model Mqtt.events
is instantiated and has no further arguments.
There is one action defined for the pinpointed transition Any.*
to Any.*
on the Mqtt.OnInboundSubscribe
event.
This transition is triggered for each SUBSCRIBE the client sends.
The action of the policy executes the following functions in the defined order:
-
The
System.log
function prints a log message with the log levelDEBUG
. -
The
Mqtt.drop
function drops the MQTT SUBSCRIBE packet.
As a result, no client is able to subscribe.
Publish.duplicate Model
The Publish.duplicate
model identifies consecutive identical client messages to prevent unnecessary resource consumption.
For example, this model can save bandwidth and storage costs by dropping duplicate readings from sensors such as a temperature sensor that sends the same value repeatedly.
The following example illustrates the use of a behavior policy to drop MQTT messages.
States | Type | Description |
---|---|---|
|
|
The starting point of the model which is entered as soon as a client is matched by the policy. |
|
|
The state models that a client has successfully connected to the broker. |
|
|
Indicates that either the client has sent its first message or two consecutive messages are different. |
|
|
Indicates that the client has sent a message which is equal to the previous one. |
|
|
When a client has sent two equal consecutive messages at any point in time and disconnects the state |
|
|
When a client has always sent different consecutive messages and disconnects the state |
Publish.duplicate
model{
"id": "drop-duplicate-messages-policy",
"matching": {
"clientIdRegex": ".*"
},
"behavior": {
"id": "Publish.duplicate"
},
"onTransitions": [
{
"fromState": "Any.*",
"toState": "Duplicated",
"Mqtt.OnInboundPublish": {
"pipeline": [
{
"id": "count-duplicate-messages",
"functionId": "Metrics.Counter.increment",
"arguments": {
"metricName": "repetitive-messages-count",
"incrementBy": 1
}
},
{
"id": "log",
"functionId": "System.log",
"arguments": {
"message": "Client is sending the same payload and the broker drops the message.",
"level": "INFO"
}
},
{
"id": "drop",
"functionId": "Mqtt.drop",
"arguments": {}
}
]
}
}
]
}
In this example, the behavior policy is configured to match every client and refers to the Publish.duplicate behavior model.
Additionally, each transition to the Duplicated
state is identified using the state filter Any.*
on the Mqtt.OnInboundPublish event
.
Effectively, this means that whenever a client sends a PUBLISH message that is identical to the previously sent message (with equal payload and topic), the defined action is triggered.
Here, the action increments the metric com.hivemq.data-hub.custom.counters.repetitive-messages-count
, followed by logging a message, and ultimately dropping the PUBLISH packet.
As a result, all duplicate PUBLISH messages are discarded.
Publish.quota Model
The Publish.quota
model tracks the number of MQTT PUBLISH messages a client sends after a client connects to the broker to identify unusual behavior.
Unusual behavior can include a client sending too few or too many messages before disconnecting.
Detecting a client that sends too few messages is particularly useful to identify clients that follow a pattern of connecting, publishing a single message, and disconnecting.
If this pattern occurs repeatedly at scale it can inflict unnecessary strain on infrastructure resources in HiveMQ and your downstream services.
Argument | Type | Description |
---|---|---|
|
|
Defines the minimal number of published messages that must be reached before disconnecting. |
|
|
Defines the maximal number of published messages that most be reached before disconnecting. |
When you configure a publish-quota model, at least one of the available arguments must be present.
Data Hub uses the default value for the missing parameter.The default value for minPublishes is 0 .
The default value for maxPublishes is UNLIMITED .
|
States | Type | Description |
---|---|---|
|
|
The starting point of the model which is entered as soon as a client is matched by the policy. |
|
|
The state models that a client has successfully connected to the broker. |
|
|
This state is entered when a client sends the first PUBLISH message, and the client remains in this state for each consecutive PUBLISH until it disconnects or violates the specified behavior. |
|
|
When the client disconnects after sending too few PUBLISH messages, or a connected client sends too many PUBLISH messages, the model transitions into the |
|
|
When a client disconnects after sending the configured amount of PUBLISH messages, the model transitions into the |
Publish.quota
model{
"id": "duplicate-policy",
"matching": {
"clientIdRegex": ".*"
},
"behavior": {
"id": "Publish.quota",
"arguments": {
"minPublishes": 2
}
},
"onTransitions": [
{
"fromState": "Any.*",
"toState": "Violated",
"Event.OnAny": {
"pipeline": [
{
"id": "log",
"functionId": "System.log",
"arguments": {
"level": "INFO",
"message": "The client does not send the configured amount of PUBLISH messages."
}
},
{
"id": "increment-metric",
"functionId": "Metrics.Counter.increment",
"arguments": {
"metricName": "incorrect-mqtt-messages",
"incrementBy": 1
}
}
]
}
}
]
}
In this example, the behavior policy is set to apply to every client and is associated with the Publish.quota
behavior model.
The behavior model is configured with a minPublishes
setting of 2
, indicating that the model will verify that the client sends a minimum of 2 PUBLISH messages.
The maximum allowed number of PUBLISH messages is unrestricted.
Additionally, any transition from any state (State.*
) to the Violated
state triggered by any event (Event.OnAny
) is configured to specify an action when it is executed.
The action logs a message and increments a metric (com.hivemq.data-hub.custom.counters.incorrect-mqtt-messages
) to track the total number of clients that violate the behavior.