Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

In this article

Table of Contents
maxLevel2
excludeOlder Versions|Additional Resources|Related Links|Related Information

Overview

You can use the Kafka Consumer Snap to read or consume documents from a Kafka topic.

Prerequisites

  • A valid account with the required permissions.
  • A Kafka topic with parameters that are valid for consumption (reading). 
Info

In order for the notifications to work properly, all the Consumer and Acknowledge Snaps that belong to one consumer group should execute in the same node of the Snaplex. You can either use Pipeline Execute Snap to execute different Pipelines in the same node, or place all the Consumer and Acknowledge Snaps into one Pipeline.

Support for Ultra Pipelines

Works in Ultra Pipelines.

Limitations and Known Issues

None.

Snap Input and Output

Input/OutputType of ViewNumber of ViewsExamples of Upstream and Downstream SnapsDescription
InputDocument
  • Min:0
  • Max:1
  • Mapper Snap
  • CSV Formatter
  • JSON Formatter
  • XML Formatter
A document containing a Kafka topic, a consumer group, a partition, and an offset.
OutputDocument
  • Min:1
  • Max:1
  • Mapper Snap
  • Copy Snap
  • JSON Formatter +and File Writer
The message data with key/value parameters and metadata.

Snap Settings

Parameter NameData TypeDescriptionDefault ValueExample
LabelString

Required. The name for the Snap. You can modify the default name to be more specific and meaningful, especially if you have more than one of the same Snaps in your Pipeline.

Kafka ConsumerConsumer_sales_data
Topic

String/Suggestion

Specify the topic from which messages are to be read.N/AT1
Partition Number

Integer/Suggestion

Specify the partition number.
Partitions allow you to parallelize a topic by splitting the data in a particular topic across multiple brokers. They allow multiple consumers to read from a topic in parallel.

Info

This field is optional unless Seek type is set to Specify offset, in which case it is required. A value that is equal to or greater than the total number of partitions for the topic is allowed so that when a target partition is added to the topic, the Snap is ready to consume the messages from the partition without restarting.


N/A2
Group IDString/Expression

Required. Specify a unique string that identifies the group to which this consumer belongs.
The Consumer Snaps can be grouped together by using the same group ID value.

Info
Kafka performs group rebalancing whenever a new consumer instance joins the group. We recommend you to specify a unique Group ID for each Snap instance to avoid rebalancing of all other consumer instances existing in the group.


N/Apeople
Message CountInteger/Expression

Specify the number of messages to be read before the consumption process stops.

Info
  • If the message count is positive, the Snap terminates when it reaches the specified message count, or when it reads all the available messages if the number of messages is less than the specified count.
  • If the message count is negative, the Snap continues to read messages indefinitely.
  • If the message count is 0, the Snap reads all the currently available messages and then terminates.


Note

If you set 0 as the value for Message Count, you might need to increase the Fetching Timeout value to avoid premature termination when records are actually available.


-120
Wait For Full CountCheck box

Activates when you enter a positive integer value or an expression in the Message Count field.

Select this check box to enable the Snap to read and process the messages until the specified number in the Message Count field is reached. 

If you deselect the check box, the Snap terminates when the specified number in the Message Count field is reached, or when all the available messages are read if the number of message is less than the specified count.

SelectedNot Selected
Max Poll RecordsInteger/Expression

Select or enter the maximum number of records in a batch that is to be returned by a single call to poll.

Note

Increasing this value can improve performance, but also increases the number of records that require acknowledgement when Auto commit is set to false and Acknowledge mode is set to Wait after each batch of records. In this case, the value of Acknowledge timeout should be adjusted accordingly. This value sets an upper limit to the batch size, but Kafka might choose to return a smaller number of records depending on other factors, such as the size of each record.


500500
Fetching TimeoutInteger/ExpressionSpecify the number of milliseconds the Snap must wait to process for a single poll request.
If the timeout expires, the Snap polls again for more messages, unless the message count is 0, in which case it stops as expected.
2000 2000
Auto CommitCheck box

Select this check box to enable the Snap to commit offsets automatically as messages are consumed and sent to the output view.

If you deselect the Auto Commit check box, the Pipeline must contain one or more Acknowledge Snaps downstream to acknowledge the documents that are output by the Consumer, each of which represents an individual Kafka message, after each document is appropriately processed by the other snaps between the Consumer and Acknowledge Snaps.

SelectedNot Selected
Acknowledge ModeDrop-down list

Activates when you deselect the Auto Commit check box.

Select the Acknowledge Mode that determines when the Snap should wait for acknowledgments from the downstream Acknowledge Snaps. The available options are:

  • Wait after each record: The Snap writes a single record, and then waits for that record to be acknowledged before writing another record.
  • Wait after each batch of records: The Snap polls the broker to obtain a batch of records, writes all records in the batch, and then waits for each of these records to be individually acknowledged before polling again. This option has more throughput and is faster.
Note

You must configure the Acknowledge Timeout value based on the Acknowledge Mode configuration.


