RabbitMQ Consumer

In this article

Overview

You can use this Snap to fetch messages from a RabbitMQ destination.

Snap Type

RabbitMQ Consumer Snap is a READ-type Snap.

Prerequisites

  • Access to the RabbitMQ Server.

  • Privileges to perform various actions on the Queues such as receive and send messages.

Support for Ultra Pipelines

Works in Ultra Pipelines

Limitations and Known Issues

None.

Snap Views

Type

Format

Number of Views

Examples of Upstream and Downstream Snaps

Description

Type

Format

Number of Views

Examples of Upstream and Downstream Snaps

Description

Input 

Document

 

 

 

  • Min:0

  • Max:1

Upstream Snap is optional. You can connect any Snap with a document output view, such as Mapper or Copy Snaps upstream of this Snap.

The Snap does not require input data. Input documents may be used to evaluate any JavaScript expression in the File property.

Output

Binary

 

 

  • Min:1

  • Max:1

  • File Writer

  • CSV Parser

  • JSON Parser

  • XML Parser

Message consumed from RabbitMQ server as binary document.

Error

Error handling is a generic way to handle errors without losing data or failing the Snap execution. You can handle the errors that the Snap might encounter while running the Pipeline by choosing one of the following options from the When errors occur list under the Views tab:

  • Stop Pipeline Execution: Stops the current pipeline execution when the Snap encounters an error.

  • Discard Error Data and Continue: Ignores the error, discards that record, and continues with the rest of the records.

  • Route Error Data to Error View: Routes the error data to an error view without stopping the Snap execution.

Learn more about Error handling in Pipelines.

Snap Settings

  • Asterisk ( * ): Indicates a mandatory field.

  • Suggestion icon (): Indicates a list that is dynamically populated based on the configuration.

  • Expression icon ( ): Indicates whether the value is an expression (if enabled) or a static value (if disabled). Learn more about Using Expressions in SnapLogic.

  • Add icon ( ): Indicates that you can add fields in the fieldset.

  • Remove icon ( ): Indicates that you can remove fields from the fieldset.

Field Name

Field Type

Field Dependency

Description

Field Name

Field Type

Field Dependency

Description

Label*

Default Value: RabbitMQ Consumer
Example: RabbitMQ Consumer

String

None

Specify a unique name for the Snap.

Exchange

Default Value: N/A
Example: DemoExchange

String/Expression

None

Specify a name of the RabbitMQ exchange bound to the queue. If you do not specify a value for exchange, the default exchange provided by RabbitMQ is considered.

Condition: Either Exchange or Queue has to be specified.

Exchange type

Default Value: direct
Example: fanout

Dropdown list

None

The type of RabbitMQ exchange used to push messages. Available options are:

  • direct: Routes the messages to the queues only on matching with the routing key.

  • fanout: Routes the messages to the queues that are bound to it regardless of the routing keys and patterns.

  • headersRoutes the messages to the queues based on header values as specified in the binding. Provide the x-match and Header Properties if selected.

  • topic: Routes the messages to the queues based on the routing key and the binding pattern.

Durable Exchange

Default Value: Selected

Checkbox

None

Select this checkbox to enable the exchange to be durable.

 

Queue

Default Value: N/A
Example: DemoQueue

String/Expression

None

Conditional. Specify a name of the RabbitMQ queue from which messages have to be consumed. If there is no queue with given queue name on RabbitMQ, then a new queue is created and consumer starts listening. If the exchange name and routing key are also specified, then the queue is bound to the exchange with the given routing key.

Either Exchange or Queue has to be specified.

Auto delete

Default Value: Deselected
Example: Selected

Checkbox

None

Select this check box to indicate that the RabbitMQ autoDelete property associated with the existing queue is set to true.

 

Routing Key

Default Value: N/A
Example: ZYA123&$

String/Expression

None

Specify the routing key to be used for binding the queue with the exchange.

Processing Mode

Default Value: Synchronous
Example: Asynchronous



Dropdown list

None

Select one of the following modes for processing messages:

Synchronous: In synchronous mode, the consumer processes messages from the destination one at a time until a STOP is read or the Message Count is reached.

If a message is not available, the consumer will retry for an available message at an interval of every 3 secs until the Message count is reached.

For example, if the Queue sends 8 messages, and the Message count is set to 10, the consumer processes the 8 messages one after the other, and sleeps for every 3 seconds for the remaining two unavailable messages.

Asynchronous: In asynchronous mode, the consumer reads the messages from the destination whenever the message arrives until a 'STOP' is read or the Message Count is reached. The consumer registers a messagelistener asynchronously. It does not block, but calls (the processmessage) immediately once a message is available.

Message Acknowledge Mode*

Default Value: AUTO_ACKNOWLEDGE
Example: PIPELINE_CONTROL



Dropdown list

None

Select one of the following modes of message acknowledgement for non-transacted sessions:

  • AUTO_ACKNOWLEDGE the session acknowledges the receipt of a message when a call to receive method or when the message listener returns successfully.

  • PIPELINE_CONTROLthe client has the power when to acknowledge the message.  

