Ignite 3
Edit

Cross-cluster Replication with Kafka

CDC replication using Kafka

This way to replicate changes between clusters requires setting up two applications:

  1. ignite-cdc.sh with org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer that will capture changes from source cluster and write it to Kafka topic.

  2. kafka-to-ignite.sh that will read changes from Kafka topic and then write them to destination cluster.

Note
Instances of ignite-cdc.sh with configured streamer should be started on each server node of source cluster to capture all changes.
Important
CDC trough Kafka requires metadata topic with the only one partition for sequential ordering guarantees.

CDC ignite2kafka

IgniteToKafkaCdcStreamer Configuration

Name Description Default value

caches

Set of cache names to replicate.

null

kafkaProperties

Kafka producer properties.

null

topic

Name of the Kafka topic for CDC events.

null

kafkaParts

Number of Kafka partitions in CDC events topic.

null

metadataTopic

Name of topic for replication of BinaryTypes and TypeMappings.

null

onlyPrimary

Flag to handle changes only on primary node.

false

maxBatchSize

Maximum size of concurrently produced Kafka records. When streamer reaches this number, it waits for Kafka acknowledgements, and then commits CDC offset.

1024

kafkaRequestTimeout

Kafka request timeout in milliseconds.

3000

  • kafkaRequestTimeout property sets how much IgniteToKafkaCdcStreamer will wait for KafkaProducer to finish request.

Note
kafkaRequestTimeout should not be too low. If wait time exceeds kafkaRequestTimeout, then IgniteToKafkaCdcStreamer will fail with a timeout error.
  • To specify KafkaProducer settings, use kafkaProperties property. We suggest to use a separate file to store all the necessary configuration properties and reference it from the IgniteToKafkaCdcStreamer configuration '.xml' file. See the examples below.

kafka.properties

bootstrap.servers=xxx.x.x.x:9092
request.timeout.ms=10000

IgniteToKafkaCdcStreamer bean declaration in ignite-to-kafka-streamer-config.xml

<bean id="cdc.streamer" class="org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer">
    <property name="topic" value="${send_data_kafka_topic_name}"/>
    <property name="metadataTopic" value="${send_metadata_kafka_topic_name}"/>
    <property name="kafkaPartitions" value="${send_kafka_partitions}"/>
    <property name="caches">
        <list>
            <value>terminator</value>
        </list>
    </property>
    <property name="onlyPrimary" value="false"/>
    <property name="kafkaProperties" ref="kafkaProperties"/>
</bean>

<util:properties id="kafkaProperties" location="file:kafka_properties_path/kafka.properties"/>
Note
request.timeout.ms Kafka producer property is mandatory for streamer configuration. For more details you should refer to a configuration section of the official Kafka documentation.

IgniteToKafkaCdcStreamer Metrics

Name Description

EventsCount

Count of messages applied to Kafka.

LastEventTime

Timestamp of last applied event to Kafka.

TypesCount

Count of binary types events applied to Kafka.

MappingsCount

Count of mappings events applied to Kafka.

BytesSent

Count of bytes sent to Kafka.

MarkersCount

Count of metadata markers sent to Kafka.

kafka-to-ignite.sh application

This application should be started near the destination cluster. kafka-to-ignite.sh will read CDC events from Kafka topic and then apply them to destination cluster.

Important
kafka-to-ignite.sh implements the fail-fast approach. It just fails in case of any error. The restart procedure should be configured with the OS tools.

Count of instances of the application does not corellate to the count of destination server nodes. It should be just enough to process source cluster load. Each instance of application will process configured subset of topic partitions to spread the load. KafkaConsumer for each partition will be created to ensure fair reads.

Configuration

Application configuration should be done using POJO classes or Spring xml file like regular Ignite node configuration. Kafka to Ignite configuration file should contain the following beans that will be loaded during startup:

  1. One of the configuration beans to define a client type that will connect to the destination cluster:

    • IgniteConfiguration bean: Configuration of a client node.

    • ClientConfiguration bean: Configuration of a Java Thin Client.

  2. java.util.Properties bean with the name kafkaProperties: Single Kafka consumer configuration.

  3. org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration bean: Options specific to kafka-to-ignite.sh application.

Name Description Default value

caches

Set of cache names to replicate.

null

topic

Name of the Kafka topic for CDC events.

null

kafkaPartsFrom

Lower Kafka partitions number (inclusive) for CDC events topic.

-1

kafkaPartsTo

Lower Kafka partitions number (exclusive) for CDC events topic.

-1

metadataTopic

Name of topic for replication of BinaryTypes and TypeMappings.

null

metadataConsumerGroup

Group for KafkaConsumer, which polls from metadata topic

ignite-metadata-update-<kafkaPartsFrom>-<kafkaPartsTo>

kafkaRequestTimeout

Kafka request timeout in milliseconds.

3000

kafkaConsumerPollTimeout

Kafka poll timeout in milliseconds.

3000

maxBatchSize

Maximum number of events to be sent to destination cluster in a single batch.

1024

threadCount

Count of threads to proceed consumers. Each thread poll records from dedicated partitions in round-robin manner.

16

metricRegistryName

Name for metric registry. org.apache.metricRegistryName.cdc.applier

cdc-kafka-to-ignite

  • kafkaRequestTimeout property is used as timeout for KafkaConsumer methods (except for KafkaConsumer#poll).

Note
kafkaRequestTimeout should not be too low, otherwise you are risking the application fail on method execution.
  • kafkaConsumerPollTimeout property is used as timeout for KafkaConsumer#poll method.

Note
High kafkaConsumerPollTimeout property setting might greatly affect replication performance. Kafka topics partitions are equally distributed among threads (see threadCount). Each thread can only poll one partition at a time, meaning no other partition, asigned to the same thread, will be polled from while the current is not handled.
  • To specify KafkaConsumer settings, use kafkaProperties bean. Basically, you need to use a separate file to store all the necessary configuration properties and reference it from the KafkaToIgniteCdcStreamer configuration '.xml' file. See the examples below.

kafka.properties

bootstrap.servers=127.0.0.1:9092
request.timeout.ms=10000
group.id=kafka-to-ignite-dc1
auto.offset.reset=earliest
enable.auto.commit=false

Kafka properties bean declaration in kafka-to-ignite-streamer-config.xml

<util:properties id="kafkaProperties" location="file:kafka_properties_path/kafka.properties"/>
Note
request.timeout.ms Kafka consumer property is mandatory for streamer configuration.

Metrics

Name Description

EventsReceivedCount

Count of events received from Kafka.

LastEventReceivedTime

Timestamp of last received event from Kafka.

EventsSentCount

Count of events sent to destination cluster.

LastBatchSentTime

Timestamp of last sent batch to the destination cluster.

MarkersCount

Count of metadata markers received from Kafka.

Logging

kafka-to-ignite.sh uses the same logging configuration as the Ignite node does. The only difference is that the log is written in the "kafka-ignite-streamer.log" file.

Fault tolerance

It expected that CDC streamers will be configured with the onlyPrimary=false in most real-world deployments to ensure fault-tolerance. That means streamer will send the same change several times equal to CacheConfiguration#backups + 1.