Ignite Cassandra Integration Usage Examples | Ignite Documentation

Ignite Summit 2024 — Call For Speakers Now Open — Learn more

Edit

Ignite Cassandra Integration Usage Examples

Overview

As described in configuration section, to configure Cassandra as a cache store you need to set CacheStoreFactory for your Ignite caches to org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory.

Below is an example of a typical configuration for Ignite cache to use Cassandra as a cache store. We will go step-by-step through all the configuration items, further down. The example is taken from the unit tests resource file store/src/test/resources/org/apache/ignite/tests/persistence/blob/ignite-config.xml of the Cassandra module source code.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- Cassandra connection settings -->
    <import resource="classpath:org/apache/ignite/tests/cassandra/connection-settings.xml" />

    <!-- Persistence settings for 'cache1' -->
    <bean id="cache1_persistence_settings" class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
        <constructor-arg type="org.springframework.core.io.Resource" value="classpath:org/apache/ignite/tests/persistence/blob/persistence-settings-1.xml" />
    </bean>

    <!-- Persistence settings for 'cache2' -->
    <bean id="cache2_persistence_settings" class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
        <constructor-arg type="org.springframework.core.io.Resource" value="classpath:org/apache/ignite/tests/persistence/blob/persistence-settings-3.xml" />
    </bean>

    <!-- Ignite configuration -->
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="cacheConfiguration">
            <list>
                <!-- Configuring persistence for "cache1" cache -->
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="cache1"/>
                    <property name="readThrough" value="true"/>
                    <property name="writeThrough" value="true"/>
                    <property name="cacheStoreFactory">
                        <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
                            <property name="dataSourceBean" value="cassandraAdminDataSource"/>
                            <property name="persistenceSettingsBean" value="cache1_persistence_settings"/>
                        </bean>
                    </property>
                </bean>

                <!-- Configuring persistence for "cache2" cache -->
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="cache2"/>
                    <property name="readThrough" value="true"/>
                    <property name="writeThrough" value="true"/>
                    <property name="cacheStoreFactory">
                        <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
                            <property name="dataSourceBean" value="cassandraAdminDataSource"/>
                            <property name="persistenceSettingsBean" value="cache2_persistence_settings"/>
                        </bean>
                    </property>
                </bean>
            </list>
        </property>

        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <!--
                        Ignite provides several options for automatic discovery that can be used
                        instead os static IP based discovery. For information on all options refer
                        to our documentation: https://ignite.apache.org/docs/latest/clustering/clustering
                    -->
                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                                <!-- In distributed environment, replace with actual host IP address. -->
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

In the specified example we have two Ignite caches configured: cache1 and cache2. So lets look at the configuration details.

Lets start from the cache configuration details. They are pretty similar for both caches (cache1 and cache2) and looks like that:

<bean class="org.apache.ignite.configuration.CacheConfiguration">
    <property name="name" value="cache1"/>
    <property name="readThrough" value="true"/>
    <property name="writeThrough" value="true"/>
    <property name="cacheStoreFactory">
        <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
            <property name="dataSourceBean" value="cassandraAdminDataSource"/>
            <property name="persistenceSettingsBean" value="cache1_persistence_settings"/>
        </bean>
    </property>
</bean>

First of all we can see that read-through and write-through options are enabled:

<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>

which is required for Ignite cache, if you plan to use a persistent store for cache entries which expired.

You can optionally specify the write-behind setting if you prefer persistent store to be updated asynchronously:

<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>

The next important thing is CacheStoreFactory configuration:

<property name="cacheStoreFactory">
    <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
        <property name="dataSourceBean" value="cassandraAdminDataSource"/>
        <property name="persistenceSettingsBean" value="cache1_persistence_settings"/>
    </bean>
</property>

You should use org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory as a CacheStoreFactory for your Ignite caches to utilize Cassandra as a persistent store. For CassandraCacheStoreFactory you should specify two required properties:

  • dataSourceBean - name of the Spring bean, which specifies all the details about Cassandra database connection.

  • persistenceSettingsBean - name of the Spring bean, which specifies all the details about how objects should be persisted into Cassandra database.

In the specified example cassandraAdminDataSource is a data source bean, which is imported into Ignite cache config file using this directive:

<import resource="classpath:org/apache/ignite/tests/cassandra/connection-settings.xml" />

