Skip to content

The amqp_consume node

Consume data from an amqp-broker like RabbitMQ.

This node accepts regular amqp routing keys as well as MQTT style topic strings for bindings/routing_key.

In safe mode

Once a data-item is received by the node, it will be immediately stored in an on-disk queue for data-safety. Only after this will the item be acknowledged to the amqp broker.

Message deduplication

If the amqp correlation-id property is set (to a unique value per message), this node can perform efficient message deduplication.

See amqp_publish for details on this.


Prefetch count, ack_every and dedup_size

For a description of these settings, see table below.

As they relate to one another in some kind, here is a rule of thumb for how to set ack_every and dedup_size when prefetch is changed:

  • set ack_every to one third of prefetch
  • set dedup_size to 3 times the prefetch value

Example: prefetch = 100, ack_every = 35, dedup_size = 300

Queue migration

Since v1.4.2

This feature can be used to migrate the flow of data to a new queue.

The reasons for migrating a queue could be:

  • Rename a queue (maybe to use a different policy)
  • Use a different type of queue
  • Migrate a queue to a different vhost

For this to work without losing any data, that is still living in the existing queue, the node will temporarily setup a second consumer for this queue and consumes from both queues in parallel, until it sees the same messages coming in on both queues (duplicates will be removed). At this point the consumer for the existing queue will be stopped and the queue will be deleted from the broker. If a flow that is setup to do a queue migration is restarted after the migration has already finished, the takeover_queue is not declared again, because the takeover_queue declaration is done in passive mode, which means that a queue can only be used, if it already exists on the broker.

New node parameters are added (See parameters table below):

  • takeover: enable/disable queue takeover mode
  • takeover_queue: Name of the existing queue from which to takeover the data stream.
  • takeover_queue_type: "x-queue-type" amqp argument
  • takeover_queue_prefix: prefix for takeover queues
  • takeover_vhost

All other parameters are the same for both consumers, except:

  • queue_prefix: Does not affect the takeover_queue, you must use the full (existing) queue name.
  • exchange and exchange_prefix: This will always be the same for both consumers.
  • consumer_tag: a postfix will be added for the take-over consumer.

When vhost and takeover_vhost are different, you must make sure, that the data is published to both vhosts. The name of the exchanges must be the same on each vhost. Exchanges and queues are specific to a vhost, so we cannot bind a queue on one vhost to an exchange on another vhost.


At the moment this node can only set up and work with topic exchanges.

Examples

|amqp_consume()
.host('deves-amqp-cluster1.internal') 
.bindings('my.routing.key')
.exchange('x_xchange')
.queue('faxe_test')
.dt_field('UTC-Time')
.dt_format('float_micro')

|amqp_consume()
.host('deves-amqp-cluster1.internal') 
.bindings('my.routing.key')
.exchange('x_xchange')
.queue('faxe_test')
.queue_type('quorum')
.takeover_queue('existing_queue') 
.dt_field('UTC-Time')
.dt_format('float_micro')

Migrate from a queue named 'exisiting_queue' to a new queue named 'faxe_test' with type 'quorum'.

Parameters

Parameter Description Default
host( string ) Ip address or hostname of the broker config: amqp.host/FAXE_AMQP_HOST
port( integer ) The broker's port config: amqp.port/FAXE_AMQP_PORT
user( string ) AMQP user config: amqp.user/FAXE_AMQP_USER
pass( string ) AMQP password config: amqp.pass/FAXE_AMQP_PASS
vhost( string ) vhost to connect to on the broker '/'
vhost_prefix( string ) prefix for the vhost and takeover_vhost name, will be applied only, if vhost or takeover_vhost is not the default of '/' config: rabbitmq.vhost_prefix/FAXE_RABBITMQ_VHOST_PREFIX
routing_key( string ) deprecated routing key to use for queue binding undefined
bindings( string_list ) list of queue bindings keys []
queue( string ) name of the queue to bind to the exchange FlowName + '_' + NodeName
queue_prefix( string ) prefix for the queue-name that will be ensured to exist for queue config: rabbitmq.queue_prefix/FAXE_RABBITMQ_QUEUE_PREFIX
queue_type( string ) Queue type, Valid values are: '', 'quorum', 'classic'. The value will be used for the "x-queue-type" argument while declaring a queue. With '', the type of the queue will be the default type defined for the vhost. ''
exchange( string ) name of the exchange to bind to the source exchange FlowName + '_' + NodeName
exchange_prefix( string ) prefix for the exchange-name that will be ensured to exist for exchange config: rabbitmq.exchange_prefix/FAXE_RABBITMQ_EXCHANGE_PREFIX
prefetch( integer ) prefetch count to use 70
consumer_tag( string ) Identifier for the queue consumer 'c_' + FlowName + '_' + NodeName
ack_every( integer ) number of messages to consume before acknowledging them to the broker 20
ack_after( duration ) timeout after which all currently not acknowledged messages will be acknowledged, regardless of the ack_every setting 5s
use_flow_ack( bool ) special ack mode, where message acknowledgement is dependend on other nodes in the flow (see crate_out), ack_every and ack_after have no effect with this mode false
dedup_size( integer ) number of correlation-ids to hold in memory for message deduplication 200
dt_field( string ) name of the timestamp field that is expected 'ts'
dt_format( string ) timestamp or datetime format that is expected (see datetime-parsing) 'millisecond'
include_topic ( bool ) whether to include the routingkey in the resulting datapoints true
topic_as ( string ) if include_topic is true, this will be the fieldname for the routingkey value 'topic'
as ( string ) base object for the output data-point undefined
ssl( is_set ) whether to use tls, if true, ssl options from faxe's config for amqp connections will be used config: amqp.ssl.enable/FAXE_AMQP_SSL_ENABLE
confirm ( boolean ) whether to acknowledge consumed messages to the amqp broker, when set to false, the channel will be set to auto-ack , throughput can be increased with the danger of data-loss true
safe ( boolean ) whether to use faxe's internal queue. If true, messages consumed from the amqp broker will be stored in an internal ondisc queue before they get sent to downstream nodes, to avoid losing data. false
takeover ( boolean ) Enable/disable queue takeover mode. config: rabbitmq.takeover/FAXE_RABBITMQ_TAKEOVER
takeover_queue ( string ) Name for the take-over queue. FlowName + '_' + NodeName
takeover_queue_prefix ( string ) prefix for the queue-name that will be ensured to exist for takeover_queue config: rabbitmq.takeover_queue_prefix/FAXE_RABBITMQ_TAKEOVER_QUEUE_PREFIX
takeover_queue_type ( string ) Queue type for the take-over queue. Valid values are: '', 'quorum', 'classic'. The value will be used for the "x-queue-type" argument while declaring a queue. With '', the type of the queue will be the default type defined for the vhost. config: rabbitmq.takeover_queue_type/FAXE_RABBITMQ_TAKEOVER_QUEUE_TYPE
takeover_queue_vhost ( string ) Name of the vhost for the take-over queue. value of vhost

Exactly one of these must be provided: routing_key, bindings.