How to Hive on GCP Using Google DataProc and Cloud Storage: Part 1

Google Cloud Dataproc is a managed Spark and Hadoop service that lets you take advantage of open-source data tools for batch processing, querying, streaming, and machine learning. This includes the Hadoop ecosystem (HDFS, Map/Reduce processing framework, and a number of applications such as Hive, Mahout, Pig, Spark, and Hue that are built on top of Hadoop). Hive gives an SQL-like interface to query data stored in various databases and file systems that integrate with Hadoop. Queries submitted via HIVE are converted into Map/Reduce jobs that access stored data,  results are then aggregated and returned to the user or application. 

For this exercise, we will be using New York city's yellow and green taxi trip data accumulated for the year 2019. Yellow Taxis are the only vehicles licensed to pick up street-hailing passengers anywhere in NYC while Green Taxis provide street hail service and prearranged service in northern Manhattan (above E 96th St and W 110th St) and in the outer boroughs. The dataset is available at the city portal.

Major steps will be as follows:

Yellow and Green Taxi data.

Step 1: Upload the TLC Raw Data (Green and Yellow Taxi Data for Y2019) Into Cloud Storage  

First, create a suitable GCP Cloud Storage bucket and create folders to store datasets of Green Taxi, Yellow Taxi data for the year 2019 as shown below:

Step 1 screenshot.

Note the cloud storage location of data files. These would be used to create external tables in the following steps:

  1. Yellow Taxi: gs://2019ah04024/TLC/data/YellowTaxi/
  2. Green Taxi: gs://2019ah04024/TLC/data/GreenTaxi/

Next, upload the datasets from the city portal into the bucket using upload files. All the 2019 yellow taxi data will be uploaded to “YellowTaxi/” and green Taxi to “GreenTaxi/” folders.

Step 2: Create Hadoop Cluster Environment Using Google DataProc

Create a Hadoop cluster in GCP using DataProc and will access the master node through the CLI. You can create clusters with multiple masters and worker nodes but, for this exercise, I have created a single node that acts both as a master node as well as the worker.

Step 2 screenshot.

Once, the cluster is created and ready, go to "VM INSTANCES" to view master and worker VM details.

Cluster details screenshot.

Select the master node. Click on the down arrow next to the SSH icon and select Open in a browser window from the drop-down menu. A new browser window will open and an icon will appear in the center alerting you to the SSH connection being set up. Once the SSH connection is established, the shell prompt will appear. 

Run the Beeline shell using the JDBC HIVE interface. Hive runs on localhost at port 10000.
use the Google Cloud user name at the Master Node hostname (cluster-tlc-m in this example). The command line will be the following:

 
beeline -u jdbc:hive2://localhost:10000/default -n myusername@cluster-tlc-m -d org.apache.hive.jdbc.HiveDriver


Step 3: Create HIVE External Tables for EDA (Staging Environment)

An external table is a table for which Hive does not manage storage. If you delete an external table, only the definition in Hive is deleted. In our case, even if the HIVE tables are deleted, the data still remains in cloud storage.

The data dictionary for Yellow and Green Taxis contains the fields, descriptions as well as the appropriate values to be stored. From the data dictionary files, we can notice that are 20 columns in the Green Taxi dataset while there are 18 columns for the yellow taxi dataset. So, we will create below 2 different external tables (staging environment) and point to Green, Yellow Taxi datasets by specifying the respective "location" in the cloud storage.  

  1. stg_ny_yellow_taxi
  2. stg_ny_green_taxi
 
/*Create External table for yellow taxi*/
CREATE EXTERNAL TABLE stg_ny_yellow_taxi (
              vendor_id int, 
              tpep_pickup_datetime string,
              tpep_dropoff_datetime string,
              passenger_count int,
              trip_distance double,
              rate_code_id smallint,
              store_and_fwd_flag string,
              pu_location_id int,
              do_location_id int,
              payment_type smallint,
              fare_amount double,
              extra_charge double,
              mta_tax double,
              tip_amount double,
              tolls_amount double,
              improvement_surcharge double,
              total_amount double,
              congestion_surcharge double
       )
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 'gs://2019ah04024/tlc/data/YellowTaxi/';

/*Create External table for green taxi*/
CREATE EXTERNAL TABLE stg_ny_green_taxi (
              vendor_id int, 
              lpep_pickup_datetime string,
              lpep_dropoff_datetime string,
              store_and_fwd_flag string,
              rate_code_id smallint,
              pu_location_id int,
              do_location_id int,
              passenger_count int,
              trip_distance double,
              fare_amount double,
              extra_charge double,
              mta_tax double,
              tip_amount double,
              tolls_amount double,
              ehail_fee double,
              improvement_surcharge double,
              total_amount double,
              payment_type smallint,
              trip_type smallint,
              congestion_surcharge double
       )
ROW FORMAT DELIMITED


Step 4: Data Ingestion and Exploratory Data Analysis (EDA)

We will perform exploratory data analysis to get a general understanding of Yellow and Green Taxis datasets, extract basic statistics, check for data validity, and highlight any nonconforming values.

