DELIVERBI Blog OBIEE , OBIA , ETL & Big Data

Translate

Tuesday, 31 October 2017

Scripting Hive Commands with Python

 

Scripting Hive Commands with Python

 













In the previous posts, we touched upon basic data processing using Hive. But it is all interactive. We will discuss how to script these Hive commands using Python.

We will start with a very basic python script and add more functionality to it by the time we reach the end of this post.

We will ensure the environment is setup correctly before getting into the scripting.

If running Python 3 or below then install the following packages

pip install pyhs2
pip install thrift_sasl=0.2.1
pip install sasl==0.2.1


if running python 3 and above you might face SASL errors , in this case turn SASL off in hive and follow the below method :)

On Error - Tsocket error with Python 3+ , Or  Could not start SASL , Or TProtocol error related to SASL...

pip3 install sasl 
pip3 install thrift 
pip3 install thrift-sasl 
pip3 install PyHive


Turn SASL off in Hive and use the pyhive python libraries
edit the hive site xml file guide path : /etc/hive/conf.dist/hive-site.xml
add the following entry below  in the file and save.

<property>
   <name>hive.server2.authentication</name>
   <value>NOSASL</value>
</property>


Restart the Hive 2 Server so that the new setting can take affect.

To start HiveServer2:
$ sudo service hive-server2 start


To stop HiveServer2:
$ sudo service hive-server2 stop



We are good to start our scripting part
For this post, we have created a Hive table table_test with two columns and loaded with very few records.
The first script we will attempt to create will do – Connect to Hive, Execute a Hive command – a select statement and show the output on screen
We are using Python 3.4.2 version and will be utilising pyhive for python

Hive Output to Screen
 
I have saved the following code as hive_data_print.py file on Linux


from pyhive import hive
conn = hive.Connection(host="43.32.0.85", auth='NOSASL',database='poc_hadoop')
cursor = conn.cursor()
cursor.execute("SELECT * FROM table_test")
for result in cursor.fetchall():
  print(result)


The first line of the code imports the python library that we are going to be using for this post and going forward.
 
We define a connection handler to the Hive Database poc_hadoop and initiate an instance of this connection (lines 2 & 3)

Using this connection handle we then execute a simple Hive Command (Select Query in this case)
 
We loop and print row by row (We could do this as the data set is small) – Lines 5 & 6. Note the Print syntax could be different depending on the version of Python you are using
 
I run the script as python3 hive_data_print.py
 
And you should see the rows displayed on the screen

















That is our very first Python script. Of course it is very simple and shows the results on to the screen. Now in real life situations this is of very limited use. We typically want to store the output of the query to a file. Well, we will add functionality to our code to achieve this

 

Hive Output to Text File
 
Save the following lines of code as hive_data_to_file_simple.py file on Linux


from pyhive import hive
conn = hive.Connection(host="43.32.0.85", auth='NOSASL',database='poc_hadoop')
cursor = conn.cursor()
cursor.execute("SELECT * FROM table_test")
with open('test.txt', 'w') as f:
  for results in cursor.fetchall():
    f.write("%s\n" % str(results))



Notice there is no change to import the library, create the connection handler (lines 1 – 4).

This time we will open a file (in the current directory, but if you specify the full path, you can create the file in any location you wish) and loop through the results and save line by line – Lines 5-7
 
When I execute this code as python3 hive_data_to_file_simple.py I see a file created in my current directory with the data

 
Hive Output to csv File using pandas
 
Now we will try to enhance our script to be more versatile using panda data frames. You need to install pandas for this, which Shahed has done using

pip3 pandas


Save the following lines of code as hive_data_to_file_pandas.py file on Linux

from pyhive import hive
import pandas as pd 
conn = hive.Connection(host="43.32.0.85", auth='NOSASL',database='poc_hadoop')
df = pd.read_sql("SELECT * FROM table_test", conn)
df.to_csv('test.csv', index=False, header=False)

When you run this script the output will be stored as csv in the current directory
 
Pandas is one of the most widely used Python libraries in data science. It is mainly used for data wrangling as it is very powerful and flexible, among many other things.

So now we know how to script Hive commands. You can export the results to file system, Google Bucket (we will cover this more in depth in future posts) or any other data source if you have the right libraries.

 
Bye for now from Krishna and Shahed


 

Sunday, 29 October 2017

Hive Big Data Commands Reference




DELIVERBI Big Data Hive Command Reference

 
 

Today we are sharing some of the frequently used Hive commands and settings that will come handy

To see the status of the Hive Server2 and also view the logs etc. you can use the following URL

http://<IPAddresses>:10002/hiveserver2.jsp



 
 
 
 
 
 
 
 
 
 
 
 
 
 
As you can see here, this URL allows you to view the configuration among other things.

To List Databases use show databases;

To create a new database in the default location use create database <db_name>;

To create a new database in specified location – create database db_final location '/storate/<db_name>'

To drop a database use drop database <db_name>;

To start using a database –  use <db_name>;

show tables; for listing the tables available in the current database

