GridGain Developers Hub

GridGain DataFrame

Overview

The Apache Spark DataFrame API introduced the concept of a schema to describe the data, allowing Spark to manage the schema and organize the data into a tabular format. To put it simply, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database and allows Spark to leverage the Catalyst query optimizer to produce much more efficient query execution plans in comparison to RDDs, which are just collections of elements partitioned across the nodes of the cluster.

GridGain expands DataFrame, simplifying development and improving data access times whenever GridGain is used as memory-centric storage for Spark. Benefits include:

  • Ability to share data and state across Spark jobs by writing and reading DataFrames to/from Ignite.

  • Faster SparkSQL queries by optimizing Spark query execution plans with Ignite SQL engine which include​ advanced indexing and avoid data movement across the network from Ignite to Spark.

Integration

IgniteRelationProvider is an implementation of the Spark RelationProvider and CreatableRelationProvider interfaces. The IgniteRelationProvider can talk directly to Ignite tables through the Spark SQL interface. The data are loaded and exchanged via IgniteSQLRelation that executes filtering operations on the Ignite side. For now, grouping, joining or ordering operations are fulfilled on the Spark side. These operations will be optimized and processed on the GridGain side in upcoming releases. IgniteSQLRelation utilizes the partitioned nature of Ignite’s architecture and provides partitioning information to Spark.

Spark Session

To use the Apache Spark DataFrame API, it is necessary to create an entry point for programming with Spark. This is achieved through the use of a SparkSession object, as shown in the following example:

// Creating spark session.
SparkSession spark = SparkSession.builder()
  .appName("Example Program")
  .master("local")
  .config("spark.executor.instances", "2")
  .getOrCreate();
// Creating spark session.
implicit val spark = SparkSession.builder()
  .appName("Example Program")
  .master("local")
  .config("spark.executor.instances", "2")
  .getOrCreate()

Reading DataFrames

In order to read data from GridGain, you need to specify its format and the path to the GridGain configuration file. For example, assume an GridGain table named ‘person’ is created and deployed in GridGain, as follows:

CREATE TABLE person (
    id LONG,
    name VARCHAR,
    city_id LONG,
    PRIMARY KEY (id, city_id)
) WITH "backups=1, affinityKey=city_id;

The following Spark code can find all the rows from the 'person' table where the name is ‘Mary Major’:

SparkSession spark = ...
String cfgPath = "path/to/config/file";

Dataset<Row> df = spark.read()
  .format(IgniteDataFrameSettings.FORMAT_IGNITE())              //Data source
  .option(IgniteDataFrameSettings.OPTION_TABLE(), "person")     //Table to read.
  .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG) //Ignite config.
  .load();

df.createOrReplaceTempView("person");

Dataset<Row> igniteDF = spark.sql(
  "SELECT * FROM person WHERE name = 'Mary Major'");
val spark: SparkSession = 
val cfgPath: String = "path/to/config/file"

val df = spark.read
  .format(FORMAT_IGNITE)               // Data source type.
  .option(OPTION_TABLE, "person")      // Table to read.
  .option(OPTION_CONFIG_FILE, cfgPath) // Ignite config.
  .load()

df.createOrReplaceTempView("person")

val igniteDF = spark.sql("SELECT * FROM person WHERE name = 'Mary Major'")

Saving DataFrames

GridGain can serve as a storage for DataFrames created or updated in Spark. The following save modes determine how a DataFrame is processed in GridGain:

  • Append - the DataFrame will be appended to an existing table. Set OPTION_STREAMER_ALLOW_OVERWRITE=true if you want to update existing entries with the data of the DataFrame.

  • Overwrite - the following steps will be executed:

  • If the table already exists in GridGain, it will be dropped.

  • A new table will be created using the schema of the DataFrame and provided options.

  • DataFrame content will be inserted into the new table.

  • ErrorIfExists (default) - an exception is thrown if the table already exists in GridGain. If a table does not exist:

  • A new table will be created using the schema of the DataFrame and provided options.

  • DataFrame content will be inserted into the new table.

  • Ignore - the operation is ignored if the table already exists in GridGain. If a table does not exist:

  • A new table will be created using the schema of the DataFrame and provided options.

  • DataFrame content will be inserted into the new table.

Save mode can be specified using the mode(SaveMode mode) method. For more information, please see the Spark Documentation). Here is a code example that shows this method:

SparkSession spark = ...

String cfgPath = "path/to/config/file";

Dataset<Row> jsonDataFrame = spark.read().json("path/to/file.json");

jsonDataFrame.write()
  .format(IgniteDataFrameSettings.FORMAT_IGNITE())
  .mode(SaveMode.Append) // SaveMode.
//... other options
   .save();
val spark: SparkSession = 

val cfgPath: String = "path/to/config/file"

val jsonDataFrame = spark.read.json("path/to/file.json")

jsonDataFrame.write
  .format(FORMAT_IGNITE)
  .mode(SaveMode.Append) // SaveMode.
//... other options
  .save()

You must define the following GridGain specific options if a new table will be created by a DataFrame’s save routines:

  • OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS - a primary key is required for every GridGain table. This option has to contain a comma-separated list of fields/columns that represent a primary key.

  • OPTION_CREATE_TABLE_PARAMETERS - additional parameters to use upon GridGain table creation. The parameters are those that are supported by the CREATE TABLE command.

The following example shows how to write the content of a JSON file into GridGain:

SparkSession spark = ...

String cfgPath = "path/to/config/file";

