Big Data Processing with Hadoop and Hive on top of the Google DataProc Service Offering
Business Scenario
One of our clients wanted us to evaluate Hadoop for data processing of about 5000 Shops. The objective of this exercise is to establish if the current processing time (using legacy RDBMS technology) can be reduced using Hadoop. We can leverage the data from OBIEE / Looker / PowerBI for reporting purposes. We can say we are impressed with google's offerings as everything can be spun up in minutes with little or no configuration of hardware or software.
The technologies utilised during this exercise are
- Linux on Google Storage
- Hadoop / HDFS – Map Reduce / Yarn
- Hive
High level Test Scenario
Process 20 million records of data into Hive Hadoop HDFS and make the data available for queries
We created the data in the csv format, move this csv file(s) to a file server (Sha has fused a Google Bucket to the local Hadoop file system, we will detail this in another blog shortly), created an external hive table to access this data, created a hive local ORC table for subsequent processing
Overview of Hadoop Server – We were using one Resource Master with two worker nodes in a clustered environment within Google Dataproc Services.
We preferred Hive over spark as the volumes are not very high and the client’s IT department is quite familiar with RDBMS technologies.
Detailed Steps
The real world scenario consists of a denormalised reporting table with 110 columns but for this test scenario we took about with The csv File Layout contains the following columns
COST_CENTRE int,
SUB_COST_CENTRE int,
VERSION_NUMBER int,
TRANSACTION_NUMBER int,
TRANSACTION_SUB_SECTION int,
TRANSACTION_LINE int,
GROSS_PROFIT float,
PROCESS_DATE int
Step 1: Transfer the source file
Move the test.csv file to the file system. The directory I used is /deliverbi/gcs_ftp/FTP_IN/test.csv
Step 2: Invoke hive shell by typing in hive at the command line
You will see the hive prompt. Type in
Show databases;
and press enter and you should see all the databases available
I am going to use poc_hadoop database that is created for this evaluation purpose. You can create a database with the following command
create database poc_hadoop;
Now, I want to make the poc_hadoop database as the default working database. We can achieve this by
Use poc_hadoop;
Now Create an external table referring to the location where the test csv file is available. Notice we gave the file directory and not the file name. Data from any files that are dropped into this directory (of course the csv file columns should be identical) will be available in Hive directly when you query this external table
You should see the OK after the command is executed successfully
You can get more details about any table in hive with the following command
describe formatted DELIVERBI_EXTERNAL_T1_F;
Issue the following command to see how many records this external table contains and you can see just over 20 million records
Select count(*) from DELIVERBI_EXTERNAL_T1_F;
So we now can access in Hive the csv file data which is stored on a Linux drive
We will create a Hive internal table and populate it from the above external table
Issue the following command at Hive prompt
CREATE TABLE DELIVERBI_T1_F
(COST_CENTRE int, SUB_COST_CENTRE int, VERSION_NUMBER int,
TRANSACTION_NUMBER int, TRANSACTION_SUB_SECTION int, TRANSACTION_LINE int,GROSS_PROFIT float, PROCESS_DATE int)
row format delimited fields terminated by ',' stored as ORC;
You would have also noticed that I used ‘ORC’ in the stored as clause. This is a Hive internal format which is highly efficient. The various types of tables that you can create in Hive are textfile, sequencefile, rcfile etc. We will discuss about these in later blog posts
You can notice that there are no headings in the output. Execute the following set statement to show the column headers
set hive.cli.print.header=true;
Then run the query again and you can see the column headers this time
Now that we know we are retrieving the required data from external table, execute the following to insert the data. Ensure the limit is removed. If not, we end up having only 5 rows in the internal table
insert into table DELIVERBI_T1_F select cost_centre,sub_cost_centre,version_number,transaction_number,transaction_sub_section,transaction_line,gross_profit,process_date from deliverbi_external_t1_f;
The records are inserted. You can check the record count is about 20 million by issuing
select count(*) from DELIVERBI_T1_F;
We will now create a Hive internal partitioned table and populate it from the above non partitioned internal table
CREATE TABLE DELIVERBI_PART_T1_F
(SUB_COST_CENTRE int, VERSION_NUMBER int, TRANSACTION_NUMBER int, TRANSACTION_SUB_SECTION int, TRANSACTION_LINE int, GROSS_PROFIT float,PROCESS_DATE int)
PARTITIONED BY (COST_CENTRE int)
row format delimited fields terminated by ',' stored as ORC;
For those of you who are familiar with Oracle partitioned tables, you would notice that Oracle expects you to specify partitioned column from the list of available columns. Where as Hive expects the partitioned column mentioned only once in the create statement.
Like Oracle you can partition Hive table by multiple columns. Just add the sub partition column within the partitioned by clause. As simple as that
You should see the OK after the table creation
Run the following command to insert the data into the new partitioned table
insert into table DELIVERBI_PART_T1_F partition(cost_centre) select sub_cost_centre,version_number,transaction_number,transaction_sub_section,transaction_line,gross_profit,process_date,cost_centre from deliverbi_t1_f;
Notice that the cost_centre column is the last one in the select statement. When inserting into a Hive partitioned table, the partitioned column should be specified as the last column in the select. If you have two columns used for partitioning then these two columns should come last (in the same order) in the select
Oh no. Got the following error (FAILED: SemanticException [Error 10096]: Dynamic partition strict mode requires at least one static partition column.
To turn this off set hive.exec.dynamic.partition.mode=nonstrict)
We partitioned this table by Cost Centre. In my test data set I have more than 1500 distinct Cost Centres and there is a default limit that has been hit resulting in error. Run the two set statements to up the limits before running the insert statement again
set hive.exec.max.dynamic.partitions.pernode=5000;
set hive.exec.max.dynamic.partitions=5000;
One last step before we start retrieving the data. Analyse the partitioned table for quicker retrieval of results
ANALYZE TABLE DELIVERBI_PART_T1_F partition (cost_centre) COMPUTE STATISTICS;
Now we are ready to query the data
Issue AN sql statement at the Hive prompt for a Non Partitioned Table
select count(*) from deliverbi_t1_f where cost_centre = 'y134';
Issue AN sql statement at the Hive prompt for a Partitioned Table
We can see that the retrieval from partitioned table is quick. Can we do anything to improve this? Well, let’s set something up. Run the following command at Hive prompt and then run the same query again
set hive.compute.query.using.stats=true;
That is a million times faster (metaphorical). You may think the results are in the memory so the query response was lightening. Let me query few more cost centres to see if the query has indeed sped up. Here is the result
It took under 5 minutes to load 20 million records into Hive internal table and that includes analysing the table data. I remember on one of the Projects it took over 20 minutes to load approx. 20 million records into Oracle database using SQL Loader (utilised the direct path insert for performance). So very pleased with what I have seen on Hadoop with Hive.
Watch this space for more blogs on Big Data, Google Big Query (Dremel), Hadoop, Hive etc. DeliverBI is moving towards Big Data and Fusion Oracle Solutions aswell as traditional RDMS technologies.
Note: only a member of this blog may post a comment.