Monday, 9 April 2018

Google Big Query Clone / Copy DataSets / Tables - PROD --> TEST --> DEV Projects

Google Big Query Cloning of Datasets & Tables across GCS Projects.

We searched the internet and could not find a simple cloning / copying of Tables and Datasets script from PROD --> TEST --> DEV in Google Big Query etc with ease. We needed a utility that had the option of copying complete datasets across projects within Google Big Query. There is the option within the Big Query UI to copy 1 table at a time but that would take us forever. Client is using SDLC and working through 3 envs. We needed to clone data from Production to our Test Environment needed for Shake down testing of Airflow deliverables (DAGS etc). Their are probably a lot of customers out their that have started loading their data into an initial environment and now need to copy the datasets to the other projects, What ever the reason may be we have this excellent script that can be run to leverage some of the work, The script can easily be amended and scheduled to run on weekends or evenings.

Technologies Used
  • Linux
  • Python3
  • Pip library: import bigquery

Please make sure : pip install --upgrade google-cloud-storage

Notes on API.

Make sure the GOOGLE_APPLICATION_CREDENTIALS is set for the service key you have downloaded from your Production project.

export GOOGLE_APPLICATION_CREDENTIALS=/u01/yourservicekey.json

If you don’t have a service key then a service Key can be generated from GCS console web application GUI. A service key is an alternative to a user_id.


Make sure your service_key has all the permission required to run tasks on big query , ie querying tables , creating tables and deleting tables to the DEV & TEST project.  You can add the service key alias email address to the DEV & TEST projects under IAM permissions to allow the script to copy datasets etc. Here you can see we have given the service account access to DEV with the role BigQueryAdmin.


Check That the BQ command is working on your GCS Linux instance

Try:  bq ls  to list the projects on the GCS Linux VM Instance

The .py Script has a lock so that it wont clone anything back to the Production environment and we suggest you use it.

As for best practice PRODUCTION à TEST à DEV is the best approach for cloning. One off tables and structures are also catered for and then DEV à TEST à PRODUCTION makes sense but we recommend CICD pipelines or alternatives for this.

Run the script using the following command                          
python3 source_project source_dataset target_project target_dataset
1 - Source Project
2 - Source Data Set
3 - Target Project
4 - Target Data Set

Tuesday, 31 October 2017

Scripting Hive Commands with Python


Scripting Hive Commands with Python


In the previous posts, we touched upon basic data processing using Hive. But it is all interactive. We will discuss how to script these Hive commands using Python.

We will start with a very basic python script and add more functionality to it by the time we reach the end of this post.

We will ensure the environment is setup correctly before getting into the scripting.

If running Python 3 or below then install the following packages

pip install pyhs2
pip install thrift_sasl=0.2.1
pip install sasl==0.2.1

if running python 3 and above you might face SASL errors , in this case turn SASL off in hive and follow the below method :)

On Error - Tsocket error with Python 3+ , Or  Could not start SASL , Or TProtocol error related to SASL...

pip3 install sasl 
pip3 install thrift 
pip3 install thrift-sasl 
pip3 install PyHive

Turn SASL off in Hive and use the pyhive python libraries
edit the hive site xml file guide path : /etc/hive/conf.dist/hive-site.xml
add the following entry below  in the file and save.


Restart the Hive 2 Server so that the new setting can take affect.

To start HiveServer2:
$ sudo service hive-server2 start

To stop HiveServer2:
$ sudo service hive-server2 stop

We are good to start our scripting part
For this post, we have created a Hive table table_test with two columns and loaded with very few records.
The first script we will attempt to create will do – Connect to Hive, Execute a Hive command – a select statement and show the output on screen
We are using Python 3.4.2 version and will be utilising pyhive for python

Hive Output to Screen
I have saved the following code as file on Linux

from pyhive import hive
conn = hive.Connection(host="", auth='NOSASL',database='poc_hadoop')
cursor = conn.cursor()
cursor.execute("SELECT * FROM table_test")
for result in cursor.fetchall():

