Rosetta Stone: MySQL, Pig and Spark (Basics)

Posted by

In a world where new data processing languages appear every day, it can be helpful to have tutorials explaining language characteristics in detail from the ground up.  This blog post is not such a tutorial.   It also isn’t a tutorial on getting started with MySQL or Hadoop, nor is it a list of best practices for the various languages I’ll reference here – there are bound to be better ways to accomplish certain tasks, and where a choice was required, I’ve emphasized clarity and readability over performance.  Finally, this isn’t meant to be a quickstart for SQL experts to access Hadoop – there are a number of SQL interfaces to Hadoop such as Impala or Hive that make Hadoop incredibly accessible to those with existing SQL skills.

Instead, this post is a pale equivalent of the Rosetta Stone – examples of identical concepts expressed in three different languages:  SQL (for MySQL), Pig and Spark.  These are the exercises I’ve worked through in order to help think in Pig and Spark as fluently as I think in SQL, and I’m recording this experience in a blog post for my own benefit.  I expect to reference it periodically in my own future work in Pig and Spark, and if it benefits anybody else, great.

Setting up

To make life easy, I’m using the Cloudera Quickstart VM to execute all of these examples.  This is easy to download and run using Oracle VirtualBox, and the image provided also conveniently includes MySQL.  For the most accessible Hadoop experience, I’m using Cloudera Manager, though my examples will be from the command-line clients, and you can stand up CDH without Cloudera Manager if RAM is limited.

MySQL Data

I’m using the employees sample database, which is available from Launchpad here.  This database was cultivated by Giuseppe Maxia (thanks!) and is familiar to many MySQL users.  To download and install it in the Cloudera VM, you can use the following commands:

wget https://launchpadlibrarian.net/24493586/employees_db-full-1.0.6.tar.bz2
bzip2 -d employees_db-full-1.0.6.tar.bz2
cd employees_db
mysql -uroot -pcloudera < employees.sql

Hadoop Data

Once the data is in MySQL, getting it copied into Hadoop is simple with Sqoop:

sqoop import-all-tables \
  --connect jdbc:mysql://localhost/employees \
  --username root \
  --password cloudera \
  --warehouse-dir /example/employees \
  --fields-terminated-by "\t"

The above Sqoop command imports all the tables from the employees database into HDFS using tab delimiters.  With the data in both MySQL and HDFS, we’re ready to get started.

Example Scripts

The examples shown below can be downloaded here.  This .ZIP file contains the SQL, Pig and Spark examples in separate text files for easy copy-and-paste.

Basic data retrieval

This is the most fundamental data processing funtion:  retrieving data from storage and printing it out in full:

MySQL

mysql> select * from departments;
+---------+--------------------+
| dept_no | dept_name          |
+---------+--------------------+
| d009    | Customer Service   |
| d005    | Development        |
| d002    | Finance            |
| d003    | Human Resources    |
| d001    | Marketing          |
| d004    | Production         |
| d006    | Quality Management |
| d008    | Research           |
| d007    | Sales              |
+---------+--------------------+

Pig

To do the same thing in Pig, you need to issue multiple statements.  Pig is procedural – while SQL describes the output you’d like to see, a Pig script walks through the steps required to produce it.  While the underlying engines are naturally different, I like having to think through the steps that a query engine has to execute in order to process a query.  Pig makes you do that to a large extent, while MySQL hides those steps.

It’s worth noting that in both Pig and Spark examples shown, I’ve left out extensive logging messages which get displayed during execution.  I originally used ellipses to indicate what I cut, but that’s also the continuation prompt for multi-line Spark statements in the pyspark shell, so I’ve just left a blank line instead.

grunt> A = LOAD '/example/employees/departments'
>> USING PigStorage('\t');
grunt> DUMP A;

(d007	Sales)
(d008	Research)
(d009	Customer Service)
(d005	Development)
(d006	Quality Management)
(d003	Human Resources)
(d004	Production)
(d001	Marketing)
(d002	Finance)

Spark

In my examples, I’ll be using Python, though Scala is an equally viable alternative. Those new to Python may need an explanation of lambda functions, but a simple working definition is that they are anonymous functions where the argument is the identifier after “lambda”.  Like Pig, multiple statements are used to execute the operation:

