HiveMQ Pulse Expression Language

The HiveMQ Pulse Expression Language is a powerful, domain-specific language designed for real-time data processing and analysis in smart manufacturing and IoT environments. It enables users to write computational expressions that operate on sensor data, metrics, and time-series values.

Getting Started with HiveMQ Pulse Expression Language

The expression language provides the following capabilities:

  • Mathematical operations for numerical computations

  • Logical operations for conditional processing

  • Statistical functions for data analysis

  • Window functions for time-series analysis

  • Variable system for referencing live data streams

  • Type safety with automatic type promotion

    • Statistical functions for data analysis - Window functions for time-series analysis - Variable system for referencing live data streams - Type safety with automatic type promotion

Basic Syntax

Literals

// Integers
42
1000000000000000000000000000000

// Decimals
3.14
0.123456789012345678901234567890

// Booleans
true
false

Variables

Variables reference live data sources (sensors, metrics, computed values):

temperature_sensor_1
motor_speed
production_count
quality_index

Arithmetic Operations

// Basic arithmetic
temperature + 10
pressure * 1.5
flow_rate / 60
production_count % 100

// Complex expressions
(temperature - 32) * 5 / 9    // Fahrenheit to Celsius
motor_rpm * torque / 5252     // Calculate horsepower

Comparison Operations

temperature > 85.0           // Temperature threshold check
pressure >= operating_min    // Minimum pressure validation
quality_score == target      // Quality target achievement
error_count != 0             // Error detection

Logical Operations

// Logical AND/OR
temperature > 80 && humidity < 60
motor_running || backup_motor_running

// Logical NOT
!system_fault

// Complex conditions
(temperature > critical_temp || pressure > max_pressure) && !emergency_stop

Conditional Expressions (Ternary Operator)

// Basic conditional
temperature > 75 ? "HOT" : "NORMAL"

// Nested conditionals
quality_score > 95 ? "EXCELLENT" :
quality_score > 80 ? "GOOD" : "NEEDS_IMPROVEMENT"

// Numeric conditionals
error_count > 0 ? alert_level * 2 : normal_level

Built-in Functions

Basic Mathematical Functions

ABS - Absolute Value

Returns the absolute value of a number.

ABS(-15.5)        // Returns 15.5
ABS(motor_torque) // Absolute torque value
ROUND - Round to Decimal Places

Returns a number rounded to a specified number of decimal places. This function utilizes standard mathematical rounding (HALF_UP), where 0.5 rounds away from zero.

The ROUND function requires Agent version 2.2.2 or higher.

Syntax: ROUND(x, n)

  • x - The number to round (BIG_DECIMAL or BIG_INT)

  • n - The number of decimal places (must be a non-negative integer)

//Rounding examples
ROUND(3.14159, 2)              // Returns 3.14
ROUND(sensor_value / 100, 3)   // Returns 0.985 (when sensor_value = 98.5)
ROUND(-2.5, 0)                 // Returns -3 (HALF_UP rounds away from zero)

// Manufacturing use cases
ROUND(temperature_celsius, 1)   // Round temperature to 1 decimal place
ROUND(efficiency_ratio * 100, 2) // Convert ratio to percentage with 2 decimals
ROUND(production_cost, 2)       // Round cost to 2 decimal places (cents)
Use the ROUND function to control calculation precision and avoid confusing BigDecimal trailing zeros (for example, 0.9850000000) in percentage and ratio conversions.

Statistical Functions

COUNT - Count Values

Counts the number of values in a window.

COUNT(sensor_readings)     // Number of readings in time window
COUNT(production_events)   // Number of production events
FIRST - First Value

Returns the first (oldest) value in a time window.

FIRST(temperature_window)  // First temperature reading
FIRST(pressure_samples)    // Initial pressure value
LAST - Last Value

Returns the last (newest) value in a time window.

LAST(sensor_data)         // Most recent sensor reading
LAST(quality_measurements) // Latest quality measurement
MAX / MAX_WINDOW - Maximum Value