The first line of the code imports the python library that we are going to be using for this post and going forward.
We define a connection handler to the Hive Database poc_hadoop and initiate an instance of this connection (lines 2 & 3)

Using this connection handle we then execute a simple Hive Command (Select Query in this case)
We loop and print row by row (We could do this as the data set is small) – Lines 5 & 6. Note the Print syntax could be different depending on the version of Python you are using
I run the script as python3
And you should see the rows displayed on the screen

That is our very first Python script. Of course it is very simple and shows the results on to the screen. Now in real life situations this is of very limited use. We typically want to store the output of the query to a file. Well, we will add functionality to our code to achieve this


Hive Output to Text File
Save the following lines of code as file on Linux

from pyhive import hive
conn = hive.Connection(host="", auth='NOSASL',database='poc_hadoop')
cursor = conn.cursor()
cursor.execute("SELECT * FROM table_test")
with open('test.txt', 'w') as f:
  for results in cursor.fetchall():
    f.write("%s\n" % str(results))

Notice there is no change to import the library, create the connection handler (lines 1 – 4).

This time we will open a file (in the current directory, but if you specify the full path, you can create the file in any location you wish) and loop through the results and save line by line – Lines 5-7
When I execute this code as python3 I see a file created in my current directory with the data

Hive Output to csv File using pandas
Now we will try to enhance our script to be more versatile using panda data frames. You need to install pandas for this, which Shahed has done using

pip3 pandas

Save the following lines of code as file on Linux

from pyhive import hive
import pandas as pd 
conn = hive.Connection(host="", auth='NOSASL',database='poc_hadoop')
df = pd.read_sql("SELECT * FROM table_test", conn)
df.to_csv('test.csv', index=False, header=False)

When you run this script the output will be stored as csv in the current directory
Pandas is one of the most widely used Python libraries in data science. It is mainly used for data wrangling as it is very powerful and flexible, among many other things.

So now we know how to script Hive commands. You can export the results to file system, Google Bucket (we will cover this more in depth in future posts) or any other data source if you have the right libraries.

Bye for now from Krishna and Shahed


Sunday, 29 October 2017

Hive Big Data Commands Reference

DELIVERBI Big Data Hive Command Reference


Today we are sharing some of the frequently used Hive commands and settings that will come handy

To see the status of the Hive Server2 and also view the logs etc. you can use the following URL


As you can see here, this URL allows you to view the configuration among other things.

To List Databases use show databases;

To create a new database in the default location use create database <db_name>;

To create a new database in specified location – create database db_final location '/storate/<db_name>'

To drop a database use drop database <db_name>;

To start using a database –  use <db_name>;

show tables; for listing the tables available in the current database

For gathering table statistics analyze table deliverbi_part_t1_f partition (cost_centre) COMPUTE STATISTICS;

To locate the storage directory – set hive.metastore.warehouse.dir;

To locate the storage directory along with other information for a specific table – describe formatted deliverbi_part_t1_f;

To show the column headers – set hive.cli.print.header=true;

To stop showing column headers, set the above property with value false

You can see the explain plan for a specific query using the following

explain select count(*) from deliverbi_t1_f;

You can adjust the number of reducers using SET mapreduce.job.reduces=5;

We will talk about sizing approach to work out optimum number of mappers & reducers in future posts.

To enable dynamic inserts into a partitioned table set hive.exec.dynamic.partition.mode=nonstrict;

For increasing the number of partitions from the default value of 101 use the following set statements

set hive.exec.max.dynamic.partitions.pernode=5000;

set hive.exec.max.dynamic.partitions=5000;

You can enable the Cost Based Optimiser using set hive.cbo.enable=true;

Below are some of the performance influencing settings. You need to play around a bit to work out the best combination of these settings that fits your specific setup

set hive.compute.query.using.stats=true; 

set hive.stats.fetch.column.stats=true;