EDA1: From the data dictionary, there are two vendors (1= Creative Mobile Technologies, LLC; 2= VeriFone Inc). We will check if there are any records where the vendor id is null or vendor id is not in (1,2). 

 
/*Green Taxi*/
SELECT sum(CASE when vendor_id = 1 THEN 1 ELSE 0 END)as Creative_Mobile_Tech,
       sum(CASE when vendor_id = 2 THEN 1 ELSE 0 END)as Verifone_Inc, 
       sum(CASE when vendor_id < 1 or vendor_id > 2  THEN 1 ELSE 0 END)as Others,
       sum(CASE when vendor_id IS NULL THEN 1 ELSE 0 END) as NULL_Values,
count(*) as Total_Vendors
from stg_ny_green_taxi;


Creative Mobile Tech screenshot.

The output of the query for the Green Taxi is as below:

 
/*Yellow Taxi*/
SELECT sum(CASE when vendor_id = 1 THEN 1 ELSE 0 END)as Creative_Mobile_Tech,
       sum(CASE when vendor_id = 2 THEN 1 ELSE 0 END)as Verifone_Inc, 
       sum(CASE when vendor_id < 1 or vendor_id > 2  THEN 1 ELSE 0 END)as Others,
       sum(CASE when vendor_id IS NULL THEN 1 ELSE 0 END) as NULL_Values,
count(*) as Total_Vendors
from stg_ny_yellow_taxi;


Creative Mobile Tech screenshot 2.

The output of the query for the Yellow Taxi is as below:

EDA2: We will perform exploratory data analysis associated with vendor trips:

SQL
 
/*Green Taxi*/
select count(*) as number_of_records, 
       count(distinct vendor_id) as number_of_tpep_vendors, 
       min(to_date(lpep_pickup_datetime)) as oldest_pickup_timestamp,
       max(to_date(lpep_pickup_datetime)) as recent_pickup_timestamp, 
       min(to_date(lpep_dropoff_datetime)) as oldest_dropoff_timestamp, 
       max(to_date(lpep_dropoff_datetime)) as recent_dropoff_timestamp,  
       min(passenger_count) as min_passengers_pertrip, 
       max(passenger_count) as max_passengers_pertrip, 
       avg(passenger_count) as average_passengers_pertrip, 
       min(trip_distance) as min_trip_distance,
       max(trip_distance) as max_trip_distance, 
       avg(trip_distance) as average_trip_distance, 
       count(distinct rate_code_id) as number_of_rate_codes,
       count(distinct store_and_fwd_flag) as types_of_store_forward_flag, 
       count(distinct pu_location_id) as num_of_pickup_zones,
       count(distinct do_location_id) as num_of_dropoff_zones, 
       count(distinct payment_type) as number_of_payment_types
from stg_ny_green_taxi;


The output of the query for the Green Taxi is as below:

SQL
 
/*Yellow Taxi*/
select count(*) as number_of_records, 
       count(distinct vendor_id) as number_of_tpep_vendors, 
       min(to_date(tpep_pickup_datetime)) as oldest_pickup_timestamp,
       max(to_date(tpep_pickup_datetime)) as recent_pickup_timestamp, 
       min(to_date(tpep_dropoff_datetime)) as oldest_dropoff_timestamp, 
       max(to_date(tpep_dropoff_datetime)) as recent_dropoff_timestamp,  
       min(passenger_count) as min_passengers_pertrip, 
       max(passenger_count) as max_passengers_pertrip, 
       avg(passenger_count) as average_passengers_pertrip, 
       min(trip_distance) as min_trip_distance,
       max(trip_distance) as max_trip_distance, 
       avg(trip_distance) as average_trip_distance, 
       count(distinct rate_code_id) as number_of_rate_codes,
       count(distinct store_and_fwd_flag) as types_of_store_forward_flag, 
       count(distinct pu_location_id) as num_of_pickup_zones,
       count(distinct do_location_id) as num_of_dropoff_zones, 
       count(distinct payment_type) as number_of_payment_types
from stg_ny_yellow_taxi;


The output of the query for the Yellow Taxi is as below:

EDA3: We will Perform exploratory data analysis of components associated with Fare Details like the following:

•    Min, Max, and Average fare charge

•    Min, Max, and Average extra charge

•    Min, Max, and Average MTA charge

•    Distinct count of MTA types

•    Min, Max, and Average tip amount

•    Min, Max, and Average toll charge

•    Min, Max, and Average improvement surcharge charges

•    Distinct count of improvement surcharge types

•    Min, Max, and Average total amount

SQL
 
