Skip to main content
Version: 3.1.0 (Latest)

Compute API

The Compute API executes distributed jobs across cluster nodes. Jobs run colocated with data for maximum performance, accessing the full Ignite API through an execution context.

Key Concepts

Compute jobs are C# classes deployed to the cluster that implement IComputeJob. Submit jobs for execution using the Compute API, which routes them to appropriate nodes based on the specified target.

Job Targets

Job targets control where jobs execute. Target by specific node, node name, or colocated partition to run jobs near data. The client routes job submissions to the appropriate cluster node based on the target.

Broadcast Execution

Broadcast jobs run on multiple nodes simultaneously. Use this pattern for operations that need to execute across the entire cluster or a subset of nodes.

Execution Tracking

Job submission returns an execution handle for monitoring progress and retrieving results. Query job state, change priority, or wait for completion through the execution interface.

Usage Examples

Basic Job Execution

// Define job class (must be deployed to cluster)
public class HelloJob : IComputeJob<string, string>
{
public IMarshaller<string>? InputMarshaller => null;
public IMarshaller<string>? ResultMarshaller => null;

public async ValueTask<string> ExecuteAsync(
IJobExecutionContext context,
string arg,
CancellationToken cancellationToken)
{
return $"Hello, {arg}!";
}
}

// Submit job
var compute = client.Compute;
var jobDescriptor = new JobDescriptor<string, string>(typeof(HelloJob));

var nodes = await client.GetClusterNodesAsync();
var target = JobTarget.Node(nodes[0]);

var execution = await compute.SubmitAsync(target, jobDescriptor, "World");
var result = await execution.GetResultAsync();

Console.WriteLine(result); // "Hello, World!"

Job with Data Access

public class DataProcessingJob : IComputeJob<long, decimal>
{
public IMarshaller<long>? InputMarshaller => null;
public IMarshaller<decimal>? ResultMarshaller => null;

public async ValueTask<decimal> ExecuteAsync(
IJobExecutionContext context,
long customerId,
CancellationToken cancellationToken)
{
// Access Ignite APIs through context
var tables = context.Ignite.Tables;
var ordersTable = await tables.GetTableAsync("orders");
var view = ordersTable.GetRecordView<Order>();

// Execute SQL through context
var sql = context.Ignite.Sql;
var statement = new SqlStatement(
"SELECT SUM(amount) FROM orders WHERE customer_id = ?");
var resultSet = await sql.ExecuteAsync<SumResult>(
null, statement, customerId);

var sum = await resultSet.FirstOrDefaultAsync();
return sum?.Total ?? 0m;
}
}

// Submit job colocated with data
var compute = client.Compute;
var jobDescriptor = new JobDescriptor<long, decimal>(typeof(DataProcessingJob));

var target = JobTarget.Colocated("orders", 12345L);
var execution = await compute.SubmitAsync(target, jobDescriptor, 12345L);
var totalAmount = await execution.GetResultAsync();

Console.WriteLine($"Total orders: ${totalAmount}");

Broadcast Execution

public class DiagnosticsJob : IComputeJob<string, NodeInfo>
{
public IMarshaller<string>? InputMarshaller => null;
public IMarshaller<NodeInfo>? ResultMarshaller => null;

public async ValueTask<NodeInfo> ExecuteAsync(
IJobExecutionContext context,
string arg,
CancellationToken cancellationToken)
{
// Gather node information
return new NodeInfo
{
NodeName = Environment.MachineName,
Timestamp = DateTime.UtcNow,
MemoryUsed = GC.GetTotalMemory(false)
};
}
}

// Broadcast to all nodes
var compute = client.Compute;
var jobDescriptor = new JobDescriptor<string, NodeInfo>(typeof(DiagnosticsJob));

var nodes = await client.GetClusterNodesAsync();
var target = BroadcastTarget.Nodes(nodes);

var execution = await compute.SubmitBroadcastAsync(
target, jobDescriptor, "diagnostics");