Dataset<Row> jsonDataFrame = spark.read().json("path/to/file.json");

jsonDataFrame.write()
  .format(IgniteDataFrameSettings.FORMAT_IGNITE())
  .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), TEST_CONFIG_FILE)
  .option(IgniteDataFrameSettings.OPTION_TABLE(), "json_table")
  .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id")
  .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated")
  .save();
val spark: SparkSession = 

val cfgPath: String = "path/to/config/file"

val jsonDataFrame = spark.read.json("path/to/file.json")

jsonDataFrame.write
  .format(FORMAT_IGNITE)
  .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
  .option(OPTION_TABLE, "json_table")
  .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
  .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
  .save()

IgniteSparkSession and IgniteExternalCatalog

Spark introduces the entity called catalog to read and store meta-information about known data sources, such as tables and views. GridGain provides its own implementation of this catalog, called IgniteExternalCatalog.

IgniteExternalCatalog can read information about all existing SQL tables deployed in the Ignite cluster. IgniteExternalCatalog is also required to build an IgniteSparkSession object.

IgniteSparkSession is an extension of the regular SparkSession that stores IgniteContext and injects the IgniteExternalCatalog instance into Spark objects.

IgniteSparkSession.builder() must be used to create IgniteSparkSession. For example, if the following two tables are created in GridGain:

CREATE TABLE city (
    id LONG PRIMARY KEY,
    name VARCHAR
) WITH "template=replicated";

CREATE TABLE person (
    id LONG,
    name VARCHAR,
    city_id LONG,
    PRIMARY KEY (id, city_id)
) WITH "backups=1, affinityKey=city_id";

Then executing the following code provides table meta-information:

// Using SparkBuilder provided by Ignite.
IgniteSparkSession igniteSession = IgniteSparkSession.builder()
  .appName("Spark Ignite catalog example")
  .master("local")
  .config("spark.executor.instances", "2")
  //Only additional option to refer to Ignite cluster.
  .igniteConfig("/path/to/ignite/config.xml")
  .getOrCreate();

// This will print out info about all SQL tables existed in Ignite.
igniteSession.catalog().listTables().show();

// This will print out schema of PERSON table.
igniteSession.catalog().listColumns("person").show();

// This will print out schema of CITY table.
igniteSession.catalog().listColumns("city").show();
// Using SparkBuilder provided by Ignite.
val igniteSession = IgniteSparkSession.builder()
  .appName("Spark Ignite catalog example")
  .master("local")
  .config("spark.executor.instances", "2")
  //Only additional option to refer to Ignite cluster.
  .igniteConfig("/path/to/ignite/config.xml")
  .getOrCreate()

// This will print out info about all SQL tables existed in Ignite.
igniteSession.catalog.listTables().show()

// This will print out schema of PERSON table.
igniteSession.catalog.listColumns("person").show()

// This will print out schema of CITY table.
igniteSession.catalog.listColumns("city").show()

And the code output should be similar to the following:

+------+--------+-----------+---------+-----------+
|  name|database|description|tableType|isTemporary|
+------+--------+-----------+---------+-----------+
|  CITY|        |       null| EXTERNAL|      false|
|PERSON|        |       null| EXTERNAL|      false|
+------+--------+-----------+---------+-----------+

PERSON table description:

+-------+-----------+--------+--------+-----------+--------+
|   name|description|dataType|nullable|isPartition|isBucket|
+-------+-----------+--------+--------+-----------+--------+
|   NAME|       null|  string|    true|      false|   false|
|     ID|       null|  bigint|   false|       true|   false|
|CITY_ID|       null|  bigint|   false|       true|   false|
+-------+-----------+--------+--------+-----------+--------+

CITY table description:

+----+-----------+--------+--------+-----------+--------+
|name|description|dataType|nullable|isPartition|isBucket|
+----+-----------+--------+--------+-----------+--------+
|NAME|       null|  string|    true|      false|   false|
|  ID|       null|  bigint|   false|       true|   false|
+----+-----------+--------+--------+-----------+--------+

GridGain DataFrame Options

Name Description

FORMAT_IGNITE

Name of the GridGain Data Source

OPTION_CONFIG_FILE

Path to the config file

OPTION_TABLE

Table name

OPTION_CREATE_TABLE_PARAMETERS

Additional parameters for a newly created table. The value of this option is used for the WITH part of a CREATE TABLE query.

OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS

Comma separated list of primary key fields.

OPTION_STREAMER_ALLOW_OVERWRITE

If true, then an existing row will be overwritten with DataFrame content. If false, then the row will be skipped if the primary key already exists in the table.

OPTION_STREAMER_FLUSH_FREQUENCY

Automatic flush frequency. This is the time after which the streamer will make an attempt to submit all data added so far to remote nodes (see Data Streaming).

OPTION_STREAMER_PER_NODE_BUFFER_SIZE

Per node buffer size. See also. The size of the per node key-value pairs buffer.

OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS

Per node buffer size. The maximum number of parallel stream operations for a single node.

OPTION_SCHEMA

The GridGain SQL schema name in which the specified table exists. When OPTION_SCHEMA is not specified, all schemas will be scanned to find a table with a matching name. This option can be used to differentiate two tables of the same name in different GridGain SQL schemas.

When creating new tables, OPTION_SCHEMA must be specified as PUBLIC, otherwise an exception will be thrown because currently GridGain SQL can issue CREATE TABLE statements within the PUBLIC schema only.

Examples

There are several examples available on GitHub that demonstrate how to use Spark DataFrames with Ignite: