Sqoop Import Queries with Examples

We all know that for transferring data from RDBMS to HDFS or vice-versa, we use Apache Sqoop. In this Sqoop Import article, we will discuss the Sqoop Import tool used for importing tables from the RDBMS to the HDFS.

In this article, you will explore how to import tables to HDFS, Hive, HBase, and Accumulo. You will also learn the syntax as well as the different arguments.

Moreover, you will study the purpose of Sqoop Import as well as examples of the Sqoop import query to understand it well.

Let us first explore what Sqoop Import is.

 

Introduction to Sqoop Import

The Sqoop import is a tool that imports an individual table from the relational database to the Hadoop Distributed File System. Each row from the table which you are importing is represented as a separate record in the HDFS.

In HDFS, these records can be stored either as text files (one record per line) or in the binary representation as Avro or as the SequenceFiles.

Sqoop Import Syntax

The syntax for Sqoop Import command is:

$ sqoop import (generic-args) (import-args)
$ sqoop-import (generic-args) (import-args)

We can pass import arguments in any order with respect to each other, but the Hadoop generic arguments must precede the import arguments.

Let us first see the common arguments.

Argument Description
–connect <jdbc-uri> It specify the JDBC connect string
–connection-manager <class-name> It specify the connection manager class used
–driver <class-name> Manually specify the JDBC driver class to use
–hadoop-mapred-home <dir> Override $HADOOP_MAPRED_HOME
–help It will print usage instructions
–password-file It will set the path for a file containing authentication password
-P It will read the password from the console
–password <password> It will set the authentication password
–username <username> It will set the authentication username
–verbose It will print more information while working
–connection-param-file <filename> Specify the Optional properties file who provides connection parameters
–relaxed-isolation It will set the connection transaction isolation to read the uncommitted for the mappers.

These are the common arguments.

Now the article enlists the syntax and arguments for the various steps for importing data.

Connecting to the Database Server

Apache Sqoop is basically designed for importing tables from the database into the HDFS. For doing so, we have to specify the connect string, which describes how to connect to the relational database.

This connect string should be similar to the URL and is communicated to Apache Sqoop via the –connect argument. This argument specifies the server and the database to connect to and the port.

For example:

$ sqoop import --connect jdbc:mysql://localhost/demo_db_db

The above example will connect to the MySQL database named demo_db on the localhost.

Also, sometimes we have to authenticate against the relational database before accessing it. We can use the argument –username for supplying a username to a database. There are several ways Sqoop provides for supplying a password in a secure and non-secure mode.

Generally, we use -P argument, which reads the password from the console.

The Validation arguments are:

Argument Description
–validate It will enable the validation of the data copied. It will support a single table copy only.
–validator <class-name> It will specify the validator class to use.
–validation-threshold <class-name> It will specify the validation threshold class to use.
–validation-failurehandler <class-name> It will specify the validation failure handler class to use.

Selecting the Data to import

1. Apache Sqoop imports the data in table-centric fashion. We can use the argument –table for selecting the table to be imported.

For example,

--table emp_info.

The –table argument can identify the VIEW or the other table-like entity in the database.
With this argument, by default, all the columns in a table get selected for import. The imported data is written to the HDFS in their “natural order”.

For example, a table containing the columns A, B, and C will result in an import of the data such as:
A1,B1,C1
A2,B2,C2

2. In Sqoop, we can also select the subset of columns, and we can control their ordering by using –columns argument. This argument must include the comma-delimited list of all the columns to be imported.

For example:

--columns "emp_name,emp_id,emp_jobtitle".

3. We can also control the rows to be imported by adding a SQL WHERE clause to the import statement. Sqoop by default generates the statements as SELECT <column list> FROM <table name>. We can append the WHERE clause to this statement with the argument –where.

--where "emp_id > 400".

So those rows whose id column has a value greater than 400 will be imported.

The other Sqoop import control arguments are:

