GridGain Developers Hub

Continuous Queries

Continuous queries monitors data modifications in a cache. All update events are propagated to the local subscriber. Continuous query implementation guarantees exactly once delivery of an event to the subscriber.

Creating a Subscriber

When you modify a cache (insert, update, or delete an entry), an event is sent to the continuous query’s local listener so that your application can react accordingly. The local listener is executed on the node that initiated the query.

Note that the continuous query throws an exception if started without a local listener.

public static void UseMySubscriber(Table table) throws InterruptedException {
    RecordView < Tuple > view = table.recordView();
    var subscriber = new MySubscriber();

    view.queryContinuously(subscriber, null);

    view.upsert(null, Tuple.create().set("id", 3).set("name", "John Doe"));
    view.upsert(null, Tuple.create().set("id", 3).set("name", "Jane Doe"));
    view.delete(null, Tuple.create().set("id", 3));

    // Wait for some events.
    Thread.sleep(3000);
}

private static class MySubscriber implements Flow.Subscriber < TableRowEventBatch < Tuple >> {
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(TableRowEventBatch < Tuple > batch) {
        List < TableRowEvent < Tuple >> items = batch.rows();
        for (TableRowEvent < Tuple > item: items) {
            System.out.println("onNext: " + item.type() + ", old=" + item.oldEntry() + ", new=" + item.entry());
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError: " + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
}

Continuous Query Parameters

You can configure the following properties of continuous queries:

  • pageSize - The number of entries returned from a single partition in one network call. Default value: 1000.

  • pollIntervalMs - Poll interval, in milliseconds. Default value: 1000.

  • startTimestampMillis - Start timestamp in epoch time.

var subscriber = new TestSubscriber<Tuple>(count);
var options = ContinuousQueryOptions.builder()
    .pollIntervalMs(10)
    .pageSize(pageSize)
    .build();

view.queryContinuously(subscriber, options);