Ignite DataFrame
Overview
The Apache Spark DataFrame API introduced the concept of a schema to describe the data, allowing Spark to manage the schema and organize the data into a tabular format. To put it simply, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database and allows Spark to leverage the Catalyst query optimizer to produce much more efficient query execution plans in comparison to RDDs, which are just collections of elements partitioned across the nodes of the cluster.
Ignite expands DataFrame, simplifying development and improving data access times whenever Ignite is used as memory-centric storage for Spark. Benefits include:
-
Ability to share data and state across Spark jobs by writing and reading DataFrames to/from Ignite.
-
Faster SparkSQL queries by optimizing Spark query execution plans with Ignite SQL engine which include advanced indexing and avoid data movement across the network from Ignite to Spark.
Integration
IgniteRelationProvider
is an implementation of the Spark RelationProvider
and CreatableRelationProvider
interfaces. The IgniteRelationProvider
can talk directly to Ignite tables through the Spark SQL interface. The data are loaded and exchanged via IgniteSQLRelation
that executes filtering operations on the Ignite side. For now, grouping, joining or ordering operations are fulfilled on the Spark side. These operations will be optimized and processed on the Ignite side in upcoming releases. IgniteSQLRelation
utilizes the partitioned nature of Ignite’s architecture and provides partitioning information to Spark.
Spark Session
To use the Apache Spark DataFrame API, it is necessary to create an entry point for programming with Spark. This is achieved through the use of a SparkSession
object, as shown in the following example:
// Creating spark session.
SparkSession spark = SparkSession.builder()
.appName("Example Program")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate();
// Creating spark session.
implicit val spark = SparkSession.builder()
.appName("Example Program")
.master("local")
.config("spark.executor.instances", "2")
.getOrCreate()
Reading DataFrames
In order to read data from Ignite, you need to specify its format and the path to the Ignite configuration file. For example, assume an Ignite table named ‘person’ is created and deployed in Ignite, as follows:
CREATE TABLE person (
id LONG,
name VARCHAR,
city_id LONG,
PRIMARY KEY (id, city_id)
) WITH "backups=1, affinityKey=city_id”;
The following Spark code can find all the rows from the 'person' table where the name is ‘Mary Major’:
SparkSession spark = ...
String cfgPath = "path/to/config/file";
Dataset<Row> df = spark.read()
.format(IgniteDataFrameSettings.FORMAT_IGNITE()) //Data source
.option(IgniteDataFrameSettings.OPTION_TABLE(), "person") //Table to read.
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG) //Ignite config.
.load();
df.createOrReplaceTempView("person");
Dataset<Row> igniteDF = spark.sql(
"SELECT * FROM person WHERE name = 'Mary Major'");
val spark: SparkSession = …
val cfgPath: String = "path/to/config/file"
val df = spark.read
.format(FORMAT_IGNITE) // Data source type.
.option(OPTION_TABLE, "person") // Table to read.
.option(OPTION_CONFIG_FILE, cfgPath) // Ignite config.
.load()
df.createOrReplaceTempView("person")
val igniteDF = spark.sql("SELECT * FROM person WHERE name = 'Mary Major'")
Saving DataFrames
Note
|
Implementation notesInternally all inserts are done through |
Ignite can serve as a storage for DataFrames created or updated in Spark. The following save modes determine how a DataFrame is processed in Ignite:
-
Append
- the DataFrame will be appended to an existing table. SetOPTION_STREAMER_ALLOW_OVERWRITE=true
if you want to update existing entries with the data of the DataFrame. -
Overwrite
- the following steps will be executed: -
If the table already exists in Ignite, it will be dropped.
-
A new table will be created using the schema of the DataFrame and provided options.
-
DataFrame content will be inserted into the new table.
-
ErrorIfExists
(default) - an exception is thrown if the table already exists in Ignite. If a table does not exist: -
A new table will be created using the schema of the DataFrame and provided options.
-
DataFrame content will be inserted into the new table.
-
Ignore
- the operation is ignored if the table already exists in Ignite. If a table does not exist: -
A new table will be created using the schema of the DataFrame and provided options.
-
DataFrame content will be inserted into the new table.
Save mode can be specified using the mode(SaveMode mode)
method. For more information, please see the Spark Documentation). Here is a code example that shows this method:
SparkSession spark = ...
String cfgPath = "path/to/config/file";
Dataset<Row> jsonDataFrame = spark.read().json("path/to/file.json");
jsonDataFrame.write()
.format(IgniteDataFrameSettings.FORMAT_IGNITE())
.mode(SaveMode.Append) // SaveMode.
//... other options
.save();
val spark: SparkSession = …
val cfgPath: String = "path/to/config/file"
val jsonDataFrame = spark.read.json("path/to/file.json")
jsonDataFrame.write
.format(FORMAT_IGNITE)
.mode(SaveMode.Append) // SaveMode.
//... other options
.save()
You must define the following Ignite specific options if a new table will be created by a DataFrame’s save routines:
-
OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS
- a primary key is required for every Ignite table. This option has to contain a comma-separated list of fields/columns that represent a primary key. -
OPTION_CREATE_TABLE_PARAMETERS
- additional parameters to use upon Ignite table creation. The parameters are those that are supported by the CREATE TABLE command.
The following example shows how to write the content of a JSON file into Ignite:
SparkSession spark = ...
String cfgPath = "path/to/config/file";
Dataset<Row> jsonDataFrame = spark.read().json("path/to/file.json");
jsonDataFrame.write()
.format(IgniteDataFrameSettings.FORMAT_IGNITE())
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), TEST_CONFIG_FILE)
.option(IgniteDataFrameSettings.OPTION_TABLE(), "json_table")
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id")
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated")
.save();
val spark: SparkSession = …
val cfgPath: String = "path/to/config/file"
val jsonDataFrame = spark.read.json("path/to/file.json")
jsonDataFrame.write
.format(FORMAT_IGNITE)
.option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
.option(OPTION_TABLE, "json_table")
.option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
.option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
.save()
IgniteSparkSession and IgniteExternalCatalog
Spark introduces the entity called catalog
to read and store meta-information about known data sources, such as tables and views. Ignite provides its own implementation of this catalog, called IgniteExternalCatalog
.
IgniteExternalCatalog
can read information about all existing SQL tables deployed in the Ignite cluster. IgniteExternalCatalog
is also required to build an IgniteSparkSession
object.
IgniteSparkSession
is an extension of the regular SparkSession
that stores IgniteContext
and injects the IgniteExternalCatalog
instance into Spark objects.
IgniteSparkSession.builder()
must be used to create IgniteSparkSession
. For example, if the following two tables are created in Ignite:
CREATE TABLE city (
id LONG PRIMARY KEY,
name VARCHAR
) WITH "template=replicated";
CREATE TABLE person (
id LONG,
name VARCHAR,
city_id LONG,
PRIMARY KEY (id, city_id)
) WITH "backups=1, affinityKey=city_id";
Then executing the following code provides table meta-information:
// Using SparkBuilder provided by Ignite.
IgniteSparkSession igniteSession = IgniteSparkSession.builder()
.appName("Spark Ignite catalog example")
.master("local")
.config("spark.executor.instances", "2")
//Only additional option to refer to Ignite cluster.
.igniteConfig("/path/to/ignite/config.xml")
.getOrCreate();
// This will print out info about all SQL tables existed in Ignite.
igniteSession.catalog().listTables().show();
// This will print out schema of PERSON table.
igniteSession.catalog().listColumns("person").show();
// This will print out schema of CITY table.
igniteSession.catalog().listColumns("city").show();
// Using SparkBuilder provided by Ignite.
val igniteSession = IgniteSparkSession.builder()
.appName("Spark Ignite catalog example")
.master("local")
.config("spark.executor.instances", "2")
//Only additional option to refer to Ignite cluster.
.igniteConfig("/path/to/ignite/config.xml")
.getOrCreate()
// This will print out info about all SQL tables existed in Ignite.
igniteSession.catalog.listTables().show()
// This will print out schema of PERSON table.
igniteSession.catalog.listColumns("person").show()
// This will print out schema of CITY table.
igniteSession.catalog.listColumns("city").show()
And the code output should be similar to the following:
+------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------+--------+-----------+---------+-----------+
| CITY| | null| EXTERNAL| false|
|PERSON| | null| EXTERNAL| false|
+------+--------+-----------+---------+-----------+
PERSON table description:
+-------+-----------+--------+--------+-----------+--------+
| name|description|dataType|nullable|isPartition|isBucket|
+-------+-----------+--------+--------+-----------+--------+
| NAME| null| string| true| false| false|
| ID| null| bigint| false| true| false|
|CITY_ID| null| bigint| false| true| false|
+-------+-----------+--------+--------+-----------+--------+
CITY table description:
+----+-----------+--------+--------+-----------+--------+
|name|description|dataType|nullable|isPartition|isBucket|
+----+-----------+--------+--------+-----------+--------+
|NAME| null| string| true| false| false|
| ID| null| bigint| false| true| false|
+----+-----------+--------+--------+-----------+--------+
Ignite DataFrame Options
Name | Description |
---|---|
|
Name of the Ignite Data Source |
|
Path to the config file |
|
Table name |
|
Additional parameters for a newly created table. The value of this option is used for the |
|
Comma separated list of primary key fields. |
|
If |
|
Automatic flush frequency. This is the time after which the streamer will make an attempt to submit all data added so far to remote nodes See Data Streaming |
|
Per node buffer size. See also. The size of the per node key-value pairs buffer. |
|
Per node buffer size. The maximum number of parallel stream operations for a single node. |
|
The Ignite SQL schema name in which the specified table exists. When OPTION_SCHEMA is not specified, all schemas will be scanned to find a table with a matching name. This option can be used to differentiate two tables of the same name in different Ignite SQL schemas. When creating new tables, |
Examples
There are several examples available on GitHub that demonstrate how to use Spark DataFrames with Ignite:
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.