Ignite 3
Edit

Ignite to PostgreSQL replication

CDC replication to PostgreSql

IgniteToPostgreSqlCdcConsumer is a CDC consumer that asynchronously replicates data from Apache Ignite to PostgreSQL. It uses Apache Ignite’s Change Data Capture (CDC) mechanism to track data changes (insert, update, delete) in specified caches and apply them to PostgreSQL.

Key Features

  • Per-cache replication (only selected caches are replicated)

  • onlyPrimary support (replicates only from primary nodes)

  • Auto table creation in PostgreSQL if needed (createTables=true)

  • Batch replication (batchSize)

  • User-defined DataSource — user configures reliability and transactional guarantees

Configuration

Spring XML configuration example (ignite-to-postgres.xml):

<bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
    <property name="consumer">
        <bean class="org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer">
            <property name="caches">
                <list>
                    <value>T1</value>
                    <value>T2</value>
                </list>
            </property>
            <property name="batchSize" value="1024" />
            <property name="onlyPrimary" value="true" />
            <property name="createTables" value="true" />
            <property name="dataSource" ref="dataSource" />
        </bean>
    </property>
</bean>

Configuration Options

The following settings can be used to configure the behavior of IgniteToPostgreSqlCdcConsumer:

Setting Description Default

dataSource

JDBC DataSource used to connect to the target PostgreSQL database. Must be provided by the user.

Required

caches

Set of Ignite cache names to replicate. Must be provided by the user.

Required

onlyPrimary

If true, replicates only events originating from the primary node. Useful to avoid duplicate updates in replicated clusters.

true

maxBatchSize

Maximum number of statements per batch submitted to PostgreSQL. Affects how many rows are commited in a single executeBatch() call.

1024

createTables

If true, missing target tables in PostgreSQL will be created automatically during startup.

false

We use PreparedStatement for batching with autoCommit set to false, committing manually after each batch execution.

Warning

Choosing the dataSource is the user’s responsibility. Consider:

  • Required delivery guarantees (e.g., retry logic)

  • High-availability PostgreSQL setup (replicas, failover, etc.)

Example dataSource

<bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="org.postgresql.Driver"/>
    <property name="url" value="jdbc:postgresql://localhost:5432/ignite_replica"/>
    <property name="username" value="ignite_user"/>
    <property name="password" value="secret"/>
    <property name="initialSize" value="3"/>
    <property name="maxTotal" value="10"/>
    <property name="validationQuery" value="SELECT 1"/>
    <property name="testOnBorrow" value="true"/>
</bean>

Schema Conversion

Table schema in PostgreSQL is generated from the QueryEntity configured in Ignite cache. Only one QueryEntity is supported per cache and is used to generate DDL and DML operations.

Schema creation occurs once on the first CdcCacheEvent if createTables=true.

Example: Schema from Ignite to PostgreSQL

class TestVal {
    private final String name;
    private final int val;
}

QueryEntity qryEntity = new QueryEntity()
    .setTableName("test_table")
    .setKeyFieldName("id")
    .setValueType("demo.TestVal")
    .addQueryField("id", Integer.class.getName(), null)
    .addQueryField("name", String.class.getName(), null)
    .addQueryField("val", Integer.class.getName(), null);

ignite.getOrCreateCache(new CacheConfiguration<Integer, TestVal>("test_table")
    .setQueryEntities(List.of(qryEntity)));

→ PostgreSQL:

CREATE TABLE test_table (
    id INT PRIMARY KEY,
    name VARCHAR,
    val INT
);

Composite Key Example

class TestKey {
    private final int id;
    private final String subId;
}

class TestVal {
    private final String name;
    private final int val;
}

QueryEntity qryEntity = new QueryEntity()
    .setTableName("test_table")
    .setKeyFields(Set.of("id", "subId"))
    .setValueType("demo.TestVal")
    .addQueryField("id", Integer.class.getName(), null)
    .addQueryField("subId", String.class.getName(), null)
    .addQueryField("name", String.class.getName(), null)
    .addQueryField("val", Integer.class.getName(), null);

ignite.getOrCreateCache(new CacheConfiguration<TestKey, TestVal>("test_table")
    .setQueryEntities(List.of(qryEntity)));

→ PostgreSQL:

CREATE TABLE test_table (
    id INT,
    subId VARCHAR,
    name VARCHAR,
    val INT,
    PRIMARY KEY (id, subId)
);

Insert / Update / Delete Events

Insert, update, and delete operations are handled via CdcEvent.

Upsert with Version Conflict Resolution

Each insert/update is translated into an INSERT …​ ON CONFLICT DO UPDATE query, with version-based conflict resolution.

Note

A version column is automatically added and stored as BYTEA.

This version is a 16-byte array based on CacheEntryVersion encoded in big-endian order:

  • 4 bytes — topologyVersion (int)

  • 8 bytes — order (long)

  • 4 bytes — nodeOrder (int)

This allows PostgreSQL to compare versions lexicographically:

INSERT INTO test_table (id, name, val, version)
VALUES (1, 'value', 5, E'\x...')
ON CONFLICT (id) DO UPDATE SET
    name = EXCLUDED.name,
    val = EXCLUDED.val
WHERE test_table.version < EXCLUDED.version;

Delete Example

DELETE FROM test_table WHERE id = 1;

Java → PostgreSQL Type Mapping

Java Type

PostgreSQL Type

Precision/Scale

java.lang.String

VARCHAR(precision)

Precision only

java.lang.Integer / int

INT

None

java.lang.Long / long

BIGINT

None

java.lang.Boolean / boolean

BOOL

None

java.lang.Double / double

NUMERIC(precision, scale)

Precision & scale

java.lang.Float / float

NUMERIC(precision, scale)

Precision & scale

java.math.BigDecimal

NUMERIC(precision, scale)

Precision & scale

java.lang.Short / short

SMALLINT

None

java.lang.Byte / byte

SMALLINT

None

java.sql.Date

DATE

None

java.sql.Time

TIME(precision)

Precision only

java.sql.Timestamp

TIMESTAMP(precision)

Precision only

java.util.Date

TIMESTAMP(precision)

Precision only

java.util.UUID

UUID

None

java.time.LocalDate

DATE

None

java.time.LocalTime

TIME(precision)

Precision only

java.time.LocalDateTime

TIMESTAMP(precision)

Precision only

java.time.OffsetTime

VARCHAR(precision)

Precision only

java.time.OffsetDateTime

TIMESTAMP WITH TIME ZONE

None

byte[]

BYTEA

None

Note
  • Precision and scale values provided in the mapping configuration will be processed and applied to the generated SQL types where supported.

  • If the Java type is not recognized in the predefined mapping, an exception will be thrown.

Limitations

  • Only BinaryObject and primitive fields are supported

  • keepBinary must be set to true

  • Schema evolution is not supported — run with createTables=true at startup