Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

In this article

Table of Contents
maxLevel2
excludeOlder Versions|Additional Resources|Related Links|Related Information

Overview

You can use the Kafka Producer Snap to send messages to a Kafka topic.

Image Modified

Prerequisites

  • Valid client ID.
  • A Kafka server with a valid account and the required permissions.

Support for Ultra Pipelines  

Works in Ultra Pipelines.

Limitations and Known Issues

None.

Snap Input and Output

Input/OutputType of ViewNumber of viewsExamples of Upstream and Downstream SnapsDescription
InputDocument
  • Min:1
  • Max:1
  • Mapper Snap
  • Copy Snap
  • Sequence Snap
A key/value pair for the Kafka message.
OutputDocument
  • Min:1
  • Max:1
  • None: The Consumer Snap is decoupled from the Producer Snap.

Kafka message records. Example,

[{"kafka_offset":"7139","kafka_partition":"0","kafka_topic":"PC_Test","original":{"msgID":"id1100","msgVal":"msg1100"}}]

Snap Settings

Parameter NameData TypeDescriptionDefault ValueExample
LabelString
Required. The name for the Snap. You can modify the default name to be specific and meaningful, especially if you have more than one of the same Snaps in your pipeline.
Kafka ProducerProducer
Client IDString/Expression

Specify the Client ID to be used in the Kafka message. If you do not specify a Client ID, a random string is used.

This Client ID is used for correlating the requests sent to the brokers with each client.

N/Atestclient001
Topic


String/Expression/Suggestion

Specify the topic to publish the message.
If the topic does not exist, a new topic and its partitions are created based on the configuration.

N/At1
Partition NumberString/Expression/Suggestion

Specify the partition number in which the messages should be published.
If you do not specify the partition number, the server publishes the messages to the next partition chosen, based on the configuration.

N/A0
Message KeyString/Expression

Enter the key for the message to be sent out.

N/AcustomerID
Message ValueString/Expression

Required. Provide the value for the corresponding message key.
The messages are sent with a value and, optionally, a key.


N/A
  • JSON.stringify($)
  • 1000
TimestampString/Expression

Specify a timestamp for the message in number, string, or date format.
A timestamp value is the number of milliseconds that have elapsed since midnight, January 1, 1970 UTC.


Info

Each record's timestamp must be greater than or equal to the timestamp of the preceding record in the same partition; that is, the records must be in chronological order within each partition.


Note

If you enter the timestamp in number or Epoch format, the value must be non-negative, (for example, 01-01-1970), else an error message is displayed. If you do not set a timestamp or if the timestamp resolves to null or a blank string, the record's timestamp defaults to the current time.

The Timestamp field is relevant only if the Kafka topic is configured with message.timestamp.type = CreateTime (which is the default). For more information, see the official Kafka documentation.


