Co-located Processing - Apache Ignite

Co-located Processing to Minimize Network Utilization

Apache Ignite® supports co-located processing technique for compute-intensive and data-intensive calculations as well as machine learning algorithms. This technique increases performance by eliminating the impact of network latency.

Co-located Processing diagram

In traditional disk-based systems, such as relational or NoSQL databases, client applications usually bring data from servers, use the records for local calculations, and discard the data as soon as the business task is complete. This approach does not scale well if a significant volume of data gets transferred over the network.

To overcome this issue, Apache Ignite supports a co-located processing technique. The primary aim of the technique is to increase the performance of your complex calculations or SQL with JOINs by running them straight on Ignite cluster nodes. In co-located processing, calculations are done on local data sets of the cluster nodes. This avoids records shuffling over the network and eliminates the impact of network latency on the performance of your applications.

Data Co-location

To use co-located processing in practice, first, you need to co-locate data sets by storing related records on the same cluster node. This process is also known as affinity co-location in Ignite.

For example, let's introduce Country and City tables and co-locate all City records that have the same Country identifier on a single node. To achieve this, you need to set CountryCode as an affinityKey in City table:

                            CREATE TABLE Country (
                                Code CHAR(3),
                                Name CHAR(52),
                                Continent CHAR(50),
                                Region CHAR(26),
                                SurfaceArea DECIMAL(10,2),
                                Population INT(11),
                                Capital INT(11),
                                PRIMARY KEY (Code)
                            ) WITH "template=partitioned, backups=1";

                            CREATE TABLE City (
                                ID INT(11),
                                Name CHAR(35),
                                CountryCode CHAR(3),
                                District CHAR(20),
                                Population INT(11),
                                PRIMARY KEY (ID, CountryCode)
                            ) WITH "template=partitioned, backups=1, affinityKey=CountryCode";

This way, you instruct Ignite to store all the Cities with the same CountryCode on a single cluster node. As soon as the data is co-located, Ignite can execute compute and data-intensive logic, and SQL with JOINs straight on the cluster nodes minimizing or even eliminating network utilization.

SQL and Distributed JOINs

Ignite SQL engine performs much faster if a query gets executed against co-located records. This is especially crucial for SQL with JOINs that can span many cluster nodes.

Using the previous example with Country and City tables, let's join the two tables returning the most populated cities in the given countries:

                            SELECT,, MAX(city.population) as max_pop
                            FROM country
                            JOIN city ON city.countrycode = country.code
                            WHERE country.code IN ('USA','RUS','CHN')
                            GROUP BY,
                            ORDER BY max_pop DESC;

This query is executed only on the nodes that store records of China, Russia, and the USA. Also, during the JOIN, the records are not shuffled between the nodes since all the Cities with the same city.countrycode are stored on a single node.

Distributed Collocated Computations

Apache Ignite compute and machine learning APIs allow you to perform computations and execute machine learning algorithms in parallel to achieve high performance, low latency, and linear scalability. Furthermore, both components work best with co-located data sets.

Let's take another example by imagining that a winter storm is about to hit a highly-populated city. As a telecommunication company, you have to send a text message to 20 million residents notifying about the blizzard. With the client-server approach, the company would read all 20 million records from a database to an application that needs to execute some logic and send a message to the residents eventually.

A much more efficient approach would be to run the logic on and send text messages from the cluster nodes that store the records of the residents. With this technique, instead of pulling 20 million records via the network, you execute the logic in place and eliminate the network impact on the performance of the calculation.

Here is an example of how this logic might look like:

Ignite ignite = ...

// NewYork ID.
long newYorkId = 2;

// Send the logic to the cluster node that stores NewYork and all its inhabitants.
ignite.compute().affinityRun("City", newYorkId, new IgniteRunnable() {

  Ignite ignite;

  public void run() {
    // Get access to the Person cache.
    IgniteCache<BinaryObject, BinaryObject> people = ignite.cache("Person").withKeepBinary();

    ScanQuery<BinaryObject, BinaryObject> query = new ScanQuery <BinaryObject, BinaryObject>();

    try (QueryCursor<Cache.Entry<BinaryObject, BinaryObject>> cursor = people.query(query)) {
      // Iteration over the local cluster node data using the scan query.
      for (Cache.Entry<BinaryObject, BinaryObject> entry : cursor) {
        BinaryObject personKey = entry.getKey();

        // Pick NewYorkers only.
        if (personKey.<Long>field("CITY_ID") == newYorkId) {
            person = entry.getValue();

            // Send the warning message to the person.