Kafka Consumer

In this article


You can use the Kafka Consumer Snap to read or consume documents from a Kafka topic.


  • A valid account with the required permissions.
  • A Kafka topic with parameters that are valid for consumption (reading). 

In order for the notifications to work properly, all the Consumer and Acknowledge Snaps that belong to one consumer group should execute in the same node of the Snaplex. You can either use Pipeline Execute Snap to execute different Pipelines in the same node, or place all the Consumer and Acknowledge Snaps into one Pipeline.

Support for Ultra Pipelines

Works in Ultra Pipelines.

Limitations and Known Issues


Snap Input and Output

Input/OutputType of ViewNumber of ViewsExamples of Upstream and Downstream SnapsDescription
  • Min:0
  • Max:1
  • Mapper
  • CSV Formatter
  • JSON Formatter
  • XML Formatter
A document containing a Kafka topic, a consumer group, a partition, and an offset.
  • Min:1
  • Max:1
  • Mapper
  • Copy
  • JSON Formatter and File Writer
The message data with key/value parameters and metadata.

Snap Settings

Parameter NameData TypeDescriptionDefault ValueExample

Required. The name for the Snap. You can modify the default name to be more specific and meaningful, especially if you have more than one of the same Snaps in your Pipeline.

Kafka ConsumerConsumer_sales_data


Specify the topic from which messages are to be read.N/AT1
Partition Number


Specify the partition number.
Partitions allow you to parallelize a topic by splitting the data in a particular topic across multiple brokers. They allow multiple consumers to read from a topic in parallel.

This field is optional unless Seek type is set to Specify offset, in which case it is required. A value that is equal to or greater than the total number of partitions for the topic is allowed so that when a target partition is added to the topic, the Snap is ready to consume the messages from the partition without restarting.

Group IDString/Expression

Required. Specify a unique string that identifies the group to which this consumer belongs.
The Consumer Snaps can be grouped together by using the same group ID value.

Kafka performs group rebalancing whenever a new consumer instance joins the group. We recommend you to specify a unique Group ID for each Snap instance to avoid rebalancing of all other consumer instances existing in the group.
Message CountInteger/Expression

Specify the number of messages to be read before the consumption process stops.

  • If the message count is positive, the Snap terminates when it reaches the specified message count, or when it reads all the available messages if the number of messages is less than the specified count.
  • If the message count is negative, the Snap continues to read messages indefinitely.
  • If the message count is 0, the Snap reads all the currently available messages and then terminates.

If you set 0 as the value for Message Count, you might need to increase the Fetching Timeout value to avoid premature termination when records are actually available.

Wait For Full CountCheck box

Activates when you enter a positive integer value or an expression in the Message Count field.

Select this check box to enable the Snap to read and process the messages until the specified number in the Message Count field is reached. 

If you deselect the check box, the Snap terminates when the specified number in the Message Count field is reached, or when all the available messages are read if the number of message is less than the specified count.

SelectedNot Selected
Max Poll RecordsInteger/Expression

Select or enter the maximum number of records in a batch that is to be returned by a single call to poll.

Increasing this value can improve performance, but also increases the number of records that require acknowledgement when Auto commit is set to false and Acknowledge mode is set to Wait after each batch of records. In this case, the value of Acknowledge timeout should be adjusted accordingly. This value sets an upper limit to the batch size, but Kafka might choose to return a smaller number of records depending on other factors, such as the size of each record.

Fetching TimeoutInteger/ExpressionSpecify the number of milliseconds the Snap must wait to process for a single poll request.
If the timeout expires, the Snap polls again for more messages, unless the message count is 0, in which case it stops as expected.
2000 2000
Auto CommitCheck box

Select this check box to enable the Snap to commit offsets automatically as messages are consumed and sent to the output view.

If you deselect the Auto Commit check box, the Pipeline must contain one or more Acknowledge Snaps downstream to acknowledge the documents that are output by the Consumer, each of which represents an individual Kafka message, after each document is appropriately processed by the other snaps between the Consumer and Acknowledge Snaps.

SelectedNot Selected
Acknowledge ModeDrop-down list

Activates when you deselect the Auto Commit check box.

