Annotation Type IgniteAsyncCallback


  • @Retention(RUNTIME)
    @Target(TYPE)
    public @interface IgniteAsyncCallback
    If callback has this annotation then it will be executing in another thread.

    Currently this annotation is supported for:

    1. ContinuousQuery - CacheEntryUpdatedListener and CacheEntryEventFilter.

    For example, if filter or CacheEntryListener has the annotation then callbacks will be executing to asyncCallback thread pool. It allows to use cache API in a callbacks. This thread pool can be configured by IgniteConfiguration.setAsyncCallbackPoolSize(int).

    Example

    As an example, suppose we have cache with 'Person' objects and we need to query all persons with salary above then 1000. Also remote filter will update some entries.

    Here is the Person class:

     public class Person {
         // Name.
         private String name;
    
         // Salary.
         private double salary;
    
         ...
     }
     

    Here is the ExampleCacheEntryFilter class:

     @IgniteAsyncCallback
     public class ExampleCacheEntryFilter implements CacheEntryEventFilter<Integer, Person> {
         @IgniteInstanceResource
         private Ignite ignite;
    
         // Continuous listener will be notified for persons with salary above 1000.
         // Filter increases salary for some person on 100. Without @IgniteAsyncCallback annotation
         // this operation is not safe.
         public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> evt) throws CacheEntryListenerException {
             Person p = evt.getValue();
    
             if (p.getSalary() > 1000)
                 return true;
    
             ignite.cache("Person").put(evt.getKey(), new Person(p.getName(), p.getSalary() + 100));
    
             return false;
         }
     }
     

    Query with asynchronous callback execute as usually:

     // Create new continuous query.
     ContinuousQuery<Long, Person> qry = new ContinuousQuery<>();
    
     // Callback that is called locally when update notifications are received.
     // It simply prints out information about all created persons.
     qry.setLocalListener((evts) -> {
         for (CacheEntryEvent<? extends Long, ? extends Person> e : evts) {
             Person p = e.getValue();
    
             System.out.println(p.getFirstName() + " " + p.getLastName() + "'s salary is " + p.getSalary());
         }
     });
    
     // Sets remote filter.
     qry.setRemoteFilterFactory(() -> new ExampleCacheEntryFilter());
    
     // Execute query.
     QueryCursor<Cache.Entry<Long, Person>> cur = cache.query(qry);
     
    See Also:
    IgniteConfiguration.getAsyncCallbackPoolSize(), AbstractContinuousQuery.getRemoteFilterFactory(), ContinuousQuery.getLocalListener()