Using Continuous Queries | Ignite Documentation

Ignite Summit 2024 — Call For Speakers Now Open — Learn more

Edit

Using Continuous Queries

A continuous query is a query that monitors data modifications occurring in a cache. Once a continuous query is started, you get notified of all the data changes that fall into your query filter.

All update events are propagated to the local listener that must be registered in the query. Continuous query implementation guarantees exactly once delivery of an event to the local listener.

You can also specify a remote filter to narrow down the range of entries that are monitored for updates.

Caution

Continuous Queries and MVCC

Continuous queries have a number of functional limitations when used with MVCC-enabled caches.

Local Listener

When a cache gets modified (an entry is inserted, updated, or deleted), an event is sent to the continuous query’s local listener so that your application can react accordingly. The local listener is executed on the node that initiated the query.

Note that the continuous query throws an exception if started without a local listener.

IgniteCache<Integer, String> cache = ignite.getOrCreateCache("myCache");

ContinuousQuery<Integer, String> query = new ContinuousQuery<>();

query.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {

    @Override
    public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events)
        throws CacheEntryListenerException {
        // react to the update events here
    }
});

cache.query(query);
class LocalListener : ICacheEntryEventListener<int, string>
{
    public void OnEvent(IEnumerable<ICacheEntryEvent<int, string>> evts)
    {
        foreach (var cacheEntryEvent in evts)
        {
            //react to update events here
        }
    }
}
public static void ContinuousQueryListenerDemo()
{
    var ignite = Ignition.Start(new IgniteConfiguration
    {
        DiscoverySpi = new TcpDiscoverySpi
        {
            LocalPort = 48500,
            LocalPortRange = 20,
            IpFinder = new TcpDiscoveryStaticIpFinder
            {
                Endpoints = new[]
                {
                    "127.0.0.1:48500..48520"
                }
            }
        }
    });
    var cache = ignite.GetOrCreateCache<int, string>("myCache");

    var query = new ContinuousQuery<int, string>(new LocalListener());

    var handle = cache.QueryContinuous(query);

    cache.Put(1, "1");
    cache.Put(2, "2");
}
/**
 * Listener class.
 */
template<typename K, typename V>
class Listener : public event::CacheEntryEventListener<K, V>
{
public:
    /**
     * Default constructor.
     */
    Listener()
    {
        // No-op.
    }

    /**
     * Event callback.
     *
     * @param evts Events.
     * @param num Events number.
     */
    virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num)
    {
        for (uint32_t i = 0; i < num; ++i)
        {
            std::cout << "Queried entry [key=" << (evts[i].HasValue() ? evts[i].GetKey() : K())
                << ", val=" << (evts[i].HasValue() ? evts[i].GetValue() : V()) << ']'
                << std::endl;
        }
    }
};

int main()
{
    IgniteConfiguration cfg;
    cfg.springCfgPath = "/path/to/configuration.xml";

    Ignite ignite = Ignition::Start(cfg);

    Cache<int32_t, std::string> cache = ignite.GetOrCreateCache<int32_t, std::string>("myCache");

    // Declaring custom listener.
    Listener<int32_t, std::string> listener;

    // Declaring continuous query.
    continuous::ContinuousQuery<int32_t, std::string> query(MakeReference(listener));

    continuous::ContinuousQueryHandle<int32_t, std::string> handle = cache.QueryContinuous(query);
}

Initial Query

You can specify an initial query that is executed before the continuous query gets registered in the cluster and before you start to receive updates. To specify an initial query, use the ContinuousQuery.setInitialQuery(…​) method.

Just like scan queries, a continuous query is executed via the query() method that returns a cursor. When an initial query is set, you can use that cursor to iterate over the results of the initial query.

IgniteCache<Integer, String> cache = ignite.getOrCreateCache("myCache");

ContinuousQuery<Integer, String> query = new ContinuousQuery<>();

// Setting an optional initial query.
// The query will return entries for the keys greater than 10.
query.setInitialQuery(new ScanQuery<>((k, v) -> k > 10));