Returns the maximum value from arguments or a window.

MAX(temp1, temp2, temp3)      // Max of individual values
MAX_WINDOW(temperature_data)  // Max from time window
MIN / MIN_WINDOW - Minimum Value

Returns the minimum value from arguments or a window.

MIN(pressure_A, pressure_B)   // Min of two pressures
MIN_WINDOW(vibration_data)    // Min vibration in window
MEDIAN - Median Value

Returns the median value from a data window.

MEDIAN(response_times)    // Median response time
MEDIAN(quality_scores)    // Median quality score
SMA - Simple Moving Average

Calculates the simple moving average of values in a window.

SMA(temperature_readings) // Average temperature over time
SMA(production_rate)      // Average production rate
STDDEV - Standard Deviation

Calculates the standard deviation of values in a window.

STDDEV(measurement_data)  // Measurement variability
STDDEV(cycle_times)       // Process consistency metric

State and Event Functions

State and event functions detect state transitions in your data and measure time or counts between those transitions. Use these functions to track how long equipment operates in a specific condition or to count events during a particular state.

Function names are case-insensitive. For example, TimeAtState, TIMEATSTATE, and timeatstate are equivalent.
TimeAtState - Timestamp at State Observation

Emits a timestamp when the payload matches a specified state. If the predicate evaluates to false, the datapoint is ignored, and no value is emitted.

Syntax: TimeAtState(<STATE_ACTIVATION_PREDICATE>, <TIMESTAMP_SOURCE>)

  • STATE_ACTIVATION_PREDICATE - A boolean predicate expression evaluated against input fields. For example, voltage > 100 && id > 30.

  • Timestamp source - EVENT_TIME(field) or INGESTION_TIME()

Returns: DateTime when the predicate is true. If the predicate evaluates to false, the datapoint is ignored, and no value is emitted.

Output tag type: DateTime

Constraints:

  • Must be used as the root-level function.

  • Cannot be nested inside other operations.

  • All referenced fields must belong to the same input tag.

// Emit the event timestamp when voltage exceeds 100 and id exceeds 30
TimeAtState(voltage > 100 && id > 30, EVENT_TIME(tsField))

// Emit the broker ingestion timestamp when motor temperature is critical
TimeAtState(motor_temp > 85.0, INGESTION_TIME())
BetweenState - Aggregate Values During a State

Accumulates values while a state is active (predicate is true) and emits an aggregated result when the state deactivates (predicate becomes false). The function automatically determines the state deactivation condition as the negation of the activation predicate.

Syntax: BetweenState(<STATE_ACTIVATION_PREDICATE>, <AGGREGATION_FUNCTION>)

  • STATE_ACTIVATION_PREDICATE - A boolean predicate expression over input fields

  • AGGREGATION_FUNCTION - TimeElapsed(…​) or CountBetween()

Returns: Depends on the aggregation function (Integer for both TimeElapsed and CountBetween).

Constraints:

  • Must be used as the root-level function.

  • Cannot be nested inside other operations.

  • All referenced fields must belong to the same input tag.

With TimeElapsed:

Computes the elapsed time in milliseconds (maximum timestamp minus minimum timestamp) across all accumulated datapoints while the state is active.

Syntax: BetweenState(<predicate>, TimeElapsed(<TIMESTAMP_SOURCE>))

Output tag type: Integer

// Measure how long voltage stays above 100 using event timestamps
BetweenState(voltage > 100, TimeElapsed(EVENT_TIME(tsField)))

// Measure duration of high-temperature state using ingestion time
BetweenState(motor_temp > 85.0, TimeElapsed(INGESTION_TIME()))

With CountBetween:

Counts the number of datapoints where the predicate evaluates to true. The count does not include the datapoint that deactivates the state.

Syntax: BetweenState(<predicate>, CountBetween())

Output tag type: Integer

// Count how many datapoints had voltage above 100
BetweenState(voltage > 100, CountBetween())