For gathering table statistics analyze table deliverbi_part_t1_f partition (cost_centre) COMPUTE STATISTICS;

To locate the storage directory – set hive.metastore.warehouse.dir;

To locate the storage directory along with other information for a specific table – describe formatted deliverbi_part_t1_f;

To show the column headers – set hive.cli.print.header=true;

To stop showing column headers, set the above property with value false

You can see the explain plan for a specific query using the following

explain select count(*) from deliverbi_t1_f;

You can adjust the number of reducers using SET mapreduce.job.reduces=5;

We will talk about sizing approach to work out optimum number of mappers & reducers in future posts.

To enable dynamic inserts into a partitioned table set hive.exec.dynamic.partition.mode=nonstrict;

For increasing the number of partitions from the default value of 101 use the following set statements

set hive.exec.max.dynamic.partitions.pernode=5000;

set hive.exec.max.dynamic.partitions=5000;

You can enable the Cost Based Optimiser using set hive.cbo.enable=true;

Below are some of the performance influencing settings. You need to play around a bit to work out the best combination of these settings that fits your specific setup

set hive.compute.query.using.stats=true; 

set hive.stats.fetch.column.stats=true;

set hive.stats.fetch.partition.stats=true;

set hive.vectorized.execution.enabled=true;

set hive.vectorized.execution.reduce.enabled = true;

set hive.vectorized.execution.reduce.groupby.enabled = true;

set hive.exec.parallel=true;

set hive.exec.parallel.thread.number=16;

 

Having covered the basics of Hive now, in future posts we will touch upon a bit of scripting using Python to execute Hive commands.

Bye for now – Krishna and Shahed

Tuesday, 24 October 2017

Big Data Processing with Hadoop and Hive on top of the Google DataProc Service


Big Data Processing with Hadoop and Hive on top of the Google DataProc Service Offering

 


 

Business Scenario

One of our clients wanted us to evaluate Hadoop for data processing of about 5000 Shops. The objective of this exercise is to establish if the current processing time (using legacy RDBMS technology) can be reduced using Hadoop. We can leverage the data from OBIEE / Looker / PowerBI for reporting purposes. We can say we are impressed with google's offerings as everything can be spun up in minutes with little or no configuration of hardware or software.

The technologies utilised during this exercise are
  • Linux on Google Storage
  • Hadoop / HDFS – Map Reduce / Yarn
  • Hive

High level Test Scenario


Process 20 million records of data into Hive Hadoop HDFS and make the data available for queries

We created the data in the csv format, move this csv file(s) to a file server (Sha has fused a Google Bucket to the local Hadoop file system, we will detail this in another blog shortly), created an external hive table to access this data, created a hive local ORC table for subsequent processing

Overview of Hadoop Server – We were using one Resource Master with two worker nodes in a clustered environment within Google Dataproc Services.

We preferred Hive over spark as the volumes are not very high and the client’s IT department is quite familiar with RDBMS technologies.



 

 
 
 

Detailed Steps


The real world scenario consists of a denormalised reporting table with 110 columns but for this test scenario we took about with The csv File Layout contains the following columns

COST_CENTRE                  int,
SUB_COST_CENTRE                        int,
VERSION_NUMBER                        int,
TRANSACTION_NUMBER              int,
TRANSACTION_SUB_SECTION     int,
TRANSACTION_LINE        int,
GROSS_PROFIT                 float,
PROCESS_DATE                 int

 

Step 1: Transfer the source file

Move the test.csv file to the file system. The directory I used is /deliverbi/gcs_ftp/FTP_IN/test.csv

Step 2: Invoke hive shell by typing in hive at the command line

You will see the hive prompt. Type in
Show databases;
and press enter and you should see all the databases available



 
 
 
 
I am going to use poc_hadoop database that is created for this evaluation purpose. You can create a database with the following command

create database poc_hadoop;

Now, I want to make the poc_hadoop database as the default working database. We can achieve this by

Use poc_hadoop;



 
 
 
Now Create an external table referring to the location where the test csv file is available. Notice we gave the file directory and not the file name. Data from any files that are dropped into this directory (of course the csv file columns should be identical) will be available in Hive directly when you query this external table













You should see the OK after the command is executed successfully

You can get more details about any table in hive with the following command

describe formatted DELIVERBI_EXTERNAL_T1_F;

Issue the following command to see how many records this external table contains and you can see just over 20 million records

Select count(*) from DELIVERBI_EXTERNAL_T1_F;


So we now can access in Hive the csv file data which is stored on a Linux drive

We will create a Hive internal table and populate it from the above external table

Issue the following command at Hive prompt

CREATE TABLE DELIVERBI_T1_F
(COST_CENTRE int, SUB_COST_CENTRE int, VERSION_NUMBER int,
TRANSACTION_NUMBER int, TRANSACTION_SUB_SECTION int, TRANSACTION_LINE int,
GROSS_PROFIT float, PROCESS_DATE int)
row format delimited fields terminated by ',' stored as ORC;

 
For simplicity sake, I kept the external table and the new internal table exactly the same.



  
 