Using Python version 2.6.6 (r266:84292, Feb 22 2013 00:00:18)
SparkContext available as sc, HiveContext available as sqlContext.
>>> dept_files = "/example/employees/departments/"
>>> results = sc.textFile(dept_files) \
... .map(lambda line: line.split('\t')) \
... .collect()

>>> for dept in results: print dept
... 
[u'd001', u'Marketing']
[u'd002', u'Finance']
[u'd003', u'Human Resources']
[u'd004', u'Production']
[u'd005', u'Development']
[u'd006', u'Quality Management']
[u'd007', u'Sales']
[u'd008', u'Research']
[u'd009', u'Customer Service']

It’s certainly possible to express the above Spark script as a single chained command – I think it’s a bit more readable when broken up.

Projections

We often need to limit the columns returned from a command, and this is done as a projection.  The following example collection returns the same rows as the first, but only the department name (not ID):

MySQL

mysql> select dept_name from departments;
+--------------------+
| dept_name          |
+--------------------+
| Customer Service   |
| Development        |
| Finance            |
| Human Resources    |
| Marketing          |
| Production         |
| Quality Management |
| Research           |
| Sales              |
+--------------------+

Pig

This is a good example of Hadoop schema-on-read – we define the data columns when we do the LOAD statement.  The previous example simply broke the columns apart by the tab separator; this example defines the column name and type, making subsequent reference easier.

grunt> A = LOAD '/example/employees/departments'
>> USING PigStorage('\t')
>> AS (deptid:chararray, dept:chararray);
grunt> B = FOREACH A GENERATE dept;
grunt> DUMP B;

(Sales)
(Research)
(Customer Service)
(Development)
(Quality Management)
(Human Resources)
(Production)
(Marketing)
(Finance)

Spark

Unlike the Pig script above, there are no column aliases.  This results in lists, which can get nested with operations like joins (shown later).

>>> dept_files = "/example/employees/departments/"
>>> results = sc.textFile(dept_files) \
... .map(lambda line: line.split('\t')) \
... .map(lambda depts: depts[1]) \
... .collect()
>>> for dept in results: print dept
...

Marketing
Finance
Human Resources
Production
Development
Quality Management
Sales
Research
Customer Service

Filtering

Just like projections eliminate columns we don’t need, filtering eliminates rows that don’t match specific criteria.

MySQL

mysql> SELECT * FROM departments
    -> WHERE dept_name NOT LIKE '%a%';
+---------+------------------+
| dept_no | dept_name        |
+---------+------------------+
| d009    | Customer Service |
| d005    | Development      |
| d004    | Production       |
+---------+------------------+

Pig

grunt> A = LOAD '/example/employees/departments'
>> USING PigStorage('\t')
>> AS (deptid:chararray, dept:chararray);
grunt> B = FILTER A BY NOT (dept MATCHES '.*a.*');
grunt> DUMP B;
(d009,Customer Service)
(d005,Development)
(d004,Production)

Spark

>>> dept_files = "/example/employees/departments/"
>>> results = sc.textFile(dept_files) \
... .map(lambda line: line.split('\t')) \
... .filter(lambda depts: "a" not in depts[1]) \
... .collect()

>>> for dept in results: print dept
... 
[u'd004', u'Production']
[u'd005', u'Development']
[u'd009', u'Customer Service']

Ordering and Limiting

Two concepts in one! Ordering and limiting result sets are often paired together to get answers to questions like, “what are the top 5 selling products?” This example set is a bit more contrived, asking the first five departments, alphabetically ranked by name.

MySQL

mysql> SELECT * FROM departments
    -> ORDER BY dept_name LIMIT 5;
+---------+------------------+
| dept_no | dept_name        |
+---------+------------------+
| d009    | Customer Service |
| d005    | Development      |
| d002    | Finance          |
| d003    | Human Resources  |
| d001    | Marketing        |
+---------+------------------+

Pig

grunt> A = LOAD '/example/employees/departments'
>> USING PigStorage('\t')
>> AS (deptid:chararray, dept:chararray);
grunt> B = ORDER A BY dept;
grunt> C = LIMIT B 5;
grunt> DUMP C;

(d009,Customer Service)
(d005,Development)
(d002,Finance)
(d003,Human Resources)
(d001,Marketing)

Spark

>>> dept_files = "/example/employees/departments/"
>>> results = sc.textFile(dept_files) \
... .map(lambda line: line.split('\t')) \
... .takeOrdered(5, key=lambda dept: dept[1])