Argument Description
–append Append data to the existing dataset in the HDFS
–as-avrodatafile Imports the data to the Avro Data Files
–as-sequencefile Imports the data to the SequenceFiles
–as-textfile Imports the data as the plain text (default)
–as-parquetfile Imports the data to the Parquet Files
–boundary-query <statement> Specify the boundary query use for creating splits
–columns <col,col,col…> Specify the columns to be imported from table
–delete-target-dir It will delete the import target directory if it exists
–direct Use the direct connector if exists for a database
–fetch-size <n> Specify the number of entries to be read from the database at once.
–inline-lob-limit <n> It will set the maximum size for the inline LOB
-m,–num-mappers <n> Specify the number of mapper. Use n map tasks for importing in parallel
-e,–query <statement> Import the results of the statement.
–split-by <column-name> Specify the table column to be used to split the work units. We cannot use it with  –autoreset-to-one-mapper option.
–autoreset-to-one-mapper It specifies that import should use only one mapper if the table does not have a primary key and no split-by column is provided. This option cannot be used with the –split-by <col> option.
–table <table-name> Table to read
–target-dir <dir> HDFS destination dir
–warehouse-dir <dir> HDFS parent for the table destination
–where <where clause> Specify the WHERE clause to be used during import
-z,–compress Enable compression
–compression-codec <c> Use Hadoop codec (default gzip)
–null-string <null-string> The string to be written for the null value for the string columns
–null-non-string <null-string> The string to be written for the null value for the non-string columns

The arguments –null-string and the –null-non-string are optional. If they are not specified, then string “null” will be used.

Free-form Query Imports

Apache Sqoop can import the result set of the arbitrary SQL query. Rather than using the arguments –table, –columns and –where, we can use –query argument for specifying a SQL statement.

Note: While importing the table via the free-form query, we have to specify the destination directory with the –target-dir argument.

Controlling Parallelism in Sqoop

1. Apache Sqoop can import the data in parallel from most of the database sources. We have to specify the total number of mappers to be used for performing the import by using the argument –num-mappers or -m.

These two arguments take an integer value that corresponds to the degree of parallelism to employ.

2. Four tasks are used by default. We can increase this value to 8 or 6 to improve the performance.

Note: The degree of parallelism should not be increased than that available within our MapReduce cluster. This may increase the import time. Also, the degree of parallelism should not be increased greater than that which our database can reasonably support. This may increase the load on the database server.

3. For performing parallel imports, Apache Sqoop requires a criterion through which Sqoop can split the workload. For this, it uses a splitting column. Sqoop, by default, identifies the primary key column in the table as used for splitting the workload.

The high and the low values for a splitting column are fetched from a database, and map tasks operate on the evenly-sized components of the total range.

For example, if we are having a table with the primary key column emp_id whose minimum value is 0 and the maximum value is 1000, and the Sqoop was directed to use 4 map tasks.

The Sqoop will execute four processes which each execute SQL statements of the form SELECT * FROM emp_info WHERE id >= lo AND id < hi, with (lo, hi) set to (0, 250), (250, 500), (500, 750), and (750, 1001) in the different tasks.

4. We can explicitly choose the different splitting columns via the argument –split-by.

For example,

--split-by emp_id.

5. At present, Sqoop cannot split on the multi-column indices. If the table is not having any index column or a multi-column key, then we have to choose the splitting column manually.

Controlling Distributed Cache in Sqoop

Apache Sqoop copies the jars in the $SQOOP_HOME/lib folder to the job cache every time while starting a Sqoop job. When it is launched by Oozie, then it is unnecessary because Oozie uses its own Sqoop share lib, which keeps the Sqoop dependencies in a distributed cache.

By using the argument –skip-dist-cache in Sqoop command when launched by the Oozie, it will skip the step by which the Sqoop copies all its dependencies to the job cache and save the massive I/O.

Controlling Import Process in Sqoop

Sqoop import process by default uses the JDBC, which provides the reasonable cross-vendor import channel. Although some databases can perform the imports in a high-performance manner by using the database-specific data movement tools.

For example, the mysqldump tool is provided by MySQL to export data from MySQL to the other systems quickly.

