Data Streamer API
The Data Streamer API provides high-throughput data ingestion into Ignite tables. Applications stream data using reactive publishers that batch records for efficient network transmission and processing. This approach achieves higher performance than individual put operations.
Key Concepts
Data streaming uses the Java Flow API for backpressure-aware data delivery. Publishers produce items that contain operation types and payloads. The streamer batches items, sends them to appropriate nodes, and executes operations in parallel across partitions.
Both RecordView and KeyValueView implement DataStreamerTarget, enabling streaming to either view type. Configure streaming behavior through DataStreamerOptions to control batch sizes, parallelism, and retry behavior.
Basic Streaming
Stream data using a Flow publisher:
RecordView<Tuple> view = table.recordView();
// Create publisher
List<DataStreamerItem<Tuple>> items = Arrays.asList(
DataStreamerItem.of(Tuple.create().set("id", 1).set("name", "Alice")),
DataStreamerItem.of(Tuple.create().set("id", 2).set("name", "Bob")),
DataStreamerItem.of(Tuple.create().set("id", 3).set("name", "Carol"))
);
SubmissionPublisher<DataStreamerItem<Tuple>> publisher =
new SubmissionPublisher<>();
// Stream data
CompletableFuture<Void> future = view.streamData(
publisher,
DataStreamerOptions.DEFAULT
);
// Submit items
items.forEach(publisher::submit);
publisher.close();
future.join();
The operation completes when all items are processed.
Stream Options
Configure streaming behavior with options:
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(1000)
.perPartitionParallelOperations(4)
.autoFlushInterval(1000)
.retryLimit(3)
.build();
CompletableFuture<Void> future = view.streamData(publisher, options);
The pageSize parameter controls batch size. Higher values increase throughput but consume more memory. The perPartitionParallelOperations setting determines concurrent operations per partition.
Operation Types
Specify operation types for each item:
List<DataStreamerItem<Tuple>> items = Arrays.asList(
DataStreamerItem.of(
Tuple.create().set("id", 1).set("name", "Alice"),
DataStreamerOperationType.PUT
),
DataStreamerItem.of(
Tuple.create().set("id", 2).set("status", "active"),
DataStreamerOperationType.PUT
),
DataStreamerItem.removed(
Tuple.create().set("id", 3)
),
DataStreamerItem.of(
Tuple.create().set("id", 4).set("name", "David")
)
);
Available operations:
- PUT: Insert or update records (default when using
of()method) - REMOVE: Remove records (use
removed()method or explicitDataStreamerOperationType.REMOVE)
Custom Publishers
Implement custom publishers for streaming from external sources:
class FilePublisher implements Flow.Publisher<DataStreamerItem<Tuple>> {
private final Path file;
public FilePublisher(Path file) {
this.file = file;
}
public void subscribe(Flow.Subscriber<? super DataStreamerItem<Tuple>> subscriber) {
subscriber.onSubscribe(new Flow.Subscription() {
private BufferedReader reader;
public void request(long n) {
try {
if (reader == null) {
reader = Files.newBufferedReader(file);
}
for (long i = 0; i < n; i++) {
String line = reader.readLine();
if (line == null) {
reader.close();
subscriber.onComplete();
return;
}
String[] parts = line.split(",");
Tuple tuple = Tuple.create()
.set("id", Integer.parseInt(parts[0]))
.set("name", parts[1]);
subscriber.onNext(DataStreamerItem.of(tuple));
}
} catch (IOException e) {
subscriber.onError(e);
}
}
public void cancel() {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
// Ignore
}
}
});
}
}
Publishers must respect backpressure signals to avoid overwhelming the system.
Receiver-Based Streaming
Execute custom processing logic on server nodes using receivers:
class AggregationReceiver
implements DataStreamerReceiver<Tuple, String, Integer> {
@Override
public CompletableFuture<List<Integer>> receive(
List<Tuple> items,
DataStreamerReceiverContext context,
String arg
) {
Table table = context.ignite().tables().table("aggregates");
RecordView<Tuple> view = table.recordView();
Map<String, Integer> counts = new HashMap<>();
for (Tuple item : items) {
String category = item.stringValue("category");
counts.merge(category, 1, Integer::sum);
}
for (Map.Entry<String, Integer> entry : counts.entrySet()) {
Tuple record = Tuple.create()
.set("category", entry.getKey())
.set("count", entry.getValue());
view.put(null, record);
}
return CompletableFuture.completedFuture(
Collections.singletonList(counts.size())
);
}
}
Register and use the receiver:
DataStreamerReceiverDescriptor<Tuple, String, Integer> descriptor =
DataStreamerReceiverDescriptor.<Tuple, String, Integer>builder(
"com.example.AggregationReceiver"
).build();
SubmissionPublisher<Tuple> publisher = new SubmissionPublisher<>();
CompletableFuture<Void> future = view.streamData(
publisher,
descriptor,
tuple -> tuple.value("id"),
tuple -> tuple,
"aggregation-arg",
null,
DataStreamerOptions.DEFAULT
);
// Submit items
List<Tuple> items = Arrays.asList(
Tuple.create().set("id", 1).set("category", "A"),
Tuple.create().set("id", 2).set("category", "B")
);
items.forEach(publisher::submit);
publisher.close();
future.join();
Receivers process batches on server nodes, enabling custom logic like aggregations or complex transformations.
Error Handling
Handle streaming errors through the returned future:
CompletableFuture<Void> future = view.streamData(publisher, options);
future.exceptionally(ex -> {
if (ex instanceof DataStreamerException) {
System.err.println("Streaming failed: " + ex.getMessage());
}
return null;
});
Configure retry behavior through DataStreamerOptions.retryLimit to automatically retry failed batches.
Auto-Flush Interval
Configure periodic flushing for low-volume streams:
DataStreamerOptions options = DataStreamerOptions.builder()
.autoFlushInterval(500)
.build();
The streamer flushes incomplete batches after the specified interval in milliseconds. This prevents data from remaining buffered indefinitely in low-throughput scenarios.
Key-Value View Streaming
Stream to key-value views using Entry payloads:
KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
List<DataStreamerItem<Map.Entry<Tuple, Tuple>>> items = Arrays.asList(
DataStreamerItem.of(Map.entry(
Tuple.create().set("id", 1),
Tuple.create().set("name", "Alice")
)),
DataStreamerItem.of(Map.entry(
Tuple.create().set("id", 2),
Tuple.create().set("name", "Bob")
))
);
SubmissionPublisher<DataStreamerItem<Map.Entry<Tuple, Tuple>>> publisher =
new SubmissionPublisher<>();
CompletableFuture<Void> future = kvView.streamData(publisher, DataStreamerOptions.DEFAULT);
items.forEach(publisher::submit);
publisher.close();
future.join();
Performance Considerations
Optimize streaming throughput by tuning configuration:
DataStreamerOptions options = DataStreamerOptions.builder()
.pageSize(10000)
.perPartitionParallelOperations(8)
.retryLimit(5)
.build();
Larger page sizes reduce per-batch overhead but increase memory usage. Higher parallelism improves throughput on multi-core systems but may cause resource contention.
Reference
- Streaming interface:
org.apache.ignite.table.DataStreamerTarget<T> - Configuration:
org.apache.ignite.table.DataStreamerOptions - Stream items:
org.apache.ignite.table.DataStreamerItem<T> - Custom processing:
org.apache.ignite.table.DataStreamerReceiver<T, A, R> - Receiver context:
org.apache.ignite.table.DataStreamerReceiverContext - Receiver descriptor:
org.apache.ignite.table.DataStreamerReceiverDescriptor<T, A, R>
DataStreamerTarget Methods
CompletableFuture<Void> streamData(Publisher<DataStreamerItem<T>>, DataStreamerOptions)- Stream data to table<E, V, A, R> CompletableFuture<Void> streamData(Publisher<E>, DataStreamerReceiverDescriptor<V, A, R>, Function<E, T>, Function<E, V>, A, Subscriber<R>, DataStreamerOptions)- Stream with custom receiver
DataStreamerOptions Configuration
pageSize- Number of items per batch (default: 1000)perPartitionParallelOperations- Concurrent operations per partition (default: 1)autoFlushInterval- Auto-flush interval in milliseconds (default: 5000)retryLimit- Number of retry attempts for failed batches (default: 16)
DataStreamerItem Operations
PUT- Insert or update recordREMOVE- Remove record
DataStreamerReceiver Interface
CompletableFuture<List<R>> receive(List<T>, DataStreamerReceiverContext, A)- Process batch on serverMarshaller<T, byte[]> payloadMarshaller()- Custom payload serializationMarshaller<A, byte[]> argumentMarshaller()- Custom argument serializationMarshaller<R, byte[]> resultMarshaller()- Custom result serialization