>>> for dept in results: print dept
... 
[u'd009', u'Customer Service']
[u'd005', u'Development']
[u'd002', u'Finance']
[u'd003', u'Human Resources']
[u'd001', u'Marketing']

Aggregation

Counting stuff!  At its most basic, finding the distinct values for a given field and the number of times each value appears.

MySQL

mysql> SELECT COUNT(*), gender 
    -> FROM employees 
    -> GROUP BY gender;
+----------+--------+
| COUNT(*) | gender |
+----------+--------+
|   179973 | M      |
|   120051 | F      |
+----------+--------+

Pig

I sometimes lose track of how Pig transforms bags during aggregation or join operations – for example, it was originally difficult for me to remember that GROUP BY produces a bag with a key named “group”, and a value which is a list of bags matching the group value.  Doing DESCRIBE B shows the structure of the B bag.  I’ve left this useful debugging step in to demonstrate.

grunt> A = LOAD '/example/employees/employees'
>> USING PigStorage('\t')
>> AS ( 
>>   empid:int, 
>>   birth_date:chararray,
>>   first_name:chararray,
>>   last_name:chararray,
>>   gender:chararray,
>>   hire_date:chararray
>>   );
grunt> B = GROUP A BY gender;
grunt> DESCRIBE B;
B: {group: chararray,A: {(empid: int,birth_date: chararray,first_name: chararray,last_name: chararray,gender: chararray,hire_date: chararray)}}
grunt> C = FOREACH B GENERATE group, COUNT(A);
grunt> DUMP C;

(F,120051)
(M,179973) 

Spark

>>> emp_files = "/example/employees/employees/"
>>> results = sc.textFile(emp_files) \
... .map(lambda line: (line.split('\t')[4])) \
... .countByKey().items() 

>>> for gender in results: print gender
... 
(u'M', 179973)
(u'F', 120051)

Joining Data Sets

We’re using data stored in relational formats here, but the same principles apply to less-structured, bigger data.  Whether joining account data to web logs, or working from RDBMS-sourced data as shown here, combining data sets makes data powerful.  In the example below, we’re looking for the 5 highest-paid current employees, and the department in which they work.

MySQL

mysql> SELECT e.first_name, e.last_name, d.dept_name, s.salary
    -> FROM employees e
    -> JOIN dept_emp de ON (e.emp_no = de.emp_no)
    -> JOIN departments d ON (de.dept_no = d.dept_no)
    -> JOIN salaries s ON (s.emp_no = e.emp_no)
    -> WHERE s.to_date = '9999-01-01'
    -> AND de.to_date = '9999-01-01'
    -> ORDER BY s.salary DESC
    -> LIMIT 5;
+------------+-----------+-----------+--------+
| first_name | last_name | dept_name | salary |
+------------+-----------+-----------+--------+
| Tokuyasu   | Pesch     | Sales     | 158220 |
| Honesty    | Mukaidono | Sales     | 156286 |
| Xiahua     | Whitcomb  | Sales     | 155709 |
| Sanjai     | Luders    | Sales     | 155513 |
| Tsutomu    | Alameldin | Sales     | 155190 |
+------------+-----------+-----------+--------+

Pig