We can pass the –direct argument, for specifying that Sqoop should choose the direct import channel. The direct import channel has higher performance than JDBC.

Controlling transaction isolation in Sqoop

Apache Sqoop is preconfigured for mapping most of the SQL types to the appropriate Java or the Hive representatives.

The default mapping may not be suitable for all and can be overridden via –map-column-java argument (for changing Java mapping) or via –map-column-hive (for changing Hive mapping).

Argument Description
–map-column-java <mapping> Used for Overriding mapping from SQL to Java type for configured columns.
–map-column-hive <mapping> Used for Overriding mapping from SQL to Hive type for configured columns.

Sqoop expects a comma-separated list of mapping in the form <name of column>=<new type>.

For example:

$ sqoop import ... --map-column-java emp_id=String,value=Integer

Let us now talk about Incremental Import.

Incremental Imports in Sqoop

Sqoop provides the facility of incremental import mode, which retrieves only those rows which are newer than the previously-imported set of rows.
The following arguments control incremental imports:

Argument Description
–check-column (col) It specifies the columns which are to be examined while determining which rows to import. Note that the column must not be of the type CHAR/NCHAR/VARCHAR/VARNCHAR/LONGVARCHAR/LONGNVARCHAR. 
–incremental (mode) It will Specify how Sqoop determines which rows are new. The legal values for the mode includes append and lastmodified.
–last-value (value) It will specify the maximum value of the check column from the previous import.

Apache Sqoop supports 2 types of incremental imports. They are append and lastmodified. For specifying the type of incremental import to be performed, we have to use –incremental argument.

We can specify the append mode when we are importing a table in which the new rows were added continually with the increasing row id values.

We can specify the lastmodified mode when the rows of the table may be updated, and this update sets the value of the last-modified column to the current timestamp.

Sqoop File Formats

We can import the data in any of the two file formats. These two file formats are delimited text or SequenceFiles.

1. Delimited text: It is the default import format. We can also specify it explicitly via the argument –as-textfile.
2. SequenceFiles: These are the binary formats which store individual records in the custom record-specific data types.

Large Objects

Apache Sqoop handles large objects such as BLOB and CLOB columns in a particular way. The large columns are not fully materialized in the memory for manipulation like the other columns. Large object data is handled in a streaming manner.

The large objects can be stored inline with the rest of the data, in which they are fully materialized in the memory on every access. They can be stored in the secondary storage file, which is linked to a primary data storage.

In Sqoop, by default, the large objects which are less than 16 MB in size were stored inline with the rest of the data.

The Output line formatting arguments are:

Argument Description
–enclosed-by <char> It will set the required field enclosing character
–escaped-by <char> It will set the escape character
–fields-terminated-by <char> It will set the field separator character
–lines-terminated-by <char> It will set the end-of-line character
–mysql-delimiters Uses the MySQL’s default delimiter set: fields: , lines: \n escaped-by: \ optionally-enclosed-by: ‘
–optionally-enclosed-by <char> It will set the field enclosing character

The Input parsing arguments are:

Argument Description
–input-enclosed-by <char> Sets a required field encloser
–input-escaped-by <char> Sets the input escape character
–input-fields-terminated-by <char> Sets the input field separator
–input-lines-terminated-by <char> Sets the input end-of-line character
–input-optionally-enclosed-by <char> Sets a field enclosing character

Importing Data Into Hive

Sqoop is used mainly for uploading table data into HDFS. But if we have a Hive metastore associated with our HDFS cluster, then also we can use Apache Sqoop.

Sqoop imports the data into the Hive by generating and executing the CREATE TABLE statement for defining data’s layout in the Hive. We can import data into Hive just by adding the option –hive-import in our Sqoop command line.

If in case the Hive table already exists, then we can specify –hive-overwrite option that indicates the already existing table in hive should be replaced.

When the data is imported into HDFS, then Sqoop will generate the Hive script that contains the CREATE TABLE operation that defines our columns using the Hive’s types, and the LOAD DATA INPATH statement for moving data files into the Hive’s warehouse directory.