Select the Acknowledge Mode that determines when the Snap should wait for acknowledgments from the downstream Acknowledge Snaps. The available options are:

  • Wait after each record: The Snap writes a single record, and then waits for that record to be acknowledged before writing another record.
  • Wait after each batch of records: The Snap polls the broker to obtain a batch of records, writes all records in the batch, and then waits for each of these records to be individually acknowledged before polling again. This option has more throughput and is faster.

You must configure the Acknowledge Timeout value based on the Acknowledge Mode configuration.

Wait after each recordWait after each batch of records.
Acknowledge Timeout (sec)Expression/Integer

Activates when you deselect the Auto Commit check box.

Enter the maximum number of seconds to wait for a notification from the Acknowledge Snap before committing the offset.

  • When you configure the Acknowledge Mode field as Wait after each record, the timeout value is the maximum time allowed to process one record.
  • When you configure the Acknowledge Mode field as Wait after each batch of records, the timeout value is the maximum time allowed to process all of the records in a single batch.  
  • When you configure either of the acknowledgment modes and if a timeout occurs, an error message is generated, which includes a complete list of the unacknowledged records, including metadata. However, if you configure an error view, the error is written to it, and the Consumer continues processing subsequent records. If no error view is configured, this error aborts the Snap and the Pipeline execution stops.
Acknowledge Timeout PolicyDrop-down list

Activates when you deselect the Auto Commit check box.

Select an Acknowledge Timeout Policy to handle acknowledge timeout errors. The available options are: 

  • Reprocess: This option reprocesses the unacknowledged records that are caused by acknowledge timeouts. The Snap writes an error detailing unacknowledged records to the Error view (if configured) and then commits the offset of the first unacknowledged record for reprocessing.
  • Continue: This option ignores the unacknowledged records caused by acknowledge timeouts. The Snap writes an error message detailing unacknowledged records to the Error view (if configured), and then process subsequent records.

Acknowledge Timeout Policy determines the Snap's behavior when the following conditions are met:

  • Auto Commit is deselected.
  • The Error view configuration on the Snap Views tab is set to either Route Error Data to Error View or Discard Error Data and Continue.

When the Consumer’s Partition is not set, an acknowledge timeout triggers a group rebalance, which means that the reprocessing of a record might occur in a different Consumer instance within the same group.

Seek TypeDrop-down list

Specify the starting position where the Consumer Snap should begin reading messages when the Snap is first initialized and executed.

The available options are: 

  • Beginning: The starting position is set to the first message in the queue whether or not a committed offset exists. 
  • End: The starting position is set to the last committed offset. If a committed offset is not found, the starting position is set according to the value of the Auto Offset Reset field.
  • End (skip all existing messages): The starting position is set to the end of the assigned partitions. Only the new messages that are added to the partitions are read.
  • Specify Offset: The starting position is set to the value of the Offset field. You must specify the partition number if you select this option.

The Kafka Consumer Snap commits offsets to Kafka as it consumes records. If the Pipeline aborts, Kafka saves the position of the last read records in each partition. If the Pipeline is restarted, and Seek Type is set to End, the Snap continues reading from the first unacknowledged record in each partitions.

If the Consumer Snap is configured with an Error View, the Snap continues processing records as if the unacknowledged records had been acknowledged (after reporting the error to the Error View). The committed offsets reflect this continuation policy. If there is no Error View when a timeout occurs, the committed offsets reflect the first unacknowledged record of each partition.


Activates when you choose the Specify Offset from the Seek Type list.

Specify the Offset from which the Consumer should start consuming the message. The Offset is specific to a partition. Hence, when you specify an offset, you must specify the partition number. The offset starts from 0 for any partition. The Consumer can start consuming from any offset in the partition regardless of the messages that have been read earlier.

Auto Offset ResetDrop-down list

Select  the Auto Offset Reset option from the list.
If the Seek Type is End and the committed offset for the partition cannot be found, then you need to specify where to read the offset within the partition.

The available options are:

  • Earliest: The starting position is set to the first message in the topic.
  • Latest: The starting position is set to the last offset of each partition. 

This field is used only if no offsets have been committed for the current partition.

Earliest Beginning
Output ModeDrop-down list

Activates when you select the Auto Commit field, or when you set Acknowledge Mode to Wait after each batch of records.