// Get results from all nodes
foreach (var jobExecution in execution.JobExecutions)
{
var nodeInfo = await jobExecution.GetResultAsync();
Console.WriteLine($"{nodeInfo.NodeName}: {nodeInfo.MemoryUsed} bytes");
}

Job with Custom Marshallers

public class ComplexDataJob : IComputeJob<CustomInput, CustomOutput>
{
// Provide custom serialization
public IMarshaller<CustomInput>? InputMarshaller => new CustomInputMarshaller();
public IMarshaller<CustomOutput>? ResultMarshaller => new CustomOutputMarshaller();

public async ValueTask<CustomOutput> ExecuteAsync(
IJobExecutionContext context,
CustomInput input,
CancellationToken cancellationToken)
{
// Process complex input
return new CustomOutput
{
ProcessedData = input.RawData.Select(x => x * 2).ToList()
};
}
}

Monitoring Job Execution

var compute = client.Compute;
var jobDescriptor = new JobDescriptor<string, string>(typeof(LongRunningJob));

var target = JobTarget.Node(nodes[0]);
var execution = await compute.SubmitAsync(target, jobDescriptor, "input");

// Monitor job state
while (true)
{
var state = await execution.GetStateAsync();
if (state == null)
{
Console.WriteLine("Job information expired");
break;
}

Console.WriteLine($"Job state: {state.Status}");

if (state.Status == JobStatus.Completed ||
state.Status == JobStatus.Failed ||
state.Status == JobStatus.Canceled)
{
break;
}

await Task.Delay(1000);
}

// Get final result
try
{
var result = await execution.GetResultAsync();
Console.WriteLine($"Result: {result}");
}
catch (Exception ex)
{
Console.WriteLine($"Job failed: {ex.Message}");
}

Changing Job Priority

var execution = await compute.SubmitAsync(target, jobDescriptor, "input");

// Increase priority
var changed = await execution.ChangePriorityAsync(10);
if (changed == true)
{
Console.WriteLine("Priority changed");
}
else if (changed == false)
{
Console.WriteLine("Job already executing or completed");
}
else
{
Console.WriteLine("Job not found (retention expired)");
}

Colocated Execution

// Execute job on node that owns the data
var compute = client.Compute;
var jobDescriptor = new JobDescriptor<long, ProcessingResult>(typeof(ColocatedProcessor));

// Target partition that contains the key
var target = JobTarget.Colocated("customers", customerId);
var execution = await compute.SubmitAsync(target, jobDescriptor, customerId);
var result = await execution.GetResultAsync();

Colocated execution minimizes network traffic by running jobs on the node that stores the data.

Exception Handling

try
{
var execution = await compute.SubmitAsync(target, jobDescriptor, "input");
var result = await execution.GetResultAsync();
}
catch (IgniteException ex)
{
Console.WriteLine($"Job execution failed: {ex.Message}");
}
catch (TimeoutException ex)
{
Console.WriteLine($"Job timed out: {ex.Message}");
}

Cancellation Support

using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(30));

try
{
var execution = await compute.SubmitAsync(
target, jobDescriptor, "input", cts.Token);
var result = await execution.GetResultAsync();
}
catch (OperationCanceledException)
{
Console.WriteLine("Job submission cancelled");
}

Reference

ICompute Interface

Job submission methods:

  • SubmitAsync<TTarget, TArg, TResult>(IJobTarget<TTarget> target, JobDescriptor<TArg, TResult> jobDescriptor, TArg arg, CancellationToken cancellationToken) - Submit job to target with cancellation
  • SubmitAsync<TTarget, TArg, TResult>(IJobTarget<TTarget> target, JobDescriptor<TArg, TResult> jobDescriptor, TArg arg) - Submit job to target

Broadcast methods:

  • SubmitBroadcastAsync<TTarget, TArg, TResult>(IBroadcastJobTarget<TTarget> target, JobDescriptor<TArg, TResult> jobDescriptor, TArg arg, CancellationToken cancellationToken) - Broadcast job with cancellation
  • SubmitBroadcastAsync<TTarget, TArg, TResult>(IBroadcastJobTarget<TTarget> target, JobDescriptor<TArg, TResult> jobDescriptor, TArg arg) - Broadcast job