set hive.stats.fetch.partition.stats=true;

set hive.vectorized.execution.enabled=true;

set hive.vectorized.execution.reduce.enabled = true;

set hive.vectorized.execution.reduce.groupby.enabled = true;

set hive.exec.parallel=true;

set hive.exec.parallel.thread.number=16;


Having covered the basics of Hive now, in future posts we will touch upon a bit of scripting using Python to execute Hive commands.

Bye for now – Krishna and Shahed

Tuesday, 24 October 2017

Big Data Processing with Hadoop and Hive on top of the Google DataProc Service

Big Data Processing with Hadoop and Hive on top of the Google DataProc Service Offering



Business Scenario

One of our clients wanted us to evaluate Hadoop for data processing of about 5000 Shops. The objective of this exercise is to establish if the current processing time (using legacy RDBMS technology) can be reduced using Hadoop. We can leverage the data from OBIEE / Looker / PowerBI for reporting purposes. We can say we are impressed with google's offerings as everything can be spun up in minutes with little or no configuration of hardware or software.

The technologies utilised during this exercise are
  • Linux on Google Storage
  • Hadoop / HDFS – Map Reduce / Yarn
  • Hive

High level Test Scenario

Process 20 million records of data into Hive Hadoop HDFS and make the data available for queries

We created the data in the csv format, move this csv file(s) to a file server (Sha has fused a Google Bucket to the local Hadoop file system, we will detail this in another blog shortly), created an external hive table to access this data, created a hive local ORC table for subsequent processing

Overview of Hadoop Server – We were using one Resource Master with two worker nodes in a clustered environment within Google Dataproc Services.

We preferred Hive over spark as the volumes are not very high and the client’s IT department is quite familiar with RDBMS technologies.



Detailed Steps

The real world scenario consists of a denormalised reporting table with 110 columns but for this test scenario we took about with The csv File Layout contains the following columns

COST_CENTRE                  int,
SUB_COST_CENTRE                        int,
VERSION_NUMBER                        int,
TRANSACTION_NUMBER              int,
GROSS_PROFIT                 float,
PROCESS_DATE                 int


Step 1: Transfer the source file

Move the test.csv file to the file system. The directory I used is /deliverbi/gcs_ftp/FTP_IN/test.csv

Step 2: Invoke hive shell by typing in hive at the command line

You will see the hive prompt. Type in
Show databases;
and press enter and you should see all the databases available

I am going to use poc_hadoop database that is created for this evaluation purpose. You can create a database with the following command

create database poc_hadoop;

Now, I want to make the poc_hadoop database as the default working database. We can achieve this by

Use poc_hadoop;

Now Create an external table referring to the location where the test csv file is available. Notice we gave the file directory and not the file name. Data from any files that are dropped into this directory (of course the csv file columns should be identical) will be available in Hive directly when you query this external table

You should see the OK after the command is executed successfully

You can get more details about any table in hive with the following command

describe formatted DELIVERBI_EXTERNAL_T1_F;

Issue the following command to see how many records this external table contains and you can see just over 20 million records

Select count(*) from DELIVERBI_EXTERNAL_T1_F;

So we now can access in Hive the csv file data which is stored on a Linux drive

We will create a Hive internal table and populate it from the above external table

Issue the following command at Hive prompt

row format delimited fields terminated by ',' stored as ORC;

For simplicity sake, I kept the external table and the new internal table exactly the same.


You would have also noticed that I used ‘ORC’ in the stored as clause. This is a Hive internal format which is highly efficient. The various types of tables that you can create in Hive are textfile, sequencefile, rcfile etc. We will discuss about these in later blog posts
Before we copy the data from external table to this new internal table, let us first build the sql statement
select cost_centre,sub_cost_centre,version_number,transaction_number,transaction_sub_section, transaction_line,gross_profit,process_date,cost_centre from deliverbi_external_t1_f limit 5;