Select the mode to output the consumed records. The available options are:

  • One output document per recordEvery record received from Kafka has a corresponding output document.
  • One output document per batch: Preserves the batching of records as received from Kafka. Every poll that returns a non-empty set of records results in a single output document containing the list of records as batchbatch_size and batch_index.

Use the One output document per batch mode if you want to preserve the batching in your Pipeline's data flow. This mode is useful when Auto Commit is disabled and Acknowledge Mode is Wait after each batch of records, depending on the nature of processing between the Kafka Consumer and the Kafka Acknowledge Snaps.
The processing between the Consumer and the Acknowledge Snaps involve dealing with the records in batches, such as writing each batch to a single file or processing each batch with a child Pipeline.
We recommend that you use a JSON Splitter Snap to split each output batch into individual records since the Acknowledge Snap requires individual records as input and not batches.

One output document per recordOne output document per batch
Key DeserializerDrop-down list

Select the target data type for the deserialized key of each record. The available options are:

  • String 
  • JSON
  • Avro (requires schema registry)
  • ByteArray
  • Int16 (16-bit integer)
  • Int32 (32-bit integer)
  • Int64 (64-bit integer)
  • Float32 (32-bit floating-point number)
  • Float64 (64-bit floating-point number)
  • BigInteger (arbitrary precision integer)
  • BigDecimal (arbitrary precision decimal number)

Ensure that your Confluent Kafka Account's configuration includes the schema registry if you select Avro as the Key Deserializer value.

Value DeserializerDrop-down list

Select the target data type for the deserialized value of each record. The available options are:

  • String 
  • JSON
  • Avro (requires schema registry)
  • ByteArray
  • Int16 (16-bit integer)
  • Int32 (32-bit integer)
  • Int64 (64-bit integer)
  • Float32 (32-bit floating-point number)
  • Float64 (64-bit floating-point number)
  • BigInteger (arbitrary precision integer)
  • BigDecimal (arbitrary precision decimal number)
Default Header DeserializerDrop-down list

Select a Header Deserializer that is to be assigned to the headers that are not configured in the Header Deserializers table. 


Header Deserializers

Use this field set to define the Key and Deserializer for any headers that requires a deserializer other than the Default Header Deserializer. Click  to add a new row in this table and define the values accordingly. This field set contains the following fields:

  • Key
  • Deserializer

String/Expression/Drop-down list

Specify a name for header that requires a Deserializer other than the Default Header Deserializer.

DeserializerString/Drop-down list

Select a Deserializer for the header to convert the header's value to a specific data type such as string or Int32.

Include TimestampCheck box

Select this check box to include a timestamp in the metadata of each record that is sent to the output view. 

A timestamp value is the number of milliseconds that have elapsed since midnight, January 1, 1970 UTC.

All Kafka records have a standard timestamp in milliseconds, and all records have a copy of the topic’s message.timestamp.type configuration setting at the time the record was created.

Not selectedSelected
Pass ThroughCheck box

Select this check box to allow the original document to pass through.
The output includes metadata in the message regardless of the configuration of this setting.

Not selectedSelected
Snap ExecutionDrop-down list

Select one of the three modes in which the Snap executes:

  • Validate & Execute. Performs limited execution of the Snap, generates, and then performs a data preview during Pipeline validation, then performs full execution of the Snap (unlimited records) during Pipeline runtime.
  • Execute only. Performs full execution of the Snap during Pipeline execution without generating preview data.
  • Disabled. Disables the Snap and all the Snaps that are downstream from it.

Validate & ExecuteExecute only

Key Concepts

Consumers and Consumer Groups

To help scale the consumption of messages from a topic, you can split the data that is being read among multiple consumers, which allows a number of consumers to read from a topic. When you use multiple Kafka Consumer instances configured with the same consumer group, each instance is assigned a different subset of partitions in the topic.

For example, if a single Kafka Consumer Snap (c1) subscribes to a topic t1 with four partitions, and c1 is the only consumer in consumer group g1, then c1 gets all the messages from all the four partitions.

If you add a second Kafka Consumer Snap (c2), to the Pipeline within the the same consumer group (g1), the Snap assigns two partitions (for example, 0 and 2) to consumer c1 and two other partitions (for example, 1 and 3) to consumer c2.

