Working with Events | Ignite Documentation

Ignite Summit 2024 — Call For Speakers Now Open — Learn more

Edit

Working with Events

Overview

Ignite can generate events for a variety of operations happening in the cluster and notify your application about those operations. There are many types of events, including cache events, node discovery events, distributed task execution events, and many more.

The list of events is available in the Events section.

Enabling Events

By default, events are disabled, and you have to enable each event type explicitly if you want to use it in your application. To enable specific event types, list them in the includeEventTypes property of IgniteConfiguration as shown below:

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="         http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/util
    http://www.springframework.org/schema/util/spring-util.xsd">

    <bean class="org.apache.ignite.configuration.IgniteConfiguration">

        <property name="includeEventTypes">
            <list>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_NODE_LEFT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_NODE_JOINED"/>
            </list>
        </property>
    </bean>

</beans>
IgniteConfiguration cfg = new IgniteConfiguration();

// Enable cache events.
cfg.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_READ,
        EventType.EVT_CACHE_OBJECT_REMOVED, EventType.EVT_NODE_JOINED, EventType.EVT_NODE_LEFT);

// Start the node.
Ignite ignite = Ignition.start(cfg);
var cfg = new IgniteConfiguration
{
    IncludedEventTypes = new[]
    {
        EventType.CacheObjectPut,
        EventType.CacheObjectRead,
        EventType.CacheObjectRemoved,
        EventType.NodeJoined,
        EventType.NodeLeft
    }
};
var ignite = Ignition.Start(cfg);
This API is not presently available for C++. You can use XML configuration.

Getting the Events Interface

The events functionality is available through the events interface, which provides methods for listening to cluster events. The events interface can be obtained from an instance of Ignite as follows:

IgniteEvents events = ignite.events();
var ignite = Ignition.GetIgnite();
var events = ignite.GetEvents();
This API is not presently available for C++.

The events interface can be associated with a set of nodes. This means that you can access events that happen on a given set of nodes. In the following example, the events interface is obtained for the set of nodes that host the data for the Person cache.

Ignite ignite = Ignition.ignite();

IgniteEvents events = ignite.events(ignite.cluster().forCacheNodes("person"));
var ignite = Ignition.GetIgnite();
var events = ignite.GetCluster().ForCacheNodes("person").GetEvents();
This API is not presently available for C++.

Listening to Events

You can listen to either local or remote events. Local events are events that are generated on the node where the listener is registered. Remote events are events that happen on other nodes.

Note that some events may be fired on multiple nodes even if the corresponding real-world event happens only once. For example, when a node leaves the cluster, the EVT_NODE_LEFT event is generated on every remaining node.

Another example is when you put an object into a cache. In this case, the EVT_CACHE_OBJECT_PUT event occurs on the node that hosts the primary partition into which the object is actually written, which may be different from the node where the put(…​) method is called. In addition, the event is fired on all nodes that hold the backup partitions for the cache if they are configured.

The events interface provides methods for listening to local events only, and for listening to both local and remote events.

Listening to Local Events

To listen to local events, use the localListen(listener, eventTypes…​) method, as shown below. The method accepts an event listener that is called every time an event of the given type occurs on the local node.

To unregister the local listener, return false in its functional method.

IgniteEvents events = ignite.events();

// Local listener that listens to local events.
IgnitePredicate<CacheEvent> localListener = evt -> {
    System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() + ", oldVal=" + evt.oldValue()
            + ", newVal=" + evt.newValue());

    return true; // Continue listening.
};

// Subscribe to the cache events that are triggered on the local node.
events.localListen(localListener, EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_READ,
        EventType.EVT_CACHE_OBJECT_REMOVED);

The event listener is an object of the IgnitePredicate<T> class with a type argument that matches the type of events the listener is going to process. For example, cache events (EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_READ, etc.) correspond to the CacheEvent class, discovery events (EVT_NODE_LEFT, EVT_NODE_JOINED, etc.) correspond to the DiscoveryEvent class, and so on. If you want to listen to events of different types, you can use the generic Event interface:

IgnitePredicate<Event> localListener = evt -> {
    // process the event
    return true;
};
class LocalListener : IEventListener<CacheEvent>
{
    public bool Invoke(CacheEvent evt)
    {
        Console.WriteLine("Received event [evt=" + evt.Name + ", key=" + evt.Key + ", oldVal=" + evt.OldValue
                          + ", newVal=" + evt.NewValue);
        return true;
    }
}

public static void LocalListenDemo()
{
    var cfg = new IgniteConfiguration
    {
        IncludedEventTypes = new[]
        {
            EventType.CacheObjectPut,
            EventType.CacheObjectRead,
            EventType.CacheObjectRemoved,
        }
    };
    var ignite = Ignition.Start(cfg);
    var events = ignite.GetEvents();
    events.LocalListen(new LocalListener(), EventType.CacheObjectPut, EventType.CacheObjectRead,
        EventType.CacheObjectRemoved);

    var cache = ignite.GetOrCreateCache<int, int>("myCache");
    cache.Put(1, 1);
    cache.Put(2, 2);
}
This API is not presently available for C++.