Run the above statement. Notice the limit 5 which is to retrieve only 5 rows.


You can notice that there are no headings in the output. Execute the following set statement to show the column headers

set hive.cli.print.header=true;
Then run the query again and you can see the column headers this time



Now that we know we are retrieving the required data from external table, execute the following to insert the data. Ensure the limit is removed. If not, we end up having only 5 rows in the internal table

insert into table DELIVERBI_T1_F select cost_centre,sub_cost_centre,version_number,transaction_number,transaction_sub_section,transaction_line,gross_profit,process_date from deliverbi_external_t1_f;


The records are inserted. You can check the record count is about 20 million by issuing

select count(*) from DELIVERBI_T1_F;


We will now create a Hive internal partitioned table and populate it from the above non partitioned internal table

row format delimited fields terminated by ',' stored as ORC;


For those of you who are familiar with Oracle partitioned tables, you would notice that Oracle expects you to specify partitioned column from the list of available columns. Where as Hive expects the partitioned column mentioned only once in the create statement.


Like Oracle you can partition Hive table by multiple columns. Just add the sub partition column within the partitioned by clause. As simple as that


You should see the OK after the table creation



Run the following command to insert the data into the new partitioned table

insert into table DELIVERBI_PART_T1_F partition(cost_centre) select sub_cost_centre,version_number,transaction_number,transaction_sub_section,transaction_line,gross_profit,process_date,cost_centre from deliverbi_t1_f;


Notice that the cost_centre column is the last one in the select statement. When inserting into a Hive partitioned table, the partitioned column should be specified as the last column in the select. If you have two columns used for partitioning then these two columns should come last (in the same order) in the select


Oh no. Got the following error (FAILED: SemanticException [Error 10096]: Dynamic partition strict mode requires at least one static partition column.
To turn this off set hive.exec.dynamic.partition.mode=nonstrict)


For now, you can fix this by issuing the following set statement and then run the insert statement again
set hive.exec.dynamic.partition.mode=nonstrict
You expect to see OK but another error (Caused by: org.apache.hadoop.hive.ql.metadata.HiveFatalException: [Error 20004]: Fatal error occurred when node tried to create too many dynamic partitions.)




We partitioned this table by Cost Centre. In my test data set I have more than 1500 distinct Cost Centres and there is a default limit that has been hit resulting in error. Run the two set statements to up the limits before running the insert statement again
set hive.exec.max.dynamic.partitions.pernode=5000;
set hive.exec.max.dynamic.partitions=5000;




One last step before we start retrieving the data. Analyse the partitioned table for quicker retrieval of results



Now we are ready to query the data

Issue AN sql statement at the Hive prompt for a Non Partitioned Table

select count(*) from deliverbi_t1_f where cost_centre = 'y134';



 Issue AN sql statement at the Hive prompt for a Partitioned Table

select count(*) from deliverbi_part_t1_f where cost_centre = 'y134';


We can see that the retrieval from partitioned table is quick. Can we do anything to improve this? Well, let’s set something up. Run the following command at Hive prompt and then run the same query again


set hive.compute.query.using.stats=true;


That is a million times faster (metaphorical). You may think the results are in the memory so the query response was lightening. Let me query few more cost centres to see if the query has indeed sped up. Here is the result




It took under 5 minutes to load 20 million records into Hive internal table and that includes analysing the table data. I remember on one of the Projects it took over 20 minutes to load approx. 20 million records into Oracle database using SQL Loader (utilised the direct path insert for performance). So very pleased with what I have seen on Hadoop with Hive.


Watch this space for more blogs on Big Data, Google Big Query (Dremel), Hadoop, Hive etc. DeliverBI is moving towards Big Data and Fusion Oracle Solutions aswell as traditional RDMS technologies.




Google Big Query Clone / Copy DataSets / Tables - PROD --> TEST --> DEV Projects

Google Big Query Cloning of Datasets & Tables across GCS Projects. We searched the internet and could not find a simple cloning / ...