//mandatory local listener
query.setLocalListener(events -> {
});

try (QueryCursor<Cache.Entry<Integer, String>> cursor = cache.query(query)) {
    // Iterating over the entries returned by the initial query 
    for (Cache.Entry<Integer, String> e : cursor)
        System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
}
This API is not presently available for C#/.NET.
Cache<int32_t, std::string> cache = ignite.GetOrCreateCache<int32_t, std::string>("myCache");

// Custom listener
Listener<int32_t, std::string> listener;

// Declaring continuous query.
continuous::ContinuousQuery<int32_t, std::string> query(MakeReference(listener));

// Declaring optional initial query
ScanQuery initialQuery = ScanQuery();

continuous::ContinuousQueryHandle<int32_t, std::string> handle = cache.QueryContinuous(query, initialQuery);

// Iterating over existing data stored in the cache.
QueryCursor<int32_t, std::string> cursor = handle.GetInitialQueryCursor();

while (cursor.HasNext())
{
    std::cout << cursor.GetNext().GetKey() << std::endl;
}

Remote Filter

This filter is executed for each updated key and evaluates whether the update should be propagated to the query’s local listener. If the filter returns true, then the local listener is notified about the update.

For redundancy reasons, the filter is executed for both primary and backup versions (if backups are configured) of the key. Because of this, a remote filter can be used as a remote listener for update events.

ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

qry.setLocalListener(events ->
    events.forEach(event -> System.out.format("Entry: key=[%s] value=[%s]\n", event.getKey(), event.getValue()))
);

qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
    @Override
    public CacheEntryEventFilter<Integer, String> create() {
        return new CacheEntryEventFilter<Integer, String>() {
            @Override
            public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
                System.out.format("the value for key [%s] was updated from [%s] to [%s]\n", e.getKey(), e.getOldValue(), e.getValue());
                return true;
            }
        };
    }
});
class LocalListener : ICacheEntryEventListener<int, string>
{
    public void OnEvent(IEnumerable<ICacheEntryEvent<int, string>> evts)
    {
        foreach (var cacheEntryEvent in evts)
        {
            //react to update events here
        }
    }
}
class RemoteFilter : ICacheEntryEventFilter<int, string>
{
    public bool Evaluate(ICacheEntryEvent<int, string> e)
    {
        if (e.Key == 1)
        {
            return false;
        }
        Console.WriteLine("the value for key {0} was updated from {1} to {2}", e.Key, e.OldValue, e.Value);
        return true;
    }
}
public static void ContinuousQueryFilterDemo()
{
    var ignite = Ignition.Start(new IgniteConfiguration
    {
        DiscoverySpi = new TcpDiscoverySpi
        {
            LocalPort = 48500,
            LocalPortRange = 20,
            IpFinder = new TcpDiscoveryStaticIpFinder
            {
                Endpoints = new[]
                {
                    "127.0.0.1:48500..48520"
                }
            }
        }
    });
    var cache = ignite.GetOrCreateCache<int, string>("myCache");

    var query = new ContinuousQuery<int, string>(new LocalListener(), new RemoteFilter());

    var handle = cache.QueryContinuous(query);

    cache.Put(1, "1");
    cache.Put(2, "2");
}
template<typename K, typename V>
struct RemoteFilter : event::CacheEntryEventFilter<int32_t, std::string>
{
    /**
     * Default constructor.
     */
    RemoteFilter()
    {
        // No-op.
    }

    /**
     * Destructor.
     */
    virtual ~RemoteFilter()
    {
        // No-op.
    }

    /**
     * Event callback.
     *
     * @param event Event.
     * @return True if the event passes filter.
     */
    virtual bool Process(const CacheEntryEvent<K, V>& event)
    {
        std::cout << "The value for key " << event.GetKey() <<
            " was updated from " << event.GetOldValue() << " to " << event.GetValue() << std::endl;
        return true;
    }
};

