Synchronizing Data Using Apache Sqoop
Load data from/to Hadoop and GridGain clusters using Apache Sqoop, a tool for transferring data between Hadoop and structured datastores. Sqoop connects to a GridGain cluster using the JDBC driver and connection manager provided by GridGain.
This page explains how to:
-
Import data from GridGain to Hadoop.
-
Export data from Hadoop to GridGain.
-
Perform an incremental import by loading only the data that has changed since the last synchronization (Change Data Capture).
All operations are performed by running the Sqoop command with relevant parameters.
Setup
Download the GridGain Hadoop Connector from https://www.gridgain.com/resources/download and unpack the archive into a directory. We will refer to this directory as GG_HADOOP_CONNECTOR_HOME
.
The GRIDGAIN_HOME
variable refers to the GridGain distribution folder.
Set the following environment variables in ${GG_HADOOP_CONNECTOR_HOME}/examples/gridgain-sqoop-examples/setenv.sh
.
#Apache Ignite core library with JDBC Thin driver to be added the Hadoop classpath.
export IGNITE_CORE_LIB="<GRIDGAIN_HOME>/libs/ignite-core-<GRIDGAIN_VERSION>.jar"
#GridGain Sqoop integration library
export GRIDGAIN_SQOOP_LIB="<GG_HADOOP_CONNECTOR_HOME>/libs/gridgain-sqoop/gridgain-sqoop-<VERSION>.jar"
#Apache Hadoop home
#Example: /home/user/hadoop-2.9.2
export HADOOP_HOME="<HADOOP_HOME>"
#Apache Sqoop Home
#Example: /home/user/sqoop-1.4.7
export SQOOP_HOME="<APACHE_SQOOP_HOME>"
Start a GridGain server node using the following configuration file. This configuration defines a cache named Person with three fields: id
, name
, last_updated
.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:util="http://www.springframework.org/schema/util" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>127.0.0.1:47500..47510</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="Person"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="ATOMIC"/>
<property name="sqlSchema" value="PUBLIC"/>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="PersonKey"/>
<property name="valueType" value="PersonValue"/>
<property name="tableName" value="Person"/>
<property name="keyFields">
<list>
<value>id</value>
</list>
</property>
<property name="fields">
<map>
<entry key="id" value="java.lang.Long"/>
<entry key="name" value="java.lang.String"/>
<entry key="lastUpdated" value="java.sql.Timestamp"/>
</map>
</property>
<property name="aliases">
<map>
<entry key="id" value="id"/>
<entry key="name" value="name"/>
<entry key="lastUpdated" value="last_updated"/>
</map>
</property>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</beans>
To start a node, run the following command.
${GRIDGAIN_HOME}/bin/ignite.sh path/to/server.xml
${GRIDGAIN_HOME}\bin\ignite.bat path\to\server.xml
Populate the Person cache using the sqlline tool as follows:
${GRIDGAIN_HOME}/bin/sqlline.sh -u jdbc:ignite:thin://127.0.0.1/ -f <GG_HADOOP_CONNECTOR_HOME>/examples/gridgain-sqoop-examples/config/init.sql
Please make sure that JAVA_HOME
points to jdk 11 or higher and copy the GridGain and Scoop jars to the HDFS:
hdfs dfs -mkdir -p /home/user/sqoop-1.4.7/lib
hdfs dfs -put /home/user/sqoop-1.4.7/lib/* /home/user/sqoop-1.4.7/lib
hdfs dfs -put /home/user/sqoop-1.4.7/sqoop-1.4.7.jar /home/user/sqoop-1.4.7
hdfs dfs -mkdir -p /home/user/gridgain-enterprise-{version}/libs
hdfs dfs -put /home/user/gridgain-enterprise-{version}/libs/ignite-core-{version}.jar /home/user/gridgain-enterprise-{version}/libs/
hdfs dfs -mkdir -p /home/user/gridgain-hadoop-connector/libs/gridgain-sqoop
hdfs dfs -put /home/user/gridgain-hadoop-connector/libs/gridgain-sqoop/gridgain-sqoop-<version>.jar /home/user/gridgain-hadoop-connector/libs/gridgain-sqoop
Now we have a running GridGain cluster (consisting of one node) that contains a cache with data.
Let’s import the data into HDFS.
Import from GridGain to HDFS
Go to ${GG_HADOOP_CONNECTOR_HOME}/examples/gridgain-sqoop-examples/
and execute import.sh|bat
. The script launches Sqoop’s import
command to load data from the Person table into HDFS as text files. You can customize the import options in the import.txt
file.
Parameter | Description |
---|---|
|
The JDBC driver connection URL.
To connect to a local cluster, use |
|
Leave unchanged. These classes are provided by GridGain and Hadoop Connector libraries. |
|
The destination path in HDFS. |
|
Output directory for generated code. |
|
The column that will be used to split the data into work units. |
|
Indicates that the data will be imported as plain text. Each table row is stored as one record per line in the file located in the target-dir directory. |
Incremental import from GridGain to HDFS
You can use an incremental import to load only the rows that have been modified since the last import. To use this feature, the table must contain a column that stores the timestamp of the time when the row was inserted. In our example, we use the last_updated
column. The value in this column must be set to the current timestamp every time the Person table is updated.
Read more about incremental imports in the Sqoop documentation.
The create-incremental-import-job.sh
script creates a saved job that performs an incremental import. The created job remembers the parameters and can be executed by its name.
The parameters for the import job are specified in the create-incremental-import-job.txt
file.
Parameter | Description |
---|---|
|
Specifies the column to examine when determining which rows to import. |
|
Specifies the maximum value of the check column from the previous import. |
The run-import-job.sh
script launches the saved job. Change the value of the --last-value
parameter in create-incremental-import-job.txt
and run the script to import the data.
Export from HDFS to GridGain
The export.sh
script launches the export
command which loads the data from HDFS into the GridGain cluster. The scripts parses the input files (specified in export.txt
) and loads the data into the Person table in the GridGain cluster using DML statements.
Parameter | Description |
---|---|
|
The source directory for the export. |
|
The name of the cache in GridGain to populate. |
See the full list of parameters in the Sqoop documentation.
Here is an example a source file:
1,John,2019-01-01 00:00:00.0
© 2024 GridGain Systems, Inc. All Rights Reserved. Privacy Policy | Legal Notices. GridGain® is a registered trademark of GridGain Systems, Inc.
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.