Java Thin Client | Ignite Documentation

Ignite Summit 2024 — Call For Speakers Now Open — Learn more

Edit

Java Thin Client

Overview

The Java thin client is a lightweight client that connects to the cluster via a standard socket connection. It does not become a part of the cluster topology, never holds any data, and is not used as a destination for compute calculations. The thin client simply establishes a socket connection to a standard node​ and performs all operations through that node.

Setting Up

If you use maven or gradle, add the ignite-core dependency to your application:

<properties>
    <ignite.version>2.16.0</ignite.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.ignite</groupId>
        <artifactId>ignite-core</artifactId>
        <version>${ignite.version}</version>
    </dependency>
</dependencies>
def igniteVersion = '2.16.0'

dependencies {
    compile group: 'org.apache.ignite', name: 'ignite-core', version: igniteVersion
}

Alternatively, you can use the ignite-core-2.16.0.jar library from the Ignite distribution package.

Connecting to Cluster

To initialize a thin client, use the Ignition.startClient(ClientConfiguration) method. The method accepts a ClientConfiguration object, which defines client connection parameters.

The method returns the IgniteClient interface, which provides various methods for accessing data. IgniteClient is an auto-closable resource. Use the try-with-resources statement to close the thin client and release the resources associated with the connection.

ClientConfiguration cfg = new ClientConfiguration().setAddresses("127.0.0.1:10800");
try (IgniteClient client = Ignition.startClient(cfg)) {
    ClientCache<Integer, String> cache = client.cache("myCache");
    // Get data from the cache
}

You can provide addresses of multiple nodes. In this case, the thin client randomly tries all the servers in the list and throws ClientConnectionException if none is available.

try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses("node1_address:10800",
        "node2_address:10800", "node3_address:10800"))) {
} catch (ClientConnectionException ex) {
    // All the servers are unavailable
}

Note that the code above provides a failover mechanism in case of server node failures. Refer to the Handling Node Failures section for more information.

Partition Awareness [partition_awareness]

Partition awareness allows the thin client to send query requests directly to the node that owns the queried data.

Without partition awareness, an application that is connected to the cluster via a thin client executes all queries and operations via a single server node that acts as a proxy for the incoming requests. These operations are then re-routed to the node that stores the data that is being requested. This results in a bottleneck that could prevent the application from scaling linearly.

Without Partition Awareness

Notice how queries must pass through the proxy server node, where they are routed to the correct node.

With partition awareness in place, the thin client can directly route queries and operations to the primary nodes that own the data required for the queries. This eliminates the bottleneck, allowing the application to scale more easily.

With Partition Awareness

Partition awareness functionality helps to avoid an additional network hop in the following scenarios:

  1. Single-key operations API, like put(), get(), etc. However, the functionality has no effect on those operations within explicit transactions (initiated via ClientTransaction#txStart() described in Transactions section).

  2. ScanQuery and IndexQuery accept a partition number as a parameter with which the query is routed to a particular server node that stores the requested data. Refer to Executing Scan Queries and Executing Index Queries sections for more information.

The following code sample illustrates how to use the partition awareness feature with the java thin client.

ClientConfiguration cfg = new ClientConfiguration()
        .setAddresses("node1_address:10800", "node2_address:10800", "node3_address:10800")
        .setPartitionAwarenessEnabled(true);

try (IgniteClient client = Ignition.startClient(cfg)) {
    ClientCache<Integer, String> cache = client.cache("myCache");
    // Put, get or remove data from the cache...
    cache.put(0, "Hello, world!");
    // The partition number can be specified with IndexQuery#setPartition(Integer) as well.
    ScanQuery scanQuery = new ScanQuery().setPartition(part);
} catch (ClientException e) {
    System.err.println(e.getMessage());
}

The code sample below shows how to use a custom cache key to partition mapping function to enable affinity awareness on a thin client side if the cache already exists in a cluster or/and was created with custom AffinityFunction or AffinityKeyMapper.

// Partition awarenes is enabled by default since Apache Ignite 2.11 release.
ClientConfiguration cfg = new ClientConfiguration()
    .setAddresses("node1_address:10800", "node2_address:10800", "node3_address:10800")
    .setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
        /** {@inheritDoc} */
        @Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
            AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);

            return aff::partition;
        }
    })

