Ignite to PostgreSQL replication
CDC replication to PostgreSql
IgniteToPostgreSqlCdcConsumer is a CDC consumer that asynchronously replicates data from Apache Ignite to PostgreSQL.
It uses Apache Ignite’s Change Data Capture (CDC) mechanism to track data changes (insert, update, delete) in specified caches and apply them to PostgreSQL.
Key Features
-
Per-cache replication (only selected caches are replicated)
-
onlyPrimarysupport (replicates only from primary nodes) -
Auto table creation in PostgreSQL if needed (
createTables=true) -
Batch replication (
batchSize) -
User-defined
DataSource— user configures reliability and transactional guarantees
Configuration
Spring XML configuration example (ignite-to-postgres.xml):
<bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
<property name="consumer">
<bean class="org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer">
<property name="caches">
<list>
<value>T1</value>
<value>T2</value>
</list>
</property>
<property name="batchSize" value="1024" />
<property name="onlyPrimary" value="true" />
<property name="createTables" value="true" />
<property name="dataSource" ref="dataSource" />
</bean>
</property>
</bean>
Configuration Options
The following settings can be used to configure the behavior of IgniteToPostgreSqlCdcConsumer:
| Setting | Description | Default |
|---|---|---|
|
JDBC |
Required |
|
Set of Ignite cache names to replicate. Must be provided by the user. |
Required |
|
If |
|
|
Maximum number of statements per batch submitted to PostgreSQL. Affects how many rows are commited in a single |
|
|
If |
|
We use PreparedStatement for batching with autoCommit set to false, committing manually after each batch execution.
|
Warning
|
Choosing the
|
Example dataSource
<bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="org.postgresql.Driver"/>
<property name="url" value="jdbc:postgresql://localhost:5432/ignite_replica"/>
<property name="username" value="ignite_user"/>
<property name="password" value="secret"/>
<property name="initialSize" value="3"/>
<property name="maxTotal" value="10"/>
<property name="validationQuery" value="SELECT 1"/>
<property name="testOnBorrow" value="true"/>
</bean>
Schema Conversion
Table schema in PostgreSQL is generated from the QueryEntity configured in Ignite cache.
Only one QueryEntity is supported per cache and is used to generate DDL and DML operations.
Schema creation occurs once on the first CdcCacheEvent if createTables=true.
Example: Schema from Ignite to PostgreSQL
class TestVal {
private final String name;
private final int val;
}
QueryEntity qryEntity = new QueryEntity()
.setTableName("test_table")
.setKeyFieldName("id")
.setValueType("demo.TestVal")
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("name", String.class.getName(), null)
.addQueryField("val", Integer.class.getName(), null);
ignite.getOrCreateCache(new CacheConfiguration<Integer, TestVal>("test_table")
.setQueryEntities(List.of(qryEntity)));
→ PostgreSQL:
CREATE TABLE test_table (
id INT PRIMARY KEY,
name VARCHAR,
val INT
);
Composite Key Example
class TestKey {
private final int id;
private final String subId;
}
class TestVal {
private final String name;
private final int val;
}
QueryEntity qryEntity = new QueryEntity()
.setTableName("test_table")
.setKeyFields(Set.of("id", "subId"))
.setValueType("demo.TestVal")
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("subId", String.class.getName(), null)
.addQueryField("name", String.class.getName(), null)
.addQueryField("val", Integer.class.getName(), null);
ignite.getOrCreateCache(new CacheConfiguration<TestKey, TestVal>("test_table")
.setQueryEntities(List.of(qryEntity)));
→ PostgreSQL:
CREATE TABLE test_table (
id INT,
subId VARCHAR,
name VARCHAR,
val INT,
PRIMARY KEY (id, subId)
);
Insert / Update / Delete Events
Insert, update, and delete operations are handled via CdcEvent.
Upsert with Version Conflict Resolution
Each insert/update is translated into an INSERT … ON CONFLICT DO UPDATE query, with version-based conflict resolution.
|
Note
|
A This version is a 16-byte array based on
This allows PostgreSQL to compare versions lexicographically:
|
Delete Example
DELETE FROM test_table WHERE id = 1;
Java → PostgreSQL Type Mapping
Java Type |
PostgreSQL Type |
Precision/Scale |
|
|
Precision only |
|
|
None |
|
|
None |
|
|
None |
|
|
Precision & scale |
|
|
Precision & scale |
|
|
Precision & scale |
|
|
None |
|
|
None |
|
|
None |
|
|
Precision only |
|
|
Precision only |
|
|
Precision only |
|
|
None |
|
|
None |
|
|
Precision only |
|
|
Precision only |
|
|
Precision only |
|
|
None |
|
|
None |
|
Note
|
|
Limitations
-
Only BinaryObject and primitive fields are supported
-
keepBinarymust be set totrue -
Schema evolution is not supported — run with
createTables=trueat startup
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.