You would have also noticed that I used ‘ORC’ in the stored as clause. This is a Hive internal format which is highly efficient. The various types of tables that you can create in Hive are textfile, sequencefile, rcfile etc. We will discuss about these in later blog posts
 
Before we copy the data from external table to this new internal table, let us first build the sql statement
 
select cost_centre,sub_cost_centre,version_number,transaction_number,transaction_sub_section, transaction_line,gross_profit,process_date,cost_centre from deliverbi_external_t1_f limit 5;

 
Run the above statement. Notice the limit 5 which is to retrieve only 5 rows.
 



 







You can notice that there are no headings in the output. Execute the following set statement to show the column headers

set hive.cli.print.header=true;
 
Then run the query again and you can see the column headers this time



 
 

 
 


Now that we know we are retrieving the required data from external table, execute the following to insert the data. Ensure the limit is removed. If not, we end up having only 5 rows in the internal table

insert into table DELIVERBI_T1_F select cost_centre,sub_cost_centre,version_number,transaction_number,transaction_sub_section,transaction_line,gross_profit,process_date from deliverbi_external_t1_f;



  












 
The records are inserted. You can check the record count is about 20 million by issuing

select count(*) from DELIVERBI_T1_F;



 












 
We will now create a Hive internal partitioned table and populate it from the above non partitioned internal table

CREATE TABLE DELIVERBI_PART_T1_F
(SUB_COST_CENTRE int, VERSION_NUMBER int, TRANSACTION_NUMBER int, TRANSACTION_SUB_SECTION int, TRANSACTION_LINE int, GROSS_PROFIT float,PROCESS_DATE int)
PARTITIONED BY (COST_CENTRE int)
row format delimited fields terminated by ',' stored as ORC;

 

For those of you who are familiar with Oracle partitioned tables, you would notice that Oracle expects you to specify partitioned column from the list of available columns. Where as Hive expects the partitioned column mentioned only once in the create statement.

 

Like Oracle you can partition Hive table by multiple columns. Just add the sub partition column within the partitioned by clause. As simple as that

 

You should see the OK after the table creation

 



 













Run the following command to insert the data into the new partitioned table
 

insert into table DELIVERBI_PART_T1_F partition(cost_centre) select sub_cost_centre,version_number,transaction_number,transaction_sub_section,transaction_line,gross_profit,process_date,cost_centre from deliverbi_t1_f;

 

Notice that the cost_centre column is the last one in the select statement. When inserting into a Hive partitioned table, the partitioned column should be specified as the last column in the select. If you have two columns used for partitioning then these two columns should come last (in the same order) in the select

 

Oh no. Got the following error (FAILED: SemanticException [Error 10096]: Dynamic partition strict mode requires at least one static partition column.
To turn this off set hive.exec.dynamic.partition.mode=nonstrict)



 






 
For now, you can fix this by issuing the following set statement and then run the insert statement again
 
set hive.exec.dynamic.partition.mode=nonstrict
 
You expect to see OK but another error (Caused by: org.apache.hadoop.hive.ql.metadata.HiveFatalException: [Error 20004]: Fatal error occurred when node tried to create too many dynamic partitions.)

 



 













 

We partitioned this table by Cost Centre. In my test data set I have more than 1500 distinct Cost Centres and there is a default limit that has been hit resulting in error. Run the two set statements to up the limits before running the insert statement again
 
set hive.exec.max.dynamic.partitions.pernode=5000;
set hive.exec.max.dynamic.partitions=5000;

 



 












 

One last step before we start retrieving the data. Analyse the partitioned table for quicker retrieval of results

ANALYZE TABLE DELIVERBI_PART_T1_F partition (cost_centre) COMPUTE STATISTICS;

 

Now we are ready to query the data

 
Issue AN sql statement at the Hive prompt for a Non Partitioned Table

select count(*) from deliverbi_t1_f where cost_centre = 'y134';

 


 




 
 
 Issue AN sql statement at the Hive prompt for a Partitioned Table

 
select count(*) from deliverbi_part_t1_f where cost_centre = 'y134';


 






 
We can see that the retrieval from partitioned table is quick. Can we do anything to improve this? Well, let’s set something up. Run the following command at Hive prompt and then run the same query again

 

set hive.compute.query.using.stats=true;

 
 






That is a million times faster (metaphorical). You may think the results are in the memory so the query response was lightening. Let me query few more cost centres to see if the query has indeed sped up. Here is the result

 



 












 

It took under 5 minutes to load 20 million records into Hive internal table and that includes analysing the table data. I remember on one of the Projects it took over 20 minutes to load approx. 20 million records into Oracle database using SQL Loader (utilised the direct path insert for performance). So very pleased with what I have seen on Hadoop with Hive.

 

Watch this space for more blogs on Big Data, Google Big Query (Dremel), Hadoop, Hive etc. DeliverBI is moving towards Big Data and Fusion Oracle Solutions aswell as traditional RDMS technologies.

 

 

 

Scripting Hive Commands with Python

  Scripting Hive Commands with Python   In the previous posts, we touched upon basic data processing using Hive. But...