Tuesday, 24 October 2017

Big Data Processing with Hadoop and Hive on top of the Google DataProc Service

Big Data Processing with Hadoop and Hive on top of the Google DataProc Service Offering



Business Scenario

One of our clients wanted us to evaluate Hadoop for data processing of about 5000 Shops. The objective of this exercise is to establish if the current processing time (using legacy RDBMS technology) can be reduced using Hadoop. We can leverage the data from OBIEE / Looker / PowerBI for reporting purposes. We can say we are impressed with google's offerings as everything can be spun up in minutes with little or no configuration of hardware or software.

The technologies utilised during this exercise are
  • Linux on Google Storage
  • Hadoop / HDFS – Map Reduce / Yarn
  • Hive

High level Test Scenario

Process 20 million records of data into Hive Hadoop HDFS and make the data available for queries

We created the data in the csv format, move this csv file(s) to a file server (Sha has fused a Google Bucket to the local Hadoop file system, we will detail this in another blog shortly), created an external hive table to access this data, created a hive local ORC table for subsequent processing

Overview of Hadoop Server – We were using one Resource Master with two worker nodes in a clustered environment within Google Dataproc Services.

We preferred Hive over spark as the volumes are not very high and the client’s IT department is quite familiar with RDBMS technologies.



Detailed Steps

The real world scenario consists of a denormalised reporting table with 110 columns but for this test scenario we took about with The csv File Layout contains the following columns

COST_CENTRE                  int,
SUB_COST_CENTRE                        int,
VERSION_NUMBER                        int,
TRANSACTION_NUMBER              int,
GROSS_PROFIT                 float,
PROCESS_DATE                 int


Step 1: Transfer the source file

Move the test.csv file to the file system. The directory I used is /deliverbi/gcs_ftp/FTP_IN/test.csv

Step 2: Invoke hive shell by typing in hive at the command line

You will see the hive prompt. Type in
Show databases;
and press enter and you should see all the databases available

I am going to use poc_hadoop database that is created for this evaluation purpose. You can create a database with the following command

create database poc_hadoop;

Now, I want to make the poc_hadoop database as the default working database. We can achieve this by

Use poc_hadoop;

Now Create an external table referring to the location where the test csv file is available. Notice we gave the file directory and not the file name. Data from any files that are dropped into this directory (of course the csv file columns should be identical) will be available in Hive directly when you query this external table

You should see the OK after the command is executed successfully

You can get more details about any table in hive with the following command

describe formatted DELIVERBI_EXTERNAL_T1_F;

Issue the following command to see how many records this external table contains and you can see just over 20 million records

Select count(*) from DELIVERBI_EXTERNAL_T1_F;

So we now can access in Hive the csv file data which is stored on a Linux drive

We will create a Hive internal table and populate it from the above external table

Issue the following command at Hive prompt

row format delimited fields terminated by ',' stored as ORC;

For simplicity sake, I kept the external table and the new internal table exactly the same.


You would have also noticed that I used ‘ORC’ in the stored as clause. This is a Hive internal format which is highly efficient. The various types of tables that you can create in Hive are textfile, sequencefile, rcfile etc. We will discuss about these in later blog posts
Before we copy the data from external table to this new internal table, let us first build the sql statement
select cost_centre,sub_cost_centre,version_number,transaction_number,transaction_sub_section, transaction_line,gross_profit,process_date,cost_centre from deliverbi_external_t1_f limit 5;

Run the above statement. Notice the limit 5 which is to retrieve only 5 rows.


You can notice that there are no headings in the output. Execute the following set statement to show the column headers

set hive.cli.print.header=true;
Then run the query again and you can see the column headers this time



Now that we know we are retrieving the required data from external table, execute the following to insert the data. Ensure the limit is removed. If not, we end up having only 5 rows in the internal table

insert into table DELIVERBI_T1_F select cost_centre,sub_cost_centre,version_number,transaction_number,transaction_sub_section,transaction_line,gross_profit,process_date from deliverbi_external_t1_f;