grunt> EMP = LOAD '/example/employees/employees'
>> USING PigStorage('\t')
>> AS ( 
>>   empid:int, 
>>   birth_date:chararray,
>>   first_name:chararray,
>>   last_name:chararray,
>>   gender:chararray,
>>   hire_date:chararray
>>   );
grunt> DEPT = LOAD '/example/employees/departments'
>> USING PigStorage('\t')
>> AS (deptid:chararray, dept:chararray);
grunt> 
grunt> DE = LOAD '/example/employees/dept_emp'
>> USING PigStorage('\t')
>> AS ( 
>>   empid:int,
>>   deptid:chararray,
>>   from_date:chararray,
>>   to_date:chararray
>>   );
grunt> 
grunt> SAL = LOAD '/example/employees/salaries'
>> USING PigStorage('\t')
>> AS ( 
>>   empid:int,
>>   salary:int,
>>   from_date:chararray,
>>   to_date:chararray
>>   );
grunt> SAL_CUR = FILTER SAL BY (to_date == '9999-01-01');
grunt> EMP_CUR = FILTER DE BY (to_date == '9999-01-01');
grunt> SAL_CUR_P = FOREACH SAL_CUR GENERATE empid, salary;
grunt> EMP_CUR_P = FOREACH EMP_CUR GENERATE empid, deptid;
grunt> EMP_P = FOREACH EMP GENERATE 
>>   empid, 
>>   first_name, 
>>   last_name;
grunt> EMP_P = FOREACH EMP GENERATE 
>>   empid, 
>>   first_name, 
>>   last_name;
grunt> EMP_SAL = JOIN SAL_CUR_P BY empid, EMP_P BY empid;
grunt> DEPT_EMP = JOIN EMP_CUR_P BY deptid, DEPT BY deptid;
grunt> DEPT_EMP_P = FOREACH DEPT_EMP GENERATE 
>>   EMP_CUR_P::empid AS empid,
>>   DEPT::dept AS dept;
grunt> EMP_SAL_P = FOREACH EMP_SAL GENERATE 
>>   SAL_CUR_P::empid AS empid, 
>>   SAL_CUR_P::salary AS salary,
>>   EMP_P::first_name AS first_name,
>>   EMP_P::last_name AS last_name;
grunt> JOINED = JOIN EMP_SAL_P BY empid, DEPT_EMP_P BY empid;
grunt> ORDERED = ORDER JOINED BY EMP_SAL_P::salary DESC;
grunt> TOP_EARNERS = LIMIT ORDERED 5;
grunt> RESULTS = FOREACH TOP_EARNERS GENERATE
>>   EMP_SAL_P::first_name AS first_name,
>>   EMP_SAL_P::last_name AS last_name,
>>   DEPT_EMP_P::dept AS dept,
>>   EMP_SAL_P::salary AS salary;
grunt> DUMP RESULTS;

(Tokuyasu,Pesch,Sales,158220)
(Honesty,Mukaidono,Sales,156286)
(Xiahua,Whitcomb,Sales,155709)
(Sanjai,Luders,Sales,155513)
(Tsutomu,Alameldin,Sales,155190)

Spark

This is where the multi-dimensional arrays referenced earlier are demonstrated.  It can be a bit of a headache to remember how the RDDs are constructed from previous transformations – using RDD.take(1) to print out a sample can prove useful in determining the organization, in the same way DESCRIBE <bag> was used earlier for Pig.

 
>>> dept_files = "/example/employees/departments/"
>>> de_files = "/example/employees/dept_emp/"
>>> emp_files = "/example/employees/employees/"
>>> salary_files = "/example/employees/salaries/"
>>> 
>>> current_salaries = sc.textFile(salary_files) \
... .map(lambda line: line.split('\t')) \
... .filter(lambda row: row[3] == '9999-01-01') \
... .map(lambda row: (int(row[0]), int(row[1])))

>>> employees = sc.textFile(emp_files) \
...  .map(lambda line: line.split('\t')) \
...  .map(lambda row: (int(row[0]), (row[2], row[3]))) \
...  .join(current_salaries)

... departments =  sc.textFile(dept_files) \
...  .map(lambda line: line.split('\t'))

>>> dept_emps = sc.textFile(de_files) \
... .map(lambda line: line.split('\t')) \
... .filter(lambda row: row[3] == '9999-01-01') \
... .map(lambda row: (row[1], int(row[0]))) \
... .join(departments) \
... .values()

>>> results = dept_emps.join(employees) \
... .takeOrdered(5, key=lambda row: -row[1][1][1]) 

>>> for emp in results: 
...  print "%15s %15s %15s %15d" % \
...       (emp[1][1][0][0], \
...        emp[1][1][0][1], \
...        emp[1][0], \
...        emp[1][1][1])
... 
       Tokuyasu           Pesch           Sales          158220
        Honesty       Mukaidono           Sales          156286
         Xiahua        Whitcomb           Sales          155709
         Sanjai          Luders           Sales          155513
        Tsutomu       Alameldin           Sales          155190

Aggregated Joins

This is a little more involved than counting – we’re joining different data sources to compute the average salary of current employees by department.

MySQL

mysql> SELECT d.dept_name, AVG(s.salary)
    -> FROM departments d
    -> JOIN dept_emp de ON (d.dept_no = de.dept_no)
    -> JOIN salaries s ON (s.emp_no = de.emp_no)
    -> WHERE s.to_date = '9999-01-01'
    -> AND de.to_date = '9999-01-01'
    -> GROUP BY d.dept_name;