CreateTimeDate.parse($timestamp, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
Key SerializerDrop-down list

Select the target data type for the Key Serializer of each record. The available options are:

  • String 
  • JSON (arbitrary complex data, schema-less)
  • Avro (arbitrary complex data, schema-centric)

    Note
    • The Avro format depends on an Avro schema registered in Kafka Schema Registry. The Kafka account associated with the Snap must have the Schema Registry configured.
    • The Kafka Producer Snap looks up the Avro schema by name using Kafka’s standard naming convention: “topic-key” for a key schema, or “topic-value” for a value schema. For example, if the topic is “order”, then the schema name for the key schema would be “order-key”.


  • ByteArray (binary content, such as images)
  • Int16 (16-bit integer)
  • Int32 (32-bit integer)
  • Int64 (64-bit integer)
  • Float32 (32-bit floating-point number)
  • Float64 (64-bit floating-point number)
  • BigInteger (arbitrary precision integer)
  • BigDecimal (arbitrary precision decimal number)
Note

The Key Deserializer field's value in the corresponding Consumer Snap should also be of the same data type.


StringJSON
Value SerializerDrop-down list

Choose the target data type for the Value Serializer of each record. The available options are:

  • String 
  • JSON
  • Avro

    Note
    • The Avro format depends on an Avro schema registered in Kafka Schema Registry. The Kafka account associated with the Snap must have the Schema Registry configured.
    • The Kafka Producer Snap looks up the Avro schema by name using Kafka’s standard naming convention: “topic-key” for a key schema, or “topic-value” for a value schema. For example, if the topic is “order”, then the schema name for the key schema would be “order-key”.


  • ByteArray
  • Int16 (16-bit integer)
  • Int32 (32-bit integer)
  • Int64 (64-bit integer)
  • Float32 (32-bit floating-point number)
  • Float64 (64-bit floating-point number)
  • BigInteger (arbitrary precision integer)
  • BigDecimal (arbitrary precision decimal number)
Note

The Value Deserializer field's value in the corresponding Consumer Snap should also be of the same value.


StringByteArray

Headers

Use this field set to define an optional set of headers for each record. Click  to add a new row for record headers and define key, value, and serializer values. This field set contains the following fields:

  • Key
  • Value
  • Serializer
Info

The Kafka Producer Snap supports the addition of multiple headers with the same key. When converting a Kafka record to an output document, multiple headers with the same key are combined into a single header whose value is a list or an array.


KeyString/Expression

Specify a name to use for the Header.

You can add multiple headers with the same key in either of the following ways:

  • Configure multiple rows in the Header table with the same key.
  • Configure a row in the Header table with a value that evaluates to a list or an array.
N/ACity
ValueString/Expression

Specify a value for the header.

N/ASFO
SerializerDrop-down listSelect a Serializer for the header to convert the value from a specific data type such as string or Int32.StringByteArray
AcknowledgementsDrop-down list

Select the type of acknowledgment for the message, which determines the number of acknowledgments that the Producer requires the server to have received before considering a request to be complete.
The available options are:

  • 0 - Send without any acknowledgment: The Producer does not wait for any acknowledgement by the server.
  • 1 - Send with leader acknowledgment: The Producer waits for the replication leader to acknowledge the message received by the server.
  • all - Send with full acknowledgment: The Producer waits for all replicas to acknowledge the message was received by the server.
1 - Send with leader acknowledgmentall - Send with full acknowledgement
Batch Size (bytes)Integer/Expression

Specify the number of bytes that are received before messages are sent to the server.

1638415000
Linger Time (milliseconds)Integer/Expression

Specify the linger time for the messages to be sent.

Linger time is the time in milliseconds, the Producer Snap should wait before it sends
out the data. During this period, the Producer will keep sending data in batches. The end of the sending process is triggered by both the Batch Size and Linger Time depending on which setting matches first.

01
RetriesInteger/Expression

Set the total number of times to attempt a message delivery in case of failure.


Note

Whenever the Snap needs to connect to an Endpoint and the Connection object is either null or closed, the Snap opens the connection and tries to connect thrice before aborting.


03
Preserve Message Order With RetryCheck box

Select this check box to resend the messages in the same order each time.


Note

This field is applicable when Retires is more than 0, in which case, the order of messages may get out of the set order; activating this check box ensures the messages order is preserved.


Not selected Selected
Compression TypeDrop-down list

Choose the compression type before sending the messages. The available options are:

  • NONE
  • GZIP
  • SNAPPY
NoneGZIP
Send SynchronouslyCheck boxSelect this check box to send each message synchronously. The consumer blocks the message until metadata from the broker is received. Not SelectedSelected
Output RecordsCheck box

Select this check box to include the record’s data and metadata in each output document, that is, key, value, headers, and timestamp.
If deselected, each document includes only the basic metadata (topic, partition, offset) for each record, including the original input document. 
A copy of each input document is stored in the output document under the key original.

Not SelectedSelected
Message Publish Timeout (milliseconds)Integer/ExpressionSpecify the timeout value in milliseconds to publish a message.
If the Snap fails to publish the message until the specified time, a timeout error appears.
6000060000
Snap ExecutionDrop-down list

Select one of the three modes in which the Snap executes. Available options are:

  • Validate & Execute: Performs limited execution of the Snap, and generates a data preview during Pipeline validation. Subsequently, performs full execution of the Snap (unlimited records) during Pipeline runtime.
  • Execute only: Performs full execution of the Snap during Pipeline execution without generating preview data.
  • Disabled: Disables the Snap and all Snaps that are downstream from it.

Execute onlyValidate & Execute

Example

Sending Documents to a Kafka Topic Using Kafka Producer Snap

This example Pipeline demonstrates how we can send documents to a Kafka topic using the Kafka Producer Snap. We use the Sequence Snap to enable the Pipeline to send documents in large numbers.

First, we configure the Sequence Snap to send 1000 documents. Hence, we set the Initial value as 1 and Number of documents as 1000.

Next, we configure the Kafka Producer Snap to send the documents to the Topic named SampleKafkaTopic. We set the Partition number to 0, to let the broker decide which partition to use.

Upon successful validation, the Pipeline displays the output preview as shown below:

Download this Pipeline.

Downloads

Note
titleImportant Steps to Successfully Reuse Pipelines
  1. Download and import the Pipeline into SnapLogic.
  2. Configure Snap accounts as applicable.
  3. Provide Pipeline parameters as applicable.


Attachments
patterns.*slp, .*zip