Kafka Producer

In this article

Overview

You can use the Kafka Producer Snap to send messages to a Kafka topic.


Prerequisites

  • Valid client ID.
  • A Kafka server with a valid account and the required permissions.

Support for Ultra Pipelines  

Works in Ultra Pipelines.

Limitations

None.

Known Issues

Recent changes in Kafka to enable idempotent writes by default may result in authorization errors when using the Kafka Snaps if your Kafka cluster version is lower than 2.8.

To fix this issue, we recommend you take one of the following actions:

  • Upgrade your Kafka cluster to at least version 2.8.
  • Use Kafka’s ACL management to ensure that IdempotentWrite is allowed for the cluster.

Snap Input and Output

Input/OutputType of ViewNumber of viewsExamples of Upstream and Downstream SnapsDescription
InputDocument
  • Min:1
  • Max:1
  • Mapper Snap
  • Copy Snap
  • Sequence Snap
A key/value pair for the Kafka message.
OutputDocument
  • Min:1
  • Max:1
  • None: The Consumer Snap is decoupled from the Producer Snap.

Kafka message records. Example,

[{"kafka_offset":"7139","kafka_partition":"0","kafka_topic":"PC_Test","original":{"msgID":"id1100","msgVal":"msg1100"}}]

Snap Settings

Parameter NameData TypeDescriptionDefault ValueExample
LabelString
Required. The name for the Snap. You can modify the default name to be specific and meaningful, especially if you have more than one of the same Snaps in your pipeline.
Kafka ProducerProducer
Client IDString/Expression

Specify the Client ID to be used in the Kafka message. If you do not specify a Client ID, a random string is used.

This Client ID is used for correlating the requests sent to the brokers with each client.

N/Atestclient001
Topic


String/Expression/Suggestion

Specify the topic to publish the message.
If the topic does not exist, a new topic and its partitions are created based on the configuration.

N/At1
Partition NumberString/Expression/Suggestion

Specify the partition number in which the messages should be published.
If you do not specify the partition number, the server publishes the messages to the next partition chosen, based on the configuration.

N/A0
Message KeyString/Expression

Enter the key for the message to be sent out.

N/AcustomerID
Message ValueString/Expression

Required. Provide the value for the corresponding message key.
The messages are sent with a value and, optionally, a key.


N/A
  • JSON.stringify($)
  • 1000
TimestampString/Expression

Specify a timestamp for the message in number, string, or date format.

A timestamp value is the number of milliseconds that have elapsed since midnight, January 1, 1970 UTC.

Each record's timestamp must be greater than or equal to the timestamp of the preceding record in the same partition; that is, the records must be in chronological order within each partition.


If you enter the timestamp in number or Epoch format, the value must be non-negative, (for example, 01-01-1970), else an error message is displayed. If you do not set a timestamp or if the timestamp resolves to null or a blank string, the record's timestamp defaults to the current time.

The Timestamp field is relevant only if the Kafka topic is configured with message.timestamp.type = CreateTime (which is the default). For more information, see the official Kafka documentation.

