JMS Streamer
Overview
Ignite offers a JMS Data Streamer to consume messages from JMS brokers, convert them into cache tuples and insert them in Ignite.
This data streamer supports the following features:
-
Consumes from queues or topics.
-
Supports durable subscriptions from topics.
-
Concurrent consumers are supported via the
threads
parameter.-
When consuming from queues, this component will start as many
Session
objects with separateMessageListener
instances each, therefore achieving natural concurrency. -
When consuming from topics, obviously we cannot start multiple threads as that would lead us to consume duplicate messages. Therefore, we achieve concurrency in a virtualized manner through an internal thread pool.
-
-
Transacted sessions are supported through the
transacted
parameter. -
Batched consumption is possible via the
batched
parameter, which groups message reception within the scope of a local JMS transaction (XA not used supported). Depending on the broker, this technique can provide a higher throughput as it decreases the amount of message acknowledgment round trips that are necessary, albeit at the expense possible duplicate messages (especially if an incident occurs in the middle of a transaction).-
Batches are committed when the
batchClosureMillis
time has elapsed, or when a Session has received at leastbatchClosureSize
messages. -
Time-based closure fires with the specified frequency and applies to all
Session
s in parallel. -
Size-based closure applies to each individual
Session
(as transactions areSession-bound
in JMS), so it will fire when thatSession
has processed that many messages. -
Both options are compatible with each other. You can disable either, but not both if batching is enabled.
-
-
Supports specifying the destination with implementation-specific
Destination
objects or with names.
We have tested our implementation against Apache ActiveMQ, but any JMS broker is supported as long as it client library implements the JMS 1.1 specification.
Instantiating JMS Streamer
When you instantiate the JMS Streamer, you will need to concretize the following generic types:
-
T extends Message
=> the type of JMSMessage
this streamer will receive. If it can receive multiple, use the genericMessage
type. -
K
=> the type of the cache key. -
V
=> the type of the cache value.
To configure the JMS streamer, you will need to provide the following compulsory properties:
-
connectionFactory
=> an instance of yourConnectionFactory
duly configured as required by the broker. It can be a pooledConnectionFactory
. -
destination
or (destinationName
anddestinationType
) => aDestination
object (normally a broker-specific implementation of the JMSQueue
orTopic
interfaces), or the combination of a destination name (queue or topic name) and the type as aClass
reference to eitherQueue
orTopic
. In the latter case, the streamer will use eitherSession.createQueue(String)
orSession.createTopic(String)
to get a hold of the destination. -
transformer
=> an implementation ofMessageTransformer<T, K, V>
that digests a JMS message of typeT
and produces aMap<K, V>
of cache entries to add. It can also returnnull
or an emptyMap
to ignore the incoming message.
Example
The example in this section populates a cache with String
keys and String
values, consuming TextMessages
with this format:
raulk,Raul Kripalani dsetrakyan,Dmitriy Setrakyan sv,Sergi Vladykin gm,Gianfranco Murador
Here is the code:
// create a data streamer
IgniteDataStreamer<String, String> dataStreamer = ignite.dataStreamer("mycache"));
dataStreamer.allowOverwrite(true);
// create a JMS streamer and plug the data streamer into it
JmsStreamer<TextMessage, String, String> jmsStreamer = new JmsStreamer<>();
jmsStreamer.setIgnite(ignite);
jmsStreamer.setStreamer(dataStreamer);
jmsStreamer.setConnectionFactory(connectionFactory);
jmsStreamer.setDestination(destination);
jmsStreamer.setTransacted(true);
jmsStreamer.setTransformer(new MessageTransformer<TextMessage, String, String>() {
@Override
public Map<String, String> apply(TextMessage message) {
final Map<String, String> answer = new HashMap<>();
String text;
try {
text = message.getText();
}
catch (JMSException e) {
LOG.warn("Could not parse message.", e);
return Collections.emptyMap();
}
for (String s : text.split("\n")) {
String[] tokens = s.split(",");
answer.put(tokens[0], tokens[1]);
}
return answer;
}
});
jmsStreamer.start();
// on application shutdown
jmsStreamer.stop();
dataStreamer.close();
To use this component, you have to import the following module through your build system (Maven, Ivy, Gradle, sbt, etc.):
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-jms11-ext</artifactId>
<version>${ignite-jms11-ext.version}</version>
</dependency>
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.