Apache Sqoop

Apache Sqoop
Sqoop (“SQL-to-Hadoop”) is a tool designed to transfer data between Hadoop and relational databases. Sqoop allows to import data from a relational database management system (RDBMS) such as MySQL into the Hadoop Distributed File System (HDFS) for further processing using MapReduce program and then later export this processed data back into an RDBMS.

Sqoop is helpful in analysing certain behaviour (e.g. could be reading server logs) and wish to view the results of such analysis quite often. Triggering MR program would not be a feasible approach for a quick view of data (plus the fact that Hadoop systems are not good at quick reads and for smaller chunks).

So to overcome we can capture few data, import into HDFS, process it and export back to Hive or other data system for ad-hoc queries.

In this short tutorial, we will see how sqoop can be used to import data from the relational table to hdfs and vice versa.

Environment
Hadoop: hadoop-2.4.0.tar (Assuming the Hadoop is already installed)
Sqoop: sqoop-1.4.5.bin__hadoop-2.0.4-alpha.tar
(http://www.apache.org/dist/sqoop/1.4.5/)
MySQL JAR: mysql-connector-java-5.1.34.jar

Importing data from MySQL table to HDFS
Step 1: Creating database and table in mysql

mysql> create database sqoopdb;
mysql> use sqoopdb;
mysql> create table employee (name varchar(255), salary double(7,2));
mysql> insert into employee values (‘John’, 123456.66);
mysql> insert into employee values (‘Tim’,98544);

mysql> select * from employee;
+——+———-+
| name | salary   |
+——+———-+
| John | 54887.00 |
| Tim  | 98544.00 |
+——+———-+
2 rows in set (0.00 sec)

Step2: Copying ‘mysql-connector-java-5.1.34.jar’ to $SQOOP_HOME/lib directory

Note: Initially I have copied ‘mysql-connector-java-5.0.5.jar’, but due to this, I was getting the following exception (trimmed for verbosity):

“INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employee` AS t LIMIT 1
ERROR manager.SqlManager: Error reading from database: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@4d3c7378 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries. java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@4d3c7378 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.”

“ERROR tool.ImportTool: Encountered IOException running import job: java.io.IOException: No columns to generate for ClassWriter”

Step3: Importing a table into HDFS
$ sqoop import –connect jdbc:mysql://localhost:3306/sqoopdb –username root –password root –table employee -m 1

INFO sqoop.Sqoop: Running Sqoop version: 1.4.5
WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
INFO tool.CodeGenTool: Beginning code generation
INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employee` AS t LIMIT 1
INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employee` AS t LIMIT 1
INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/local/hadoop240
Note: /tmp/sqoop-hduser/compile/8b0c322a9f8c2420e9bbd2dd079dea4d/employee.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hduser/compile/8b0c322a9f8c2420e9bbd2dd079dea4d/employee.jar
WARN manager.MySQLManager: It looks like you are importing from mysql.
WARN manager.MySQLManager: This transfer can be faster! Use the –direct
WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
INFO mapreduce.ImportJobBase: Beginning import of employee
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
INFO db.DBInputFormat: Using read commited transaction isolation
INFO mapreduce.JobSubmitter: number of splits:1

INFO mapreduce.Job: Job job_1427265867092_0001 running in uber mode : false
INFO mapreduce.Job:  map 0% reduce 0%
INFO mapreduce.Job:  map 100% reduce 0%
INFO mapreduce.Job: Job job_1427265867092_0001 completed successfully
INFO mapreduce.Job: Counters: 30

INFO mapreduce.ImportJobBase: Transferred 25 bytes in 47.9508 seconds (0.5214 bytes/sec)
INFO mapreduce.ImportJobBase: Retrieved 2 records.

Step4: Listing datafile content
$ hadoop dfs -ls -R employee
$ hadoop dfs -cat /user/hduser/employee/part-m-00000
John,54887.0
Tim,98544.0

Importing data from HDFS to table
Step1: Creating an empty table in mysql
mysql> use sqoopdb;
mysql> create table employee_export (name varchar(255), salary double(7,2));
Query OK, 0 rows affected (0.06 sec)

mysql> desc employee_export;
+——–+————–+——+—–+———+——-+
| Field  | Type         | Null | Key | Default | Extra |
+——–+————–+——+—–+———+——-+
| name   | varchar(255) | YES  |     | NULL    |       |
| salary | double(7,2)  | YES  |     | NULL    |       |
+——–+————–+——+—–+———+——-+
2 rows in set (0.00 sec)

mysql> select * from employee_export;
Empty set (0.00 sec)

Step2: Creating directory and copy csv to HDFS
$ cat employee.csv
Jack,9878.21
Mark,8754.65

hadoop fs -mkdir -p /user/hduser/export
hadoop fs -copyFromLocal employee.csv /user/hduser/export/employee.csv

Step3: Running export command
$ sqoop export –connect jdbc:mysql://localhost/sqoopdb –username root –password root –table employee_export –export-dir ‘/user/hduser/export’ -m 1

INFO sqoop.Sqoop: Running Sqoop version: 1.4.5
WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
15/03/25 18:55:19 INFO tool.CodeGenTool: Beginning code generation
15/03/25 18:55:19 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employee_export` AS t LIMIT 1
15/03/25 18:55:19 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employee_export` AS t LIMIT 1
15/03/25 18:55:19 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/local/hadoop240
Note: /tmp/sqoop-hduser/compile/6e14d90fb8e22995a61e9be9af177f1a/employee_export.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
15/03/25 18:55:21 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hduser/compile/6e14d90fb8e22995a61e9be9af177f1a/employee_export.jar
15/03/25 18:55:21 INFO mapreduce.ExportJobBase: Beginning export of employee_export

15/03/25 18:55:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/03/25 18:55:25 INFO input.FileInputFormat: Total input paths to process : 1
15/03/25 18:55:25 INFO input.FileInputFormat: Total input paths to process : 1
15/03/25 18:55:25 INFO mapreduce.JobSubmitter: number of splits:1
15/03/25 18:55:25 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative
15/03/25 18:55:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1427265867092_0006
15/03/25 18:55:26 INFO impl.YarnClientImpl: Submitted application application_1427265867092_0006

15/03/25 18:55:34 INFO mapreduce.Job:  map 0% reduce 0%
15/03/25 18:55:41 INFO mapreduce.Job:  map 100% reduce 0%
15/03/25 18:55:41 INFO mapreduce.Job: Job job_1427265867092_0006 completed successfully
15/03/25 18:55:42 INFO mapreduce.Job: Counters: 30
15/03/25 18:55:42 INFO mapreduce.ExportJobBase: Transferred 163 bytes in 18.4814 seconds (8.8197 bytes/sec)
15/03/25 18:55:42 INFO mapreduce.ExportJobBase: Exported 2 records.
———————-
mysql> select * from sqoopdb.employee_export;
+——+———+
| name | salary  |
+——+———+
| Jack | 9878.21 |
| Mark | 8754.65 |
+——+———+
2 rows in set (0.00 sec)

Apache Storm

What is Storm
Apache Storm is a distributed realtime computation system which process unbounded streams of data, doing for realtime processing. Storm is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate. Realtime analytics, online machine learning, continuous computation, distributed RPC, ETL are namely few primary use cases that can be addressed via Storm.

STORMs logical overview
At the highest level, Storm is comprised of topologies. A topology is a graph of computations — each node contains processing logic and each path between nodes indicates how data should be passed between nodes. A topology comprises of network of streams which is unbounded sequence of tuples. In short:

  1. Tuple: Ordered list of elements. Eg: (“orange”,”tweet-123″,..,..,..).
    Valid type: String, Integer, byte-array, or you can also define your own serializers so that custom types can be used natively within tuples.
  2. Streams: Unbounded sequence of tuples: Tuple1, Tuple2, Tuple3

Storm uses SPOUTS, which takes an continuous input stream from a source viz. twitter and pass this chunk of data (or emit this stream) to another component called as BOLTS to consume. An emitted tuple can go from a “Spout” to “Bolt” or/and from “Bolt” to another “Bolt”. A Storm topology may have one or more Spouts and Bolts. As an implementer/programmer, multiple spouts/bolts can be configured as per the business logic.

(The above image is from Apache Storm’s website)

STORMs Architecturial overview
Storm run in a clustered environment. Similar to Hadoop, it has two types of nodes:

  1. Master node: This node runs a daemon process called ‘Nimbus’. Nimbus is responsible for distributing code or the toplogy (spouts+bolts) across the cluster, assigning tasks to worker nodes, and monitoring the success and failure of units of work.
  2. Worker node: Worker node has a node called as daemon process called the ‘Supervisor’. A Supervisor is responsible to starts and stops worker processes. Each worker process executes a subset of a topology, so that the execution of a topology is spread across a different worker processes that are running on cluster.

Storm leverages ZooKeeper to maintain the communication between Nimbus and Supervisor. Nimbus communicates to Supervisor by passing messages to Zookeeper. Zookeeper maintain the complete state of toplogy so that Nimbus and Supervisors to be fail-fast and stateless.

Storm mode
1. Local mode: In local mode, Storm executes topologies completely in-process by simulating worker nodes using threads.
2. Distributed mode: Runs across the cluster of machines.