This script is executed by calling an installed copy of Hive on the machine where Sqoop is run.

If we have multiple Hive installations, or the hive is not in our $PATH, then we use the option –hive-home for identifying Hive installation directory. Apache Sqoop will use $HIVE_HOME/bin/hive from here.

We can import data for Hive into the particular partition by specifying the arguments, –hive-partition-key and –hive-partition-value arguments.

By using the –compress and –compression-codec options, we can import compressed tables into the Hive.

The Hive arguments are:

Argument Description
–hive-home <dir> Override $HIVE_HOME
–hive-import Import tables into Hive. It uses the Hive’s default delimiters if none are set.
–hive-overwrite Overwrite the existing data in the Hive table.
–create-hive-table If we set this option, then the Sqoop job will fail if a target hive table exits. This property, by default, is set to false.
–hive-table <table-name> It will set a table name to use while importing to Hive.
–hive-drop-import-delims It will drop the \n, \r, and \01 from the string fields while importing to Hive.
–hive-delims-replacement It will replace the \n, \r, and \01 from the string fields with the user defined string while importing to Hive.
–hive-partition-key It will specify the name of the hive field to which the partition are sharded on
–hive-partition-value <v> It will specify the String-value which serves as a partition key for this imported into the hive in this job.
–map-column-hive <map> Override the default mapping from the SQL type to the Hive type for the configured columns.

Importing Data Into HBase using Sqoop

Apache Sqoop can import the records into the table in HBase as well. For importing a table to HBase instead of any directory in HDFS, we have to specify the –hbase-table option in the Sqoop command.

Apache Sqoop will import the data to a table specified as an argument to the –hbase-table option. Each row of an input table is transformed into the HBase Put operation to the row of the output table. For each row, the key is taken from the column of the input.

Sqoop, by default, uses a split-by column as a row key column.

If the split-by column is not specified, then it will try to find the primary key column. We can also manually specify the row key column with the –hbase-row-key. Every output column is placed in the same column family specified with –column-family.

While importing data into HBase, if the target table and the column family don’t exist, then the Sqoop job exits with an error. So if you are importing using the –hbase-table option, then you have to create the target table and the column family before running the import.

If we specify the option –hbase-create-table, then the Sqoop will itself create the target table and the column family if they don’t exist.

The HBase arguments are:

Argument Description
–column-family <family> It will set a target column family for the import
–hbase-create-table If it is specified, then it will create missing HBase tables
–hbase-row-key <col> It will specify which input column to be used as a row key. If the input table contains a composite key, then in such as case the<col> must be in the form of the comma-separated list of the composite key attributes
–hbase-table <table-name> It will Specify an HBase table to be used as a target instead of HDFS
–hbase-bulkload Enables bulk loading

Importing Data Into Accumulo

Apache Sqoop also provides support for importing records into the table in Accumulo. It can be done by specifying –accumulo-table option. Apache Sqoop imports the data to a table specified as an argument to the –accumulo-table.

Each row of an input table is transformed into the Accumulo Mutation operation to the row of an output table. For each row, the key is taken from the column of the input.

Sqoop, by default, uses the split-by column as a row key column. If the split-by column is not specified, then it tries to find the primary key column. We can also manually specify a row key column via –accumulo-row-key.

Each output column is placed in the same column family specified with –accumulo-column-family.
Specify the –accumulo-create-table parameter if you have not created a target table.

The Accumulo arguments are:

Argument Description
–accumulo-table <table-nam> It will specify the Accumulo table to be used as the target instead of the HDFS
–accumulo-column-family <family> It will set the target column family for the import
–accumulo-create-table If it is specified, then it will create the missing Accumulo tables
–accumulo-row-key <col> It will specify which input column to be used as the row key
–accumulo-visibility <vis> It is Optional.  It will specify the visibility token to be applied to all the rows inserted into Accumulo. The default value is the empty string.
–accumulo-batch-size <size> It is optional. It will set the size of Accumulo’s writer buffer in bytes. The default is 4MB.
–accumulo-max-latency <ms> It is Optional. It will set the maximum latency in the milliseconds for the Accumulo batch writer. The default value is set to 0.
–accumulo-zookeepers <host:port> Specify the comma-separated list of the Zookeeper servers used by Accumulo instance
–accumulo-instance <table-name> Specify the name of a target Accumulo instance
–accumulo-user <username> Specify the name of Accumulo user to import as
–accumulo-password <password> Specify the password for Accumulo user

