Skip to content

The mqtt_publish node

Publish data to a mqtt-broker. Incoming data is converted to JSON before sending.

If the save() parameter is given, every message first gets stored to an on-disk queue before sending, this way we can make sure no message gets lost when disconnected from the broker.

Note: This node is a sink node and does not output any flow-data, therefore any node connected to this node will not get any data.

Example

def topic = 'top/track/pressure'

|mqtt_publish() 
.topic(topic)
.retained()

Using a lambda expression for the topic:

def topic_base = 'top/'

|mqtt_publish()
.topic_lambda(lambda: str_concat([topic_base, "type", '/', "measurement"])

Here the topic string is built with a lambda expression using the topic_base declaration, the string '/' and two fields from the current data_point. The topic string may be a different one with every data_point that gets published.


Sequence check

since version 1.5.5

This node can add meta data to each data-item it sends to an mqtt broker. This data will include a sequence number that will be checked by a receiving mqtt_subscribe node. The check will be performed after n number of items received by the mqtt_subscribe node to see, if data has been lost along the way. In case of missing data, a new mqtt message will be sent out to report this. To enable this sequence check, the config value seq_check.enable/FAXE_SEQ_CHECK_ENABLE must be set to on. For more information see configuration.

Example data_point with meta data:

{
    "ts": 1748438360000,
    "data": {
        ...
        }
    },
    "_meta": {
        "topic": "data/test/seq_check/v1",
        "started": false,
        "seq": 4392,
        "nodeid": "mqtt_publish3",
        "flowid": "test1",
        "device": "3a79daae29a28b4e2f754720a2bcd31c"
    }
}

Parameters

Parameter Description Default
host( string ) Ip address or hostname of the broker config: mqtt.host/FAXE_MQTT_HOST
port( integer ) The broker's port config: mqtt.port/FAXE_MQTT_PORT
user( string ) username config: mqtt.user/FAXE_MQTT_USER
pass( string ) password config: mqtt.pass/FAXE_MQTT_PASS
client_id( string ) mqtt client id, defaults to a combination of flow-id and node-id undefined
topic( string ) mqtt topic to use undefined
topic_lambda( lambda ) mqtt topic to use evaluated via a lambda expression undefined
topic_field( string ) [since 0.19.9] path to a field in the current data-item, who's value should be used as the topic undefined
qos( integer ) Quality of service, one of 0, 1 or 2 1
retained( is_set ) whether the message should be retained on the broker false (not set)
save( is_set ) send save (on-disk queuing) false (not set)
ssl( is_set ) whether to use ssl config: mqtt.ssl.enable/FAXE_MQTT_SSL_ENABLE
max_mem_queue_size( integer ) Number of data_items that can be stored in memory, in case a connection is not available 700
use_pool( bool ) Whether to use a connection pool for the mqtt connection. If false, then the node will use its own connection. config: mqtt_pub_pool.enable/FAXE_MQTT_PUB_POOL_ENABLE
add_seq_check( bool ) whether to add meta data to each data_item sent to an mqtt broker (see ) for more info config: seq_check.enable/FAXE_SEQ_CHECK_ENABLE
seq_check_topic_depth( integer ) see for more info config: seq_check.topic_depth/FAXE_SEQ_CHECK_TOPIC_DEPTH

topic or topic_lambda or topic_field must be provided.