JMS Streamer | Ignite Documentation

Ignite Summit 2024 — Call For Speakers Now Open — Learn more

Edit

JMS Streamer

Overview

Ignite offers a JMS Data Streamer to consume messages from JMS brokers, convert them into cache tuples and insert them in Ignite.

This data streamer supports the following features:

  • Consumes from queues or topics.

  • Supports durable subscriptions from topics.

  • Concurrent consumers are supported via the threads parameter.

    • When consuming from queues, this component will start as many Session objects with separate MessageListener instances each, therefore achieving natural concurrency.

    • When consuming from topics, obviously we cannot start multiple threads as that would lead us to consume duplicate messages. Therefore, we achieve concurrency in a virtualized manner through an internal thread pool.

  • Transacted sessions are supported through the transacted parameter.

  • Batched consumption is possible via the batched parameter, which groups message reception within the scope of a local JMS transaction (XA not used supported). Depending on the broker, this technique can provide a higher throughput as it decreases the amount of message acknowledgment​ round trips that are necessary, albeit at the expense possible duplicate messages (especially if an incident occurs in the middle of a transaction).

    • Batches are committed when the batchClosureMillis time has elapsed, or when a Session has received at least batchClosureSize messages.

    • Time-based closure fires with the specified frequency and applies to all Sessions in parallel.

    • Size-based closure applies to each individual Session (as transactions are Session-bound in JMS), so it will fire when that Session has processed that many messages.

    • Both options are compatible with each other. You can disable either, but not both if batching is enabled.

  • Supports specifying the destination with implementation-specific Destination objects or with names.

We have tested our implementation against Apache ActiveMQ, but any JMS broker is supported as long as it client library implements the JMS 1.1 specification.

Instantiating JMS Streamer

When you instantiate the JMS Streamer, you will need to concretize​ the following generic types:

  • T extends Message => the type of JMS Message this streamer will receive. If it can receive multiple, use the generic Message type.

  • K => the type of the cache key.

  • V => the type of the cache value.

To configure the JMS streamer, you will need to provide the following compulsory properties:

  • connectionFactory => an instance of your ConnectionFactory duly configured as required by the broker. It can be a pooled ConnectionFactory.

  • destination or (destinationName and destinationType) => a Destination object (normally a broker-specific implementation of the JMS Queue or Topic interfaces), or the combination of a destination name (queue or topic name) and the type as a Class reference to either Queue or Topic. In the latter case, the streamer will use either Session.createQueue(String) or Session.createTopic(String) to get a hold of the destination.

  • transformer => an implementation of MessageTransformer<T, K, V> that digests a JMS message of type T and produces a Map<K, V> of cache entries to add. It can also return null or an empty Map to ignore the incoming message.

Example

The example in this section populates a cache with String keys and String values, consuming TextMessages with this format:

raulk,Raul Kripalani
dsetrakyan,Dmitriy Setrakyan
sv,Sergi Vladykin
gm,Gianfranco Murador

Here is the code:

// create a data streamer
IgniteDataStreamer<String, String> dataStreamer = ignite.dataStreamer("mycache"));
dataStreamer.allowOverwrite(true);

// create a JMS streamer and plug the data streamer into it
JmsStreamer<TextMessage, String, String> jmsStreamer = new JmsStreamer<>();
jmsStreamer.setIgnite(ignite);
jmsStreamer.setStreamer(dataStreamer);
jmsStreamer.setConnectionFactory(connectionFactory);
jmsStreamer.setDestination(destination);
jmsStreamer.setTransacted(true);
jmsStreamer.setTransformer(new MessageTransformer<TextMessage, String, String>() {
    @Override
    public Map<String, String> apply(TextMessage message) {
        final Map<String, String> answer = new HashMap<>();
        String text;
        try {
            text = message.getText();
        }
        catch (JMSException e) {
            LOG.warn("Could not parse message.", e);
            return Collections.emptyMap();
        }
        for (String s : text.split("\n")) {
            String[] tokens = s.split(",");
            answer.put(tokens[0], tokens[1]);
        }
        return answer;
    }
});

jmsStreamer.start();

// on application shutdown
jmsStreamer.stop();
dataStreamer.close();

To use this component, you have to import the following module through your build system (Maven, Ivy, Gradle, sbt, etc.):

<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-jms11-ext</artifactId>
    <version>${ignite-jms11-ext.version}</version>
</dependency>