and cache1_persistence_settings is a persistence settings bean, which is defined in Ignite cache config file using such directive:

<bean id="cache1_persistence_settings" class="org.apache.ignite.cache.store.cassandra.utils.persistence.KeyValuePersistenceSettings">
    <constructor-arg type="org.springframework.core.io.Resource" value="classpath:org/apache/ignite/tests/persistence/blob/persistence-settings-1.xml" />
</bean>

Now lets look at the specification of cassandraAdminDataSource from store/src/test/resources/org/apache/ignite/tests/cassandra/connection-settings.xml test resource.

Specifically,CassandraAdminCredentials and CassandraRegularCredentials are classes which extend org.apache.ignite.cache.store.cassandra.datasource.Credentials. You are welcome to implement these classes and reference them afterwards.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="cassandraAdminCredentials" class="org.my.project.CassandraAdminCredentials"/>
    <bean id="cassandraRegularCredentials" class="org.my.project.CassandraRegularCredentials"/>

    <bean id="loadBalancingPolicy" class="com.datastax.driver.core.policies.TokenAwarePolicy">
        <constructor-arg type="com.datastax.driver.core.policies.LoadBalancingPolicy">
            <bean class="com.datastax.driver.core.policies.RoundRobinPolicy"/>
        </constructor-arg>
    </bean>

    <bean id="contactPoints" class="org.apache.ignite.tests.utils.CassandraHelper" factory-method="getContactPointsArray"/>

    <bean id="cassandraAdminDataSource" class="org.apache.ignite.cache.store.cassandra.datasource.DataSource">
        <property name="credentials" ref="cassandraAdminCredentials"/>
        <property name="contactPoints" ref="contactPoints"/>
        <property name="readConsistency" value="ONE"/>
        <property name="writeConsistency" value="ONE"/>
        <property name="loadBalancingPolicy" ref="loadBalancingPolicy"/>
    </bean>

    <bean id="cassandraRegularDataSource" class="org.apache.ignite.cache.store.cassandra.datasource.DataSource">
        <property name="credentials" ref="cassandraRegularCredentials"/>
        <property name="contactPoints" ref="contactPoints"/>
        <property name="readConsistency" value="ONE"/>
        <property name="writeConsistency" value="ONE"/>
        <property name="loadBalancingPolicy" ref="loadBalancingPolicy"/>
    </bean>
</beans>

For more details about Cassandra data source connection configuration visit the integration configuration page.

Finally, the last piece which wasn’t still described is persistence settings configuration. Lets look at the cache1_persistence_settings from the org/apache/ignite/tests/persistence/blob/persistence-settings-1.xml test resource.

<persistence keyspace="test1" table="blob_test1">
    <keyPersistence class="java.lang.Integer" strategy="PRIMITIVE" />
    <valuePersistence strategy="BLOB"/>
</persistence>

In the configuration above, we can see that Cassandra test1.blob_test1 table will be used to store key/value objects for cache1 cache. Key objects of the cache will be stored as integer in key column. Value objects of the cache will be stored as blob in value column. For more information about persistence settings configuration visit the integration configuration page.