+--------------------+---------------+
| dept_name          | AVG(s.salary) |
+--------------------+---------------+
| Customer Service   |    67285.2302 |
| Development        |    67657.9196 |
| Finance            |    78559.9370 |
| Human Resources    |    63921.8998 |
| Marketing          |    80058.8488 |
| Production         |    67843.3020 |
| Quality Management |    65441.9934 |
| Research           |    67913.3750 |
| Sales              |    88852.9695 |
+--------------------+---------------+

Pig

grunt> DEPT = LOAD '/example/employees/departments'
>> USING PigStorage('\t')
>> AS (deptid:chararray, dept:chararray);
grunt> DE = LOAD '/example/employees/dept_emp'
>> USING PigStorage('\t')
>> AS ( 
>>   empid:int,
>>   deptid:chararray,
>>   from_date:chararray,
>>   to_date:chararray
>>   );
grunt> SAL = LOAD '/example/employees/salaries'
>> USING PigStorage('\t')
>> AS ( 
>>   empid:int,
>>   salary:int,
>>   from_date:chararray,
>>   to_date:chararray
>>   );
grunt> SAL_CUR = FILTER SAL BY (to_date == '9999-01-01');
grunt> EMP_CUR = FILTER DE BY (to_date == '9999-01-01');
grunt> SAL_CUR_P = FOREACH SAL_CUR GENERATE empid, salary;
grunt> EMP_CUR_P = FOREACH EMP_CUR GENERATE empid, deptid;
grunt> DEPT_EMP = JOIN EMP_CUR_P BY deptid, DEPT BY deptid;
grunt> DEPT_EMP_P = FOREACH DEPT_EMP GENERATE 
>>   EMP_CUR_P::empid AS empid,
>>   DEPT::dept AS dept;
grunt> JOINED = JOIN SAL_CUR_P BY empid, DEPT_EMP_P BY empid;
grunt> GROUPED = GROUP JOINED BY DEPT_EMP_P::dept;
grunt> GROUPED = GROUP JOINED BY DEPT_EMP_P::dept;
grunt> RESULTS = FOREACH GROUPED GENERATE
>>   group AS department,
>>   AVG(JOINED.SAL_CUR_P::salary) AS avg_salary;
grunt> DUMP RESULTS;
(Sales,88852.96947030583)
(Finance,78559.93696228994)
(Research,67913.374975714)
(Marketing,80058.84880743835)
(Production,67843.30198484166)
(Development,67657.91955820545)
(Human Resources,63921.89982943092)
(Customer Service,67285.2301781547)
(Quality Management,65441.99340024749)

Spark

In the following example, I defined a function avg(), which takes a list as an argument and calculated the average of the values in the list.  This highlights how easy it is to write transformation functions – it would be relatively easy to calculate the median value as well (something that’s far harder in MySQL).

>>> dept_files = "/example/employees/departments/"
>>> de_files = "/example/employees/dept_emp/"
>>> salary_files = "/example/employees/salaries/"
>>> current_salaries = sc.textFile(salary_files) \
... .map(lambda line: line.split('\t')) \
... .filter(lambda row: row[3] == '9999-01-01') \
... .map(lambda row: (int(row[0]), int(row[1])))
>>> departments =  sc.textFile(dept_files) \
... .map(lambda line: line.split('\t'))
>>> dept_emps = sc.textFile(de_files) \
... .map(lambda line: line.split('\t')) \
... .filter(lambda row: row[3] == '9999-01-01') \
... .map(lambda row: (row[1], int(row[0]))) \
... .join(departments) \
... .values()
>>> def avg(l):
...   return float(sum(l))/len(l)
... 
>>> result = dept_emps.join(current_salaries) \
... .map(lambda data: (data[1][0], data[1][1])) \
... .groupByKey() \
... .map(lambda r: (r[0], avg(r[1]))) \
... .sortBy(lambda row: row[0]) \
... .collect()

>>> for row in result:
...  print "%20s %5.4f" % row
... 
    Customer Service 67285.2302
         Development 67657.9196
             Finance 78559.9370
     Human Resources 63921.8998
           Marketing 80058.8488
          Production 67843.3020
  Quality Management 65441.9934
            Research 67913.3750
               Sales 88852.9695

Leave a Reply

Your email address will not be published. Required fields are marked *