Continuous Queries
Continuous queries monitors data modifications in a cache. All update events are propagated to the local subscriber. Continuous query implementation guarantees exactly once delivery of an event to the subscriber.
Creating a Subscriber
When you modify a cache (insert, update, or delete an entry), an event is sent to the continuous query’s local listener so that your application can react accordingly. The local listener is executed on the node that initiated the query.
Note that the continuous query throws an exception if started without a local listener.
public static void UseMySubscriber(Table table) throws InterruptedException {
RecordView < Tuple > view = table.recordView();
var subscriber = new MySubscriber();
view.queryContinuously(subscriber, null);
view.upsert(null, Tuple.create().set("id", 3).set("name", "John Doe"));
view.upsert(null, Tuple.create().set("id", 3).set("name", "Jane Doe"));
view.delete(null, Tuple.create().set("id", 3));
// Wait for some events.
Thread.sleep(3000);
}
private static class MySubscriber implements Flow.Subscriber < TableRowEventBatch < Tuple >> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(TableRowEventBatch < Tuple > batch) {
List < TableRowEvent < Tuple >> items = batch.rows();
for (TableRowEvent < Tuple > item: items) {
System.out.println("onNext: " + item.type() + ", old=" + item.oldEntry() + ", new=" + item.entry());
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError: " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
Continuous Query Parameters
You can configure the following properties of continuous queries:
-
pageSize
- The number of entries returned from a single partition in one network call. Default value:1000
. -
pollIntervalMs
- Poll interval, in milliseconds. Default value:1000
. -
startTimestampMillis
- Start timestamp in epoch time.
var subscriber = new TestSubscriber<Tuple>(count);
var options = ContinuousQueryOptions.builder()
.pollIntervalMs(10)
.pageSize(pageSize)
.build();
view.queryContinuously(subscriber, options);
© 2024 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.