Wait after each recordWait after each batch of records.
Acknowledge Timeout (sec)Expression/Integer

Activates when you deselect the Auto Commit check box.

Enter the maximum number of seconds to wait for a notification from the Acknowledge Snap before committing the offset.

Info
  • When you configure the Acknowledge Mode field as Wait after each record, the timeout value is the maximum time allowed to process one record.
  • When you configure the Acknowledge Mode field as Wait after each batch of records, the timeout value is the maximum time allowed to process all of the records in a single batch.  
  • When you configure either of the acknowledgment modes and if a timeout occurs, an error message is generated, which includes a complete list of the unacknowledged records, including metadata. However, if you configure an error view, the error is written to it, and the Consumer continues processing subsequent records. If no error view is configured, this error aborts the Snap and the Pipeline execution stops.


1015
Acknowledge Timeout PolicyDrop-down list

Activates when you deselect the Auto Commit check box.

Select an Acknowledge Timeout Policy to handle acknowledge timeout errors. The available options are: 

  • Reprocess: This option reprocesses the unacknowledged records that are caused by acknowledge timeouts. The Snap writes an error detailing unacknowledged records to the Error view (if configured) and then commits the offset of the first unacknowledged record for reprocessing.
  • Continue: This option ignores the unacknowledged records caused by acknowledge timeouts. The Snap writes an error message detailing unacknowledged records to the Error view (if configured), and then process subsequent records.
Info

Acknowledge Timeout Policy determines the Snap's behavior when the following conditions are met:

  • Auto Commit is deselected.
  • The Error view configuration on the Snap Views tab is set to either Route Error Data to Error View or Discard Error Data and Continue.

When the Consumer’s Partition is not set, an acknowledge timeout triggers a group rebalance, which means that the reprocessing of a record might occur in a different Consumer instance within the same group.


ReprocessContinue
Seek TypeDrop-down list

Specify the starting position where the Consumer Snap should begin reading messages when the Snap is first initialized and executed.

The available options are: 

  • Beginning: The starting position is set to the first message in the queue whether or not a committed offset exists. 
  • End: The starting position is set to the last committed offset. If a committed offset is not found, the starting position is set according to the value of the Auto Offset Reset field.
  • End (skip all existing messages): The starting position is set to the end of the assigned partitions. Only the new messages that are added to the partitions are read.
  • Specify Offset: The starting position is set to the value of the Offset field. You must specify the partition number if you select this option.
Note

The Kafka Consumer Snap commits offsets to Kafka as it consumes records. If the Pipeline aborts, Kafka saves the position of the last read records in each partition. If the Pipeline is restarted, and Seek Type is set to End, the Snap continues reading from the first unacknowledged record in each partitions.

If the Consumer Snap is configured with an Error View, the Snap continues processing records as if the unacknowledged records had been acknowledged (after reporting the error to the Error View). The committed offsets reflect this continuation policy. If there is no Error View when a timeout occurs, the committed offsets reflect the first unacknowledged record of each partition.


EndBeginning
OffsetExpression/String

Activates when you choose the Specify Offset from the Seek Type list.

Specify the Offset from which the Consumer should start consuming the message. The Offset is specific to a partition. Hence, when you specify an offset, you must specify the partition number. The offset starts from 0 for any partition. The Consumer can start consuming from any offset in the partition regardless of the messages that have been read earlier.

N/A7
Auto Offset ResetDrop-down list

Select  the Auto Offset Reset option from the list.
If the Seek Type is End and the committed offset for the partition cannot be found, then you need to specify where to read the offset within the partition.

The available options are:

  • Earliest: The starting position is set to the first message in the topic.
  • Latest: The starting position is set to the last offset of each partition. 

This field is used only if no offsets have been committed for the current partition.

Earliest Beginning
Output ModeDrop-down list

Activates when you select the Auto Commit field, or when you set Acknowledge Mode to Wait after each batch of records.

Select the mode to output the consumed records. The available options are:

  • One output document per recordEvery record received from Kafka has a corresponding output document.
  • One output document per batch: Preserves the batching of records as received from Kafka. Every poll that returns a non-empty set of records results in a single output document containing the list of records as batchbatch_size and batch_index.
Info

Use the One output document per batch mode if you want to preserve the batching in your Pipeline's data flow. This mode is useful when Auto Commit is disabled and Acknowledge Mode is Wait after each batch of records, depending on the nature of processing between the Kafka Consumer and the Kafka Acknowledge Snaps.
The processing between the Consumer and the Acknowledge Snaps involve dealing with the records in batches, such as writing each batch to a single file or processing each batch with a child Pipeline.
We recommend that you use a JSON Splitter Snap to split each output batch into individual records since the Acknowledge Snap requires individual records as input and not batches.


One output document per recordOne output document per batch
Key DeserializerDrop-down list

