Class ContinuousQuery<K,V>
- java.lang.Object
-
- org.apache.ignite.cache.query.Query<javax.cache.Cache.Entry<K,V>>
-
- org.apache.ignite.cache.query.AbstractContinuousQuery<K,V>
-
- org.apache.ignite.cache.query.ContinuousQuery<K,V>
-
- All Implemented Interfaces:
Serializable
public final class ContinuousQuery<K,V> extends AbstractContinuousQuery<K,V>
API for configuring continuous cache queries.Continuous queries allow registering a remote filter and a local listener for cache updates. If an update event passes the filter, it will be sent to the node that executed the query, and local listener will be notified.
Additionally, you can execute an initial query to get currently existing data. Query can be of any type (SQL, TEXT or SCAN) and can be set via
setInitialQuery(Query)
method.Query can be executed either on all nodes in topology using
IgniteCache.query(Query)
method, or only on the local node, ifQuery.setLocal(boolean)
parameter is set totrue
. Note that if the query is distributed and a new node joins, it will get the remote filter for the query during discovery process before it actually joins a topology, so no updates will be missed.Example
As an example, suppose we have a cache with'Person'
objects and we need to query for all people with salary above 1000.Here is the
Person
class:public class Person { // Name. private String name; // Salary. private double salary; ... }
You can create and execute a continuous query like so:
// Create a new continuous query. ContinuousQuery<Long, Person> qry = new ContinuousQuery<>(); // Initial iteration query will return all people with salary above 1000. qry.setInitialQuery(new ScanQuery<>((id, p) -> p.getSalary() > 1000)); // Callback that is called locally when update notifications are received. // It simply prints out information about all created or modified records. qry.setLocalListener((evts) -> { for (CacheEntryEvent<? extends Long, ? extends Person> e : evts) { Person p = e.getValue(); System.out.println(p.getFirstName() + " " + p.getLastName() + "'s salary is " + p.getSalary()); } }); // The continuous listener will be notified for people with salary above 1000. qry.setRemoteFilter(evt -> evt.getValue().getSalary() > 1000); // Execute the query and get a cursor that iterates through the initial data. QueryCursor<Cache.Entry<Long, Person>> cur = cache.query(qry);
This will execute query on all nodes that have the cache you are working with and listener will start receiving notifications for cache updates.To stop receiving updates call
QueryCursor.close()
method:cur.close();
Note that this works even if you didn't provide the initial query. Cursor will be empty in this case, but it will still unregister listeners whenQueryCursor.close()
is called.IgniteAsyncCallback
annotation is supported forCacheEntryEventFilter
(seeAbstractContinuousQuery.setRemoteFilterFactory(Factory)
) andCacheEntryUpdatedListener
(seesetLocalListener(CacheEntryUpdatedListener)
). If a filter and/or listener are annotated withIgniteAsyncCallback
then the annotated callback is executed in an async callback pool (seeIgniteConfiguration.getAsyncCallbackPoolSize()
) and a notification order is kept the same as an update order for a given cache key.
-
-
Field Summary
-
Fields inherited from class org.apache.ignite.cache.query.AbstractContinuousQuery
DFLT_AUTO_UNSUBSCRIBE, DFLT_PAGE_SIZE, DFLT_TIME_INTERVAL
-
-
Constructor Summary
Constructors Constructor Description ContinuousQuery()
Creates new continuous query.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description javax.cache.event.CacheEntryUpdatedListener<K,V>
getLocalListener()
CacheEntryEventSerializableFilter<K,V>
getRemoteFilter()
Gets remote filter.ContinuousQuery<K,V>
setAutoUnsubscribe(boolean autoUnsubscribe)
Sets automatic unsubscribe flag.ContinuousQuery<K,V>
setInitialQuery(Query<javax.cache.Cache.Entry<K,V>> initQry)
Sets initial query.ContinuousQuery<K,V>
setLocal(boolean loc)
Sets whether this query should be executed on a local node only.ContinuousQuery<K,V>
setLocalListener(javax.cache.event.CacheEntryUpdatedListener<K,V> locLsnr)
Sets a local callback.ContinuousQuery<K,V>
setPageSize(int pageSize)
Sets optional page size, if0
, then default is used.ContinuousQuery<K,V>
setRemoteFilter(CacheEntryEventSerializableFilter<K,V> rmtFilter)
Deprecated.ContinuousQuery<K,V>
setTimeInterval(long timeInterval)
Sets time interval.-
Methods inherited from class org.apache.ignite.cache.query.AbstractContinuousQuery
getInitialQuery, getRemoteFilterFactory, getTimeInterval, isAutoUnsubscribe, isIncludeExpired, setIncludeExpired, setRemoteFilterFactory
-
Methods inherited from class org.apache.ignite.cache.query.Query
getPageSize, isLocal, prepare, toString
-
-
-
-
Method Detail
-
setInitialQuery
public ContinuousQuery<K,V> setInitialQuery(Query<javax.cache.Cache.Entry<K,V>> initQry)
Sets initial query.This query will be executed before continuous listener is registered which allows to iterate through entries which already existed at the time continuous query is executed.
- Overrides:
setInitialQuery
in classAbstractContinuousQuery<K,V>
- Parameters:
initQry
- Initial query.- Returns:
this
for chaining.
-
setLocalListener
public ContinuousQuery<K,V> setLocalListener(javax.cache.event.CacheEntryUpdatedListener<K,V> locLsnr)
Sets a local callback. This callback is called only on local node when new updates are received.The callback predicate accepts ID of the node from where updates are received and a collection of the received entries. Note that for removed entries values will be
null
.If the predicate returns
false
, query execution will be cancelled.WARNING: all operations that involve any kind of JVM-local or distributed locking (e.g., synchronization or transactional cache operations), should be executed asynchronously without blocking the thread that called the callback. Otherwise, you can get deadlocks.
If local listener are annotated with
IgniteAsyncCallback
then it is executed in an async callback pool (seeIgniteConfiguration.getAsyncCallbackPoolSize()
) that allow to perform a cache operations.- Parameters:
locLsnr
- Local callback.- Returns:
this
for chaining.- See Also:
IgniteAsyncCallback
,IgniteConfiguration.getAsyncCallbackPoolSize()
,ContinuousQueryWithTransformer.setLocalListener(EventListener)
-
getLocalListener
public javax.cache.event.CacheEntryUpdatedListener<K,V> getLocalListener()
- Returns:
- Local listener.
-
setRemoteFilter
@Deprecated public ContinuousQuery<K,V> setRemoteFilter(CacheEntryEventSerializableFilter<K,V> rmtFilter)
Deprecated.Sets optional key-value filter. This filter is called before entry is sent to the master node.WARNING: all operations that involve any kind of JVM-local or distributed locking (e.g., synchronization or transactional cache operations), should be executed asynchronously without blocking the thread that called the filter. Otherwise, you can get deadlocks.
If remote filter are annotated with
IgniteAsyncCallback
then it is executed in async callback pool (seeIgniteConfiguration.getAsyncCallbackPoolSize()
) that allow to perform a cache operations.- Parameters:
rmtFilter
- Key-value filter.- Returns:
this
for chaining.- See Also:
IgniteAsyncCallback
,IgniteConfiguration.getAsyncCallbackPoolSize()
-
getRemoteFilter
public CacheEntryEventSerializableFilter<K,V> getRemoteFilter()
Gets remote filter.- Returns:
- Remote filter.
-
setTimeInterval
public ContinuousQuery<K,V> setTimeInterval(long timeInterval)
Sets time interval.When a cache update happens, entry is first put into a buffer. Entries from buffer will be sent to the master node only if the buffer is full (its size can be provided via
Query.setPageSize(int)
method) or time provided via this method is exceeded.Default time interval is
0
which means that time check is disabled and entries will be sent only when buffer is full.- Overrides:
setTimeInterval
in classAbstractContinuousQuery<K,V>
- Parameters:
timeInterval
- Time interval.- Returns:
this
for chaining.
-
setAutoUnsubscribe
public ContinuousQuery<K,V> setAutoUnsubscribe(boolean autoUnsubscribe)
Sets automatic unsubscribe flag.This flag indicates that query filters on remote nodes should be automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is
false
, filters will be unregistered only when the query is cancelled from master node, and won't ever be unregistered if master node leaves grid.Default value for this flag is
true
.- Overrides:
setAutoUnsubscribe
in classAbstractContinuousQuery<K,V>
- Parameters:
autoUnsubscribe
- Automatic unsubscription flag.- Returns:
this
for chaining.
-
setPageSize
public ContinuousQuery<K,V> setPageSize(int pageSize)
Sets optional page size, if0
, then default is used.- Overrides:
setPageSize
in classQuery<javax.cache.Cache.Entry<K,V>>
- Parameters:
pageSize
- Optional page size.- Returns:
this
for chaining.
-
setLocal
public ContinuousQuery<K,V> setLocal(boolean loc)
Sets whether this query should be executed on a local node only. Note: backup event queues are not kept for local continuous queries. It may lead to loss of notifications in case of node failures. UseAbstractContinuousQuery.setRemoteFilterFactory(Factory)
to register cache event listeners on all cache nodes, if delivery guarantee is required.
-
-