try (IgniteClient client = Ignition.startClient(cfg)) {
    ClientCache<Integer, String> cache = client.cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
    // Put, get or remove data from the cache, partition awarenes will be enabled.
}
catch (ClientException e) {
    System.err.println(e.getMessage());
}

If a list of server nodes is dynamically changing or scaling, then it is possible to configure the connection with custom implementation of ClientAddressFinder. It should provide a number of current server addresses every time a client asks for them. The following code sample illustrates how to use it.

ClientAddressFinder finder = () -> {
    String[] dynamicServerAddresses = fetchServerAddresses();

    return dynamicServerAddresses;
};

ClientConfiguration cfg = new ClientConfiguration()
    .setAddressesFinder(finder)
    .setPartitionAwarenessEnabled(true);

try (IgniteClient client = Ignition.startClient(cfg)) {
    ClientCache<Integer, String> cache = client.cache("myCache");
    // Put, get, or remove data from the cache...
} catch (ClientException e) {
    System.err.println(e.getMessage());
}

The code snippet shows how an example implementation might look like if you want clients to retrieve server addresses dynamically.

  • The ClientAddressFinder is a functional interface that provides the only method getAddresses().

  • The fetchServerAddress() is a custom function that dynamically provides server addresses.

  • Configure client with ClientConfiguration.setAddressFinder(finder).

Also, you can check a real example of the interface implementation. ThinClientKubernetesAddressFinder is created to handle scalable Kubernetes environment.

Note

Partition Awareness also enables Service Awareness

Using Key-Value API

The Java thin client supports most of the key-value operations available in the thick client. To execute key-value operations on a specific cache, you need to get an instance of the cache and use one of its methods.

Getting a Cache Instance

The ClientCache interface provides the key-value API. You can use the following methods to obtain an instance of ClientCache:

  • IgniteClient.cache(String): assumes a cache with the specified name exists. The method does not communicate with the cluster to check if the cache really exists. Subsequent cache operations fail if the cache does not exist.

  • IgniteClient.getOrCreateCache(String), IgniteClient.getOrCreateCache(ClientCacheConfiguration): get existing cache with the specified name or create the cache if it does not exist. The former operation creates a cache with default configuration.

  • IgniteClient.createCache(String), IgniteClient.createCache(ClientCacheConfiguration): create a cache with the specified name and fail if the cache already exists.

Use IgniteClient.cacheNames() to list all existing caches.

ClientCacheConfiguration cacheCfg = new ClientCacheConfiguration().setName("References")
        .setCacheMode(CacheMode.REPLICATED)
        .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);

ClientCache<Integer, String> cache = client.getOrCreateCache(cacheCfg);

Basic Cache Operations

The following code snippet demonstrates how to execute basic cache operations from the thin client.

Map<Integer, String> data = IntStream.rangeClosed(1, 100).boxed()
        .collect(Collectors.toMap(i -> i, Object::toString));

cache.putAll(data);

assert !cache.replace(1, "2", "3");
assert "1".equals(cache.get(1));
assert cache.replace(1, "1", "3");
assert "3".equals(cache.get(1));

cache.put(101, "101");

cache.removeAll(data.keySet());
assert cache.size() == 1;
assert "101".equals(cache.get(101));

cache.removeAll();
assert 0 == cache.size();

Executing Scan Queries

Use the ScanQuery<K, V> class to get a set of entries that satisfy a given condition. The thin client sends the query to the cluster node where it is executed as a regular scan query.

The query condition is specified by an IgniteBiPredicate<K, V> object that is passed to the query constructor as an argument. The predicate is applied on the server side. If there is no predicate provided, the query returns all cache entries.

Note
The classes of the predicates must be available on the server nodes of the cluster.

The results of the query are transferred to the client page by page. Each page contains a specific number of entries and is fetched to the client only when the entries from that page are requested. To change the number of entries in a page, use the ScanQuery.setPageSize(int pageSize) method (default value is 1024).

ClientCache<Integer, Person> personCache = client.getOrCreateCache("personCache");

Query<Cache.Entry<Integer, Person>> qry = new ScanQuery<Integer, Person>(
        (i, p) -> p.getName().contains("Smith"));