Select the PIPELINE_CONTROL option when using the RabbitMQ Acknowledge Snap in a Pipeline.

  • The RabbitMQ Consumer Snap writes the messages to the output view along with headers containing a DeliveryTag which is then forwarded to the RabbitMQ Acknowledge Snap. 

  • The RabbitMQ Acknowledge Snap may be positioned anywhere in the pipeline following the Consumer Snap. The RabbitMQ Acknowledge Snap should be configured with necessary acknowledgement mode based on message processing. Options available are Acknowledge, Reject and Recover. 

If you select the PIPELINE_CONTROL option and the message is not acknowledged, by default, the Consumer Snap waits for 30 minutes to get the acknowledgment. If Snap does not get any acknowledgment for the message even after 30 minutes, Snap considers the status as unacknowledged and moves to the next message.

Acknowledge type

Default value: Acknowledge Message 
Example: Reject Message

Dropdown list

Appears on selecting Auto_Acknowledgment from the Message Acknowledge Mode dropdown list.

Select one of the following acknowledge types for the message.

  • Acknowledge Message: Successful acknowledgment of the message by the Snap to the RabbitMQ server and removal of that message from the queue.

  • Reject Message: Notifies the RabbitMQ server to mark the particular message as ‘dead lettered’ and route the message to the ‘Dead Lettered Exchange.’ However, if the ‘Dead Lettered Exchange’ is not configured on the RabbitMQ server, the message is dropped from the queue automatically.

  • Recover Message: Notifies the RabbitMQ server to requeue the message to be available in the queue.

x-match

 

Default Value: all
Example: any

Dropdown list

None

Specify the x-match value to be submitted for binding. The available options are:

  • any: If selected, even one matching header value is sufficient

  • all: If selected, mandates that all the values should match 

Select x-match only when the Exchange type is header

Header properties

 

Use this fieldset to define header properties to bind the queue with an exchange as arguments.  

Configure Header properties only when the Exchange type is headers.

Header key

 

Default Value: N/A
Example: org

String/Expression

None

Specify the name of the header that is being used for the binding.



Header value

Default Value: N/A
Example: snap

String/Expression

None

Specify the header value corresponding to the respective header key.

Argument properties

Use this field set to define custom argument properties to ensure all declarations for the queues use the same configuration/options/arguments.  

 

Argument key

Default value: N/A
Examplex-max-len-bytesx-max-priority, x-queue-type

String/Expression

None

Specify the name of the argument used for declaration. Learn more about Optional Arguments in RabbitMQ documentation.

 

Argument value

Default value: N/A
Example1048576, 2, quorum

String/Expression

None

Specify the value corresponding to the argument key.

To consume messages from the quorum queue, specify Argument key as x-queue-type and Argument value as quorum.

Message Count

Default value: -1
Example: 5



Integer/Expression

None

Controls the number of messages to be consumed from the destination queue before the consumer is stopped. Negative integer value, that is the default value of -1 makes the consumer run indefinitely.

  • -1: The consumer Snap runs in a never ending loop and keeps consuming messages from the destination. This is the default behavior. For example, if you set this field to the default value of -1, the Pipeline runs infinitely and reads all the messages until you stop that pipeline.

  • 0: The consumer Snap reads all the messages from the destination and then stops. For example, if there are 10 messages in the destination queue, all the 10 messages are read before it stops.

  • >0: The consumer Snap reads the specified number of messages before stopping. For example, if there are 20 messages in the queue and you have specified 4 as the value, then only 4 messages are read before it stops.

Maximum connection attempts

 

Default Value: 3
Example: 4

Integer

None

Specify the maximum number of connection attempts in case of a connection failure. The Snap retries for the configured number of attempts for establishing the connection.

If it exceeds the configured value, and the Route connection errors property is enabled, the messages are routed to the error view with the error information along with the original.

When the Route connection errors property is disabled, the Snap displays the number of published messages along with the error information, however, the Snap fails.

Connection retry interval

 

Default Value: 10
Example: 5

Integer

None

Specify the time taken in seconds to wait before retrying for another connection.

Route connection errors

 

Default Value: Deselected

Checkbox

None

Select this checkbox to route the connection errors to the error view displaying the error information along with the original. If you deselect this checkbox, the Snap fails to execute.

 

Snap Execution

 

Default Value: Validate & Execute
Example: Execute only

Dropdown list

None

Indicates how the Snap must be executed. Available options are:

  • Validate & Execute: Performs limited execution of the Snap (up to 50 records) during Pipeline validation; performs full execution of the Snap (unlimited records) during Pipeline execution.

  • Execute only: Performs full execution of the Snap during Pipeline execution; does not execute the Snap during Pipeline validation.

  • Disabled: Disables the Snap and, by extension, its downstream Snaps.

Examples

This example Pipeline demonstrates how to consume messages from a specified queue and write them to the output view.

The RabbitMQ Consumer Snap, consumes the messages from the queue DemoQueue either in synchronous or asynchronous mode.

Note that the Consumer creates the Exchange and Queue if they are not available in the RabbitMQ server, and binds them based on the properties configured such as Routing keyHeader Properties, x-match etc., and then starts consuming the messages from the server. Successful execution of the Pipeline displays the message properties in the output preview:





In the below Pipeline, the RabbitMQ Consumer Snap reads the messages from a queue and sends them to the CRM instance. The JSON Parser parses the messages and maps the messages to the CRM instance. The Script Snap is configured for a time delay so all the messages configured in the queue are processed.