GridGain Developers Hub

Kafka Sink Connector

Kafka Sink Connector integrates Kafka with GridGain 9, exporting topic data into GridGain tables.

Guarantees

GridGain Kafka sink provides the following guarantees:

  • At least once delivery.

  • Automatic failover and reconnection to GridGain cluster.

  • Message order within the same GridGain partition is preserved. Message order across different partitions is not defined.

Kafka Sink Configuration

Installation

Install the Kafka Sink connector following the recommended procedure.

Kafka Sink Connector Configuration

GridGain Kafka sink connector uses the standard Kafka configuration file to define configuration properties. Below is the example of the configuration file:

name=gridgain-kafka-connect-sink
topics=topic1,topic2,topic3
connector.class=org.gridgain.kafka.sink.GridGainSinkConnector
tasks.max=1

ignite.addresses=host1:10800,host2:10800
ignite.operationTimeout=30000

ignite.table.name.regex=^events$
ignite.table.name.regex.replacement=Events
ignite.table.name.regex.1=.*
ignite.table.name.regex.replacement.1=UserEvents
Parameter Description Default value

name

The name of GridGain kafka connector.

topics

The list of topics that will be copied to GridGain.

connector.class

The class name of the GridGain Sink connector.

tasks.max

The maximum number of sink tasks running in parallel.

retry.backoff

The period of time before retrying the request, in milliseconds.

3000

nested.struct.mode

How nested STRUCT fields are handled. Possible values are: CONCAT, FLATTEN, IGNORE, DISALLOW.

CONCAT (separator defined in nested.struct.concat.separator)

nested.struct.concat.separator

The separator that is used to concatenate nested field names into Ignite column name when nested.struct.mode is set to CONCAT. For example, if separator is _ and nested field is address.street, the column name will be address_street.

Underscore symbol (_)

flush.mode

Flush mode. Possible values are: KAFKA - Kafka decides when to flush, based on offset.flush.interval.ms configuration, ON_PUT - flush on every put. This causes more frequent flushes and may affect performance.

KAFKA

ignite.addresses

Required. Addresses of the GridGain nodes the data will be sent to.

ignite.authenticator.basic.password

Password for basic authentication to the GridGain cluster.

ignite.authenticator.basic.username

Username for basic authentication to the GridGain cluster.

ignite.connect.timeout

Socket connection timeout, in milliseconds.

5000

ignite.error.handling.policy

Error handling policy. Supported values: LOG_ONLY - the entry will be skipped while writing the error to the log. STOP_TASK - the task will be stopped when the error happens. DEAD_LETTER_QUEUE - problematic records will be redirected to the configured Dead Letter Queue topic without stopping the processing.

LOG_ONLY

ignite.heartbeat.interval

An interval at which the client sends heartbeat messages to the cluster, in milliseconds. 0 disables heartbeats.

30000

ignite.heartbeat.timeout

Heartbeat response timeout, in milliseconds. The connection is closed if the response is not received before the timeout occurs.

5000

ignite.reconnect.interval

Reconnect interval, in milliseconds. 0 disables background reconnects.

30_000

ignite.ssl.client.authentication.mode

Client authentication mode: NONE, OPTIONAL, or REQUIRE.

NONE

ignite.ssl.ciphers

Comma-separated list of ciphers to be used to set up the SSL connection.

ignite.ssl.enabled

If true, an SSL/TLS connection is established.

ignite.ssl.key.store.password

Keystore password to be used to set up the SSL connection.

ignite.ssl.key.store.path

Keystore path to be used to set up the SSL connection.

ignite.ssl.trust.store.password

Truststore password to be used to set up the SSL connection.

ignite.ssl.trust.store.path

Truststore path to be used to set up the SSL connection.

ignite.streamer.auto.flush.interval

Ignite data streamer’s auto-flush interval. The interval, in milliseconds, after which the data streamer will automatically flush the data to the cluster.

5000

ignite.streamer.page.size

Ignite data streamer’s page size. The number of entries that will be sent to the cluster per network call.

1000

ignite.streamer.parallel.ops

Ignite data streamer’s parallel operations. The number of parallel operations per partition. Defines how many in-flight requests can be active per partition.

1

ignite.streamer.retry.limit

Ignite data streamer’s retry limit. The number of retries in case of a connection issue.

16

ignite.table.name.regex

The regular expression pattern that will be replaced in the topic name. See the examples below for how to use regular expressions.

ignite.table.name.regex.replacement

The value regular expression match will be replaced by.

ignite.table.name.regex.1

Optional ordering for regular expressions. Expressions with lower number will be applied first.

ignite.table.name.regex.replacement.1

Optional ordering for regular expressions. Expressions with lower number will be applied first.

Kafka Topic Mapping

When applying regular expressions to map Kafka topics to GridGain tables, Kafka Sink applies only the first regular expression that returns matches.

In the example below, any text in Kafka topic will be replaced by MyTable

ignite.table.name.regex=.*
ignite.table.name.regex.replacement=MyTable

You can use standard regular expression substitution syntax. The example below will replace topic with table while maintaining the number:

ignite.table.name.regex=topic-(\d*)
ignite.table.name.regex.replacement=table-$1

Multiple regular expressions can be used to handle different topic names. The example below first tries to match topic-(\d*), and if no matches are found in topic name, tries to use the customer-(\d) expression.

ignite.table.name.regex=topic-(\d*)
ignite.table.name.regex.replacement=table-$1

ignite.table.name.regex.1=customer-(\d)
ignite.table.name.regex.replacement.2=from-$1

GridGain Configuration

On the receiving GridGain cluster, create tables and schemas to match data from your Kafka topics. Tables will not be created automatically, and not having the correct table will cause an error.

Type Conversion

GridGain Kafka Sink Connector uses the schema of the GridGain table to convert Kafka topic messages. For example, when GridGain column has the INT32 type, we expect Kafka data to have the INT8, INT16, INT32, or INT64 data type.

Full list of supported conversions is provided below:

GridGain Type Kafka Type

INT8

INT8, INT16, INT32, INT64

INT16

INT8, INT16, INT32, INT64

INT32

INT8, INT16, INT32, INT64

INT64

INT8, INT16, INT32, INT64

FLOAT

FLOAT, DOUBLE

DOUBLE

FLOAT, DOUBLE

DATE

STRING, ARRAY

TIME

STRING, ARRAY

DATETIME

STRING, ARRAY, LONG

TIMESTAMP

STRING, ARRAY, DOUBLE

UUID

STRING

BYTE_ARRAY

STRING, ARRAY, BYTES

DECIMAL

STRING, LONG, FLOAT, DOUBLE

Those conversions have been tested for all standard serializers and deserializers:

  • Json

  • Json with schema

  • Avro

  • Protobuf