Skip to main content
Version: 3.1.0 (Latest)

Compute and Events

Ignite 3 provides distributed compute for executing jobs across cluster nodes and an event system for monitoring cluster activity. The compute API uses an asynchronous, priority-based execution model built on CompletableFuture.

Distributed Compute Architecture

Key characteristics:

  • Asynchronous execution returning CompletableFuture<R>
  • Job placement based on target type (any node, colocated, broadcast)
  • Priority-based queue with configurable thread pool
  • Automatic failover on node departure
  • Map-reduce support for split/aggregate patterns

Job Execution Model

ComputeJob Interface

Jobs implement the ComputeJob<T, R> interface:

public interface ComputeJob<T, R> {
CompletableFuture<R> executeAsync(
JobExecutionContext context,
T arg
);
}

The JobExecutionContext provides:

PropertyDescription
ignite()Ignite instance for cluster operations
isCancelled()Check if cancellation requested
partition()Partition info for colocated jobs

Example job implementation:

public class AccountBalanceJob implements ComputeJob<Long, Double> {
@Override
public CompletableFuture<Double> executeAsync(
JobExecutionContext context,
Long accountId) {

Table accounts = context.ignite().tables().table("accounts");
RecordView<Tuple> view = accounts.recordView();

Tuple key = Tuple.create().set("id", accountId);
Tuple record = view.get(null, key);

return CompletableFuture.completedFuture(
record.doubleValue("balance")
);
}
}

Job Targets

Job targets determine where jobs execute:

TargetUse CaseReturn Type
AnyNodeStateless computationJobExecution<R>
ColocatedData-local processingJobExecution<R>
BroadcastCluster-wide operationsBroadcastExecution<R>

Any Node Execution

Execute on any available node:

JobDescriptor<Long, Double> descriptor = JobDescriptor
.<Long, Double>builder(AccountBalanceJob.class)
.build();

JobExecution<Double> execution = client.compute()
.submit(JobTarget.anyNode(client.clusterNodes()), descriptor, accountId);

Double balance = execution.resultAsync().join();

Colocated Execution

Execute on the node holding specific data:

// Execute where account 42's data lives
JobExecution<Double> execution = client.compute().submit(
JobTarget.colocated("accounts", Tuple.create().set("id", 42L)),
descriptor,
42L
);

This eliminates network transfer for data-intensive operations.

Broadcast Execution

Execute on all specified nodes:

BroadcastExecution<String> execution = client.compute().submitBroadcast(
client.clusterNodes(),
JobDescriptor.builder(NodeInfoJob.class).build(),
null
);

// Get results from all nodes
Map<ClusterNode, String> results = execution.resultsAsync().join();

Job Scheduling

Jobs execute through a priority-based queue system:

Configuration options:

SettingDefaultDescription
threadPoolSizemax(CPU cores, 8)Worker thread count
queueMaxSizeInteger.MAX_VALUEMaximum queued jobs
statesLifetimeMillis60,000Job state retention

Job Priority

Set priority when submitting:

JobDescriptor<String, String> descriptor = JobDescriptor
.<String, String>builder(MyJob.class)
.priority(5) // Higher number = lower priority
.build();

Change priority during execution:

JobExecution<String> execution = client.compute().submit(target, descriptor, arg);
execution.changePriorityAsync(1); // Move to higher priority

Jobs with the same priority execute in FIFO order.

Job Failover

Ignite automatically handles node failures during job execution:

Failover behavior:

  • Triggered only on node departure (not job exceptions)
  • Selects next worker from remaining candidates
  • Continues until candidates exhausted
  • Application exceptions propagate to caller without retry

For application-level retries:

JobDescriptor<String, String> descriptor = JobDescriptor
.<String, String>builder(MyJob.class)
.maxRetries(3) // Retry on job failure
.build();

Job State Management

Track job execution through states:

Query job state:

JobExecution<String> execution = client.compute().submit(target, descriptor, arg);