try (QueryCursor<Cache.Entry<Integer, Person>> cur = personCache.query(qry)) {
    for (Cache.Entry<Integer, Person> entry : cur) {
        // Process the entry ...
    }
}

The IgniteClient.query(…​) method returns an instance of FieldsQueryCursor. Make sure to always close the cursor after you obtain all results.

Transactions

Client transactions are supported for caches with AtomicityMode.TRANSACTIONAL mode.

Executing Transactions

To start a transaction, obtain the ClientTransactions object from IgniteClient. ClientTransactions has a number of txStart(…​) methods, each of which starts a new transaction and returns an object (ClientTransaction) that represents the transaction. Use this object to commit or rollback the transaction.

ClientCache<Integer, String> cache = client.cache("my_transactional_cache");

ClientTransactions tx = client.transactions();

try (ClientTransaction t = tx.txStart()) {
    cache.put(1, "new value");

    t.commit();
}

Transaction Configuration

Client transactions can have different concurrency modes, isolation levels, and execution timeout, which can be set for all transactions or on a per transaction basis.

The ClientConfiguration object supports setting the default concurrency mode, isolation level, and timeout for all transactions started with this client interface.

ClientConfiguration cfg = new ClientConfiguration();
cfg.setAddresses("localhost:10800");

cfg.setTransactionConfiguration(new ClientTransactionConfiguration().setDefaultTxTimeout(10000)
        .setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC)
        .setDefaultTxIsolation(TransactionIsolation.REPEATABLE_READ));

IgniteClient client = Ignition.startClient(cfg);

You can specify the concurrency mode, isolation level, and timeout when starting an individual transaction. In this case, the provided values override the default settings.

ClientTransactions tx = client.transactions();
try (ClientTransaction t = tx.txStart(TransactionConcurrency.OPTIMISTIC,
        TransactionIsolation.REPEATABLE_READ)) {
    cache.put(1, "new value");
    t.commit();
}

Working with Binary Objects

The thin client fully supports Binary Object API described in the Working with Binary Objects section. Use CacheClient.withKeepBinary() to switch the cache to binary mode and start working directly with binary objects to avoid serialization/deserialization. Use IgniteClient.binary() to get an instance of IgniteBinary and build an object from scratch.

IgniteBinary binary = client.binary();

BinaryObject val = binary.builder("Person").setField("id", 1, int.class).setField("name", "Joe", String.class)
        .build();

ClientCache<Integer, BinaryObject> cache = client.cache("persons").withKeepBinary();

cache.put(1, val);

BinaryObject value = cache.get(1);

Refer to the Working with Binary Objects page for detailed information.

Cache Entry Listening

When a cache is modified (an entry is inserted, updated, deleted, or expired), an event can be sent to notify the client. To listen to these events, you can use one of the following approaches:

  • Continuous queries

  • Cache registerCacheEntryListener methods

Both approaches require a local listener to be provided, which is triggered on every cache modification event.

For both approaches you can also specify a remote filter to narrow down the range of entries that are monitored for updates. This filter is executed for each updated entry on the server-side and evaluates whether the event should be propagated to the client’s local listener.

Note
The classes of the remote filter factory must be available on the server nodes of the cluster.

Refer to the thick client continuous queries page for more information about continuous queries.

In case of connection to server failure, a thin client cannot silently reconnect with guarantees that no events are lost, so continuous queries and registered cache event listeners are closed after the server disconnection. There are also several methods with the additional parameter: disconnect listener. This listener allows to catch server disconnection events and react appropriately.

ClientCache<Integer, String> cache = client.getOrCreateCache("myCache");

ContinuousQuery<Integer, String> query = new ContinuousQuery<>();

query.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events)
        throws CacheEntryListenerException {
        // react to the update events here
    }
});

ClientDisconnectListener disconnectListener = new ClientDisconnectListener() {
    @Override public void onDisconnected(Exception reason) {
        // react to the disconnect event here
    }
};

cache.query(query, disconnectListener);

Executing SQL Statements

The Java thin client provides a SQL API to execute SQL statements. SQL statements are declared using the SqlFieldsQuery objects and executed through the IgniteClient.query(SqlFieldsQuery) method.

client.query(new SqlFieldsQuery(String.format(
        "CREATE TABLE IF NOT EXISTS Person (id INT PRIMARY KEY, name VARCHAR) WITH \"VALUE_TYPE=%s\"",
        Person.class.getName())).setSchema("PUBLIC")).getAll();