Select the target data type for the deserialized key of each record. The available options are:

  • String 
  • JSON
  • Avro (requires schema registry)
  • ByteArray
  • Int16 (16-bit integer)
  • Int32 (32-bit integer)
  • Int64 (64-bit integer)
  • Float32 (32-bit floating-point number)
  • Float64 (64-bit floating-point number)
  • BigInteger (arbitrary precision integer)
  • BigDecimal (arbitrary precision decimal number)
Note

Ensure that your Confluent Kafka Account's configuration includes the schema registry if you select Avro as the Key Deserializer value.


StringInt32
Value DeserializerDrop-down list

Select the target data type for the deserialized value of each record. The available options are:

  • String 
  • JSON
  • Avro (requires schema registry)
  • ByteArray
  • Int16 (16-bit integer)
  • Int32 (32-bit integer)
  • Int64 (64-bit integer)
  • Float32 (32-bit floating-point number)
  • Float64 (64-bit floating-point number)
  • BigInteger (arbitrary precision integer)
  • BigDecimal (arbitrary precision decimal number)
StringInt64
Default Header DeserializerDrop-down list

Select a Header Deserializer that is to be assigned to the headers that are not configured in the Header Deserializers table. 

StringJSON

Header Deserializers

Use this field set to define the Key and Deserializer for any headers that requires a deserializer other than the Default Header Deserializer. Click  to add a new row in this table and define the values accordingly. This field set contains the following fields:

  • Key
  • Deserializer
Key

String/Expression/Drop-down list

Specify a name for header that requires a Deserializer other than the Default Header Deserializer.

N/ADepartment
DeserializerString/Drop-down list

Select a Deserializer for the header to convert the header's value to a specific data type such as string or Int32.

StringByteArray
Include TimestampCheck box

Select this check box to include a timestamp in the metadata of each record that is sent to the output view. 

A timestamp value is the number of milliseconds that have elapsed since midnight, January 1, 1970 UTC.

Info

All Kafka records have a standard timestamp in milliseconds, and all records have a copy of the topic’s message.timestamp.type configuration setting at the time the record was created.


Not selectedSelected
Pass ThroughCheck box

Select this check box to allow the original document to pass through.
The output includes metadata in the message regardless of the configuration of this setting.

Not selectedSelected
Snap ExecutionDrop-down list

Select one of the three modes in which the Snap executes:

  • Validate & Execute. Performs limited execution of the Snap, generates, and then performs a data preview during Pipeline validation, then performs full execution of the Snap (unlimited records) during Pipeline runtime.
  • Execute only. Performs full execution of the Snap during Pipeline execution without generating preview data.
  • Disabled. Disables the Snap and all the Snaps that are downstream from it.

Validate & ExecuteExecute only

Key Concepts

Consumers and Consumer Groups

To help scale the consumption of messages from a topic, you can split the data that is being read among multiple consumers, which allows a number of consumers to read from a topic. When you use multiple Kafka Consumer instances configured with the same consumer group, each instance is assigned a different subset of partitions in the topic.

For example, if a single Kafka Consumer Snap (c1) subscribes to a topic t1 with four partitions, and c1 is the only consumer in consumer group g1, then c1 gets all the messages from all the four partitions.

If you add a second Kafka Consumer Snap (c2), to the Pipeline within the the same consumer group (g1), the Snap assigns two partitions (for example, 0 and 2) to consumer c1 and two other partitions (for example, 1 and 3) to consumer c2.

If you add more consumers to a single group, and the number of consumers is more than the number of existing partitions, then some of the consumers remain idle and do not receive messages. 

If you add additional consumers to a different consumer group (for example, g2), then those consumers behave independently of those within the consumer group g1. 

Consider the following example. For topic t1 with four partitions, four of the five consumers in g1 receive messages, and c5 remains in idle state (because there is no available partition to assign to this consumer). For consumer group g2, consumer c1 receives messages from partitions 0 and 2, and consumer c2 receives messages from partitions 1 and 3. 

Therefore, it is important to know how many partitions exist within a topic while adding consumers to consume the messages.

Example

Consuming Messages from a Kafka Topic

This example demonstrates how messages are read from a Kafka topic, and how the selected output documents are sent to the downstream Mapper view.

First, we configure the Kafka Consumer Snap to read the messages from the Topic, Sample_Topic, under the Group ID, consumergroup. We select the Output Mode as One output document per batchbecause we want to view the output records in batches.

After validation, we can view the output preview of the Kafka Consumer Snap as follows:

Kafka Consumer Output (Table)Kafka Consumer Output (JSON)

We use the Mapper Snap to display only those fields that we require in the output view. Note that we set the Mapping root to $batch[*] to map the fields of each record in the batch. Hence, we configure the Mapper Snap as shown below:

Upon successful execution, we see the output preview of the Mapper Snap as follows:

Download this Pipeline.

Downloads


Note

Important Steps to Successfully Reuse Pipelines

  1. Download and import the Pipeline into SnapLogic.
  2. Configure Snap accounts as applicable.
  3. Provide Pipeline parameters as applicable.

Attachments
patterns.*slp, .*zip