Listening to Remote Events

The IgniteEvents.remoteListen(localListener, filter, types) method can be used to register a listener that listens for both remote and local events. It accepts a local listener, a filter, and a list of event types you want to listen to.

The filter is deployed to all the nodes associated with the events interface, including the local node. The events that pass the filter are sent to the local listener.

The method returns a unique identifier that can be used to unregister the listener and filters. To do this, call IgniteEvents.stopRemoteListen(uuid). Another way to unregister the listener is to return false in the apply() method.

IgniteEvents events = ignite.events();

IgnitePredicate<CacheEvent> filter = evt -> {
    System.out.println("remote event: " + evt.name());
    return true;
};

// Subscribe to the cache events on all nodes where the cache is hosted.
UUID uuid = events.remoteListen(new IgniteBiPredicate<UUID, CacheEvent>() {

    @Override
    public boolean apply(UUID uuid, CacheEvent e) {

        // process the event

        return true; //continue listening
    }
}, filter, EventType.EVT_CACHE_OBJECT_PUT);
This API is not presently available for C#/.NET.
This API is not presently available for C++.

Batching Events

Each activity in a cache can result in an event notification being generated and sent. For systems with high cache activity, getting notified for every event could be network intensive, possibly leading to a decreased performance of cache operations.

Event notifications can be grouped together and sent in batches or timely intervals to mitigate the impact on performance. Here is an example of how this can be done:

Ignite ignite = Ignition.ignite();

// Get an instance of the cache.
final IgniteCache<Integer, String> cache = ignite.cache("cacheName");

// Sample remote filter which only accepts events for the keys
// that are greater than or equal to 10.
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
    @Override
    public boolean apply(CacheEvent evt) {
        System.out.println("Cache event: " + evt);

        int key = evt.key();

        return key >= 10;
    }
};

// Subscribe to the cache events that are triggered on all nodes
// that host the cache.
// Send notifications in batches of 10.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(10 /* batch size */,
        0 /* time intervals */, false, null, rmtLsnr, EventType.EVTS_CACHE);

// Generate cache events.
for (int i = 0; i < 20; i++)
    cache.put(i, Integer.toString(i));
This API is not presently available for C#/.NET.
This API is not presently available for C++.

Storing and Querying Events

You can configure an event storage that will keep events on the nodes where they occur. You can then query events in your application.

The event storage can be configured to keep events for a specific period, keep only the most recent events, or keep the events that satisfy a specific filter. See the MemoryEventStorageSpi javadoc for details.

Below is an example of event storage configuration:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">

    <property name="eventStorageSpi" >
        <bean class="org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi">
            <property name="expireAgeMs" value="600000"/>
        </bean>
    </property>

</bean>
MemoryEventStorageSpi eventStorageSpi = new MemoryEventStorageSpi();
eventStorageSpi.setExpireAgeMs(600000);

IgniteConfiguration igniteCfg = new IgniteConfiguration();
igniteCfg.setEventStorageSpi(eventStorageSpi);

Ignite ignite = Ignition.start(igniteCfg);
var cfg = new IgniteConfiguration
{
    EventStorageSpi = new MemoryEventStorageSpi()
    {
        ExpirationTimeout = TimeSpan.FromMilliseconds(600000)
    },
    IncludedEventTypes = new[]
    {
        EventType.CacheObjectPut,
        EventType.CacheObjectRead,
        EventType.CacheObjectRemoved,
    }
};
var ignite = Ignition.Start(cfg);
This API is not presently available for C++. You can use XML configuration.

Querying Local Events

The following example shows how you can query local EVT_CACHE_OBJECT_PUT events stored in the event storage.

Collection<CacheEvent> cacheEvents = events.localQuery(e -> {
    // process the event
    return true;
}, EventType.EVT_CACHE_OBJECT_PUT);
var events = ignite.GetEvents();
var cacheEvents = events.LocalQuery(EventType.CacheObjectPut);
This API is not presently available for C++.

Querying Remote Events

Here is an example of querying remote events:

Collection<CacheEvent> storedEvents = events.remoteQuery(e -> {
    // process the event
    return true;
}, 0, EventType.EVT_CACHE_OBJECT_PUT);
class EventFilter : IEventFilter<CacheEvent>
{
    public bool Invoke(CacheEvent evt)
    {
        return true;
    }
}
// ....


    var events = ignite.GetEvents();
    var storedEvents = events.RemoteQuery(new EventFilter(), null, EventType.CacheObjectPut);
This API is not presently available for C++.