The records are inserted. You can check the record count is about 20 million by issuing

select count(*) from DELIVERBI_T1_F;


We will now create a Hive internal partitioned table and populate it from the above non partitioned internal table

row format delimited fields terminated by ',' stored as ORC;


For those of you who are familiar with Oracle partitioned tables, you would notice that Oracle expects you to specify partitioned column from the list of available columns. Where as Hive expects the partitioned column mentioned only once in the create statement.


Like Oracle you can partition Hive table by multiple columns. Just add the sub partition column within the partitioned by clause. As simple as that


You should see the OK after the table creation



Run the following command to insert the data into the new partitioned table

insert into table DELIVERBI_PART_T1_F partition(cost_centre) select sub_cost_centre,version_number,transaction_number,transaction_sub_section,transaction_line,gross_profit,process_date,cost_centre from deliverbi_t1_f;


Notice that the cost_centre column is the last one in the select statement. When inserting into a Hive partitioned table, the partitioned column should be specified as the last column in the select. If you have two columns used for partitioning then these two columns should come last (in the same order) in the select


Oh no. Got the following error (FAILED: SemanticException [Error 10096]: Dynamic partition strict mode requires at least one static partition column.
To turn this off set hive.exec.dynamic.partition.mode=nonstrict)


For now, you can fix this by issuing the following set statement and then run the insert statement again
set hive.exec.dynamic.partition.mode=nonstrict
You expect to see OK but another error (Caused by: org.apache.hadoop.hive.ql.metadata.HiveFatalException: [Error 20004]: Fatal error occurred when node tried to create too many dynamic partitions.)




We partitioned this table by Cost Centre. In my test data set I have more than 1500 distinct Cost Centres and there is a default limit that has been hit resulting in error. Run the two set statements to up the limits before running the insert statement again
set hive.exec.max.dynamic.partitions.pernode=5000;
set hive.exec.max.dynamic.partitions=5000;




One last step before we start retrieving the data. Analyse the partitioned table for quicker retrieval of results



Now we are ready to query the data

Issue AN sql statement at the Hive prompt for a Non Partitioned Table

select count(*) from deliverbi_t1_f where cost_centre = 'y134';



 Issue AN sql statement at the Hive prompt for a Partitioned Table

select count(*) from deliverbi_part_t1_f where cost_centre = 'y134';


We can see that the retrieval from partitioned table is quick. Can we do anything to improve this? Well, let’s set something up. Run the following command at Hive prompt and then run the same query again


set hive.compute.query.using.stats=true;


That is a million times faster (metaphorical). You may think the results are in the memory so the query response was lightening. Let me query few more cost centres to see if the query has indeed sped up. Here is the result




It took under 5 minutes to load 20 million records into Hive internal table and that includes analysing the table data. I remember on one of the Projects it took over 20 minutes to load approx. 20 million records into Oracle database using SQL Loader (utilised the direct path insert for performance). So very pleased with what I have seen on Hadoop with Hive.


Watch this space for more blogs on Big Data, Google Big Query (Dremel), Hadoop, Hive etc. DeliverBI is moving towards Big Data and Fusion Oracle Solutions aswell as traditional RDMS technologies.





  1. I really appreciate the information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in #APACHE SPARK & SCALA, kindly Contact MaxMunus
    MaxMunus Offer World Class Virtual Instructor-led training on #APACHE SPARK & SCALA,. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 1,00,000 + training in India, USA, UK, Australia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain, and UAE etc.
    Avishek Priyadarshi
    Skype id: avishek_2.
    Ph:(0) 8553177744 / 080 - 41103383

  2. Really Good blog post.provided a helpful information.I hope that you will post more updates like thisBig data hadoop online Training Hyderabad

  3. Excellent article. Very interesting to read. I really love to read such a nice article. Thanks! keep rocking.
    Big data hadoop online training India


Google Big Query Clone / Copy DataSets / Tables - PROD --> TEST --> DEV Projects

Google Big Query Cloning of Datasets & Tables across GCS Projects. We searched the internet and could not find a simple cloning / ...