Kafka Producer
n this article
Overview
You can use the Kafka Producer Snap to send messages to a Kafka topic.
Prerequisites
Valid client ID.
A Kafka server with a valid account and the required permissions.
Support for Ultra Pipelines
Works in Ultra Pipelines.
Known Issues
Recent changes in Kafka to enable idempotent writes by default may result in authorization errors when using the Kafka Snaps if your Kafka cluster version is lower than 2.8.
To fix this issue, we recommend you take one of the following actions:
Upgrade your Kafka cluster to at least version 2.8.
Use Kafka’s ACL management to ensure that IdempotentWrite is allowed for the cluster.
Snap Views
Input/Output | Type of View | Number of views | Examples of Upstream and Downstream Snaps | Description |
|---|---|---|---|---|
Input | Document |
|
| A key/value pair for the Kafka message. |
Output | Document |
|
| Kafka message records. Example,
|
Snap Settings
Field Name | Field Type | Description |
|---|---|---|
Label | String | Required. The name for the Snap. You can modify the default name to be specific and meaningful, especially if you have more than one of the same Snaps in your pipeline. Default Value: Kafka Producer |
Client ID | String/Expression | Specify the Client ID to be used in the Kafka message. If you do not specify a Client ID, a random string is used. This Client ID is used for correlating the requests sent to the brokers with each client. Default Value: N/A |
Topic | String/Expression/Suggestion | Specify the topic to publish the message. Default Value: N/A |
Partition Number | String/Expression/Suggestion | Specify the partition number in which the messages should be published. Default Value: N/A |
Message Key | String/Expression | Enter the key for the message to be sent out. Default Value: N/A |
Message Value | String/Expression | Required. Provide the value for the corresponding message key. Default Value: N/A
|
Timestamp | String/Expression | Specify a timestamp for the message in number, string, or date format. Default Value: CreateTime Each record's timestamp must be greater than or equal to the timestamp of the preceding record in the same partition; that is, the records must be in chronological order within each partition. If you enter the timestamp in number or Epoch format, the value must be non-negative, (for example, 01-01-1970), else an error message is displayed. If you do not set a timestamp or if the timestamp resolves to null or a blank string, the record's timestamp defaults to the current time. The Timestamp field is relevant only if the Kafka topic is configured with |
Key Serializer | Dropdown list | Select the target data type for the Key Serializer of each record. The available options are:
Default Value: String The Key Deserializer field's value in the corresponding Consumer Snap should also be of the same data type. |
Value Serializer | Dropdown list | Choose the target data type for the Value Serializer of each record. The available options are:
Default Value: String The Avro and JSON_SR formats depend on the schema registered in the Kafka Schema Registry. The Kafka account associated with the Snap must have the Schema Registry configured. The Value Deserializer field's value in the corresponding Consumer Snap should also be of the same value. |
Headers | Use this field set to define an optional set of headers for each record. Click to add a new row for record headers and define key, value, and serializer values. The Kafka Producer Snap supports the addition of multiple headers with the same key. When converting a Kafka record to an output document, multiple headers with the same key are combined into a single header whose value is a list or an array. | |
Key | String/Expression | Specify a name to use for the Header. You can add multiple headers with the same key in either of the following ways:
Default Value: N/A |
Value | String/Expression | Specify a value for the header.
Default Value: N/A |
Serializer | Dropdown list | Select a Serializer for the header to convert the value from a specific data type such as string or Int32.
Default Value: String |
Acknowledgements | Dropdown list | Select the type of acknowledgment for the message, which determines the number of acknowledgments that the Producer requires the server to have received before considering a request to be complete.
Default Value: 1 - Send with leader acknowledgment |
Batch Size (bytes) | Integer/Expression | Specify the number of bytes that are received before messages are sent to the server. Default Value: 16384 |
Linger Time (milliseconds) | Integer/Expression | Specify the linger time for the messages to be sent. Linger time is the time in milliseconds, the Producer Snap should wait before it sends Default Value: 0 |
Retries | Integer/Expression | Set the total number of times to attempt a message delivery in case of failure. Default Value: 0 Whenever the Snap needs to connect to an Endpoint and the Connection object is either null or closed, the Snap opens the connection and tries to connect thrice before aborting. |
Preserve Message Order With Retry | Checkbox | Select this check box to resend the messages in the same order each time. Default status: Deselected This field is applicable when Retires is more than 0, in which case, the order of messages may get out of the set order; activating this check box ensures the messages order is preserved. |
Compression Type | Dropdown list | Choose the compression type before sending the messages. The available options are:
Default Value: None Learn more: |
Send Synchronously | Checkbox | Select this check box to send each message synchronously. The consumer blocks the message until metadata from the broker is received.
Default status: Deselected |
Output Records | Checkbox | Select this check box to include the record’s data and metadata in each output document, that is, key, value, headers, and timestamp. Default status: Deselected |
Message Publish Timeout (milliseconds) | Integer/Expression | Specify the timeout value in milliseconds to publish a message. Default Value: 60000 |
Snap Execution | Dropdown list | Select one of the three modes in which the Snap executes. Available options are:
Default Value: Execute only |
Example
Sending documents to a Kafka topic
This example Pipeline demonstrates how we can send documents to a Kafka topic using the Kafka Producer Snap. We use the Sequence Snap to enable the Pipeline to send documents in large numbers.
First, we configure the Sequence Snap to send 1000 documents. Hence, we set the Initial value as 1 and Number of documents as 1000.
Next, we configure the Kafka Producer Snap to send the documents to the Topic named SampleKafkaTopic. We set the Partition number to 0, to let the broker decide which partition to use.
Upon successful validation, the Pipeline displays the output preview as shown below:
Downloads
Important Steps to Successfully Reuse Pipelines
Download and import the Pipeline into SnapLogic.
Configure Snap accounts as applicable.
Provide Pipeline parameters as applicable.
Have feedback? Email documentation@snaplogic.com | Ask a question in the SnapLogic Community
© 2017-2025 SnapLogic, Inc.