Map-reduce methods:

  • SubmitMapReduceAsync<TArg, TResult>(TaskDescriptor<TArg, TResult> taskDescriptor, TArg arg, CancellationToken cancellationToken) - Submit map-reduce task with cancellation
  • SubmitMapReduceAsync<TArg, TResult>(TaskDescriptor<TArg, TResult> taskDescriptor, TArg arg) - Submit map-reduce task

IComputeJob<TArg, TResult> Interface

Properties:

  • InputMarshaller - Optional custom marshaller for job arguments
  • ResultMarshaller - Optional custom marshaller for job results

Methods:

  • ExecuteAsync(IJobExecutionContext context, TArg arg, CancellationToken cancellationToken) - Execute the job on the server

Jobs must be deployed to cluster nodes before submission. The job implementation has full access to the Ignite API through the execution context.

IJobExecutionContext Interface

Properties:

  • Ignite - Full Ignite API access for the server environment

Use the context to access tables, execute SQL, start transactions, or perform other operations from within the job. All operations execute in the server context on the cluster node.

IJobExecution<T> Interface

Properties:

  • Id - Unique job identifier (Guid)
  • Node - Cluster node where job is executing

Methods:

  • GetResultAsync() - Wait for and retrieve job result
  • GetStateAsync() - Get current job state (returns null if retention expired)
  • ChangePriorityAsync(int priority) - Change job priority (returns true if changed, false if executing/completed, null if not found)

The execution handle tracks a submitted job. Use it to monitor progress, adjust priority, or wait for completion.

IJobTarget<T> Interface

Properties:

  • Data - Target data (node, partition, etc.)

Static factory methods (on JobTarget class):

  • JobTarget.Node(IClusterNode node) - Target specific node
  • JobTarget.AnyNode(IEnumerable<IClusterNode> nodes) - Target any node from collection
  • JobTarget.AnyNode(params IClusterNode[] nodes) - Target any node from array
  • JobTarget.Colocated(string tableName, TKey key) - Target partition containing key
  • JobTarget.Colocated(QualifiedName tableName, TKey key) - Target partition containing key with qualified table name

JobDescriptor<TArg, TResult> Class

Constructors:

  • JobDescriptor(Type type) - Create descriptor from job type implementing IComputeJob<TArg, TResult>
  • JobDescriptor(string jobClassName) - Create descriptor with Java job class name for server-side execution

The job type must implement IComputeJob<TArg, TResult>. Use the Type constructor for .NET jobs, or the string constructor for Java jobs on the server.

JobState Record

Job state information:

  • Id - Job identifier (Guid)
  • Status - Current job status (JobStatus enum)
  • CreateTime - Job creation timestamp
  • StartTime - Job start timestamp (null when not yet started)
  • FinishTime - Job completion timestamp (null when not yet finished)

State information may expire based on cluster configuration. GetStateAsync returns null when state information is no longer available.

JobStatus Enum

Possible job status values:

  • Queued - Job is submitted and waiting for execution start
  • Executing - Job is currently running
  • Failed - Job was unexpectedly terminated during execution
  • Completed - Job executed successfully and returned result
  • Canceling - Job received cancel command but is still running
  • Canceled - Job was successfully cancelled

Best Practices

Deploy jobs before submission. Jobs must exist on cluster nodes before the client can submit them.

Use colocated execution when jobs access specific data. This eliminates network transfers between compute and data nodes.

Keep jobs focused. Each job should perform a specific task. Use multiple jobs for complex workflows.

Handle exceptions in jobs. Unhandled exceptions fail the job and return errors to the client.

Consider job serialization. Job arguments and results cross network boundaries. Use efficient serialization or custom marshallers for large data.

Monitor long-running jobs. Use GetStateAsync to track progress and detect failures early.