Change Data Capture Extension | Ignite Documentation

Ignite Summit 2024 — Call For Speakers Now Open — Learn more

Edit

Change Data Capture Extension

Warning
CDC is an experimental feature. API or design architecture might be changed.

Overview

Change Data Capture Extension module provides two ways to set up cross cluster replication based on CDC.

  1. Ignite2IgniteClientCdcStreamer - streams changes to destination cluster using Java Thin Client.

  2. Ignite2IgniteCdcStreamer - streams changes to destination cluster using client node.

  3. Ignite2KafkaCdcStreamer combined with KafkaToIgniteCdcStreamer streams changes to destination cluster using Apache Kafka as a transport.

Note
For each cache replicated between clusters CacheVersionConflictResolver should be defined.
Note
All implementations of CDC replication support replication of BinaryTypes and TypeMappings
Note
To use SQL queries on the destination cluster over CDC-replicated data, set the same VALUE_TYPE in CREATE TABLE on both source and destination clusters for each table.

Ignite to Java Thin Client CDC streamer

This streamer starts Java Thin Client which connects to destination cluster. After connection is established, all changes captured by CDC will be replicated to destination cluster.

Note
Instances of ignite-cdc.sh with configured streamer should be started on each server node of source cluster to capture all changes.

CDC ignite2igniteClient

Configuration

Name Description Default value

caches

Set of cache names to replicate.

null

destinationClientConfiguration

Client configuration of thin client that will connect to destination cluster to replicate changes.

null

onlyPrimary

Flag to handle changes only on primary node.

false

maxBatchSize

Maximum number of events to be sent to destination cluster in a single batch.

1024

Metrics

Name

Description

EventsCount

Count of messages applied to destination cluster.

LastEventTime

Timestamp of last applied event.

TypesCount

Count of received binary types events.

MappingsCount

Count of received mappings events.

Ignite to Ignite CDC streamer

This streamer starts client node which connects to destination cluster. After connection is established, all changes captured by CDC will be replicated to destination cluster.

Note
Instances of ignite-cdc.sh with configured streamer should be started on each server node of source cluster to capture all changes.

CDC ignite2ignite

Configuration

Name Description Default value

caches

Set of cache names to replicate.

null

destinationIgniteConfiguration

Ignite configuration of client nodes that will connect to destination cluster to replicate changes.

null

onlyPrimary

Flag to handle changes only on primary node.

false

maxBatchSize

Maximum number of events to be sent to destination cluster in a single batch.

1024

Metrics

Name

Description

EventsCount

Count of messages applied to destination cluster.

LastEventTime

Timestamp of last applied event.

TypesCount

Count of received binary types events.

MappingsCount

Count of received mappings events.

CDC replication using Kafka

This way to replicate changes between clusters requires setting up two applications:

  1. ignite-cdc.sh with org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer that will capture changes from source cluster and write it to Kafka topic.

  2. kafka-to-ignite.sh that will read changes from Kafka topic and then write them to destination cluster.

Note
Instances of ignite-cdc.sh with configured streamer should be started on each server node of source cluster to capture all changes.
Important
CDC trough Kafka requires metadata topic with the only one partition for sequential ordering guarantees.

CDC ignite2kafka

IgniteToKafkaCdcStreamer Configuration

Name Description Default value

caches

Set of cache names to replicate.

null

kafkaProperties

Kafka producer properties.

null

topic

Name of the Kafka topic for CDC events.

null

kafkaParts

Number of Kafka partitions in CDC events topic.

null

metadataTopic

Name of topic for replication of BinaryTypes and TypeMappings.

null

onlyPrimary

Flag to handle changes only on primary node.

false

maxBatchSize

Maximum size of concurrently produced Kafka records. When streamer reaches this number, it waits for Kafka acknowledgements, and then commits CDC offset.

1024

kafkaRequestTimeout

Kafka request timeout in milliseconds.

3000

IgniteToKafkaCdcStreamer Metrics

Name

Description

EventsCount

Count of messages applied to destination cluster.

LastEventTime

Timestamp of last applied event.

BytesSent

Number of bytes send to Kafka.

kafka-to-ignite.sh application

This application should be started near the destination cluster. kafka-to-ignite.sh will read CDC events from Kafka topic and then apply them to destination cluster.

