Data Access Components – Hive and Pig - Hadoop Essentials (2015)

Hadoop Essentials (2015)

Chapter 4. Data Access Components – Hive and Pig

Hadoop can usually hold terabytes or petabytes of data to process; hence Data Access is an extremely important aspect in any project or product, especially with Hadoop. As we deal with Big Data for processing data, we will have to perform some ad hoc processing to get insights of data and design strategies. Hadoop's basic processing layer is MapReduce, which as we discussed earlier, is a massively parallel processing framework that is scalable, faster, adaptable, and fault tolerant.

We will look at some limitations of MapReduce programming and some programming abstraction layers such as Hive and Pig in detail, which can execute MapReduce using a user friendly language for faster development and management. Hive and Pig are quite useful and handy when it comes to easily do some ad hoc analysis and some not very complex analysis.

Need of a data processing tool on Hadoop

MapReduce is the key to perform processing on Big Data, but it is complex to understand, design, code, and optimize. MapReduce has a high learning curve, which requires good programming skills to master. Usually Big Data users come from different backgrounds such as Programming, Database administrators, scripting, Analyst, Data science, Data Managers, and so on, and not all users can adapt to the programming model of MapReduce. Hence we have different abstractions for the data access components for Hadoop.

The data access components are very useful for developers as they may not need to learn MapReduce programming in detail and can still utilize the MapReduce framework in an interface in which they can be much more comfortable and can help in faster development and better manageability of the code. Abstractions can help ad hoc processing on data quickly and concentrate on the business logic.

The two widely used data access components in the Hadoop ecosystem are:

· Pig

· Hive

Let us discuss each of these in detail with some examples.

Pig

Pig is a component which has the abstraction wrapper of Pig Latin language on top of MapReduce. Pig was developed by Yahoo! around 2006 and was contributed to Apache as an open source project. Pig Latin is a data flow language that is more comfortable for a procedural language developer or user. Pig can help manage the data in a flow which is ideal for the data flow process, ETL (Extract Transform Load), or the ELT (Extract Load Transform) process ad hoc data analysis.

Pig can be used in a much easier way for structured and semi-structured data analysis. Pig was developed based on a philosophy, which is that Pigs can eat anything, live anywhere, can be easily controlled and modified by the user, and it is important to process data quickly.

Pig data types

Pig has a collection of primitive data types, as well as complex data types. Inputs and outputs to Pig's relational operators are specified using these data types:

· Primitive: int, long, float, double, chararray, and bytearray

· Map: Map is an associative array data type that stores a chararray key and its associated value. The data type of a value in a map can be a complex type. If the type of the value cannot be determined, Pig defaults to the bytearray data type. The key and value association is specified as the # symbol. The key values within a map have to be unique.

