Java Thin Client
Overview
The Java thin client is a lightweight client that connects to the cluster via a standard socket connection. It does not become a part of the cluster topology, never holds any data, and is not used as a destination for compute calculations. The thin client simply establishes a socket connection to a standard node and performs all operations through that node.
Setting Up
If you use maven or gradle, add the ignite-core
dependency to your application:
<properties>
<ignite.version>2.16.0</ignite.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>${ignite.version}</version>
</dependency>
</dependencies>
def igniteVersion = '2.16.0'
dependencies {
compile group: 'org.apache.ignite', name: 'ignite-core', version: igniteVersion
}
Alternatively, you can use the ignite-core-2.16.0.jar
library from the Ignite distribution package.
Connecting to Cluster
To initialize a thin client, use the Ignition.startClient(ClientConfiguration)
method. The method accepts a ClientConfiguration
object, which defines client connection parameters.
The method returns the IgniteClient
interface, which provides various methods for accessing data. IgniteClient
is an auto-closable resource. Use the try-with-resources statement to close the thin client and release the resources associated with the connection.
ClientConfiguration cfg = new ClientConfiguration().setAddresses("127.0.0.1:10800");
try (IgniteClient client = Ignition.startClient(cfg)) {
ClientCache<Integer, String> cache = client.cache("myCache");
// Get data from the cache
}
You can provide addresses of multiple nodes. In this case, the thin client randomly tries all the servers in the list and throws ClientConnectionException
if none is available.
try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses("node1_address:10800",
"node2_address:10800", "node3_address:10800"))) {
} catch (ClientConnectionException ex) {
// All the servers are unavailable
}
Note that the code above provides a failover mechanism in case of server node failures. Refer to the Handling Node Failures section for more information.
Partition Awareness [partition_awareness]
Partition awareness allows the thin client to send query requests directly to the node that owns the queried data.
Without partition awareness, an application that is connected to the cluster via a thin client executes all queries and operations via a single server node that acts as a proxy for the incoming requests. These operations are then re-routed to the node that stores the data that is being requested. This results in a bottleneck that could prevent the application from scaling linearly.
Notice how queries must pass through the proxy server node, where they are routed to the correct node.
With partition awareness in place, the thin client can directly route queries and operations to the primary nodes that own the data required for the queries. This eliminates the bottleneck, allowing the application to scale more easily.
Partition awareness functionality helps to avoid an additional network hop in the following scenarios:
-
Single-key operations API, like put(), get(), etc. However, the functionality has no effect on those operations within explicit transactions (initiated via ClientTransaction#txStart() described in Transactions section).
-
ScanQuery and IndexQuery accept a partition number as a parameter with which the query is routed to a particular server node that stores the requested data. Refer to Executing Scan Queries and Executing Index Queries sections for more information.
The following code sample illustrates how to use the partition awareness feature with the java thin client.
ClientConfiguration cfg = new ClientConfiguration()
.setAddresses("node1_address:10800", "node2_address:10800", "node3_address:10800")
.setPartitionAwarenessEnabled(true);
try (IgniteClient client = Ignition.startClient(cfg)) {
ClientCache<Integer, String> cache = client.cache("myCache");
// Put, get or remove data from the cache...
cache.put(0, "Hello, world!");
// The partition number can be specified with IndexQuery#setPartition(Integer) as well.
ScanQuery scanQuery = new ScanQuery().setPartition(part);
} catch (ClientException e) {
System.err.println(e.getMessage());
}
The code sample below shows how to use a custom cache key to partition mapping function to enable affinity awareness on a thin client side if the cache already exists in a cluster or/and was created with custom AffinityFunction or AffinityKeyMapper.
// Partition awarenes is enabled by default since Apache Ignite 2.11 release.
ClientConfiguration cfg = new ClientConfiguration()
.setAddresses("node1_address:10800", "node2_address:10800", "node3_address:10800")
.setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
/** {@inheritDoc} */
@Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);
return aff::partition;
}
})
try (IgniteClient client = Ignition.startClient(cfg)) {
ClientCache<Integer, String> cache = client.cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
// Put, get or remove data from the cache, partition awarenes will be enabled.
}
catch (ClientException e) {
System.err.println(e.getMessage());
}
If a list of server nodes is dynamically changing or scaling, then it is possible to configure the connection with custom implementation of ClientAddressFinder
. It should provide a number of current server addresses every time a client asks for them.
The following code sample illustrates how to use it.
ClientAddressFinder finder = () -> {
String[] dynamicServerAddresses = fetchServerAddresses();
return dynamicServerAddresses;
};
ClientConfiguration cfg = new ClientConfiguration()
.setAddressesFinder(finder)
.setPartitionAwarenessEnabled(true);
try (IgniteClient client = Ignition.startClient(cfg)) {
ClientCache<Integer, String> cache = client.cache("myCache");
// Put, get, or remove data from the cache...
} catch (ClientException e) {
System.err.println(e.getMessage());
}
The code snippet shows how an example implementation might look like if you want clients to retrieve server addresses dynamically.
-
The
ClientAddressFinder
is a functional interface that provides the only methodgetAddresses()
. -
The
fetchServerAddress()
is a custom function that dynamically provides server addresses. -
Configure client with
ClientConfiguration.setAddressFinder(finder)
.
Also, you can check a real example of the interface implementation. ThinClientKubernetesAddressFinder
is created to handle scalable Kubernetes environment.
Note
|
Partition Awareness also enables Service Awareness |
Using Key-Value API
The Java thin client supports most of the key-value operations available in the thick client. To execute key-value operations on a specific cache, you need to get an instance of the cache and use one of its methods.
Getting a Cache Instance
The ClientCache
interface provides the key-value API. You can use the following methods to obtain an instance of ClientCache
:
-
IgniteClient.cache(String)
: assumes a cache with the specified name exists. The method does not communicate with the cluster to check if the cache really exists. Subsequent cache operations fail if the cache does not exist. -
IgniteClient.getOrCreateCache(String)
,IgniteClient.getOrCreateCache(ClientCacheConfiguration)
: get existing cache with the specified name or create the cache if it does not exist. The former operation creates a cache with default configuration. -
IgniteClient.createCache(String)
,IgniteClient.createCache(ClientCacheConfiguration)
: create a cache with the specified name and fail if the cache already exists.
Use IgniteClient.cacheNames()
to list all existing caches.
ClientCacheConfiguration cacheCfg = new ClientCacheConfiguration().setName("References")
.setCacheMode(CacheMode.REPLICATED)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
ClientCache<Integer, String> cache = client.getOrCreateCache(cacheCfg);
Basic Cache Operations
The following code snippet demonstrates how to execute basic cache operations from the thin client.
Map<Integer, String> data = IntStream.rangeClosed(1, 100).boxed()
.collect(Collectors.toMap(i -> i, Object::toString));
cache.putAll(data);
assert !cache.replace(1, "2", "3");
assert "1".equals(cache.get(1));
assert cache.replace(1, "1", "3");
assert "3".equals(cache.get(1));
cache.put(101, "101");
cache.removeAll(data.keySet());
assert cache.size() == 1;
assert "101".equals(cache.get(101));
cache.removeAll();
assert 0 == cache.size();
Executing Scan Queries
Use the ScanQuery<K, V>
class to get a set of entries that satisfy a given condition. The thin client sends the query to the cluster node where it is executed as a regular scan query.
The query condition is specified by an IgniteBiPredicate<K, V>
object that is passed to the query constructor as an argument. The predicate is applied on the server side. If there is no predicate provided, the query returns all cache entries.
Note
|
The classes of the predicates must be available on the server nodes of the cluster. |
The results of the query are transferred to the client page by page. Each page contains a specific number of entries and is fetched to the client only when the entries from that page are requested. To change the number of entries in a page, use the ScanQuery.setPageSize(int pageSize)
method (default value is 1024).
ClientCache<Integer, Person> personCache = client.getOrCreateCache("personCache");
Query<Cache.Entry<Integer, Person>> qry = new ScanQuery<Integer, Person>(
(i, p) -> p.getName().contains("Smith"));
try (QueryCursor<Cache.Entry<Integer, Person>> cur = personCache.query(qry)) {
for (Cache.Entry<Integer, Person> entry : cur) {
// Process the entry ...
}
}
The IgniteClient.query(…)
method returns an instance of FieldsQueryCursor
. Make sure to always close the cursor after you obtain all results.
Transactions
Client transactions are supported for caches with AtomicityMode.TRANSACTIONAL
mode.
Executing Transactions
To start a transaction, obtain the ClientTransactions
object from IgniteClient
.
ClientTransactions
has a number of txStart(…)
methods, each of which starts a new transaction and returns an object (ClientTransaction
) that represents the transaction.
Use this object to commit or rollback the transaction.
ClientCache<Integer, String> cache = client.cache("my_transactional_cache");
ClientTransactions tx = client.transactions();
try (ClientTransaction t = tx.txStart()) {
cache.put(1, "new value");
t.commit();
}
Transaction Configuration
Client transactions can have different concurrency modes, isolation levels, and execution timeout, which can be set for all transactions or on a per transaction basis.
The ClientConfiguration
object supports setting the default concurrency mode, isolation level, and timeout for all transactions started with this client interface.
ClientConfiguration cfg = new ClientConfiguration();
cfg.setAddresses("localhost:10800");
cfg.setTransactionConfiguration(new ClientTransactionConfiguration().setDefaultTxTimeout(10000)
.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC)
.setDefaultTxIsolation(TransactionIsolation.REPEATABLE_READ));
IgniteClient client = Ignition.startClient(cfg);
You can specify the concurrency mode, isolation level, and timeout when starting an individual transaction. In this case, the provided values override the default settings.
ClientTransactions tx = client.transactions();
try (ClientTransaction t = tx.txStart(TransactionConcurrency.OPTIMISTIC,
TransactionIsolation.REPEATABLE_READ)) {
cache.put(1, "new value");
t.commit();
}
Working with Binary Objects
The thin client fully supports Binary Object API described in the Working with Binary Objects section.
Use CacheClient.withKeepBinary()
to switch the cache to binary mode and start working directly with binary objects to avoid serialization/deserialization.
Use IgniteClient.binary()
to get an instance of IgniteBinary
and build an object from scratch.
IgniteBinary binary = client.binary();
BinaryObject val = binary.builder("Person").setField("id", 1, int.class).setField("name", "Joe", String.class)
.build();
ClientCache<Integer, BinaryObject> cache = client.cache("persons").withKeepBinary();
cache.put(1, val);
BinaryObject value = cache.get(1);
Refer to the Working with Binary Objects page for detailed information.
Cache Entry Listening
When a cache is modified (an entry is inserted, updated, deleted, or expired), an event can be sent to notify the client. To listen to these events, you can use one of the following approaches:
-
Continuous queries
-
Cache
registerCacheEntryListener
methods
Both approaches require a local listener to be provided, which is triggered on every cache modification event.
For both approaches you can also specify a remote filter to narrow down the range of entries that are monitored for updates. This filter is executed for each updated entry on the server-side and evaluates whether the event should be propagated to the client’s local listener.
Note
|
The classes of the remote filter factory must be available on the server nodes of the cluster. |
Refer to the thick client continuous queries page for more information about continuous queries.
In case of connection to server failure, a thin client cannot silently reconnect with guarantees that no events are lost, so continuous queries and registered cache event listeners are closed after the server disconnection. There are also several methods with the additional parameter: disconnect listener. This listener allows to catch server disconnection events and react appropriately.
ClientCache<Integer, String> cache = client.getOrCreateCache("myCache");
ContinuousQuery<Integer, String> query = new ContinuousQuery<>();
query.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events)
throws CacheEntryListenerException {
// react to the update events here
}
});
ClientDisconnectListener disconnectListener = new ClientDisconnectListener() {
@Override public void onDisconnected(Exception reason) {
// react to the disconnect event here
}
};
cache.query(query, disconnectListener);
Executing SQL Statements
The Java thin client provides a SQL API to execute SQL statements. SQL statements are declared using the SqlFieldsQuery
objects and executed through the IgniteClient.query(SqlFieldsQuery)
method.
client.query(new SqlFieldsQuery(String.format(
"CREATE TABLE IF NOT EXISTS Person (id INT PRIMARY KEY, name VARCHAR) WITH \"VALUE_TYPE=%s\"",
Person.class.getName())).setSchema("PUBLIC")).getAll();
int key = 1;
Person val = new Person(key, "Person 1");
client.query(new SqlFieldsQuery("INSERT INTO Person(id, name) VALUES(?, ?)").setArgs(val.getId(), val.getName())
.setSchema("PUBLIC")).getAll();
FieldsQueryCursor<List<?>> cursor = client
.query(new SqlFieldsQuery("SELECT name from Person WHERE id=?").setArgs(key).setSchema("PUBLIC"));
// Get the results; the `getAll()` methods closes the cursor; you do not have to
// call cursor.close();
List<List<?>> results = cursor.getAll();
results.stream().findFirst().ifPresent(columns -> {
System.out.println("name = " + columns.get(0));
});
The query(SqlFieldsQuery)
method returns an instance of FieldsQueryCursor
, which can be used to iterate over the results. After getting the results, the cursor must be closed to release the resources associated with it.
Note
|
The getAll() method retrieves the results from the cursor and closes it.
|
Read more about using SqlFieldsQuery
and SQL API in the Using SQL API section.
Using Cluster APIs
The cluster APIs let you create a group of cluster nodes and run various operations against the group. The ClientCluster
interface is the entry-point to the APIs that can be used as follows:
-
Get or change the state of a cluster
-
Get a list of all cluster nodes
-
Create logical groups of cluster nodes and use other Ignite APIs to perform certain operations on the group
Use the instance of IgniteClient
to obtain a reference to the ClientCluster
interface:
try (IgniteClient client = Ignition.startClient(clientCfg)) {
ClientCluster clientCluster = client.cluster();
clientCluster.state(ClusterState.ACTIVE);
}
Logical Nodes Grouping
You can use the ClientClusterGroup
interface of the cluster APIs to create various groups of cluster nodes. For instance,
one group can comprise all servers nodes, while the other group can include only those nodes that match a specific
TCP/IP address format. The example below shows how to create a group of server nodes located in the dc1
data center:
try (IgniteClient client = Ignition.startClient(clientCfg)) {
ClientClusterGroup serversInDc1 = client.cluster().forServers().forAttribute("dc", "dc1");
serversInDc1.nodes().forEach(n -> System.out.println("Node ID: " + n.id()));
}
Refer to the main cluster groups documentation page for more details on the capability.
Executing Compute Tasks
Presently, the Java thin client supports basic compute capabilities by letting you execute those compute tasks that are already deployed in the cluster. You can either run a task across all cluster nodes or a specific cluster group. The deployment assumes that you create a JAR file with the compute tasks and add the JAR to the cluster nodes' classpath.
By default, the execution of tasks, triggered by the thin client, is disabled on the cluster side. You need to set the
ThinClientConfiguration.maxActiveComputeTasksPerConnection
parameter to a non-zero value in the configuration of your
server nodes and thick clients:
<bean class="org.apache.ignite.configuration.IgniteConfiguration" id="ignite.cfg">
<property name="clientConnectorConfiguration">
<bean class="org.apache.ignite.configuration.ClientConnectorConfiguration">
<property name="thinClientConfiguration">
<bean class="org.apache.ignite.configuration.ThinClientConfiguration">
<property name="maxActiveComputeTasksPerConnection" value="100" />
</bean>
</property>
</bean>
</property>
</bean>
ThinClientConfiguration thinClientCfg = new ThinClientConfiguration()
.setMaxActiveComputeTasksPerConnection(100);
ClientConnectorConfiguration clientConnectorCfg = new ClientConnectorConfiguration()
.setThinClientConfiguration(thinClientCfg);
IgniteConfiguration igniteCfg = new IgniteConfiguration()
.setClientConnectorConfiguration(clientConnectorCfg);
Ignite ignite = Ignition.start(igniteCfg);
The example below shows how to get access to the compute APIs via the ClientCompute
interface and execute the compute
task named MyTask
:
try (IgniteClient client = Ignition.startClient(clientCfg)) {
// Suppose that the MyTask class is already deployed in the cluster
client.compute().execute(
MyTask.class.getName(), "argument");
}
Executing Ignite Services
You can use the ClientServices
APIs of the Java thin client to invoke an Ignite Service that
is already deployed in the cluster.
The example below shows how to invoke the service named MyService
:
try (IgniteClient client = Ignition.startClient(clientCfg)) {
// Executing the service named MyService
// that is already deployed in the cluster.
client.services().serviceProxy(
"MyService", MyService.class).myServiceMethod();
}
The deployed service can be implemented using Java or .NET.
Handling Exceptions
Handling Node Failures
When you provide the addresses of multiple nodes in the client configuration, the client automatically switches to the next node if the current connection fails and retries any ongoing operation.
In the case of atomic operations, failover to another node is transparent to the user. However, if you execute a scan query or a SELECT query, iteration over query cursor may throw an ClientConnectionException
. This can happen because queries return data in pages, and if the node that the client is connected to goes down while the client retrieves the pages, to keep query result consistent exception is thrown.
If explicit transaction is started, cache operations binded to this transaction also can throw an ClientException
in case of failed connection to server node.
User code should handle these exceptions and implement retry logic accordingly.
Security
SSL/TLS
To use encrypted communication between the thin client and the cluster, you have to enable SSL/TLS in both the cluster configuration and the client configuration. Refer to the Enabling SSL/TLS for Thin Clients section for the instruction on the cluster configuration.
To enable encrypted communication in the thin client, provide a keystore that contains the encryption key and a truststore with the trusted certificates in the thin client configuration.
ClientConfiguration clientCfg = new ClientConfiguration().setAddresses("127.0.0.1:10800");
clientCfg.setSslMode(SslMode.REQUIRED).setSslClientCertificateKeyStorePath(KEYSTORE)
.setSslClientCertificateKeyStoreType("JKS").setSslClientCertificateKeyStorePassword("123456")
.setSslTrustCertificateKeyStorePath(TRUSTSTORE).setSslTrustCertificateKeyStorePassword("123456")
.setSslTrustCertificateKeyStoreType("JKS").setSslKeyAlgorithm("SunX509").setSslTrustAll(false)
.setSslProtocol(SslProtocol.TLS);
try (IgniteClient client = Ignition.startClient(clientCfg)) {
// ...
}
The following table explains encryption parameters of the client configuration:
Parameter | Description | Default Value |
---|---|---|
sslMode |
Either |
|
sslClientCertificateKeyStorePath |
The path to the keystore file with the private key. |
N/A |
sslClientCertificateKeyStoreType |
The type of the keystore. |
|
sslClientCertificateKeyStorePassword |
The password to the keystore. |
N/A |
sslTrustCertificateKeyStorePath |
The path to the truststore file. |
N/A |
sslTrustCertificateKeyStoreType |
The type of the truststore. |
|
sslTrustCertificateKeyStorePassword |
The password to the truststore. |
N/A |
sslKeyAlgorithm |
Sets the key manager algorithm that is used to create a key manager. |
|
sslTrustAll |
If this parameter is set to |
N/A |
sslProtocol |
The name of the protocol that is used for data encryption. |
|
Authentication
Configure authentication on the cluster side and provide the user name and password in the client configuration.
ClientConfiguration clientCfg = new ClientConfiguration().setAddresses("127.0.0.1:10800").setUserName("joe")
.setUserPassword("passw0rd!");
try (IgniteClient client = Ignition.startClient(clientCfg)) {
// ...
} catch (ClientAuthenticationException e) {
// Handle authentication failure
}
Async APIs
Most network-bound thin client APIs have an async counterpart, for example, ClientCache.get
and ClientCache.getAsync
.
IgniteClient client = Ignition.startClient(clientCfg);
ClientCache<Integer, String> cache = client.getOrCreateCache("cache");
IgniteClientFuture<Void> putFut = cache.putAsync(1, "hello");
putFut.get(); // Blocking wait.
IgniteClientFuture<String> getFut = cache.getAsync(1);
getFut.thenAccept(val -> System.out.println(val)); // Non-blocking continuation.
-
Async methods do not block the calling thread
-
Async methods return
IgniteClientFuture<T>
which is a combination ofFuture<T>
andCompletionStage<T>
. -
Async continuations are executed using
ClientConfiguration.AsyncContinuationExecutor
, which defaults toForkJoinPool#commonPool()
. For example,cache.getAsync(1).thenAccept(val → System.out.println(val))
will execute theprintln
call using a thread from thecommonPool
.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.