Additional Import Configuration Properties

There are some additional properties which we can configure by modifying the conf/sqoop-site.xml file. We can specify the properties in the same manner as we do in Hadoop configuration files.

For example:

<property>
    <name>property.name</name>
    <value>property.value</value>
  </property>

We can also specify it on the command line in the generic arguments.
For example:

sqoop import -D property.name=property.value ...

The Additional import configuration properties are:

Argument Description
sqoop.bigdecimal.format.string Controls how BigDecimal columns will be formatted when they are stored as a String. The default value is true that will use toPlainString for storing them without an exponent component (0.0000001). If set to false then it will use toString which may include an exponent (1E-7).
sqoop.hbase.add.row.key It is set to false by default which means that the  Sqoop will not add the column which is used as a row key into the row data in the HBase. If we set this property to true, then the column which is used as a row key will be added to the row data in the HBase.

Sqoop Import Example Invocations

The below examples will illustrate how we can use the Sqoop import tool in a variety of situations.

1: In this example, we are just trying to import a table named emp_info in the demo_db_db database:

$ sqoop import --connect jdbc:mysql://localhost/demo_db_db --table emp_info

The basic import requires a login which can be done as:

$ sqoop import --connect jdbc:mysql://localhost/demo_db_db --table emp_info \
    --username SomeUser -P
Enter password: (hidden)

2: In this example we are importing the specific columns from the emp_info table:

$ sqoop import --connect jdbc:mysql://localhost/demo_db_db --table emp_info \
    --columns "emp_id,emp_name,emp_jobtitle"

3: Controlling the import parallelism by using the 8 parallel tasks:

$ sqoop import --connect jdbc:mysql://localhost/demo_db --table emp_info \
    -m 8

4: Storing data in the SequenceFiles, and setting generated class name to example.Emp:

$ sqoop import --connect jdbc:mysql://localhost/demo_db --table emp_info \
    --class-name example.Emp --as-sequencefile

5: Specifying delimiters to be use in the text-mode import:

$ sqoop import --connect jdbc:mysql://localhost/demo_db --table emp_info\
    --fields-terminated-by '\t' --lines-terminated-by '\n' \
    --optionally-enclosed-by '\"'

6: Trying to import the data to Hive:

$ sqoop import --connect jdbc:mysql://localhost/demo_db --table emp_info\
    --hive-import

7: Importing only the new employees:

$ sqoop import --connect jdbc:mysql://localhost/demo_db --table emp_info \
    --where "start_date > '2020-06-11'"

8: Trying to change the splitting column from the default:

$ sqoop import --connect jdbc:mysql://localhost/demo_db --table emp_info \
    --split-by dept_id

9: We can verify that an import was successful by using:

$ hadoop fs -ls emp_info

10: In the below example, we are Performing an incremental import of the new data. This is after importing the the first 1000 rows of the table:

$ sqoop import --connect jdbc:mysql://localhost/demo_db --table emp_info \
    --where "id > 1000" --target-dir /incremental_dataset --append

11: Trying to import a table named emp_info in the demo_db database that uses validation for validating the import by using the table row count and the number of rows copied into the HDFS:

$ sqoop import --connect jdbc:mysql://localhost/demo_db \
    --table emp_info --validate

Summary

I hope after reading this article, you clearly understand how we can import tables from relational tables to HDFS, Hive, HBase, and Accumulo.

The article had explained different types of arguments as well as the syntax for the Sqoop import tool.

If you have any doubt related to Sqoop import, then please share it with us in the comment section.