Syntax: [key#value, key1#value1…]

· Tuple: A tuple data type is a collection of data values. They are of fixed length and ordered. Tuple is similar to a record in a SQL table, without restrictions on the column types. Each data value is called a field. Ordering of values offers the capability to randomly access a value within a tuple.

Syntax: (value1, value2, value3…)

· Bag: A bag data type is a container for tuples and other bags. They are unordered, that is, a tuple or a bag within a bag cannot be accessed randomly. There are no constraints on the structure of the tuples contained in a bag. Duplicate tuples or bags are allowed within a bag.

Syntax: {(tuple1), (tuple2)…}

Pig allows nesting of complex data structures where you can nest a tuple inside a tuple, a bag, and a Map. Pig Latin statements work with relations, which can be thought of as:

· A relation (similar to, database table) is a bag

· A bag is a collection of tuples

· A tuple (similar to, database row) is an ordered set of fields

· A field is a piece of data

The Pig architecture

The Pig data flow architecture is layered for transforming Pig Latin statements to MapReduce steps. There are three main phases in compiling and executing a Pig script, which are as follows:

· Logical plan

· Physical plan

· MapReduce plan

The logical plan

In the logical plan, the Pig statements are parsed for syntax errors and validation of the input files and input data structures. A logical plan, a DAG (Directed Acyclic Graph) of operators as nodes, and data flow as edges are then prepared. Optimizations based on in-built rules happen at this stage. The logical plan has a one-to-one correspondence with the operators.

The physical plan

A translation of each operator into the physical form of execution happens during this stage. For the MapReduce platform, except for a few, most operators have a one-to-one correspondence with the physical plan. In addition to the logical operators, there are a few physical operators too. They are as follows:

· Local Rearrange (LR)

· Global Rearrange (GR)

· Package (P)

Logical operators like GROUP, COGROUP, or JOIN are translated into a sequence of LR, GR, and P operators. The LR operator corresponds to the shuffle preparation stage, where partitioning happens based on the key. The GR corresponds to the actual shuffle between the Map and Reduce tasks. The P operator is the partitioning operator on the Reduce side.

The MapReduce plan

The final stage of Pig compilation is to compile the physical plan to actual MapReduce jobs. A Reduce task is required wherever a LR, GR, and P sequence is present in the physical plan. The compiler also looks for opportunities to put in Combiners wherever possible. The MapReduce plan for the physical plan in the previous image has two MapReduce jobs, one corresponding to the JOIN and the other to the GROUP in the logical plan. The MapReduce task corresponding to the GROUP operator has a Combiner as well. It must be noted that the GROUP operation happens in the Map task.

Pig modes

The user can run Pig in two modes:

· Local Mode: With access to a single machine, all files are installed and run using a localhost and filesystem.

· MapReduce Mode: This is the default mode, which requires access to a Hadoop cluster.

In Pig, there are three modes of execution:

· Interactive mode or grunt mode

· Batch mode or script mode

· Embedded mode: Embed Pig commands in a host language such as Python or JavaScript and run the program

These modes of execution can be either executed in the Local mode or in the MapReduce mode.

Grunt shell

Grunt is Pig's interactive shell. It can be used to enter Pig Latin interactively and provides a shell for users to interact with HDFS.

For Local mode:

Specify local mode using the -x flag:

$ pig –x local

For MapReduce mode:

Point Pig to a remote cluster by placing HADOOP_CONF_DIR on PIG_CLASSPATH.

Tip

HADOOP_CONF_DIR is the directory containing the hadoop-site.xml, hdfs-site.xml, and mapred-site.xml files.

Example: $ export PIG_CLASSPATH=<path_to_hadoop_conf_dir>

This is given here:

$ pig

grunt>

Input data

We will be using the movies_data.csv file as a dataset for exploring Pig. The input file has the following fields and sample data:

ID

Name

Year

Rating

Duration in sec

40146

Oscar's Oasis: Chicken Charmer Top Gun Lizard: Wanted Power of Love

2011

1601

40147

Transformers: Rescue Bots: Season 1: Return of the Dino Bot

2011

1324

40148

Plankton Invasion: Operation Winkle Zone Operation Cod-Tagion Operation Hardshell

2012

1262

40149

Transformers: Rescue Bots: Season 1: Deep Trouble

2011

1324

40150

Trailer: Lift the Veil

2012

3.6

69

40151

Trailer: Pain

2012

3.6

52

40152

Todd and the Book of Pure Evil

2010

3.9

40153

Trailer: House of Cards

2012

3.7

148

Loading data

For loading your data in Pig, we use the LOAD command and map it to an alias of relation (as movies in this example), which can read data from the filesystem or HDFS and load it for processing within Pig. Different storage handlers are available in Pig for handling different types of records by mentioning USING and the storage handler function; few of the frequently used storage handler functions are:

· PigStorage which is used for structured text files with a delimiter that can be specified and is the default storage handler

· HBaseStorage which is used for handling data from HBase tables

· BinStorage which is used for binary and machine readable formats

· JSONStorage which is used for handling JSON data and a schema that should be specified

· TextLoader which is used for unstructured data in UTF-8

If we do not mention any handler by default, PigStorage will be used by default, and PigStorage and TextStorage will support the compression files gzip and bzip.

Example:

grunt> movies = LOAD '/user/biadmin/shiva/movies_data.csv' USING PigStorage(',') as (id,name,year,rating,duration);

We can use schemas to assign types to fields:

A = LOAD 'data' AS (name, age, gpa); // name, age, gpa default to bytearrays

A = LOAD 'data' AS (name:chararray, age:int, gpa:float); // name is now a String (chararray), age is integer and gpa is float

Dump

The dump command is very useful to interactively view the values stored in the relation and writes the output to the console. DUMP doesn't save the data:

Example:

grunt> DUMP movies;

INFO [JobControl] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

INFO [main] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

(1,The Nightmare Before Christmas,1993,3.9,4568)

(2,The Mummy,1932,3.5,4388)

(3,Orphans of the Storm,1921,3.2,9062)

(4,The Object of Beauty,1991,2.8,6150)

(5,Night Tide,1963,2.8,5126)

(6,One Magic Christmas,1985,3.8,5333)

(7,Muriel's Wedding,1994,3.5,6323)

Store

The store command is used to write or continue with the data. Pig starts a job only when a DUMP or STORE is encountered. We can use the handlers mentioned in LOAD with STORE too.

Example:

grunt> STORE movies INTO '/temp' USING PigStorage(','); //This will write contents of movies to HDFS in /temp location

FOREACH generate

A FOREACH operation is used to apply a column-level expression in each record of the relation. It is quite powerful to even allow some columns from the relation, and we can use UDF as an expression in FOREACH.

Example:

grunt> movie_duration = FOREACH movies GENERATE name, (double)(duration/60);

Filter

Filter is used to get rows matching the expression criteria.

Example:

grunt> movies_greater_than_four = FILTER movies BY (float)rating>4.0;

grunt> DUMP movies_greater_than_four;

We can use multiple conditions with filters and Boolean operators (AND, OR, NOT):

grunt> movies_greater_than_four_and_2012 = FILTER movies BY (float)rating>4.0 AND year > 2012;

grunt> DUMP movies_greater_than_four_and_2012;

INFO [JobControl] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

WARN [main] org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized

INFO [main] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

(22148,House of Cards: Season 1,2013,4.4,)

(22403,House of Cards,2013,4.4,)

(37138,Orange Is the New Black: Season 1,2013,4.5,)

(37141,Orange Is the New Black,2013,4.5,)

(37174,The Following: Season 1,2013,4.1,)

(37239,The Following,2013,4.1,)

(37318,The Carrie Diaries,2013,4.3,)

(37320,The Carrie Diaries: Season 1,2013,4.3,)

(37589,Safe Haven,2013,4.2,6936)

Group By

The Group By command is used to create groups of records with a key. Group By relations are used to work with the aggregation functions on the grouped data.

The syntax for Group By is as follows:

alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [PARALLEL n];

For example:

· To Group By (employee start year at Salesforce)

· grunt> grouped_by_year = group movies by year;

· Or Group By multiple fields:

· B = GROUP A BY (age, employeesince);

Limit

The Limit command limits the number of output tuples in a relation, but the tuples return can change in a different execution of the command. For a specific tuple, we have to use ORDER along with LIMIT, which will return the ordered set of tuples.

Example:

grunt> movies_limit_10 = LIMIT movies 10;

grunt> DUMP movies_limit_10;

INFO [JobControl] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

INFO [JobControl] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

WARN [main] org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized

INFO [main] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

(1,The Nightmare Before Christmas,1993,3.9,4568)

(2,The Mummy,1932,3.5,4388)

(3,Orphans of the Storm,1921,3.2,9062)

(4,The Object of Beauty,1991,2.8,6150)

(5,Night Tide,1963,2.8,5126)

(6,One Magic Christmas,1985,3.8,5333)

(7,Muriel's Wedding,1994,3.5,6323)

(8,Mother's Boys,1994,3.4,5733)

(9,Nosferatu: Original Version,1929,3.5,5651)

(10,Nick of Time,1995,3.4,5333)

Aggregation

Pig provides a bunch of aggregation functions such as:

· AVG

· COUNT

· COUNT_STAR

· SUM

· MAX

· MIN

Example:

grunt> count_by_year = FOREACH grouped_by_year GENERATE group, COUNT(movies);

grunt> DUMP count_by_year;

INFO [JobControl] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

INFO [main] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

(1913,3)

(1914,20)

.

.

(2009,4451)

(2010,5107)

(2011,5511)

(2012,4339)

(2013,981)

(2014,1)

Cogroup

Cogroup is a generalization of group. Instead of collecting records of one input based on a key, it collects records of n inputs based on a key. The result is a record with a key and a bag for each input. Each bag contains all records from that input that have the given value for the key:

$ cat > owners.csv

adam,cat

adam,dog

alex,fish

alice,cat

steve,dog

$ cat > pets.csv

nemo,fish

fido,dog

rex,dog

paws,cat

wiskers,cat

grunt> owners = LOAD 'owners.csv'

>> USING PigStorage(',')

>> AS (owner:chararray,animal:chararray);

grunt> pets = LOAD 'pets.csv'

>> USING PigStorage(',')

>> AS (name:chararray,animal:chararray);

grunt> grouped = COGROUP owners BY animal, pets by animal;

grunt> DUMP grouped;

This will group each table based on the animal column. For each animal, it will create a bag of matching rows from both tables. For this example, we get the results, as shown in the following table:

group

owners

pets

cat

{(adam,cat),(alice,cat)}

{(paws,cat),(wiskers,cat)}

dog

{(adam,dog),(steve,dog)}

{(fido,dog),(rex,dog)}

fish

{(alex,fish)}

{(nemo,fish)}

DESCRIBE

The DESCRIBE command gives the schema of a relation, as shown here:

grunt> Describe grouped;

grouped: {group: chararray,owners: {(owner: chararray,animal: chararray)},pets: {(name: chararray,animal: chararray)}}

EXPLAIN

The EXPLAIN command on a relation shows how the Pig script is going to get executed. It shows the Logical plan, the Physical plan, and the MapReduce plan of the relation. We can use the EXPLAIN command to study the optimizations that have gone into the plans. This command can be used to optimize the script further:

grunt> explain grouped;

#-----------------------------------------------

# New Logical Plan:

#-----------------------------------------------

grouped: (Name: LOStore Schema: group#107:chararray,owners#108:bag{#118:tuple(owner#94:chararray,animal#95:chararray)},pets#110:bag{#119:tuple(name#96:chararray,animal#97:chararray)})

|

|---grouped: (Name: LOCogroup Schema: group#107:chararray,owners#108:bag{#118:tuple(owner#94:chararray,animal#95:chararray)},pets#110:bag{#119:tuple(name#96:chararray,animal#97:chararray)})

| |

| animal:(Name: Project Type: chararray Uid: 95 Input: 0 Column: 1)

| |

| animal:(Name: Project Type: chararray Uid: 97 Input: 1 Column: 1)

|

|---owners: (Name: LOForEach Schema: owner#94:chararray,animal#95:chararray)

| | |

| | (Name: LOGenerate[false,false] Schema: owner#94:chararray,animal#95:chararray)ColumnPrune:InputUids=[95, 94]ColumnPrune:OutputUids=[95, 94]

| | | |

| | | (Name: Cast Type: chararray Uid: 94)

| | | |

| | | |---owner:(Name: Project Type: bytearray Uid: 94 Input: 0 Column: (*))

| | | |

| | | (Name: Cast Type: chararray Uid: 95)

| | | |

| | | |---animal:(Name: Project Type: bytearray Uid: 95 Input: 1 Column: (*))

| | |

| | |---(Name: LOInnerLoad[0] Schema: owner#94:bytearray)

| | |

| | |---(Name: LOInnerLoad[1] Schema: animal#95:bytearray)

| |

| |---owners: (Name: LOLoad Schema: owner#94:bytearray,animal#95:bytearray)RequiredFields:null

|

|---pets: (Name: LOForEach Schema: name#96:chararray,animal#97:chararray)

| |

| (Name: LOGenerate[false,false] Schema: name#96:chararray,animal#97:chararray)ColumnPrune:InputUids=[96, 97]ColumnPrune:OutputUids=[96, 97]

| | |

| | (Name: Cast Type: chararray Uid: 96)

| | |

| | |---name:(Name: Project Type: bytearray Uid: 96 Input: 0 Column: (*))

| | |

| | (Name: Cast Type: chararray Uid: 97)

| | |

| | |---animal:(Name: Project Type: bytearray Uid: 97 Input: 1 Column: (*))

| |

| |---(Name: LOInnerLoad[0] Schema: name#96:bytearray)

| |

| |---(Name: LOInnerLoad[1] Schema: animal#97:bytearray)

|

|---pets: (Name: LOLoad Schema: name#96:bytearray,animal#97:bytearray)RequiredFields:null

#-----------------------------------------------

# Physical Plan:

#-----------------------------------------------

grouped: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-76

|

|---grouped: Package[tuple]{chararray} - scope-71

|

|---grouped: Global Rearrange[tuple] - scope-70

|

|---grouped: Local Rearrange[tuple]{chararray}(false) - scope-72

| | |

| | Project[chararray][1] - scope-73

| |

| |---owners: New For Each(false,false)[bag] - scope-61

| | |

| | Cast[chararray] - scope-56

| | |

| | |---Project[bytearray][0] - scope-55

| | |

| | Cast[chararray] - scope-59

| | |

| | |---Project[bytearray][1] - scope-58

| |

| |---owners: Load(file:///home/opt/pig/bin/owners.csv:PigStorage(',')) - scope-54

|

|---grouped: Local Rearrange[tuple]{chararray}(false) - scope-74

| |

| Project[chararray][1] - scope-75

|

|---pets: New For Each(false,false)[bag] - scope-69

| |

| Cast[chararray] - scope-64

| |

| |---Project[bytearray][0] - scope-63

| |

| Cast[chararray] - scope-67

| |

| |---Project[bytearray][1] - scope-66

|

|---pets: Load(file:///home/opt/pig/bin/pets.csv:PigStorage(',')) - scope-62

#--------------------------------------------------

# Map Reduce Plan

#--------------------------------------------------

MapReduce node scope-79

Map Plan

Union[tuple] - scope-80

|

|---grouped: Local Rearrange[tuple]{chararray}(false) - scope-72

| | |

| | Project[chararray][1] - scope-73

| |

| |---owners: New For Each(false,false)[bag] - scope-61

| | |

| | Cast[chararray] - scope-56

| | |

| | |---Project[bytearray][0] - scope-55

| | |

| | Cast[chararray] - scope-59

| | |

| | |---Project[bytearray][1] - scope-58

| |

| |---owners: Load(file:///home/opt/pig/bin/owners.csv:PigStorage(',')) - scope-54

|

|---grouped: Local Rearrange[tuple]{chararray}(false) - scope-74

| |

| Project[chararray][1] - scope-75

|

|---pets: New For Each(false,false)[bag] - scope-69

| |

| Cast[chararray] - scope-64

| |

| |---Project[bytearray][0] - scope-63

| |

| Cast[chararray] - scope-67

| |

| |---Project[bytearray][1] - scope-66

|

|---pets: Load(file:///home/opt/pig/bin/pets.csv:PigStorage(',')) - scope-62--------

Reduce Plan

grouped: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-76

|

|---grouped: Package[tuple]{chararray} - scope-71--------

Global sort: false

----------------

ILLUSTRATE

The ILLUSTRATE command is perhaps the most important development aid. ILLUSTRATE on a relation samples the data and applies the query on it. This can save a lot of time during the debugging process. The sample is significantly smaller than the data making the code, test, and, debug cycle very fast. In many situations, JOIN or FILTER operators may not yield any output on a sample of the data. In such cases, ILLUSTRATE manufactures records that pass through these operators and inserts them into the sample dataset:

grunt> illustrate grouped;

-----------------------------------------------------------

| owners | owner:chararray | animal:chararray |

-----------------------------------------------------------

| | steve | dog |

| | adam | dog |

-----------------------------------------------------------

--------------------------------------------------------

| pets | name:chararray | animal:chararray |

--------------------------------------------------------

| | fido | dog |

| | rex | dog |

--------------------------------------------------------

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

| grouped | group:chararray | owners:bag{:tuple(owner:chararray,animal:chararray)} | pets:bag{:tuple(name:chararray,animal:chararray)} |

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

| | dog | {(steve, dog), (adam, dog)} | {(fido, dog), (rex, dog)} |

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Pig is widely used in data flows and ETL, hence scripting like Pig Latin languages helps to design the flow easily.

Hive

Hive provides a data warehouse environment in Hadoop with a SQL-like wrapper and also translates the SQL commands in MapReduce jobs for processing. SQL commands in Hive are called as HiveQL, which doesn't support the SQL 92 dialect and should not be assumed to support all the keywords, as the whole idea is to hide the complexity of MapReduce programming and perform analysis on the data.

Hive can also act as an analytical interface with other systems as most of the systems integrate well with Hive. Hive cannot be used for handling transactions, as it doesn't provide row-level updates and real-time queries.

The Hive architecture

Hive architecture has different components such as:

· Driver: Driver manages the lifecycle of a HiveQL statement as it moves through Hive and also maintains a session handle for session statistics.

· Metastore: Metastore stores the system catalog and metadata about tables, columns, partitions, and so on.

· Query Compiler: It compiles HiveQL into a DAG of optimized map/reduce tasks.

· Execution Engine: It executes the tasks produced by the compiler in a proper dependency order. The execution engine interacts with the underlying Hadoop instance.

· HiveServer2: It provides a thrift interface and a JDBC/ODBC server and provides a way of integrating Hive with other applications and supports multi-client concurrency and authentication.

· Client components such as the Command Line Interface (CLI), the web UI, and drivers. The drivers are the JDBC/ODBC drivers provided by vendors and other appropriate drivers.

The process flow of HiveQL is described here:

· A HiveQL statement can be submitted from the CLI, the web UI, or an external client using interfaces such as thrift, ODBC, or JDBC.

· The driver first passes the query to the compiler where it goes through the typical parse, type check, and semantic analysis phases, using the metadata stored in the Metastore.

· The compiler generates a logical plan which is then optimized through a simple rule-based optimizer. Finally, an optimized plan in the form of a DAG of MapReduce tasks and HDFS tasks is generated. The execution engine then executes these tasks in the order of their dependencies by using Hadoop.

Let us check more details on the Metastore, the Query Compiler, and the Execution Engine.

Metastore

The Metastore stores all the details about the tables, partitions, schemas, columns, types, and so on. It acts as a system catalog for Hive. It can be called from clients from different programming languages, as the details can be queried using Thrift. Metastore is very critical for Hive without which the structure design details cannot be retrieved and data cannot be accessed. Hence, Metastore is backed up regularly.

Metastore can become a bottleneck in Hive, so an isolated JVM process is advised with a local JDBC database like MySQL. Hive ensures that Metastore is not directly accessed by Mappers and Reducers of a job; instead it is passed through an xml plan that is generated by the compiler and contains information that is needed at runtime.

The Query compiler

The query compiler uses the metadata stored by Metastore to process the HiveQL statements to generate an execution plan. The query compiler performs the following steps:

· Parse: The query compiler parses the statement.

· Type checking and semantic analysis: In this phase, the compiler uses the metadata to check the type compatibility in expressions and semantics of the statement. After the checks are validated and no errors are found, the compiler builds a logical plan for the statement.

· Optimization: The compiler optimizes the logical plan and creates a DAG to pass the result of one chain to the next and tries to optimize the plan by applying different rules, if possible, for logical steps.

The Execution engine

The execution engine executes the optimized plan. It executes the plan step by step, considering the dependent task to complete for every task in the plan. The results of tasks are stored in a temporary location and in the final step the data is moved to the desired location.

Data types and schemas

Hive supports all the primitive numeric data types such as TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, and DECIMAL. In addition to these primitive data types, Hive also supports string types such as CHAR, VARCHAR, and STRING data types. Like SQL, time indicator data types such as TIMESTAMP and DATE are present. The BOOLEAN and BINARY miscellaneous types are available too.

A number of complex data types are also available. The complex types can be composed from other primitive or complex types. The complex types available are:

· STRUCT: These are groupings of data elements similar to a C-struct. The dot notation is used to dereference elements within a struct. A field within column C defined as a STRUCT {x INT, y STRING} can be accessed as A.x or A.y.

Syntax: STRUCT<field_name : data_type>

· MAP: These are key value data types. Providing the key within square braces can help access a value. A value of a map column M that maps from key x to value y can be accessed by M[x].There is no restriction on the type stored by the value, though the key needs to be of a primitive type.

Syntax: MAP<primitive_type, data_type>

· ARRAY: These are lists that can be randomly accessed through their position. The syntax to access an array element is the same as a map. But what goes into the square braces is a zero-based index of the element.

Syntax: ARRAY<data_type>

· UNION: There is a union type available in Hive. It can hold an element of one of the data types specified in the union.

Syntax: UNIONTYPE<data_type1, data_type2…>

Installing Hive

Hive can be installed by downloading and unpacking a tarball, or you can download the source code and build Hive using Maven (release 0.13 and later) or Ant (release 0.12 and earlier).

The Hive installation process has these requirements:

· Java 1.7 (preferred) or Java 1.6

· Hadoop 2.x (preferred) or 1.x. Hive versions up to 0.13 but it also supports 0.20.x or 0.23.x

· Hive is commonly used in production in Linux and Windows environments

Start by downloading the most recent stable release of Hive from one of the Apache download mirrors (see Hive Releases).

Next, you need to unpack the tarball. This will result in the creation of a subdirectory named hive-x.y.z (where x.y.z is the release number):

$ tar -xzvf hive-x.y.z.tar.gz

Set the environment variable HIVE_HOME to point to the installation directory:

$ cd hive-x.y.z

$ export HIVE_HOME={{pwd}}

Finally, add $HIVE_HOME/bin to your Path:

$ export PATH=$HIVE_HOME/bin:$PATH

Starting Hive shell

For using Hive shell, we should follow these steps:

1. The user must create /tmp and /user/hive/warehouse and set them as chmod g+w in HDFS before a table can be created in Hive. The commands to perform this setup are these:

2. $HADOOP_HOME/bin$ ./hadoop dfs -mkdir /tmp

3. $HADOOP_HOME/bin$ ./hadoop dfs -mkdir /user/hive/warehouse

4. $HADOOP_HOME/bin$ ./hadoop dfs -chmod g+w /tmp

5. $HADOOP_HOME/bin$ ./hadoop dfs -chmod g+w /user/hive/warehouse

6. To use the Hive command-line interface (cli) from the shell, use the following script:

7. $HIVE_HOME/bin$ ./hive

HiveQL

HiveQL has a wide range of Hive built-in operators, Hive built-in functions, Hive built-in aggregate functions, UDF, and UDAF for user-defined functions.

DDL (Data Definition Language) operations

Let's start with the DDL operation commands which are:

· Create database: The Create database command is used for creating a database in Hive. Example:

· hive> Create database shiva;

· OK

· Time taken: 0.764 seconds

· Show database: The Show database command is used to list down all the existing databases in Hive. Example:

· hive> show databases;

·

· OK

· default

· shiva

· Time taken: 4.458 seconds, Fetched: 2 row(s)

· Use database: The Use database command is used to select a database for the session. Example:

· hive> use shiva;

· Create table: The Create table is used to create a Hive table. In the create table command, we can specify whether a table is Managed or External, if it requires partitioning, bucketing, and other important features in the Hive table. An example of a simple "create table" option is this:

· hive> Create table person (name STRING , add STRING);

The Create table command has many options which we will see in the create table section given next. The preceding command is the simplest form of table creation in Hive.

The Create command has a lot of options for specific cases and its requirements are:

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name

[(col_name data_type [COMMENT col_comment], ...)]

[COMMENT table_comment]

[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]

[CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]

[ROW FORMAT row_format] [STORED AS file_format]

[LOCATION hdfs_path]

[TBLPROPERTIES (property_name=property_value, ...)]

[AS select_statement]

· Create table: This command creates a table with the given table name and the following options explained:

o IF NOT EXISTS: This command is used to skip the error if a table or view with the same name already exists.

o The EXTERNAL keyword: As we discussed earlier, this command allows us to create a table and we have to provide a LOCATION for this.

o ROW FORMAT: We can use custom SerDe or native SerDe while creating a table. A native SerDe is used if ROW FORMAT is not specified or ROW FORMAT DELIMITED is specified. You can use the DELIMITED clause to read delimited files.

o STORED AS: We can use TEXTFILE if the data needs to be stored as plain text files. Use STORED AS SEQUENCEFILE if the data needs to be compressed.

o PARTITIONED BY: Partitioned tables can be created using the PARTITIONED BY clause.

o CLUSTERED BY: Further, tables or partitions can be bucketed using columns, and data can be sorted within that bucket via SORT BY columns. This can improve the performance of certain kinds of queries.

o TBLPROPERTIES: This clause allows you to tag the table definition with your own metadata key/value pairs.

· Show tables: The Show tables command is used to list all the tables present in the database:

· hive>Show tables;

· OK

· person

· Time taken: 0.057 seconds, Fetched: 1 row(s)

·

· hive>Show tables '.*n';-- List all the table end that end with s.

·

· OK

· person

· Time taken: 0.057 seconds, Fetched: 1 row(s)

· Describe table: The describe table command is used to get useful information about the table and the columns.

· hive> describe person;

· OK

· name string None

· add string None

· Time taken: 0.181 seconds, Fetched: 2 row(s)

· Alter table: The Alter table command is used to change table metadata and to add partitioning or bucketing.

· hive> Alter table person ADD COLUMNS (PNO INT);

· OK

· Time taken: 0.334 seconds

· Drop table: The drop table command is used to remove the table from Hive metadata; if the table is Hive-managed, then this command will also remove the data, and if it's external then only the Hive metadata is removed.

· hive>drop table person;

DML (Data Manipulation Language) operations

Now, let's look at the DML operation commands:

Load data: A file in Hive can be loaded from local, as well as from HDFS; by default Hive will look in HDFS.

The input data we are using is simple personal data having Name, add, and pno; a sample of this data is like this:

Name

add

pno

Alvin Joyner

678-8957 Nisi Avenue

1

Jasper G. Robertson

8336 Tincidunt Av.

2

Deirdre Fulton

624-9370 Nisl. Street

3

Hillary Craig

Ap #198-3439 Id Av.

4

Blaze Carr

Ap #283-9985 Purus Road

5

Look at the following command:

hive>LOAD DATA INPATH 'hdfs://localhost:9000/user/hive/shiva/PersonData.csv' OVERWRITE INTO TABLE person;

Loading data to table shiva.person

OK

Time taken: 0.721 seconds

The preceding command will load data from an HDFS file/directory to the table, and the process of loading data from HDFS will result in moving the file/directory.

For Local Data load, use the following code:

hive>LOAD DATA LOCAL INPATH './examples/shiva/file1.txt' OVERWRITE INTO TABLE person;

We can also load the data with PARTITION:

hive>LOAD DATA LOCAL INPATH './examples/ shiva /file2.txt'

OVERWRITE INTO TABLE person PARTITION (date='26-02-2014');

The SQL operation

Querying the data in Hive can be done as shown in the following sections:

SELECT: SELECT is the projection operator in SQL. The clauses used for this function are:

· SELECT scans the table specified by the FROM clause

· WHERE gives the condition of what to filter

· GROUP BY gives a list of columns which then specify how to aggregate the records

· CLUSTER BY, DISTRIBUTE BY, and SORT BY specify the sort order and algorithm

· LIMIT specifies the # of records to retrieve:

· SELECT [ALL | DISTINCT] select_expr, select_expr,

· FROM table_reference

· [WHERE where_condition]

· [GROUP BY col_list]

· [HAVING having_condition]

· [CLUSTER BY col_list | [DISTRIBUTE BY col_list] [SORT BY col_list]]

· [LIMIT number];

Example:

hive>select * from person where name = 'Alvin Joyner';

Total MapReduce jobs = 1

Launching Job 1 out of 1

Number of reduce tasks is set to 0 since there's no reduce operator

Starting Job = job_201503051113_2664, Tracking URL = http://machine76.bigdatadomain.com:50030/jobdetails.jsp?jobid=job_201503051113_2664

Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0

2015-03-24 14:52:54,541 Stage-1 map = 0%, reduce = 0%

2015-03-24 14:52:58,570 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 2.57 sec

2015-03-24 14:52:59,579 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.57 sec

MapReduce Total cumulative CPU time: 2 seconds 570 msec

Ended Job = job_201503051113_2664

MapReduce Jobs Launched:

Job 0: Map: 1 Cumulative CPU: 2.57 sec HDFS Read: 4502 HDFS Write: 0 SUCCESS

Total MapReduce CPU Time Spent: 2 seconds 570 msec

OK

Time taken: 12.53 seconds

Joins

HiveQL supports the following types of joins:

· JOIN

· LEFT OUTER JOIN

· RIGHT OUTER JOIN

· FULL OUTER JOIN

Only equi join is supported in HiveQL; non-equality condition joins cannot be executed. The default join option in HiveQL is equi join, whereas in SQL the default is inner join; one syntactic difference in also present which is we have to mention LEFT OUTER JOIN and RIGHT OUTER JOIN whereas in SQL LEFT JOIN and RIGHT JOIN works.

HiveQL is converted into MapReduce jobs, hence we have to design the query keeping the MapReduce paradigm in mind. The joins are executed as Mapside join or reduce side join depending on the parser and the optimization plan, hence the thumb rule is to join the smaller tables earlier to avoid the huge amount of data transfer or process and join the larger table at the last. The reason behind this, is that in every MapReduce stage of the joint, the last table is streamlined through the reducers; whereas the others are buffered.

Example:

Hive> SELECT a.val1, a.val2, b.val, c.val

> FROM a

> JOIN b ON (a.key = b.key)

> LEFT OUTER JOIN c ON (a.key = c.key);

As mentioned in the Hive wiki, the following conditions are not supported:

· Union followed by a MapJoin

· Lateral View followed by a MapJoin

· Reduce Sink (Group By/Join/Sort By/Cluster By/Distribute By) followed by MapJoin

· MapJoin followed by Union

· MapJoin followed by Join

· MapJoin followed by MapJoin

Aggregations

HiveQL supports aggregations and also allows for multiple aggregations to be done at the same time. The possible aggregators are:

· count(*), count(expr), count(DISTINCT expr[, expr_.])

· sum(col), sum(DISTINCT col)

· avg(col), avg(DISTINCT col)

· min(col)

· max(col)

Example:

hive> SELECT a, sum(b) FROM t1

> GROUP BY a;

Hive also supports map-side aggregation for Group By for improving the performance but would require more memory. If we set hive.map.aggr as true (the default is false), then Hive will do the first-level aggregation directly in the map task.

hive> set hive.map.aggr=true;

hive> SELECT COUNT(*) FROM table2;

Built-in functions

Hive has numerous built-in functions and some of its widely used functions are:

· concat(string A, string B,...)

· substr(string A, int start)

· round(double a)

· upper(string A), lower(string A)

· trim(string A)

· to_date(string timestamp)

· year(string date), month(string date), day(string date)

Custom UDF (User Defined Functions)

We can create our own Custom UDF functions and use it in Hive queries. Hive provides an interface for user-defined functions where custom functions can be written in Java and deployed, which can be used as a function in HiveQL. The steps to be performed for Custom UDF are these:

1. Create a new Java class that extends UDF with one or more methods named evaluate:

2. import org.apache.hadoop.hive.ql.exec.UDF;

3. import org.apache.hadoop.io.Text;

4. public class LowerUDF extends UDF

5. {

6. public Text evaluate(final Text s)

7. {

8. if (s == null) { return null; }

9. return new Text(s.toString().toLowerCase());

10. }

}

11.Now, compile the function and make jar.

12.Deploy jars for user-defined functions:

13.hive> add jar my_jar.jar;

Added my_jar.jar to class path

14.Once Hive has started with your jars in the classpath, the final step is to register your function:

15.create temporary function my_lowerUDF as 'Lower';

16.Now, you can start using it.

17.hive> select my_lowerUDF(title), sum(freq) from titles group by my_lowerUDF(title);

Managing tables – external versus managed

Hive has the flexibility to manage only metadata or metadata along with the data. In Hive the two types of data management are:

· Managed: Metadata along with data will be managed by Hive.

· External: Only metadata will be stored and managed by Hive.

The managed table in Hive should be used if we want Hive to manage the lifecycle of the table, and data should be used in the case of a temporary table.

The advantages of using an external table are:

· We can use a custom location like HBase, Cassandra, and so on.

· Data can be processed by the other system which can avoid locking, while processing and improving the performance

· In the DROP table command, only the metadata will be deleted and the data will not be deleted.

SerDe

One of the important benefits of using Hadoop is its flexibility to store and provide interfaces to process semi-structured and unstructured data. Hive can also be used for processing this data; Hive does it due to its complex data types and SerDe properties. SerDe is a Serializer and Deserializer interface which can allow marshalling and unmarshalling of string or binary data in Java objects, which can be used by Hive for reading and writing in tables. Hive has some built-in SerDe libraries such as Avro, ORC, RegEx, Thrift, Parquet, and CSV; it also has a third party SerDe like that of JSON SerDe provided by Amazon.

We can also write our custom SerDe. For writing a custom SerDe class, we have to override some methods:

· public void initialize (Configuration conf, Properties tbl) throws SerDeException: The initialize() method is called only once and we can get and set some commonly used information from the table properties such as column types and names.

· public Writable serialize (Object obj, ObjectInspector oi) throws SerDeException: The serialize() method should have the logic of seralization that takes a Java object representing a row of data and generates a writable interface object which can be serialized.

· public Class<? extends Writable> getSerializedClass (): The getSerializedClass() returns the return type class of the serialized object.

· public Object deserialize (Writable blob) throws SerDeException: The deserialize() should have the deserialization logic.

· public ObjectInspector getObjectInspector () throws SerDeException: The ObjectInspectors are Hive objects that are used to describe and examine complex type hierarchies.

· public SerDeStats getSerDeStats(): They override to support some statistics.

Let's look at a code for implementing Custom SerDe:

public class CustomSerDe implements SerDe {

private StructTypeInfo rowTypeInfo;

private ObjectInspector rowOI;

private List<String> colNames;

Object[] outputFields;

Text outputRowText;

private List<Object> row = new ArrayList<Object>();

@Override

public void initialize(Configuration conf, Properties tbl)throws SerDeException {

// Get a list of the table's column names.

String colNamesStr = tbl.getProperty(Constants.LIST_COLUMNS);

colNames = Arrays.asList(colNamesStr.split(","));

// Get a list of TypeInfos for the columns. This list lines up with

// the list of column names.

String colTypesStr = tbl.getProperty(Constants.LIST_COLUMN_TYPES);

List<TypeInfo> colTypes = TypeInfoUtils.getTypeInfosFromTypeString(colTypesStr);

rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(colNames, colTypes);

rowOI = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);

}

@Override

public Object deserialize(Writable blob) throws SerDeException {

row.clear();

// Implement the logic of Deserialization

return row;

}

@Override

public ObjectInspector getObjectInspector() throws SerDeException {

return rowOI;

}

@Override

public SerDeStats getSerDeStats() {

return null;

}

@Override

public Class<? extends Writable> getSerializedClass() {

return Text.class;

}

@Override

public Writable serialize(Object obj, ObjectInspector oi)

throws SerDeException {

// Implement Logic of Serialization

return outputRowText;

}

}

We have to create a jar file of the class and put it in the Hive server. We can then use the SerDe while creating the table, as shown in the following code:

CREATE EXTERNAL TABLE IF NOT EXISTS my_table (field1 string, field2 int, field3 string, field4 double)

ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2. CustomSerDe' LOCATION '/path-to/my_table/';

Partitioning

Hive supports partitioning of data, which can be used for distributing data horizontally. For example, if we have a large transaction table which frequently queries with respect to a year or range of months, then we can partition the table with PARTITIONED BY (year INT, month INT) while creating one.

Hive manages the data by creating subdirectories as the structure of partition fields such as:

/DB/Table/Year/Month/.

/db/table/2014/11/.

/db/table/2014/12/.

/db/table/2015/1/.

/db/table/2015/2/.

Partitioning works on managed and external tables and is advised for very large tables which can limit the files to be processed and provide a huge advantage to improve the performance.

Partitioning should be done carefully as, it can have the following downsides:

· If partition columns are not selected properly, then it can unevenly divide the data and query execution will not be optimized.

· If the partition hierarchy levels become high, then the recursively scanning the directories will be more expensive than full data scan.

Bucketing

We just discussed the fact about partitioning that it can unevenly distribute the data, but usually it is very less likely to get even distribution. But, we can achieve almost even distributed data for processing using bucketing. Bucketing has a value of data into a bucket due to which the same value records can be present in the same bucket, and a bucket can have multiple groups of values. Bucketing provides control to a number of files, as we have to mention the number of buckets while using bucketing in create table using CLUSTERED BY (month) INTO #noofBuckets BUCKETS.

For even distribution of data, we should set hive.enforce.bucketing = true. Bucketing is ideal for aiding map-side joins as due to the same value data present in buckets, Merge Sort will be much faster and more efficient. It can be used with or without partitioning.

Summary

In this chapter, we have explored two wrappers of MapReduce programming–Pig and Hive.

MapReduce is very powerful but a very complex high learning curve. The difficult part is to manage the MapReduce programs and the time taken for the development and optimizations. For easier and faster development in MapReduce, we have abstraction layers such as Pig, which is a wrapper of the Pig Latin procedural language on top of MapReduce, and Hive which is a SQL-like HiveQL wrapper.

Pig is used in the data flow model, as it uses the DAG model to transform the Pig Latin language to the MapReduce job. Pig does the transformation in three plans, namely Logical to Physical to MapReduce, where each plan translates the statements and produces an optimized plan of execution. Pig also has the grunt mode for analyzing data interactively. Pig has very useful commands to filter, group, aggregate, cogroup, and so on, and it also supports user-defined functions.

Hive is used by users who are more comfortable in SQL-like development as it has HiveQL. The Hive architecture contains Driver, Metastore, Query compiler, Execution engine, and HiveServer. HiveQL has an exhaustive list of built-in functions and commands to analyze the data. Hive has many in-built functions and also supports user-defined functions.

In the next chapter, we will cover one of the most important components to know in Hadoop. It is a non relational distributed database which gives a high throughput and performance; we call it as HBase.