Streaming Data
To stream a large amount of data, use the data streamer. Data streaming provides a quicker and more efficient way to load, organize and optimally distribute your data. Data streamer accepts a stream of data and distributes data entries across the cluster, where the processing takes place. Data streaming is available in all table views.
Data streaming provides at-least-once delivery guarantee.
Using Data Streamer API
RecordView<Tuple> view = defaultTable().recordView();
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<CustomData>()) {
streamerFut = view.<CustomData, String, Boolean>streamData(
publisher,
null,
item -> Tuple.create().set("id", item.getId()),
item -> item.serializeToString(),
null,
List.of(new DeploymentUnit("test", "1.0.0")),
"org.foo.bar.StreamReceiver",
"receiverArg1");
publisher.submit(new CustomData(1, "x"));
}
public async Task TestBasicStreamingRecordBinaryView()
{
var options = DataStreamerOptions.Default with { BatchSize = 10 };
var data = Enumerable.Range(0, Count).Select(x => new IgniteTuple { ["id"] = 1L, ["name"] = "foo" }).ToList();
await TupleView.StreamDataAsync(data.ToAsyncEnumerable(), options);
}
Tracking Failed Entries
If data streamer fails to submit data, it stores all failed items, and returns them in the DataStreamerException#failedItems
. You can then catch the exception to keep track of the entries that were not sent to the cluster:
RecordView<Tuple> view = defaultTable().recordView(); CompletableFuture<Void> streamerFut; try (var publisher = new SubmissionPublisher<CustomData>()) { streamerFut = view.<CustomData, String, Boolean>streamData( publisher, null, item -> Tuple.create().set("id", item.getId()), item -> item.serializeToString(), null, List.of(new DeploymentUnit("test", "1.0.0")), "org.foo.bar.StreamReceiver", "receiverArg1") .exceptionally(e -> { // Handle entries that failed during background streaming System.out.println(((DataStreamerException)e.getCause()).failedItems()) } ); publisher.submit(new CustomData(1, "x")); } catch (DataStreamerException e){ Set<?> failedItems = e.failedItems() // Handle entries that failed during submission System.out.println(failedItems) }
Configuring Data Streamer Properties
All data streamer parameters can be configured by using the DataStreamerOptions
object. For example, the code snippet below sets the data streamer to have 3 retries:
RecordView<Tuple> view = client.tables().table("Person").recordView();
var publisher = new SubmissionPublisher<Tuple>();
var options = DataStreamerOptions.builder()
.retryLimit(3)
.build();
streamerFut = view.streamData(publisher, options);
Tuning Memory Usage
Data streamer may require significant amount of memory to handle the requests in orderly manner. Depending on your environment, you may want to increase or reduce the amount of memory reserved by the data streamer.
For every node in the cluster, the streamer reserves an amount of memory equal to batchSize
(1000 entries by default) multiplied by perNodeParallelOperations
(4 by default) setting. For example, a 10-node cluster with default parameters and average entry size of 1KB will reserve 40MB for operations.
You can change these options the same way you would work with any other options:
RecordView<Tuple> view = client.tables().table("Person").recordView();
var publisher = new SubmissionPublisher<Tuple>();
var options = DataStreamerOptions.builder()
.batchSize(10000)
.perNodeParallelOperations(10)
.build();
streamerFut = view.streamData(publisher, options);
Additionally, the data streamer periodically flushes incomplete buffers to avoid messages being stuck for a long time (a specific buffer can fill up slowly or never fill completely at all, depending on the data distribution). This is configured with the autoFlushInterval
(5000ms by default) property.
© 2024 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.