// Count readings during high-pressure state
BetweenState(pressure > max_safe_pressure, CountBetween())
EVENT_TIME - Event Timestamp Source

Identifies a payload field as the event timestamp source and returns the value as a DateTime.

Syntax: EVENT_TIME(<field>)

  • field - A single scalar field of type DateTime

Returns: DateTime

The input tag field must be configured with a DateTime data type. EVENT_TIME does not accept fields of type Integer or Decimal.

Usage: Use as the timestamp source inside TimeAtState and TimeElapsed.

// Reference an event timestamp field
EVENT_TIME(tsField)
EVENT_TIME(event_timestamp)
INGESTION_TIME - Broker Ingestion Timestamp

Returns the MQTT broker ingestion timestamp of the current datapoint.

Syntax: INGESTION_TIME()

Arguments: None

Returns: DateTime (epoch milliseconds)

Usage: Use as the timestamp source inside TimeAtState and TimeElapsed.

// Use broker ingestion time as the timestamp source
TimeAtState(voltage > 100, INGESTION_TIME())
BetweenState(motor_temp > 85.0, TimeElapsed(INGESTION_TIME()))
TimeElapsed - Elapsed Time Aggregation

Computes the elapsed time in milliseconds across accumulated timestamps. Use as the aggregation function inside BetweenState.

Syntax: TimeElapsed(<TIMESTAMP_SOURCE>)

  • A single DateTime scalar (from EVENT_TIME or INGESTION_TIME)

Returns: Integer (elapsed time in milliseconds)

Usage: Only valid inside BetweenState.

// Elapsed time based on event timestamps
BetweenState(voltage > 100, TimeElapsed(EVENT_TIME(tsField)))

// Elapsed time based on ingestion timestamps
BetweenState(pressure > 200, TimeElapsed(INGESTION_TIME()))
CountBetween - Count Datapoints Between State Transitions

Counts the number of datapoints between state transitions. Use as the aggregation function inside BetweenState.

Syntax: CountBetween()

Arguments: None

Returns: Integer (count of datapoints where the predicate evaluates to true)

Usage: Only valid inside BetweenState.

// Count datapoints during high-voltage state
BetweenState(voltage > 100, CountBetween())

Smart Manufacturing Examples

Temperature Monitoring

// Convert Fahrenheit to Celsius and check if within range
celsius_temp = (fahrenheit_sensor - 32) * 5 / 9
celsius_temp >= 18 && celsius_temp <= 24

// Temperature stability check using moving average
temperature_stable = ABS(current_temp - SMA(temp_history)) < 2.0

// Critical temperature alert
temperature_alert = temperature > critical_threshold ?
                   "CRITICAL" : temperature > warning_threshold ?
                   "WARNING" : "NORMAL"

Production Line Monitoring

// Calculate Overall Equipment Effectiveness (OEE)
availability = actual_runtime / planned_runtime
performance = actual_output / target_output
quality_rate = good_parts / total_parts
oee = availability * performance * quality_rate

// Display OEE as a clean percentage with 2 decimal places
oee_percentage = ROUND(oee * 100, 2)  // Returns 85.47 instead of 85.4700000000

// Production rate analysis
current_rate = COUNT(production_events) / time_window_hours
rate_variance = STDDEV(hourly_production_rates)
rate_consistent = rate_variance < acceptable_variance

// Track efficiency trends with rounded display values
efficiency_improving = LAST(efficiency_window) > FIRST(efficiency_window)
current_efficiency = ROUND(LAST(efficiency_window) * 100, 1)  // Returns 92.5%

Quality Control

// Statistical Process Control
measurement_average = SMA(quality_measurements)
measurement_stddev = STDDEV(quality_measurements)
upper_control_limit = measurement_average + (3 * measurement_stddev)
lower_control_limit = measurement_average - (3 * measurement_stddev)

// Out of control detection
out_of_control = measurement > upper_control_limit ||
                 measurement < lower_control_limit