Next sections will provide examples of persistence settings configuration for different kind of persistence strategies (see more details about persistence strategies on the integration configuration page.

Example 1

Persistence setting for Ignite cache with keys of Integer type to be persisted as int in Cassandra and values of String type to be persisted as text in Cassandra.

<persistence keyspace="test1" table="my_table">
    <keyPersistence class="java.lang.Integer" strategy="PRIMITIVE" column="my_key"/>
    <valuePersistence class="java.lang.String" strategy="PRIMITIVE" />
</persistence>

Keys will be stored in my_key column. Values will be stored in value column (which is used by default if column attribute wasn’t specified).

Example 2

Persistence setting for Ignite cache with keys of Integer type to be persisted as int in Cassandra and values of any type (you don’t need to specify the type for BLOB persistence strategy) to be persisted as blob in Cassandra. The only solution for this situation is to store value as a BLOB in Cassandra table.

<persistence keyspace="test1" table="my_table">
    <keyPersistence class="java.lang.Integer" strategy="PRIMITIVE" />
    <valuePersistence strategy="BLOB"/>
</persistence>

Keys will be stored in key column (which is used by default if column attribute wasn’t specified). Values will be stored in value column.

Example 3

Persistence setting for Ignite cache with keys of Integer type and values of any type, both to be persisted as BLOB in Cassandra.

<persistence keyspace="test1" table="my_table">
    <!-- By default Java standard serialization is going to be used -->
    <keyPersistence class="java.lang.Integer"
                    strategy="BLOB"/>

    <!-- Kryo serialization specified to be used -->
    <valuePersistence class="org.apache.ignite.tests.pojos.Person"
                      strategy="BLOB"
                      serializer="org.apache.ignite.cache.store.cassandra.serializer.KryoSerializer"/>
</persistence>

Keys will be stored in key column having blob type and using Java standard serialization. Values will be stored in value column having blob type and using Kryo serialization.

Example 4

Persistence setting for Ignite cache with keys of Integer type to be persisted as int in Cassandra and values of custom POJO org.apache.ignite.tests.pojos.Person type to be dynamically analyzed and persisted into a set of table columns, so that each POJO field will be mapped to appropriate table column. For more details about dynamic POJO fields discovery refer to PersistenceSettingsBean documentation section.

<persistence keyspace="test1" table="my_table">
    <keyPersistence class="java.lang.Integer" strategy="PRIMITIVE"/>
    <valuePersistence class="org.apache.ignite.tests.pojos.Person" strategy="POJO"/>
</persistence>

Keys will be stored in key column having int type.

Now lets imagine that the org.apache.ignite.tests.pojos.Person class has such an implementation:

public class Person {
    private String firstName;
    private String lastName;
    private int age;
    private boolean married;
    private long height;
    private float weight;
    private Date birthDate;
    private List<String> phones;

    public void setFirstName(String name) {
        firstName = name;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setLastName(String name) {
        lastName = name;
    }

    public String getLastName() {
        return lastName;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public int getAge() {
        return age;
    }

    public void setMarried(boolean married) {
        this.married = married;
    }

    public boolean getMarried() {
        return married;
    }

    public void setHeight(long height) {
        this.height = height;
    }

    public long getHeight() {
        return height;
    }

    public void setWeight(float weight) {
        this.weight = weight;
    }

    public float getWeight() {
        return weight;
    }

    public void setBirthDate(Date date) {
        birthDate = date;
    }

    public Date getBirthDate() {
        return birthDate;
    }

    public void setPhones(List<String> phones) {
        this.phones = phones;
    }

    public List<String> getPhones() {
        return phones;
    }
}

In this case Ignite cache values of the org.apache.ignite.tests.pojos.Person type will be persisted into a set of Cassandra table columns using such dynamically configured mapping rule:

POJO field Table column Column type

firstName

firstname

text

lastName

lastname

text

age

age

int

married

married

boolean

height

height

bigint

weight

weight

float

birthDate

birthdate

timestamp

As you can see from the table above, phones field will not be persisted into table. That’s because it’s not of simple java type which could be directly mapped to appropriate Cassandra type. Such kind of fields could be persisted into Cassandra only if you manually specify all mapping details for the object type and if field type itself is implementing java.io.Serializable interface. In a such case field will be persisted into a separate table column as blob. See more details in the next example.

Example 5

Persistence setting for Ignite cache with keys of custom POJO org.apache.ignite.tests.pojos.PersonId and values of custom POJO org.apache.ignite.tests.pojos.Person types, both to be persisted into a set of table columns based on manually specified mapping rules.

<persistence keyspace="test1" table="my_table" ttl="86400">
    <!-- Cassandra keyspace options which should be used to create provided keyspace if it doesn't exist -->
    <keyspaceOptions>
        REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3}
        AND DURABLE_WRITES = true
    </keyspaceOptions>

    <!-- Cassandra table options which should be used to create provided table if it doesn't exist -->
    <tableOptions>
        comment = 'A most excellent and useful table'
        AND read_repair_chance = 0.2
    </tableOptions>

    <!-- Persistent settings for Ignite cache keys -->
    <keyPersistence class="org.apache.ignite.tests.pojos.PersonId" strategy="POJO">
        <!-- Partition key fields if POJO strategy used -->
        <partitionKey>
            <!-- Mapping from POJO field to Cassandra table column -->
            <field name="companyCode" column="company" />
            <field name="departmentCode" column="department" />
        </partitionKey>

        <!-- Cluster key fields if POJO strategy used -->
        <clusterKey>
            <!-- Mapping from POJO field to Cassandra table column -->
            <field name="personNumber" column="number" sort="desc"/>
        </clusterKey>
    </keyPersistence>

    <!-- Persistent settings for Ignite cache values -->
    <valuePersistence class="org.apache.ignite.tests.pojos.Person"
                      strategy="POJO"
                      serializer="org.apache.ignite.cache.store.cassandra.serializer.KryoSerializer">
        <!-- Mapping from POJO field to Cassandra table column -->
        <field name="firstName" column="first_name" />
        <field name="lastName" column="last_name" />
        <field name="age" />
        <field name="married" index="true"/>
        <field name="height" />
        <field name="weight" />
        <field name="birthDate" column="birth_date" />
        <field name="phones" />
    </valuePersistence>
</persistence>

These persistence settings looks rather complicated. Lets go step by step and analyse them.

Lets first look at the root tag:

<persistence keyspace="test1" table="my_table" ttl="86400">

It specifies that Ignite cache keys and values should be stored in test1.my_table table and that data in each row expires after 86400 sec which is 24 hours.

Then we can see the advanced settings for Cassandra keyspace. The setting will be used to create keyspace if it’s not exist.

<keyspaceOptions>
    REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3}
    AND DURABLE_WRITES = true
</keyspaceOptions>

Then by analogy to keyspace setting we can see table advanced setting, which will be used only for table creation.

<tableOptions>
    comment = 'A most excellent and useful table'
    AND read_repair_chance = 0.2
</tableOptions>

Next section specifies how Ignite cache keys should be persisted:

<keyPersistence class="org.apache.ignite.tests.pojos.PersonId" strategy="POJO">
    <!-- Partition key fields if POJO strategy used -->
    <partitionKey>
        <!-- Mapping from POJO field to Cassandra table column -->
        <field name="companyCode" column="company" />
        <field name="departmentCode" column="department" />
    </partitionKey>

    <!-- Cluster key fields if POJO strategy used -->
    <clusterKey>
        <!-- Mapping from POJO field to Cassandra table column -->
        <field name="personNumber" column="number" sort="desc"/>
    </clusterKey>
</keyPersistence>

Lets assume that org.apache.ignite.tests.pojos.PersonId has such implementation:

public class PersonId {
    private String companyCode;
    private String departmentCode;
    private int personNumber;

    public void setCompanyCode(String code) {
        companyCode = code;
    }

    public String getCompanyCode() {
        return companyCode;
    }

    public void setDepartmentCode(String code) {
        departmentCode = code;
    }

    public String getDepartmentCode() {
        return departmentCode;
    }

    public void setPersonNumber(int number) {
        personNumber = number;
    }

    public int getPersonNumber() {
        return personNumber;
    }
}

In such case Ignite cache keys of org.apache.ignite.tests.pojos.PersonId type will be persisted into a set of Cassandra table columns representing PARTITION and CLUSTER key using this mapping rule:

POJO field Table column Column type

companyCode

company

text

departmentCode

department

text

personNumber

number

int

In addition to that, combination of columns (company, department) will be used as Cassandra PARTITION key and column number will be used as a CLUSTER key sorted in descending order.

Finally lets move to the last section, which specifies persistence settings for Ignite cache values:

<valuePersistence class="org.apache.ignite.tests.pojos.Person"
                  strategy="POJO"
                  serializer="org.apache.ignite.cache.store.cassandra.serializer.KryoSerializer">
    <!-- Mapping from POJO field to Cassandra table column -->
    <field name="firstName" column="first_name" />
    <field name="lastName" column="last_name" />
    <field name="age" />
    <field name="married" index="true"/>
    <field name="height" />
    <field name="weight" />
    <field name="birthDate" column="birth_date" />
    <field name="phones" />
</valuePersistence>

Lets assume that org.apache.ignite.tests.pojos.Person class has the same implementation like in Example 4. In this case Ignite cache values of org.apache.ignite.tests.pojos.Person type will be persisted into a set of Cassandra table columns using such mapping rule:

POJO field Table column Column type

firstName

first_name

text

lastName

last_name

text

age

age

int

married

married

boolean

height

height

bigint

weight

weight

float

birthDate

birth_date

timestamp

phones

phones

blob

Comparing to Example 4 we can see that now phones field will be serialized to phones column of blob type using Kryo serializer. In addition to that, Cassandra secondary index will be created for the married column.