namespace ignite
{
    namespace binary
    {
        template<>
        struct BinaryType< RemoteFilter<int32_t, std::string> >
        {
            static int32_t GetTypeId()
            {
                return GetBinaryStringHashCode("RemoteFilter<int32_t,std::string>");
            }

            static void GetTypeName(std::string& dst)
            {
                dst = "RemoteFilter<int32_t,std::string>";

            }

            static int32_t GetFieldId(const char* name)
            {
                return GetBinaryStringHashCode(name);
            }

            static bool IsNull(const RemoteFilter<int32_t, std::string>&)
            {
                return false;
            }

            static void GetNull(RemoteFilter<int32_t, std::string>& dst)
            {
                dst = RemoteFilter<int32_t, std::string>();
            }

            static void Write(BinaryWriter& writer, const RemoteFilter<int32_t, std::string>& obj)
            {
                // No-op.
            }

            static void Read(BinaryReader& reader, RemoteFilter<int32_t, std::string>& dst)
            {
                // No-op.
            }
        };
    }
}

int main()
{
    IgniteConfiguration cfg;
    cfg.springCfgPath = "/path/to/configuration.xml";

    // Start a node.
    Ignite ignite = Ignition::Start(cfg);

    // Get binding.
    IgniteBinding binding = ignite.GetBinding();

    // Registering remote filter.
    binding.RegisterCacheEntryEventFilter<RemoteFilter<int32_t, std::string>>();

    // Get cache instance.
    Cache<int32_t, std::string> cache = ignite.GetOrCreateCache<int32_t, std::string>("myCache");

    // Declaring custom listener.
    Listener<int32_t, std::string> listener;

    // Declaring filter.
    RemoteFilter<int32_t, std::string> filter;

    // Declaring continuous query.
    continuous::ContinuousQuery<int32_t, std::string> qry(MakeReference(listener), MakeReference(filter));
}
Note

In order to use remote filters, make sure the class definitions of the filters are available on the server nodes. You can do this in two ways:

Remote Transformer

By default, continuous queries send the whole updated object to the local listener. This can lead to excessive network usage, especially if the object is very large. Moreover, applications often need only a subset of fields of the object.

To address these cases, you can use a continuous query with a transformer. A transformer is a function that is executed on remote nodes for every updated object and sends back only the results of the transformation.

IgniteCache<Integer, Person> cache = ignite.getOrCreateCache("myCache");

// Create a new continuous query with a transformer.
ContinuousQueryWithTransformer<Integer, Person, String> qry = new ContinuousQueryWithTransformer<>();

// Factory to create transformers.
Factory factory = FactoryBuilder.factoryOf(
    // Return one field of a complex object.
    // Only this field will be sent over to the local listener.
    (IgniteClosure<CacheEntryEvent, String>)
        event -> ((Person)event.getValue()).getName()
);

qry.setRemoteTransformerFactory(factory);

// Listener that will receive transformed data.
qry.setLocalListener(names -> {
    for (String name : names)
        System.out.println("New person name: " + name);
});
This API is not presently available for C#/.NET.
This API is not presently available for C++.
Note

In order to use transformers, make sure the class definitions of the transformers are available on the server nodes. You can do this in two ways:

Events Delivery Guarantees

Continuous queries ensure the exactly-once semantic for the delivery of events to the clients' local listeners.

Both primary and backup nodes maintain an update queue that holds events that are processed by continuous queries on the server side but yet to be delivered to the clients. Suppose a primary node crashes or the cluster topology changes for any reason. In that case, every backup node flushes the content of its update queue to the client, making sure that every event is delivered to the client’s local listener.

Ignite manages a special per-partition update counter that helps to avoid duplicate notifications. Once an entry in some partition is updated, a counter for this partition is incremented on both primary and backup nodes. The value of this counter is also sent along with the event notification to the client. Thus, the client can skip already-processed events. Once the client confirms that an event is received, the primary and backup nodes remove the record for this event from their backup queues.

Examples

The following application examples show typical usage of continuous queries.