Apache Kafka Streamer
Overview
Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache. Either of the following two methods can be used to achieve such streaming:
-
using Kafka Connect functionality with Ignite sink
-
importing the Kafka Streamer module in your Maven project and instantiating KafkaStreamer for data streaming
Streaming Data via Kafka Connect
IgniteSinkConnector
will help you export data from Kafka to Ignite cache by polling data from Kafka topics and writing
it to your specified cache. The connector can be found in the optional/ignite-kafka
module. It and its dependencies
have to be on the classpath of a Kafka running instance, as described in the following subsection. For more information
on Kafka Connect, see Kafka Documentation.
Setting up and Running
-
Add the
IGNITE_HOME/libs/optional/ignite-kafka
module to the application classpath. -
Prepare worker configurations, e.g.,
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.storage.StringConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000
-
Prepare connector configurations, e.g.,
# connector name=my-ignite-connector connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector tasks.max=2 topics=someTopic1,someTopic2 # cache cacheName=myCache cacheAllowOverwrite=true igniteCfg=/some-path/ignite.xml singleTupleExtractorCls=my.company.MyExtractor
-
where
cacheName
is the name of the cache you specify in/some-path/ignite.xml
and the data fromsomeTopic1,someTopic2
will be pulled and stored. -
cacheAllowOverwrite
can be set totrue
if you want to enable overwriting existing values in the cache. -
If you need to parse the incoming data and decide on your new key and value, you can implement it as
StreamSingleTupleExtractor
and specify assingleTupleExtractorCls
. -
You can also set
cachePerNodeDataSize
andcachePerNodeParOps
to adjust per-node buffer and the maximum number of parallel stream operations for a single node.
-
-
Start connector, for instance, in a standalone mode as follows,
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
Checking the Flow
To perform a very basic functionality check, you can do the following,
-
Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
-
Start Kafka server
bin/kafka-server-start.sh config/server.properties
-
Provide some data input to the Kafka server
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,k1,v1
-
Start the connector
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
-
Check the value is in the cache. For example, via REST API,
http://node1:8080/ignite?cmd=size&cacheName=cache1
Streaming data with Ignite Kafka Streamer Module
If you are using Maven to manage dependencies of your project, first of all you will have to add Kafka Streamer module
dependency like this (replace ${ignite-kafka-ext.version}
with actual Ignite Kafka Extension version you are interested in):
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
...
<dependencies>
...
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-kafka-ext</artifactId>
<version>${ignite-kafka-ext.version}</version>
</dependency>
...
</dependencies>
...
</project>
Having a cache with String
keys and String
values, the streamer can be started as follows
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache"));
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic(someKafkaTopic);
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(4);
// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);
// set extractor
kafkaStreamer.setSingleTupleExtractor(strExtractor);
kafkaStreamer.start();
...
// stop on shutdown
kafkaStreamer.stop();
strm.close();
For the detailed information on Kafka consumer properties, refer http://kafka.apache.org/documentation.html
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.