If you add more consumers to a single group, and the number of consumers is more than the number of existing partitions, then some of the consumers remain idle and do not receive messages. 

If you add additional consumers to a different consumer group (for example, g2), then those consumers behave independently of those within the consumer group g1. 

Consider the following example. For topic t1 with four partitions, four of the five consumers in g1 receive messages, and c5 remains in idle state (because there is no available partition to assign to this consumer). For consumer group g2, consumer c1 receives messages from partitions 0 and 2, and consumer c2 receives messages from partitions 1 and 3. 

Therefore, it is important to know how many partitions exist within a topic while adding consumers to consume the messages.


Consuming Messages from a Kafka Topic

This example demonstrates how messages are read from a Kafka topic, and how the selected output documents are sent to the downstream Mapper view.

First, we configure the Kafka Consumer Snap to read the messages from the Topic, Sample_Topic, under the Group ID, consumergroup. We select the Output Mode as One output document per batchbecause we want to view the output records in batches.

After validation, we can view the output preview of the Kafka Consumer Snap as follows:

Kafka Consumer Output (Table)Kafka Consumer Output (JSON)

We use the Mapper Snap to display only those fields that we require in the output view. Note that we set the Mapping root to $batch[*] to map the fields of each record in the batch. Hence, we configure the Mapper Snap as shown below:

Upon successful execution, we see the output preview of the Mapper Snap as follows:

Download this Pipeline.


Important Steps to Successfully Reuse Pipelines

  1. Download and import the Pipeline into SnapLogic.
  2. Configure Snap accounts as applicable.
  3. Provide Pipeline parameters as applicable.

  File Modified

File Example_Kafka_Consumer.slp

Feb 08, 2021 by Kalpana Malladi

Snap Pack History

 Click to view/expand
Release Snap Pack VersionDateType  Updates
4.29main15993 StableKafka Snap Pack now supports Azure Event Hubs streaming service. Learn more: Azure Event Hubs configuration in Azure portal for Kafka SSL Account.
4.28 Patch428patches14904 Latest

Fixed an issue with Kafka Snaps where the Snaps failed with null pointer exceptions when logging the cluster information with the controller’s identity not known.

4.28main14627 StableUpgraded with the latest SnapLogic Platform release.





Upgraded with the latest SnapLogic Platform release.
4.26main11181 StableUpgraded with the latest SnapLogic Platform release.
4.25 Patch425patches10543 Latest
  • Improved the handling of interrupted/aborted Kafka Snaps to ensure proper clean-up of metrics.

  • Optimized the Kafka Producer Snap to initialize the Kafka API only if there is at least one input document.

  • Fixed the issue of account passwords being included in the log messages output of Kafka  Snaps. The password is now hidden in the logs for both Kafka Consumer and Kafka Producer Snaps.
  • The Kafka Producer and Consumer Snaps create a default Client ID string to identify the specific Snap instance.

  • Enhanced the Kafka Producer Snap to support Avro serialization format with a new target data type Avro under Key and Value Serializer.

  • Enhanced the Kafka SSL Account with new fields (Registry Username or Key and Registry Password or Secret) for Schema Registry authentication. The two existing SASL properties (SASL Username and SASL Password) have been revised to SASL Username or Key and SASL Password or Secret respectively.

  • Apache Kafka client library is upgraded from version 2.6.0 to 2.8.0.

  • Confluent Kafka client libraries are updated from version 5.2.1 to 6.2.0.

StableUpgraded with the latest SnapLogic Platform release.
4.24 Patch424patches8805 Latest

Fixed an issue in the Kafka Producer Snap by removing the validation of the account when the Snap initialized in a Pipeline execution.

StableUpgraded with the latest SnapLogic Platform release.
4.23 Patch423patches7900 Latest
  • Removed the Confluent prefix from the Snaps and accounts in the Snap Pack. However, the Snap Pack name continues to be Confluent Kafka Snap Pack.

  • Added the Wait For Full Count check box to the Kafka Consumer Snap to determine how a positive value for the Message Count field must be interpreted.

    • Enabled (by default): The Snap continues polling for messages until the specified count is reached.

    • Disabled: If the number of messages that are currently available is less than the specified count, then the Snap consumes the available messages and terminates.

      Known Issue

      The Wait For Full Count check box activates only when you provide a positive integer value in the Message Count field. However, it does not activate when you use an expression for Message Count even if the value evaluates to a positive number.

      Workaround: To activate this check box, temporarily replace the Message Count expression () with a positive integer, select the desired state for Wait For Full Count, and then restore the original value in the Message Count field.

  • Added support for writing and reading record headers.

    • The Kafka Producer Snap has a new Headers table to configure the Key, Value, and Serializer for each header to be written.

    • The Kafka Consumer Snap reads any headers present on the records it consumes. It provides two new fields to configure how the header values should be deserialized – Default Header Deserializer and Header Deserializers, for any headers that require a deserializer other than the default.

  • Added support for writing and reading each record’s timestamp.

    • The Kafka Producer Snap has a new Timestamp field that can be configured to set each record’s timestamp, which is the number of milliseconds since the epoch (00:00:00 UTC on January 1, 1970). This can be set to an expression that evaluates to a long integer, or a string that can be parsed as a long integer, or a date. If you specify no expression, or the value is empty, then the timestamp is set to the current time.

      The Timestamp field is relevant only if the Kafka topic is configured with message.timestamp.type = CreateTime (which is the default). For more information, see the official Kafka documentation.

    • The Kafka Consumer Snap has a new check box, Include Timestamp, which by default is disabled for backward compatibility. If enabled, the output for each record includes its timestamp in the metadata.

  • The Kafka Producer Snap has a new check box, Output Records, to determine the format of each output document when configured with an output view.

    • Disabled (by default): The Snap’s output includes only the basic metadata (topic, partition, offset) for each record, along with the original input document.

    • Enabled: Each output document contains a more complete representation of the record produced, including its key, value, headers, and timestamp.

  • The Kafka Consumer Snap has a new field, Output Mode, with two options:

    • One output document per record (the default): Every record received from Kafka has a corresponding output document.

    • One output document per batch: Preserves the batching of records as received from Kafka. Every poll that returns a non-empty set of records results in a single output document containing the list of records as batchbatch_size and batch_index. This mode is especially useful when Auto Commit is disabled and Acknowledge Mode is Wait after each batch of records, depending on the nature of the processing between the Kafka Consumer and the Kafka Acknowledge Snaps.

  • Removed the Add 1 to Offsets check box from the Kafka Consumer Snap.

  • Removed the Account tab from the Kafka Acknowledge Snap, because this Snap no longer needs an account.

Fixed an intermittent issue of Confluent Kafka Consumer Snap stopping when Message Count is 0 while the records are available. The Snap will now stop only if a poll returns no records twice in a row, and has partitions assigned before and after each poll.

StableUpgraded with the latest SnapLogic Platform release.
4.21 Patch421patches6136 Latest

Enhanced the Confluent Kafka Consumer Snap with an option to allow reprocessing of records that remain unacknowledged due to acknowledge timeouts. This ensures that all records from Kafka are processed.

4.21 Patch

421patches5862 Latest

Fixes the Confluent Kafka Consumer Snap that aborts abruptly upon getting a CommitFailedException error. Upgraded the Kafka client library from version 2.2 to 2.5.




Enhanced the Confluent Kafka Consumer Snap to significantly reduce the volume of commit requests for offsets when using the Record acknowledgement mode (Wait after each record), and thereby, improve performance and stability.

4.20 Patch

confluentkafka8747 Latest

Updated the Confluent Kafka SSL Account settings to make the Truststore filepath and Truststore password properties optional. 

StableUpgraded with the latest SnapLogic Platform release.
StableUpgraded with the latest SnapLogic Platform release.
4.18 Patchconfluentkafka8111 Latest

Fixed an issue with the Confluent Kafka Producer Snap wherein the Topic property doesn't evaluate expression against input documents.

4.18 Patchconfluentkafka8006 Latest

Fixed an exception that occurs while consuming log-compacted topics, in the Confluent Kafka Consumer Snap.

4.18 Patchconfluentkafka7784 Latest

Fixed the Consumer Snap to stop quickly and cleanly on CLOSE and STOP lifecycle events. 

4.18 Patch

confluentkafka7732 Latest

Added support for Kerberos-based authentication to enable connecting to Kerberos-enabled Kafka servers.

StableUpgraded with the latest SnapLogic Platform release.

4.17 Patch

confluentkafka7537 Latest

Updated the following Snaps:

  • Confluent Kafka Consumer Snap
    • Properties added: Max poll recordsAcknowledge mode, Key deserializer, Value deserializer, Add 1 to offsets.
    • Properties removed: Auto commit interval (msec), Standardize error output.
    • Diagnostic fields added in the metadata object in the Snap's output:
      • client_id: The unique ID assigned to a single Kafka consumer client instance. All documents written by a Consumer Snap during a single pipeline execution or validation will have the same value for this field.
      • batch_size: The number of messages in the batch of messages that the current message came from. Note that “batch” isn’t Kafka terminology; we’re using it to mean the set of messages received from the broker in a single call to Kafka’s poll method.
      • batch_index: The current batch number relative to a single consumer instance.  This number starts at 0 for the first batch and is only incremented for batches which contain at least one message.
      • record_index: The current record number relative to its batch.  The first message in a batch has an index of 0; the last has an index of batch_size - 1.
  • Confluent Kafka Acknowledge Snap: Added new property–Metadata path.
  • Confluent Kafka Producer Snap: Added new properties–Key serializer and Value serializer.

Pushed automatic rebuild of the latest version of each Snap Pack to SnapLogic UAT and Elastic servers.

  • Tested and certified all Snaps against CDH 6.1.
  • Added the Snap Execution field to all Standard-mode Snaps. In some Snaps, this field replaces the existing Execute during preview check box.
4.16 Patchconfluentkafka7118 Latest

Fixed an issue with the Confluent Kafka Producer Snap wherein the Snap fails to produce the upstream Snap's input schema for Partition Number property in the output preview.

4.16 Patch

confluentkafka6891 Latest

Certified Confluent Platform version 5.x.

StableUpgraded with the latest SnapLogic Platform release.
4.15 Patchconfluentkafka6446 Latest

Fixed an issue with the Confluent Kafka Consumer Snap wherein some connections were not closed after a Pipeline is aborted.

StableUpgraded with the latest SnapLogic Platform release.
4.14 Patchconfluentkafka5737 Latest

Added a new property to the Confluent Kafka Producer Snap, Message publish timeout (milliseconds), that lets you specify a timeout value.

4.14 Patchconfluentkafka5663 Latest

Fixed the Confluent Kafka Snaps that do not delete temp files after pipeline execution.

StableUpgraded with the latest SnapLogic Platform release.
4.13 Patch confluentkafka5614 Latest

Fixed the Confluent Kafka Snap Pack that does not delete the temp files after executing pipelines.

4.13 Patch confluentkafka5330 Latest

Added support in Confluent Kafka Producer Snap the ability to write data to multiple partitions, in a round robin fashion.  



StableUpgraded with the latest SnapLogic Platform release.

4.12 Patch 

confluentkafka5087 Latest

Fixed an issue with the Consumer Snap that fails when the specified timeout value is less than the pipeline execution time. 




Enhanced the performance of the Confluent Kafka Producer Snap in writing incoming documents into the Confluent Kafka Cluster.

4.11 Patch

confluentkafka4626 Latest

Fixed an issue with the Confluent Kafka Consumer Snap wherein the Consumer session is not closing properly when manually stopping a pipeline, which causes the subsequent pipeline execution to wait for the previous session to die and not consume messages.

4.11 Patchconfluentkafka4302 Latest

Fixed an issue with the Confluent Kafka Consumer Snap that fetched 500 messages only, when message count set to 0 caused due to driver default value change.


Updated the Confluent Kafka SSL Account with SCRAM_SASL support.




Confluent v3.2 support for Confluent Kafka Snap Pack.

4.9 Patchconfluentkafka3156 Latest

Fixed an issue regarding account validation failure due to unauthorized topics; Enhance the error message for empty topic suggestions

  • Introduced the Confluent Kafka Acknowledge Snap in this release.
  • Enhanced the Consumer and the Producer Snaps to use Kafka Connect API to enhance the performance.
4.8 Patchconfluentkafka2804 Latest

Addressed an issue with Confluent Kafka Producer not including input document schema in Message Key and Value expression properties




New Snap Pack for 4.8. It consists of Confluent Kafka Consumer and Confluent Kafka Producer.

See Also