Kafka Sink Connector
Kafka Sink Connector integrates Kafka with GridGain 9, exporting topic data into GridGain tables.
Guarantees
GridGain Kafka sink provides the following guarantees:
-
At least once delivery.
-
Automatic failover and reconnection to GridGain cluster.
-
Message order within the same GridGain partition is preserved. Message order across different partitions is not defined.
Kafka Sink Configuration
Installation
Install the Kafka Sink connector following the recommended procedure.
Kafka Sink Connector Configuration
GridGain Kafka sink connector uses the standard Kafka configuration file to define configuration properties. Below is the example of the configuration file:
name=gridgain-kafka-connect-sink topics=topic1,topic2,topic3 connector.class=org.gridgain.kafka.sink.GridGainSinkConnector tasks.max=1 ignite.addresses=host1:10800,host2:10800 ignite.operationTimeout=30000 ignite.table.name.regex=^events$ ignite.table.name.regex.replacement=Events ignite.table.name.regex.1=.* ignite.table.name.regex.replacement.1=UserEvents
Parameter | Description | Default value |
---|---|---|
name |
The name of GridGain kafka connector. |
|
topics |
The list of topics that will be copied to GridGain. |
|
connector.class |
The class name of the GridGain Sink connector. |
|
tasks.max |
The maximum number of sink tasks running in parallel. |
|
retry.backoff |
The period of time before retrying the request, in milliseconds. |
3000 |
nested.struct.mode |
How nested STRUCT fields are handled. Possible values are: |
CONCAT (separator defined in |
nested.struct.concat.separator |
The separator that is used to concatenate nested field names into Ignite column name when |
Underscore symbol ( |
flush.mode |
Flush mode. Possible values are: |
|
ignite.addresses |
Required. Addresses of the GridGain nodes the data will be sent to. |
|
ignite.authenticator.basic.password |
Password for basic authentication to the GridGain cluster. |
|
ignite.authenticator.basic.username |
Username for basic authentication to the GridGain cluster. |
|
ignite.connect.timeout |
Socket connection timeout, in milliseconds. |
5000 |
ignite.error.handling.policy |
Error handling policy. Supported values: |
|
ignite.heartbeat.interval |
An interval at which the client sends heartbeat messages to the cluster, in milliseconds. 0 disables heartbeats. |
30000 |
ignite.heartbeat.timeout |
Heartbeat response timeout, in milliseconds. The connection is closed if the response is not received before the timeout occurs. |
5000 |
ignite.reconnect.interval |
Reconnect interval, in milliseconds. 0 disables background reconnects. |
30_000 |
ignite.ssl.client.authentication.mode |
Client authentication mode: |
|
ignite.ssl.ciphers |
Comma-separated list of ciphers to be used to set up the SSL connection. |
|
ignite.ssl.enabled |
If true, an SSL/TLS connection is established. |
|
ignite.ssl.key.store.password |
Keystore password to be used to set up the SSL connection. |
|
ignite.ssl.key.store.path |
Keystore path to be used to set up the SSL connection. |
|
ignite.ssl.trust.store.password |
Truststore password to be used to set up the SSL connection. |
|
ignite.ssl.trust.store.path |
Truststore path to be used to set up the SSL connection. |
|
ignite.streamer.auto.flush.interval |
Ignite data streamer’s auto-flush interval. The interval, in milliseconds, after which the data streamer will automatically flush the data to the cluster. |
5000 |
ignite.streamer.page.size |
Ignite data streamer’s page size. The number of entries that will be sent to the cluster per network call. |
1000 |
ignite.streamer.parallel.ops |
Ignite data streamer’s parallel operations. The number of parallel operations per partition. Defines how many in-flight requests can be active per partition. |
1 |
ignite.streamer.retry.limit |
Ignite data streamer’s retry limit. The number of retries in case of a connection issue. |
16 |
ignite.table.name.regex |
The regular expression pattern that will be replaced in the topic name. See the examples below for how to use regular expressions. |
|
ignite.table.name.regex.replacement |
The value regular expression match will be replaced by. |
|
ignite.table.name.regex.1 |
Optional ordering for regular expressions. Expressions with lower number will be applied first. |
|
ignite.table.name.regex.replacement.1 |
Optional ordering for regular expressions. Expressions with lower number will be applied first. |
Kafka Topic Mapping
When applying regular expressions to map Kafka topics to GridGain tables, Kafka Sink applies only the first regular expression that returns matches.
In the example below, any text in Kafka topic will be replaced by MyTable
ignite.table.name.regex=.* ignite.table.name.regex.replacement=MyTable
You can use standard regular expression substitution syntax. The example below will replace topic
with table
while maintaining the number:
ignite.table.name.regex=topic-(\d*) ignite.table.name.regex.replacement=table-$1
Multiple regular expressions can be used to handle different topic names. The example below first tries to match topic-(\d*)
, and if no matches are found in topic name, tries to use the customer-(\d)
expression.
ignite.table.name.regex=topic-(\d*) ignite.table.name.regex.replacement=table-$1 ignite.table.name.regex.1=customer-(\d) ignite.table.name.regex.replacement.2=from-$1
GridGain Configuration
On the receiving GridGain cluster, create tables and schemas to match data from your Kafka topics. Tables will not be created automatically, and not having the correct table will cause an error.
Type Conversion
GridGain Kafka Sink Connector uses the schema of the GridGain table to convert Kafka topic messages. For example, when GridGain column has the INT32
type, we expect Kafka data to have the INT8
, INT16
, INT32
, or INT64
data type.
Full list of supported conversions is provided below:
GridGain Type | Kafka Type |
---|---|
INT8 |
INT8, INT16, INT32, INT64 |
INT16 |
INT8, INT16, INT32, INT64 |
INT32 |
INT8, INT16, INT32, INT64 |
INT64 |
INT8, INT16, INT32, INT64 |
FLOAT |
FLOAT, DOUBLE |
DOUBLE |
FLOAT, DOUBLE |
DATE |
STRING, ARRAY |
TIME |
STRING, ARRAY |
DATETIME |
STRING, ARRAY, LONG |
TIMESTAMP |
STRING, ARRAY, DOUBLE |
UUID |
STRING |
BYTE_ARRAY |
STRING, ARRAY, BYTES |
DECIMAL |
STRING, LONG, FLOAT, DOUBLE |
Those conversions have been tested for all standard serializers and deserializers:
-
Json
-
Json with schema
-
Avro
-
Protobuf
© 2024 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.