Apache Hive Tutorial Part V

In this post we will understand practical usage of User Defined Functions (UDF) in HIVE

So far we have seen how to create table where input data would have proper delimited columns. For instance, having tab or comma separated values for each column.

However, in the Big Data world, this would not always be the case, we would often have complex data, like server logs. In this post first we will find out how Hive allows us to ingest complex data.

To process the complex data, Hive uses SerDe

SerDe means Serialization and De-Serialization

Hive uses SerDe to: Read and Write Data

When we specify default delimiter, even then Hive uses SerDe, the defaut one. 

The default SerDe is chosen when we specify “Row Format Delimited Fields Separated by ….”

Now, suppose our input data is as below:

As you can see we have different column separator for each column. Therefore, we can’t use default SerDe.

For the above we can use Regex-SerDe

We create the table like we normally do:

CREATE TABLE IF NOT EXISTS employees
(id INT, 
name STRING,
salary INT, 
age INT)

Then we choose Regex SerDe:

ROW FORMAT SERDE  'org.apache.hadoop.hive.serde2.RegexSerDe'

Then we specify how are the fields separated:

WITH SERDEPROPERTIES
(
"input.regex" = "(\S+)::(\S+)--(\S+),(\S+)",
"output.format.string" = "%1$d %2$s %3$d %4$d"
)

In this way we can use Regex SerDe

Now, lets take an example data set and use Joins and Regex SerDe to see the effect

I am going to be using the two data sets, movies.dat and rating.dat from: github

First we create the movies table

CREATE TABLE IF NOT EXISTS hivemovies (id STRING, name STRING,genre STRING )
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
 "input.regex" = "(\S+)::(.+)::(.+)",
 "output.format.string" = "%1$s %2$s %3$s")

Then we move movies.dat to hdfs and load in to the above table

$ hdfs dfs -put movies.dat 

hive> LOAD DATA INPATH 'movies.dat' OVERWRITE INTO TABLE hivemovies;

Similarly we load ratings table:

hive> CREATE TABLE IF NOT EXISTS hiveratings (userr STRING, id STRING, 
      rate STRING, time STRING )
      ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
      WITH SERDEPROPERTIES (
        "input.regex" = "(\S+)::(\S+)::(\S+)::(\S+)",
        "output.format.string" = "%1$s %2$s %3$s %4$s");


hive> LOAD DATA INPATH 'ratings.dat.gz' OVERWRITE INTO TABLE 
      hiveratings;

Q) Lets find 10 movies having more than 200 reviews

SELECT m.name, COUNT(r.rate) AS cnt
FROM hivemovies m LEFT OUTER JOIN hiveratings r
ON (m.id = r.id)
GROUP BY m.name
HAVING cnt > 200
LIMIT 10;

The output is as shown above after the map reduce job has run.

We can also find movies with highest average ratings.

SELECT m.name, AVG(r.rate) aver
FROM hivemovies m LEFT OUTER JOIN hiveratings r
ON (m.id = r.id)
GROUP BY m.name
ORDER BY aver DESC
LIMIT 10;

 



Now lets work with Hive UDF

Suppose we have a large text file and we need to do a simple word count. The tricky part is to remove the unnecessary punctuation from the text and convert all text to lower care. This we can easily do from Hive UDF.

We shall use Eclipse IDE for this.

1. Create a new Java Project, lets call it “HiveUDFProject” in eclipse.

2. Add Hadoop and Hive Jars

Click on the project, and press Alt+Enter key, which will open Properties. Then select Java Build Path and click add external jars.

Then go to Hive installation and select all Jars under lib folder.

Similarly, add hadoop jars, select all jars under hadoop home, share> hadoop> common directory, and all

jars under hadoop home, share> hadoop> common > lib

 

3. Now we create a new Package, lets call it “HiveUDFPackage”

4. Next, we need to create a new Java class, lets call it “HivePunc”. It needs to override hive UDF class.

for this we will import:

org.apache.hadoop.hive.ql.exec.UDF;

4. Next we need to override “evaluate” method. First we need to know what is going to be the input to this UDF and what output we expect. Here we will pass the strings read from the text file and return string as well.

However, we are working in Hadoop, here we have data types as Writables. For strings we use “Text”.

As “Text” is immutable, we can’t modify it, instead we use “set” method to assign values to it.

package HiveUDFPackage;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.Text;

public class HivePunc extends UDF {

public Text evaluate(Text inputdata) throws HiveException{ 
 
   Text newtext = new Text("");
 
   if (inputdata != null) {
     try { 
        newtext.set(inputdata.toString().replaceAll("[^a-zA-Z0-9 ]", "").toLowerCase());
     } 
     catch (Exception e) { 
        throw new HiveException(e.getMessage());
     }
   }
  return newtext;
 }
}

Once the logic is ready with no obvious errors, we can create a Jar file. Lets name it as “HivePuncUDF.jar“.

First we will be loading the text file in to a Hive table, the data is not manipulated at this stage.

CREATE TABLE IF NOT EXISTS SHAKEHIVE (line STRING)
ROW FORMAT DELIMITED
LINES TERMINATED BY 'n';

LOAD DATA INPATH 'shakespeare.txt' OVERWRITE INTO TABLE SHAKEHIVE;

Now, we want to get the word count in to a new file in local directory.

First, from Hive shell, we need to add the newly created Jar.

hive> ADD JAR /home/sumit/Desktop/workspace/HivePuncUDF.jar;

 

After that we can directly refer to the UDF using “package.class” or we can create a Function with a suitable name, like as shown:

hive>  CREATE     FUNCTION     HivePunc    AS          
       'HiveUDFPackage.HivePunc';

Now, we can refer to the UDF and do a word count. We can get the output out of Hive to a local directory, but instead if we remove “local” keyword, the output will be in HDFS.

INSERT OVERWRITE LOCAL DIRECTORY '/home/sumit/Desktop/Wordcounts'
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY 't'
 LINES TERMINATED BY 'n'
 STORED AS TEXTFILE
SELECT a.word, count(1) AS COUNT
FROM
 (SELECT explode(split(HivePunc(line), ' ')) AS word FROM SHAKEHIVE) a
   group by a.word
ORDER BY COUNT desc;

Once the mapreduce job completes, we can check the output at the specified location:

If Nulls appear in output, we can use NVL function to find which ones and take appropriate action.

Further, as we read entire data as “Text” or Strings, we can use CAST function to convert them to Integers or other types,

example: CAST(salary AS INT)