Introduction to SQOOP in Hadoop

Introduction to SQOOP in Hadoop

Data ingestion is one of the crucial steps in the data lifecycle and when the source is a relational database, Sqoop can be a very easy and simple tool for this.

What is Sqoop?

Sqoop is a general tool for transferring bulk data from relational database management systems to Hadoop-distributed data storage (HDFS) and vice versa.

The most used feature of Sqoop is Sqoop import for importing data from RDBMS to HDFS. Generally, this import happens parallelly by multiple mappers importing different parts of the data at the same time.

example -
sqoop-import \ --connect "jdbc:mysql:hostname:3306/db_name" --username dba \ --password dba \ --table employees \ --target-dir /data/employees

In the above example --connect will decide the source DB connection string,
--username & --password decides the credentials for authentication ( this can be encrypted also using password-alias and Hadoop credential cmd)
--table decides the name of the source table
--target-dir decides where the results will be stored in HDFS

Sqoop import working

Steps

1. A dummy query is executed to get the schema details
select * from table_name limit 1;

2. A java class object is created with getters and setters for the schema obtained in step 1

3. A JAR file is created of the object obtained in the previous step

4. Boundary Query (can be customized also) is executed to find the min and max values of the primary key
select min(emp_id),max(emp_id) from employee;
if there is no primary key in the source table then we need to decide on a column to be used for deciding the split size using --split-by property or do a sequential import using only 1 mapper.

5. with the above boundary query result split size is calculated using
(max-min)/number of mappers

6. Sqoop MapReduce job is submitted to the Hadoop cluster with default mappers as 4. there is no reducer used as no aggregation is happening.

7. each mapper executes the select query for a part of data having the number of records equal to split size.
for example, if there are a total of 4000 records from id 1 to 4000 then
Mapper 1 - select * from employee where id between 1 and 1000; Mapper 2 - select * from employee where id between 1001 and 2000; Mapper 3 - select * from employee where id between 2001 and 3000; Mapper 4 - select * from employee where id between 3001 and 4000;

8. fetched records are stored in hdfs, by default fields are terminated by ',' and records are terminated by '\n'

Sqoop incremental import & Sqoop JOB

There can be situations when you don't want to import the whole table but just recent inserts(append mode) or updates (last-modified mode) done to the table. In this type of scenarios Incremental Sqoop comes into play where we do partial imports based on some condition.

There are two types of incremental import
1. Append
Used when there are no updates and you want to import new inserts only.

example -
sqoop-import \ --connect "jdbc:mysql:hostname:3306/db_name" --username dba \ --password dba \ --table employees \ --target-dir /data/employees \ --incremental append \ --check-column emp_id \ --last-value 987
In the above example, --incremental decides which mode is selected append or last-modified, --check-column decides the column where these conditions are applied for partial import, --last-value decides the smallest possible value for check-column while importing a record.

2. Lastmodified
Used where there are updates as well as new inserts in the source table.

sqoop-import --connect "jdbc:mysql:hostname:3306/db_name" --username dba --password dba --table employees --target-dir /data/employees --incremental lastmodified --check-column dt_updated --last-value 2022-12-27 00:00:00

In the above example, --incremental decides which mode is selected append or last-modified, --check-column decides the column where these conditions are applied for partial import, --last-value decides the smallest possible value for check-column while importing a record. In case of lastmodified the check column should be of date type in source table.

Generally, incremental import is done on a regularly scheduled basis to add recent updates/inserts regularly to HDFS and for that, we need to remember the last value each time running the incremental sqoop import.
To solve this problem we have something called Sqoop Job which stores the --incremental, --check-column, and --last-value in Sqoop metadata store so that whenever we run the job next time it takes the last updated values of these parameters and imports the required data only.

example -
sqoop job \ --create job_emp \ -- import \ --connect "jdbc:mysql:hostname:3306/db_name" --username dba --password dba --table employees --target-dir /data/employees --incremental lastmodified --check-column dt_updated --last-value 2022-12-27 00:00:00 sqoop job --exec job_emp