CreateTimeDate.parse($timestamp, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
Key SerializerDrop-down list

Select the target data type for the Key Serializer of each record. The available options are:

  • String 
  • JSON (arbitrary complex data, schema-less)
  • Avro (arbitrary complex data, schema-centric)

    • The Avro format depends on an Avro schema registered in Kafka Schema Registry. The Kafka account associated with the Snap must have the Schema Registry configured.
    • The Kafka Producer Snap looks up the Avro schema by name using Kafka’s standard naming convention: “topic-key” for a key schema, or “topic-value” for a value schema. For example, if the topic is “order”, then the schema name for the key schema would be “order-key”.
  • ByteArray (binary content, such as images)
  • Int16 (16-bit integer)
  • Int32 (32-bit integer)
  • Int64 (64-bit integer)
  • Float32 (32-bit floating-point number)
  • Float64 (64-bit floating-point number)
  • BigInteger (arbitrary precision integer)
  • BigDecimal (arbitrary precision decimal number)

The Key Deserializer field's value in the corresponding Consumer Snap should also be of the same data type.

StringJSON
Value SerializerDrop-down list

Choose the target data type for the Value Serializer of each record. The available options are:

  • String 
  • JSON
  • Avro

    • The Avro format depends on an Avro schema registered in Kafka Schema Registry. The Kafka account associated with the Snap must have the Schema Registry configured.
    • The Kafka Producer Snap looks up the Avro schema by name using Kafka’s standard naming convention: “topic-key” for a key schema, or “topic-value” for a value schema. For example, if the topic is “order”, then the schema name for the key schema would be “order-key”.
  • ByteArray
  • Int16 (16-bit integer)
  • Int32 (32-bit integer)
  • Int64 (64-bit integer)
  • Float32 (32-bit floating-point number)
  • Float64 (64-bit floating-point number)
  • BigInteger (arbitrary precision integer)
  • BigDecimal (arbitrary precision decimal number)

The Value Deserializer field's value in the corresponding Consumer Snap should also be of the same value.

StringByteArray

Headers

Use this field set to define an optional set of headers for each record. Click  to add a new row for record headers and define key, value, and serializer values. This field set contains the following fields:

  • Key
  • Value
  • Serializer

The Kafka Producer Snap supports the addition of multiple headers with the same key. When converting a Kafka record to an output document, multiple headers with the same key are combined into a single header whose value is a list or an array.

KeyString/Expression

Specify a name to use for the Header.

You can add multiple headers with the same key in either of the following ways:

  • Configure multiple rows in the Header table with the same key.
  • Configure a row in the Header table with a value that evaluates to a list or an array.
N/ACity
ValueString/Expression

Specify a value for the header.

N/ASFO
SerializerDrop-down listSelect a Serializer for the header to convert the value from a specific data type such as string or Int32.StringByteArray
AcknowledgementsDrop-down list

Select the type of acknowledgment for the message, which determines the number of acknowledgments that the Producer requires the server to have received before considering a request to be complete.
The available options are:

  • 0 - Send without any acknowledgment: The Producer does not wait for any acknowledgement by the server.
  • 1 - Send with leader acknowledgment: The Producer waits for the replication leader to acknowledge the message received by the server.
  • all - Send with full acknowledgment: The Producer waits for all replicas to acknowledge the message was received by the server.
1 - Send with leader acknowledgmentall - Send with full acknowledgement
Batch Size (bytes)Integer/Expression

Specify the number of bytes that are received before messages are sent to the server.

1638415000
Linger Time (milliseconds)Integer/Expression

Specify the linger time for the messages to be sent.

Linger time is the time in milliseconds, the Producer Snap should wait before it sends
out the data. During this period, the Producer will keep sending data in batches. The end of the sending process is triggered by both the Batch Size and Linger Time depending on which setting matches first.

01
RetriesInteger/Expression

Set the total number of times to attempt a message delivery in case of failure.

Whenever the Snap needs to connect to an Endpoint and the Connection object is either null or closed, the Snap opens the connection and tries to connect thrice before aborting.

03
Preserve Message Order With RetryCheck box

Select this check box to resend the messages in the same order each time.

This field is applicable when Retires is more than 0, in which case, the order of messages may get out of the set order; activating this check box ensures the messages order is preserved.

Not selected Selected
Compression TypeDrop-down list

Choose the compression type before sending the messages. The available options are:

  • NONE
  • GZIP
  • LZ4
  • SNAPPY
  • ZSTD

Learn more: Optimizing for Throughput | Confluent Documentation

NoneGZIP
Send SynchronouslyCheck boxSelect this check box to send each message synchronously. The consumer blocks the message until metadata from the broker is received. Not SelectedSelected
Output RecordsCheck box

Select this check box to include the record’s data and metadata in each output document, that is, key, value, headers, and timestamp.
If deselected, each document includes only the basic metadata (topic, partition, offset) for each record, including the original input document. 
A copy of each input document is stored in the output document under the key original.

Not SelectedSelected
Message Publish Timeout (milliseconds)Integer/ExpressionSpecify the timeout value in milliseconds to publish a message.
If the Snap fails to publish the message until the specified time, a timeout error appears.
6000060000
Snap ExecutionDrop-down list

Select one of the three modes in which the Snap executes. Available options are:

  • Validate & Execute: Performs limited execution of the Snap, and generates a data preview during Pipeline validation. Subsequently, performs full execution of the Snap (unlimited records) during Pipeline runtime.
  • Execute only: Performs full execution of the Snap during Pipeline execution without generating preview data.
  • Disabled: Disables the Snap and all Snaps that are downstream from it.

Execute onlyValidate & Execute

Example

Sending Documents to a Kafka Topic Using Kafka Producer Snap

This example Pipeline demonstrates how we can send documents to a Kafka topic using the Kafka Producer Snap. We use the Sequence Snap to enable the Pipeline to send documents in large numbers.

First, we configure the Sequence Snap to send 1000 documents. Hence, we set the Initial value as 1 and Number of documents as 1000.

Next, we configure the Kafka Producer Snap to send the documents to the Topic named SampleKafkaTopic. We set the Partition number to 0, to let the broker decide which partition to use.

Upon successful validation, the Pipeline displays the output preview as shown below:

Download this Pipeline.

Downloads

Important Steps to Successfully Reuse Pipelines

  1. Download and import the Pipeline into SnapLogic.
  2. Configure Snap accounts as applicable.
  3. Provide Pipeline parameters as applicable.


  File Modified

File Example_Kafka_Producer_Writing_Messages.slp

Feb 08, 2021 by Kalpana Malladi

Snap Pack History

 Click to view/expand
Release Snap Pack VersionDateType  Updates
August 2024438patches28225 Latest

Added the Kafka MSK IAM Account, which enables you to use IAM access control for secure communication with the MSK cluster. You can securely integrate your Kafka pipelines with the MSK cluster.

August 2024

main27765

 

Stable

Upgraded the org.json.json library from v20090211 to v20240303, which is fully backward compatible.

August 2024437patches27730 Latest

Fixed an issue with the Kafka Consumer Snap where the Seek Type of Beginning or End (skip all existing messages) may not work correctly after a rebalance. 

May 2024437patches27404 Latest

Fixed an issue with the Kafka SSL Account that displayed an error when the Kafka Consumer Snap could not commit offsets during the rebalancing process. This behavior avoids unstable partition allocation. Now, the Snap logs a warning, allowing the Kafka Consumer to continue processing instead of displaying an exception.

May 2024437patches26651 Latest

Added Kafka OAuth2 Account to the Kafka Snap Pack to enable the usage of OAuth within the Kafka SASL_SSL framework.

May 2024main26341 Stable

Kafka Snap Pack: Enhanced the following fields to support expressions through pipeline parameters:

  • Truststore Password

  • Keystore File Password

  • SSL Key Password

  • SASL Password or Secret

  • Registry Password or Secret

February 2024main25112 StableUpdated and certified against the current SnapLogic Platform release.
November 2023main23721

 

StableUpdated and certified against the current SnapLogic Platform release.

August 2023

main22460

 

Stable

Updated and certified against the current SnapLogic Platform release.

May 2023

433patches21070

 Latest

Fixed an issue with the Kafka Consumer Snap that caused it to skip records in a partition when a pipeline failed without successfully processing any records from that partition. This was an edge case that was only possible with a specific configuration of the Snap and a specific ordering of events during termination of the pipeline.

May 2023

main21015 

Stable

Upgraded with the latest SnapLogic Platform release.

February 2023432patches20143 Latest/StableFixed an issue with the Kafka Producer Snap to avoid an error about delivery.timeout.ms. The Snap now adjusts the value of the Message Publish Timeout setting automatically and displays a warning in the Pipeline Execution Statistics.
February 2023main19844 StableUpgraded with the latest SnapLogic Platform release.
November 2022431patches19770 LatestFixed a memory issue that occurred when the Kafka Producer Snap was used to produce many records in a long-running pipeline.
November 2022431patches19211 Latest

Enhanced the Kafka Consumer Snap with the new Max Poll Interval (ms) field, which allows you to specify the time limit between subsequent calls to poll. This field corresponds to the Kafka max.poll.interval.ms property.

November 2022main18944 StableUpgraded with the latest SnapLogic Platform release.
August 2022main17386 Stable
  • Upgraded Kafka client libraries to Apache-Kafka Version 3.1.1 and Confluent-Kafka Version 7.1.1.

  • Optimized the performance of the Snap with the support for LZ4 and ZSTD compression types of the messages produced.

  • The Kafka account that connects to the Kafka Schema Registry uses TLS v1.2 protocol instead of the older version of TLS, which has security vulnerabilities.

4.29 Patch429patches17109 Latest
  • Enhanced the Kafka Producer Snap with LZ4 and ZSTD Compression Types for compressing the messages that optimize Snap's performance.

  • Fixed an issue with the Kafka Producer Snap where the Snap wrote an output document even when there was an error.

  • Fixed an issue with Kafka Account where the expression buttons were enabled, by default, for the Bootstrap Server and Advanced Property Value fields.

  • Upgraded Kafka client libraries to Apache-Kafka Version 3.1.1 and Confluent-Kafka Version 7.1.1.

  • Enhanced the Kafka account, which is configured to connect to the Kafka Schema Registry, to use TLS v1.2 protocol instead of TLS that has security vulnerabilities.

4.29main15993 StableThe Kafka Snap Pack now supports Azure Event Hubs streaming service. Learn more: Azure Event Hubs configuration in Azure portal for Kafka SSL Account.
4.28 Patch428patches14904 Latest

Fixed an issue with Kafka Snaps where the Snaps failed with null pointer exceptions when logging the cluster information with the controller’s identity not known.

4.28main14627 StableUpgraded with the latest SnapLogic Platform release.

4.27

main12833

 

Stable

Upgraded with the latest SnapLogic Platform release.
4.26main11181 StableUpgraded with the latest SnapLogic Platform release.
4.25 Patch425patches10543 Latest
  • Improved the handling of interrupted/aborted Kafka Snaps to ensure proper clean-up of metrics.

  • Optimized the Kafka Producer Snap to initialize the Kafka API only if there is at least one input document.

  • Fixed the issue of account passwords being included in the log messages output of Kafka  Snaps. The password is now hidden in the logs for both Kafka Consumer and Kafka Producer Snaps.
  • The Kafka Producer and Consumer Snaps create a default Client ID string to identify the specific Snap instance.

  • Enhanced the Kafka Producer Snap to support Avro serialization format with a new target data type Avro under Key and Value Serializer.

  • Enhanced the Kafka SSL Account with new fields (Registry Username or Key and Registry Password or Secret) for Schema Registry authentication. The two existing SASL properties (SASL Username and SASL Password) have been revised to SASL Username or Key and SASL Password or Secret respectively.

  • Apache Kafka client library is upgraded from version 2.6.0 to 2.8.0.

  • Confluent Kafka client libraries are updated from version 5.2.1 to 6.2.0.

4.25main9554
 
StableUpgraded with the latest SnapLogic Platform release.
4.24 Patch424patches8805 Latest

Fixed an issue in the Kafka Producer Snap by removing the validation of the account when the Snap initialized in a Pipeline execution.

4.24main8556
StableUpgraded with the latest SnapLogic Platform release.
4.23 Patch423patches7900 Latest
  • Removed the Confluent prefix from the Snaps and accounts in the Snap Pack. However, the Snap Pack name continues to be Confluent Kafka Snap Pack.

  • Added the Wait For Full Count check box to the Kafka Consumer Snap to determine how a positive value for the Message Count field must be interpreted.

    • Enabled (by default): The Snap continues polling for messages until the specified count is reached.

    • Disabled: If the number of messages that are currently available is less than the specified count, then the Snap consumes the available messages and terminates.

      Known Issue

      The Wait For Full Count check box activates only when you provide a positive integer value in the Message Count field. However, it does not activate when you use an expression for Message Count even if the value evaluates to a positive number.

      Workaround: To activate this check box, temporarily replace the Message Count expression () with a positive integer, select the desired state for Wait For Full Count, and then restore the original value in the Message Count field.

  • Added support for writing and reading record headers.

    • The Kafka Producer Snap has a new Headers table to configure the Key, Value, and Serializer for each header to be written.

    • The Kafka Consumer Snap reads any headers present on the records it consumes. It provides two new fields to configure how the header values should be deserialized – Default Header Deserializer and Header Deserializers, for any headers that require a deserializer other than the default.

  • Added support for writing and reading each record’s timestamp.

    • The Kafka Producer Snap has a new Timestamp field that can be configured to set each record’s timestamp, which is the number of milliseconds since the epoch (00:00:00 UTC on January 1, 1970). This can be set to an expression that evaluates to a long integer, or a string that can be parsed as a long integer, or a date. If you specify no expression, or the value is empty, then the timestamp is set to the current time.

      The Timestamp field is relevant only if the Kafka topic is configured with message.timestamp.type = CreateTime (which is the default). For more information, see the official Kafka documentation.

    • The Kafka Consumer Snap has a new check box, Include Timestamp, which by default is disabled for backward compatibility. If enabled, the output for each record includes its timestamp in the metadata.

  • The Kafka Producer Snap has a new check box, Output Records, to determine the format of each output document when configured with an output view.

    • Disabled (by default): The Snap’s output includes only the basic metadata (topic, partition, offset) for each record, along with the original input document.

    • Enabled: Each output document contains a more complete representation of the record produced, including its key, value, headers, and timestamp.

  • The Kafka Consumer Snap has a new field, Output Mode, with two options:

    • One output document per record (the default): Every record received from Kafka has a corresponding output document.

    • One output document per batch: Preserves the batching of records as received from Kafka. Every poll that returns a non-empty set of records results in a single output document containing the list of records as batchbatch_size and batch_index. This mode is especially useful when Auto Commit is disabled and Acknowledge Mode is Wait after each batch of records, depending on the nature of the processing between the Kafka Consumer and the Kafka Acknowledge Snaps.

  • Removed the Add 1 to Offsets check box from the Kafka Consumer Snap.

  • Removed the Account tab from the Kafka Acknowledge Snap, because this Snap no longer needs an account.
4.23main7430
 
Stable

Fixed an intermittent issue of Confluent Kafka Consumer Snap stopping when Message Count is 0 while the records are available. The Snap will now stop only if a poll returns no records twice in a row, and has partitions assigned before and after each poll.

4.22main6403
 
StableUpgraded with the latest SnapLogic Platform release.
4.21 Patch421patches6136 Latest

Enhanced the Confluent Kafka Consumer Snap with an option to allow reprocessing of records that remain unacknowledged due to acknowledge timeouts. This ensures that all records from Kafka are processed.

4.21 Patch

421patches5862 Latest

Fixes the Confluent Kafka Consumer Snap that aborts abruptly upon getting a CommitFailedException error. Upgraded the Kafka client library from version 2.2 to 2.5.

4.21snapsmrc542

 

Stable

Enhanced the Confluent Kafka Consumer Snap to significantly reduce the volume of commit requests for offsets when using the Record acknowledgement mode (Wait after each record), and thereby, improve performance and stability.

4.20 Patch

confluentkafka8747 Latest

Updated the Confluent Kafka SSL Account settings to make the Truststore filepath and Truststore password properties optional. 

4.20snapsmrc535
 
StableUpgraded with the latest SnapLogic Platform release.
4.19snaprsmrc528
 
StableUpgraded with the latest SnapLogic Platform release.
4.18 Patchconfluentkafka8111 Latest

Fixed an issue with the Confluent Kafka Producer Snap wherein the Topic property doesn't evaluate expression against input documents.

4.18 Patchconfluentkafka8006 Latest

Fixed an exception that occurs while consuming log-compacted topics, in the Confluent Kafka Consumer Snap.

4.18 Patchconfluentkafka7784 Latest

Fixed the Consumer Snap to stop quickly and cleanly on CLOSE and STOP lifecycle events. 

4.18 Patch

confluentkafka7732 Latest

Added support for Kerberos-based authentication to enable connecting to Kerberos-enabled Kafka servers.

4.18snapsmrc523
 
StableUpgraded with the latest SnapLogic Platform release.

4.17 Patch

confluentkafka7537 Latest

Updated the following Snaps:

  • Confluent Kafka Consumer Snap
    • Properties added: Max poll recordsAcknowledge mode, Key deserializer, Value deserializer, Add 1 to offsets.
    • Properties removed: Auto commit interval (msec), Standardize error output.
    • Diagnostic fields added in the metadata object in the Snap's output:
      • client_id: The unique ID assigned to a single Kafka consumer client instance. All documents written by a Consumer Snap during a single pipeline execution or validation will have the same value for this field.
      • batch_size: The number of messages in the batch of messages that the current message came from. Note that “batch” isn’t Kafka terminology; we’re using it to mean the set of messages received from the broker in a single call to Kafka’s poll method.
      • batch_index: The current batch number relative to a single consumer instance.  This number starts at 0 for the first batch and is only incremented for batches which contain at least one message.
      • record_index: The current record number relative to its batch.  The first message in a batch has an index of 0; the last has an index of batch_size - 1.
  • Confluent Kafka Acknowledge Snap: Added new property–Metadata path.
  • Confluent Kafka Producer Snap: Added new properties–Key serializer and Value serializer.
4.17ALL7402
 
Latest

Pushed automatic rebuild of the latest version of each Snap Pack to SnapLogic UAT and Elastic servers.

4.17snapsmrc515
 
Latest
  • Tested and certified all Snaps against CDH 6.1.
  • Added the Snap Execution field to all Standard-mode Snaps. In some Snaps, this field replaces the existing Execute during preview check box.
4.16 Patchconfluentkafka7118 Latest

Fixed an issue with the Confluent Kafka Producer Snap wherein the Snap fails to produce the upstream Snap's input schema for Partition Number property in the output preview.

4.16 Patch

confluentkafka6891 Latest

Certified Confluent Platform version 5.x.

4.16snapsmrc508
 
StableUpgraded with the latest SnapLogic Platform release.
4.15 Patchconfluentkafka6446 Latest

Fixed an issue with the Confluent Kafka Consumer Snap wherein some connections were not closed after a Pipeline is aborted.

4.15snapsmrc500
 
StableUpgraded with the latest SnapLogic Platform release.
4.14 Patchconfluentkafka5737 Latest

Added a new property to the Confluent Kafka Producer Snap, Message publish timeout (milliseconds), that lets you specify a timeout value.

4.14 Patchconfluentkafka5663 Latest

Fixed the Confluent Kafka Snaps that do not delete temp files after pipeline execution.

4.14snapsmrc490
 
StableUpgraded with the latest SnapLogic Platform release.
4.13 Patch confluentkafka5614 Latest

Fixed the Confluent Kafka Snap Pack that does not delete the temp files after executing pipelines.

4.13 Patch confluentkafka5330 Latest

Added support in Confluent Kafka Producer Snap the ability to write data to multiple partitions, in a round robin fashion.  

4.13

snapsmrc486

 
StableUpgraded with the latest SnapLogic Platform release.

4.12 Patch 

confluentkafka5087 Latest

Fixed an issue with the Consumer Snap that fails when the specified timeout value is less than the pipeline execution time. 

4.12

snapsmrc480

 
Stable

Enhanced the performance of the Confluent Kafka Producer Snap in writing incoming documents into the Confluent Kafka Cluster.

4.11 Patch

confluentkafka4626 Latest

Fixed an issue with the Confluent Kafka Consumer Snap wherein the Consumer session is not closing properly when manually stopping a pipeline, which causes the subsequent pipeline execution to wait for the previous session to die and not consume messages.

4.11 Patchconfluentkafka4302 Latest

Fixed an issue with the Confluent Kafka Consumer Snap that fetched 500 messages only, when message count set to 0 caused due to driver default value change.

4.11snapsmrc465
 
Stable

Updated the Confluent Kafka SSL Account with SCRAM_SASL support.

4.10

snapsmrc414

 
Stable

Confluent v3.2 support for Confluent Kafka Snap Pack.

4.9 Patchconfluentkafka3156 Latest

Fixed an issue regarding account validation failure due to unauthorized topics; Enhance the error message for empty topic suggestions

4.9snapsmrc405
 
Stable
  • Introduced the Confluent Kafka Acknowledge Snap in this release.
  • Enhanced the Consumer and the Producer Snaps to use Kafka Connect API to enhance the performance.
4.8 Patchconfluentkafka2804 Latest

Addressed an issue with Confluent Kafka Producer not including input document schema in Message Key and Value expression properties

4.8

snapsmrc398

 
Stable

New Snap Pack for 4.8. It consists of Confluent Kafka Consumer and Confluent Kafka Producer.

See Also