/*Green Taxi*/
select min(fare_amount) as min_fare_charge, 
       max(fare_amount) as max_fare_charge, 
       avg(fare_amount) as average_fare_charge,
       min(extra_charge) as min_extra_charge, 
       max(extra_charge) as max_extra_charge, 
       avg(extra_charge) as average_extra_charge,
       count(distinct mta_tax) as types_of_mta_tax_charge, 
       min(mta_tax) as min_mta_tax_charge, 
       max(mta_tax) as max_mta_tax_charge, 
       avg(mta_tax) as average_mta_tax_charge,
       min(tip_amount) as min_tip_amount, 
       max(tip_amount) as max_tip_amount, 
       avg(tip_amount) as average_tip_amount,
       min(tolls_amount) as min_toll_charge, 
       max(tolls_amount) as max_toll_charge, 
       avg(tolls_amount) as average_toll_charge,
       count(distinct improvement_surcharge) as types_of_surcharge, 
       min(improvement_surcharge) as min_surcharge, 
       max(improvement_surcharge) as max_surcharge, 
       avg(improvement_surcharge) as average_surcharge,
       min(total_amount) as min_total_charge, 
       max(total_amount) as max_total_charge, 
       avg(total_amount) as average_total_charge
from stg_ny_green_taxi;


The output for the Green Taxi is below:

Green Taxi output screenshot.

SQL
 
/*Yellow Taxi*/
select min(fare_amount) as min_fare_charge, 
       max(fare_amount) as max_fare_charge, 
       avg(fare_amount) as average_fare_charge,
       min(extra_charge) as min_extra_charge, 
       max(extra_charge) as max_extra_charge, 
       avg(extra_charge) as average_extra_charge,
       count(distinct mta_tax) as types_of_mta_tax_charge, 
       min(mta_tax) as min_mta_tax_charge, 
       max(mta_tax) as max_mta_tax_charge, 
       avg(mta_tax) as average_mta_tax_charge,
       min(tip_amount) as min_tip_amount, 
       max(tip_amount) as max_tip_amount, 
       avg(tip_amount) as average_tip_amount,
       min(tolls_amount) as min_toll_charge, 
       max(tolls_amount) as max_toll_charge, 
       avg(tolls_amount) as average_toll_charge,
       count(distinct improvement_surcharge) as types_of_surcharge, 
       min(improvement_surcharge) as min_surcharge, 
       max(improvement_surcharge) as max_surcharge, 
       avg(improvement_surcharge) as average_surcharge,
       min(total_amount) as min_total_charge, 
       max(total_amount) as max_total_charge, 
       avg(total_amount) as average_total_charge
from stg_ny_yellow_taxi;


The output of Yellow Taxis is as given below:
Yellow Taxi output.

In a similar manner, EDA can be performed to verify associated components with passenger count, rate code, payment type, and also if there are any records in which the pickup timestamp is after the drop-off timestamp. By analyzing all these scenarios, we can see the non-confirming records where: 

So, we now have the total non-confirming records for the Green Taxi records:

SQL
 
/*Green Taxi*/
select vendor_id as Vendor_Id_1CreativeMob_2VeriFone, 
       count(*) as NonConf_Records
from stg_ny_green_taxi
where year(lpep_pickup_datetime) =2019 and year(lpep_dropoff_datetime) =2019 and
       unix_timestamp(lpep_pickup_datetime) < unix_timestamp(lpep_dropoff_datetime) and
       passenger_count in (1,2,3,4,5,6,7,8,9) and
       trip_distance > 0.0 and rate_code_id in (1,2,3,4,5,6) and
       payment_type in (1,2,3,4,5,6) and fare_amount > 0 and
       extra_charge in (0,0.5,1) and mta_tax in (0,0.5) and 
       tip_amount > 0.0 and tolls_amount >= 0.0 and 
       improvement_surcharge in (0,0.3) and 
       total_amount > 0 and vendor_id is not null
group by vendor_id
order by vendor_id;


Green Taxi records screenshot.There are almost 692856 non-confirming records for the green taxi, which accounts for 11.4% of the total Green Taxi Y2019 dataset and total Non-confirming records for the Yellow Taxi records:

SQL
 
/*Yellow Taxi*/
select vendor_id as Vendor_Id_1CreativeMob_2VeriFone, 
       count(*) as NonConf_Records
from stg_ny_yellow_taxi
where year(tpep_pickup_datetime) =2019 and year(tpep_dropoff_datetime) =2019 and
       unix_timestamp(tpep_pickup_datetime) < unix_timestamp(tpep_dropoff_datetime) and
       passenger_count in (1,2,3,4,5,6,7,8,9) and
       trip_distance > 0.0 and rate_code_id in (1,2,3,4,5,6) and
       payment_type in (1,2,3,4,5,6) and fare_amount > 0 and
       extra_charge in (0,0.5,1) and mta_tax in (0,0.5) and 
       tip_amount > 0.0 and tolls_amount >= 0.0 and 
       improvement_surcharge in (0,0.3) and 
       total_amount > 0 and vendor_id is not null
group by vendor_id
order by vendor_id;

Yellow Taxi records.

There are almost 27019826 non-confirming records for the yellow taxi, which accounts for 32% of the total Yellow Taxi Y2019 dataset.

In this part, we have seen how to create a Google data proc cluster, create external tables in HIVE by pointing to the data located on google cloud storage, perform exploratory data analysis and find non-confirming records. In the next part, we will see the remaining steps of creating main tables with valid records for detailed deep drive analysis.

 

 

 

 

Top