Important
kafka-to-ignite.sh implements the fail-fast approach. It just fails in case of any error. The restart procedure should be configured with the OS tools.

Count of instances of the application does not corellate to the count of destination server nodes. It should be just enough to process source cluster load. Each instance of application will process configured subset of topic partitions to spread the load. KafkaConsumer for each partition will be created to ensure fair reads.

Installation

  1. Build cdc-ext module with maven:

      $~/src/ignite-extensions/> mvn clean package -DskipTests
      $~/src/ignite-extensions/> ls modules/cdc-ext/target | grep zip
    ignite-cdc-ext.zip
  2. Unpack ignite-cdc-ext.zip archive to $IGNITE_HOME folder.

Now, you have additional binary $IGNITE_HOME/bin/kafka-to-ignite.sh and $IGNITE_HOME/libs/optional/ignite-cdc-ext module.

Note
Please, enable ignite-cdc-ext to be able to run kafka-to-ignite.sh.

Configuration

Application configuration should be done using POJO classes or Spring xml file like regular Ignite node configuration. Kafka to Ignite configuration file should contain the following beans that will be loaded during startup:

  1. One of the configuration beans to define a client type that will connect to the destination cluster:

    • IgniteConfiguration bean: Configuration of a client node.

    • ClientConfiguration bean: Configuration of a Java Thin Client.

  2. java.util.Properties bean with the name kafkaProperties: Single Kafka consumer configuration.

  3. org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration bean: Options specific to kafka-to-ignite.sh application.

Name Description Default value

caches

Set of cache names to replicate.

null

topic

Name of the Kafka topic for CDC events.

null

kafkaPartsFrom

Lower Kafka partitions number (inclusive) for CDC events topic.

-1

kafkaPartsTo

Lower Kafka partitions number (exclusive) for CDC events topic.

-1

metadataTopic

Name of topic for replication of BinaryTypes and TypeMappings.

null

metadataConsumerGroup

Group for KafkaConsumer, which polls from metadata topic

ignite-metadata-update-<kafkaPartsFrom>-<kafkaPartsTo>

kafkaRequestTimeout

Kafka request timeout in milliseconds.

3000

maxBatchSize

Maximum number of events to be sent to destination cluster in a single batch.

1024

threadCount

Count of threads to proceed consumers. Each thread poll records from dedicated partitions in round-robin manner.

16

Logging

kafka-to-ignite.sh uses the same logging configuration as the Ignite node does. The only difference is that the log is written in the "kafka-ignite-streamer.log" file.

CacheVersionConflictResolver implementation

It expected that CDC streamers will be configured with the onlyPrimary=false in most real-world deployments to ensure fault-tolerance. That means streamer will send the same change several times equal to CacheConfiguration#backups + 1. At the same time concurrent updates of the same key can be done in replicated clusters. CacheVersionConflictResolver used by Ignite node to selects or merge new (from update request) and existing (stored in the cluster) entry versions. Selected entry version will be actually stored in the cluster.

Note
Default implementation only select correct entry and never merge.

CacheVersionConflictResolver should be defined for each cache replicated between clusters.

Default implementation is available in cdc-ext.

Configuration

Name Description Default value

clusterId

Local cluster id. Can be any value from 1 to 31.

null

caches

Set of cache names to handle with this plugin instance.

null

conflictResolveField

Value field to resolve conflict with. Optional. Field values must implement java.lang.Comparable.

null

Conflict resolve algorithm

Replicated changes contain some additional data. Specifically, entry version from source cluster supplied with the changed data. Default conflict resolve algorithm based on entry version and conflictResolveField. Conflict resolution field should contain user provided monotonically increasing value such as query id or timestamp.

  1. Changes from the "local" cluster always win.

  2. If both old and new entry from the same cluster version comparison used to determine order.

  3. If conflictResolveField if provided then field values comparison used to determine order.

  4. Conflict resolution failed. Update will be ignored.

Important
The current implementation does not support deleting data from the cache in the destination cluster. The data must be deleted in the source cluster.

Configuration example

Configuration is done via Ignite node plugin:

<property name="pluginProviders">
    <bean class="org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider">
        <property name="clusterId" value="1" />
        <property name="caches">
            <util:list>
                <bean class="java.lang.String">
                    <constructor-arg type="String" value="queryId" />
                </bean>
            </util:list>
        </property>
    </bean>
</property>