JobState state = execution.stateAsync().join();
System.out.println("Status: " + state.status());
System.out.println("Created: " + state.createTime());
System.out.println("Started: " + state.startTime());
StateDescription
QUEUEDWaiting in priority queue
EXECUTINGRunning on worker thread
COMPLETEDFinished successfully
FAILEDTerminated with exception
CANCELINGCancellation in progress
CANCELEDCancelled by request

Map-Reduce Tasks

For split/aggregate computation patterns, use MapReduceTask:

Implement the task interface:

public class WordCountTask implements MapReduceTask<String, String, Map<String, Long>, Map<String, Long>> {

@Override
public CompletableFuture<List<MapReduceJob<String, Map<String, Long>>>> splitAsync(
TaskExecutionContext context,
String input) {

// Split input into chunks for parallel processing
List<MapReduceJob<String, Map<String, Long>>> jobs = Arrays.stream(input.split("\n\n"))
.map(chunk -> MapReduceJob.<String, Map<String, Long>>builder()
.jobDescriptor(JobDescriptor.builder(CountWordsJob.class).build())
.args(chunk)
.build())
.toList();

return CompletableFuture.completedFuture(jobs);
}

@Override
public CompletableFuture<Map<String, Long>> reduceAsync(
TaskExecutionContext context,
Map<UUID, Map<String, Long>> results) {

// Aggregate word counts from all jobs
Map<String, Long> totals = new HashMap<>();
for (Map<String, Long> partial : results.values()) {
partial.forEach((word, count) ->
totals.merge(word, count, Long::sum));
}
return CompletableFuture.completedFuture(totals);
}
}

Submit the task:

TaskDescriptor<String, Map<String, Long>> descriptor = TaskDescriptor
.<String, Map<String, Long>>builder(WordCountTask.class)
.build();

TaskExecution<Map<String, Long>> execution = client.compute()
.submitMapReduce(descriptor, document);

Map<String, Long> wordCounts = execution.resultAsync().join();

Event System

Ignite provides an event system for monitoring cluster and compute activity.

Event Architecture

Compute Events

EventTrigger
COMPUTE_JOB_QUEUEDJob added to queue
COMPUTE_JOB_EXECUTINGJob started execution
COMPUTE_JOB_COMPLETEDJob finished successfully
COMPUTE_JOB_FAILEDJob terminated with error
COMPUTE_JOB_CANCELINGCancellation requested
COMPUTE_JOB_CANCELEDJob cancelled

Event Listeners

Register listeners for specific events:

EventListener<ComputeEventParameters> listener = params -> {
System.out.println("Job " + params.jobId() + " status: " + params.status());
return CompletableFuture.completedFuture(false); // Keep listening
};

client.compute().listen(IgniteEventType.COMPUTE_JOB_COMPLETED, listener);

Listener return values:

ReturnBehavior
falseKeep listener active
trueRemove listener after this event

For synchronous handlers:

EventListener<ComputeEventParameters> listener = EventListener.fromConsumer(params -> {
log.info("Job completed: {}", params.jobId());
});

Code Deployment

Jobs require their classes to be available on executing nodes. Deploy code using deployment units:

// Create deployment unit from JAR
DeploymentUnit unit = DeploymentUnit.fromPath(
"my-jobs",
"1.0.0",
Path.of("my-jobs.jar")
);

// Deploy to cluster
client.deployment().deployAsync(unit).join();

// Reference in job descriptor
JobDescriptor<String, String> descriptor = JobDescriptor
.<String, String>builder("com.example.MyJob")
.deploymentUnits(List.of(new DeploymentUnit("my-jobs", "1.0.0")))
.build();

Design Constraints

  1. Stateless jobs: Jobs should not maintain state between executions. Store state in tables if needed.

  2. Serializable arguments: Job arguments and results must be serializable for network transfer.

  3. Failover scope: Automatic failover handles infrastructure failures only. Application exceptions propagate without retry unless maxRetries is configured.

  4. Event ordering: Listeners execute in registration order per event, but no global ordering across nodes.

  5. One-shot listeners: Return true to auto-unsubscribe. Useful for waiting on specific events.

  6. Thread pool bounds: The executor thread pool is bounded. Long-running jobs can block other jobs.