int key = 1;
Person val = new Person(key, "Person 1");

client.query(new SqlFieldsQuery("INSERT INTO Person(id, name) VALUES(?, ?)").setArgs(val.getId(), val.getName())
        .setSchema("PUBLIC")).getAll();

FieldsQueryCursor<List<?>> cursor = client
        .query(new SqlFieldsQuery("SELECT name from Person WHERE id=?").setArgs(key).setSchema("PUBLIC"));

// Get the results; the `getAll()` methods closes the cursor; you do not have to
// call cursor.close();
List<List<?>> results = cursor.getAll();

results.stream().findFirst().ifPresent(columns -> {
    System.out.println("name = " + columns.get(0));
});

The query(SqlFieldsQuery) method returns an instance of FieldsQueryCursor, which can be used to iterate over the results. After getting the results, the cursor must be closed to release the resources associated with it.

Note
The getAll() method retrieves the results from the cursor and closes it.

Read more about using SqlFieldsQuery and SQL API in the Using SQL API section.

Using Cluster APIs

The cluster APIs let you create a group of cluster nodes and run various operations against the group. The ClientCluster interface is the entry-point to the APIs that can be used as follows:

  • Get or change the state of a cluster

  • Get a list of all cluster nodes

  • Create logical groups of cluster nodes and use other Ignite APIs to perform certain operations on the group

Use the instance of IgniteClient to obtain a reference to the ClientCluster interface:

try (IgniteClient client = Ignition.startClient(clientCfg)) {
    ClientCluster clientCluster = client.cluster();
    clientCluster.state(ClusterState.ACTIVE);
}

Logical Nodes Grouping

You can use the ClientClusterGroup interface of the cluster APIs to create various groups of cluster nodes. For instance, one group can comprise all servers nodes, while the other group can include only those nodes that match a specific TCP/IP address format. The example below shows how to create a group of server nodes located in the dc1 data center:

try (IgniteClient client = Ignition.startClient(clientCfg)) {
    ClientClusterGroup serversInDc1 = client.cluster().forServers().forAttribute("dc", "dc1");
    serversInDc1.nodes().forEach(n -> System.out.println("Node ID: " + n.id()));
}

Refer to the main cluster groups documentation page for more details on the capability.

Executing Compute Tasks

Presently, the Java thin client supports basic compute capabilities by letting you execute those compute tasks that are already deployed in the cluster. You can either run a task across all cluster nodes or a specific cluster group. The deployment assumes that you create a JAR file with the compute tasks and add the JAR to the cluster nodes' classpath.

By default, the execution of tasks, triggered by the thin client, is disabled on the cluster side. You need to set the ThinClientConfiguration.maxActiveComputeTasksPerConnection parameter to a non-zero value in the configuration of your server nodes and thick clients:

<bean class="org.apache.ignite.configuration.IgniteConfiguration" id="ignite.cfg">
    <property name="clientConnectorConfiguration">
        <bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
            <property name="thinClientConfiguration">
                <bean class="org.apache.ignite.configuration.ThinClientConfiguration">
                    <property name="maxActiveComputeTasksPerConnection" value="100" />
                </bean>
            </property>
        </bean>
    </property>
</bean>
ThinClientConfiguration thinClientCfg = new ThinClientConfiguration()
        .setMaxActiveComputeTasksPerConnection(100);

ClientConnectorConfiguration clientConnectorCfg = new ClientConnectorConfiguration()
        .setThinClientConfiguration(thinClientCfg);

IgniteConfiguration igniteCfg = new IgniteConfiguration()
        .setClientConnectorConfiguration(clientConnectorCfg);

Ignite ignite = Ignition.start(igniteCfg);

The example below shows how to get access to the compute APIs via the ClientCompute interface and execute the compute task named MyTask:

try (IgniteClient client = Ignition.startClient(clientCfg)) {
    // Suppose that the MyTask class is already deployed in the cluster
    client.compute().execute(
        MyTask.class.getName(), "argument");
}

Executing Ignite Services

You can use the ClientServices APIs of the Java thin client to invoke an Ignite Service that is already deployed in the cluster.

The example below shows how to invoke the service named MyService:

try (IgniteClient client = Ignition.startClient(clientCfg)) {
    // Executing the service named MyService
    // that is already deployed in the cluster.
    client.services().serviceProxy(
        "MyService", MyService.class).myServiceMethod();
}

The deployed service can be implemented using Java or .NET.

Handling Exceptions

Handling Node Failures

When you provide the addresses of multiple nodes in the client configuration, the client automatically switches to the next node if the current connection fails and retries any ongoing operation.

In the case of atomic operations, failover to another node is transparent to the user. However, if you execute a scan query or a SELECT query, iteration over query cursor may throw an ClientConnectionException. This can happen because queries return data in pages, and if the node that the client is connected to goes down while the client retrieves the pages, to keep query result consistent exception is thrown.

If explicit transaction is started, cache operations binded to this transaction also can throw an ClientException in case of failed connection to server node.

User code should handle these exceptions and implement retry logic accordingly.

Security

SSL/TLS

To use encrypted communication between the thin client and the cluster, you have to enable SSL/TLS in both the cluster configuration and the client configuration. Refer to the Enabling SSL/TLS for Thin Clients section for the instruction on the cluster configuration.

To enable encrypted communication in the thin client, provide a keystore that contains the encryption key and a truststore with the trusted certificates in the thin client configuration.

ClientConfiguration clientCfg = new ClientConfiguration().setAddresses("127.0.0.1:10800");

clientCfg.setSslMode(SslMode.REQUIRED).setSslClientCertificateKeyStorePath(KEYSTORE)
        .setSslClientCertificateKeyStoreType("JKS").setSslClientCertificateKeyStorePassword("123456")
        .setSslTrustCertificateKeyStorePath(TRUSTSTORE).setSslTrustCertificateKeyStorePassword("123456")
        .setSslTrustCertificateKeyStoreType("JKS").setSslKeyAlgorithm("SunX509").setSslTrustAll(false)
        .setSslProtocol(SslProtocol.TLS);

try (IgniteClient client = Ignition.startClient(clientCfg)) {
    // ...
}

The following table explains encryption parameters of the client configuration:

Parameter Description Default Value

sslMode

Either REQURED or DISABLED.

DISABLED

sslClientCertificateKeyStorePath

The path to the keystore file with the private key.

N/A

sslClientCertificateKeyStoreType

The type of the keystore.

JKS

sslClientCertificateKeyStorePassword

The password to the keystore.

N/A

sslTrustCertificateKeyStorePath

The path to the truststore file.

N/A

sslTrustCertificateKeyStoreType

The type of the truststore.

JKS

sslTrustCertificateKeyStorePassword

The password to the truststore.

N/A

sslKeyAlgorithm

Sets the key manager algorithm that is used to create a key manager.

SunX509

sslTrustAll

If this parameter is set to true, the certificates are not validated.

N/A

sslProtocol

The name of the protocol that is used for data encryption.

TLS

Authentication

Configure authentication on the cluster side and provide the user name and password in the client configuration.

ClientConfiguration clientCfg = new ClientConfiguration().setAddresses("127.0.0.1:10800").setUserName("joe")
        .setUserPassword("passw0rd!");

try (IgniteClient client = Ignition.startClient(clientCfg)) {
    // ...
} catch (ClientAuthenticationException e) {
    // Handle authentication failure
}

Async APIs

Most network-bound thin client APIs have an async counterpart, for example, ClientCache.get and ClientCache.getAsync.

IgniteClient client = Ignition.startClient(clientCfg);
ClientCache<Integer, String> cache = client.getOrCreateCache("cache");

IgniteClientFuture<Void> putFut = cache.putAsync(1, "hello");
putFut.get(); // Blocking wait.

IgniteClientFuture<String> getFut = cache.getAsync(1);
getFut.thenAccept(val -> System.out.println(val)); // Non-blocking continuation.
  • Async methods do not block the calling thread

  • Async methods return IgniteClientFuture<T> which is a combination of Future<T> and CompletionStage<T>.

  • Async continuations are executed using ClientConfiguration.AsyncContinuationExecutor, which defaults to ForkJoinPool#commonPool(). For example, cache.getAsync(1).thenAccept(val → System.out.println(val)) will execute the println call using a thread from the commonPool.