// Quality trend analysis
quality_trend = LAST(quality_scores) - FIRST(quality_scores)
quality_improving = quality_trend > 0

Predictive Maintenance

// Vibration analysis
vibration_rms = SMA(vibration_readings)
vibration_peak = MAX_WINDOW(vibration_data)
vibration_ratio = vibration_peak / vibration_rms

// Maintenance alert conditions
maintenance_needed = vibration_ratio > 3.0 ||
                    temperature > normal_temp + 10 ||
                    efficiency < normal_efficiency * 0.8

// Bearing condition monitoring
bearing_frequency = 1750 // RPM converted to Hz
bearing_amplitude = MAX_WINDOW(fft_analysis_at_bearing_freq)
bearing_condition = bearing_amplitude < warning_level ? "GOOD" :
                   bearing_amplitude < critical_level ? "WATCH" : "REPLACE"

Energy Management

// Power consumption analysis
power_average = SMA(power_consumption)
power_efficiency = production_output / power_average
energy_cost = power_consumption * electricity_rate

// Peak demand management
demand_approaching_peak = power_consumption > peak_threshold * 0.9
load_shed_required = power_consumption > peak_limit

// Energy efficiency trending
efficiency_last_hour = SMA(recent_efficiency_data)
efficiency_yesterday = SMA(yesterday_efficiency_data)
efficiency_improvement = efficiency_last_hour - efficiency_yesterday

Process Control

// PID-like control logic
setpoint_error = target_value - actual_value
control_output = proportional_gain * setpoint_error

// Flow rate control
flow_deviation = ABS(actual_flow - target_flow)
flow_adjustment_needed = flow_deviation > tolerance

// Pressure regulation
pressure_trend = LAST(pressure_window) - FIRST(pressure_window)
pressure_rising = pressure_trend > 0
pressure_action_required = pressure > max_safe_pressure ||
                          (pressure_rising && pressure > warning_level)

Data Types

The expression language supports these data types:

  • BIG_INT: Arbitrary precision integers for counting, indexing

  • BIG_DECIMAL: Arbitrary precision decimals for measurements, calculations

  • BOOLEAN: True/false values for conditions, flags

  • STRING: Text values for labels, states (limited support)

  • DATETIME: Timestamp values for date and time data. Supports multiple serialization formats including Unix timestamps in milliseconds, Unix timestamps in seconds, and ISO-8601 strings (used by EVENT_TIME, INGESTION_TIME, and TimeAtState)

  • WINDOW: Time-series data collections for statistical analysis

Variable Naming Conventions

For smart manufacturing applications, use descriptive variable names:

// Sensor readings
temperature_zone_1
pressure_hydraulic_main
vibration_motor_bearing_a

// Production metrics
parts_per_minute
cycle_time_seconds
downtime_minutes_today

// Quality measurements
dimensional_tolerance_mm
surface_roughness_ra
defect_rate_percentage

// Process parameters
spindle_speed_rpm
feed_rate_mm_min
coolant_flow_lpm

Best Practices

  1. Use descriptive names: motor_temperature instead of temp1

  2. Include units in names: pressure_psi, flow_rate_gpm

  3. Group related variables: zone_1_temp, zone_2_temp

  4. Use consistent naming: snake_case for multi-word names

  5. Validate ranges: Always check sensor values are within expected ranges

  6. Handle edge cases: Use conditional logic for startup, shutdown conditions

  7. Document thresholds: Make alarm and warning levels explicit

Integration with HiveMQ Pulse

The expression language integrates seamlessly with HiveMQ Pulse’s data pipeline:

  • Variables automatically map to configured data sources

  • Results can trigger actions, alerts, or data storage

  • Real-time evaluation processes expressions as data arrives

  • Historical analysis works with time-windowed data

  • Scalable execution handles high-frequency industrial data streams

This powerful combination enables sophisticated real-time analytics for smart manufacturing, predictive maintenance, quality control, and process optimization use cases.