Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Compute differences between succesive records in Hadoop with Hive Queries

Tags:

hadoop

hive

I have a Hive table that holds data of customer calls. For simplicity consider it has 2 columns, first column holds the customer ID and the second column holds the timestamp of the call (unix timestamp).

I can query this table to find all the calls for each customer:

SELECT * FROM mytable SORT BY customer_id, call_time;

The result is:

Customer1    timestamp11
Customer1    timestamp12
Customer1    timestamp13
Customer2    timestamp21
Customer3    timestamp31
Customer3    timestamp32
...

Is it possible to create a Hive query that returns, for each customer, starting from the second call, the time interval between two succesive calls? For the above example that query should return:

Customer1    timestamp12-timestamp11
Customer1    timestamp13-timestamp12
Customer3    timestamp32-timestamp31
...

I have tried to adapt the solutions from the sql solution, but I'm stuck with the Hive limitations: it accepts subqueries only in FROM and joins must contain only equalities.

Thank you.

EDIT1:

I have tried to use a Hive UDF function:

public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;

public String evaluate(String customerId, LongWritable callTime) {
    long callTimeValue = callTime.get();
    String timeDifference = null;

    if (customerId.equals(previousCustomerId)) {
        timeDifference = new Long(callTimeValue - previousCallTime).toString();
    }

    previousCustomerId = customerId;
    previousCallTime = callTimeValue;

    return timeDifference;
}}

and use it with the name "delta".

But it seems (from the logs and result) that it is being used at MAP time. 2 problems arise from this:

First: The table data must be sorted by Customer ID and timestamp BEFORE using this function. The query:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;

does not work because the sorting part is performed at REDUCE time, long after my function is being used.

I can sort the table data before using the function, but I'm not happy with this because it is an overhead I hope to avoid.

Second: In case of a distributed Hadoop configuration, data is split among the available job trackers. So I believe there will be multiple instances of this function, one for each mapper, so it is possible to have the same customer data split between 2 mappers. In this case I will lose customer calls, which is not acceptable.

I don't know how to solve this issue. I know that DISTRIBUTE BY ensures that all data with a specific value is sent to the same reducer (thus ensuring that SORT works as expected), does anybody know if there is something similar for the mapper?

Next I plan to follow libjack's suggestion to use a reduce script. This "computation" is needed between some other hive queries, so I want to try everything Hive offers, before moving to another tool, as suggested by Balaswamy vaddeman.

EDIT2:

I started to investigate the custom scripts solution. But, in the first page of chapter 14 in Programming Hive book (this chapter presents the custom scripts), I found the following paragraph:

Streaming is usually less efficient than coding the comparable UDFs or InputFormat objects. Serializing and deserializing data to pass it in and out of the pipe is relatively inefficient. It is also harder to debug the whole program in a unified manner. However, it is useful for fast prototyping and for leveraging existing code that is not written in Java. For Hive users who don’t want to write Java code, it can be a very effective approach.

So it was clear that the custom scripts is not the best solution in terms of efficiency.

But how should I keep my UDF function, but make sure it works as expected in a distributed Hadoop configuration? I found the answer to this question in the UDF Internals section of the Language Manual UDF wiki page. If I write my query:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

it is executed at REDUCE time and DISTRIBUTE BY and SORT BY constructs guarantee that all the records from the same customer are being processed by the same reducer, in order of calls.

So the above UDF and this query construct solve my problem.

(Sorry for not adding the links, but I'm not allowed to do it because I don't have enough reputation points)

like image 525
Cipi Avatar asked Feb 01 '13 14:02

Cipi


People also ask

How will you query the data in Hive explain?

Hive supports subqueries in FROM clauses and WHERE clauses that you can use for many Hive operations, such as filtering data from one table based on contents of another table. You use AVG, SUM, or MAX functions to aggregate data, and the GROUP BY clause to group data query results in one or more table columns..

How Hadoop and Hive work together?

Hive allows users to read, write, and manage petabytes of data using SQL. Hive is built on top of Apache Hadoop, which is an open-source framework used to efficiently store and process large datasets. As a result, Hive is closely integrated with Hadoop, and is designed to work quickly on petabytes of data.

How do I run multiple queries in Hive?

Whenever a user requirement is to run single or multiple queries (separated by semicolon) on Hive CLI with terminating the Hive shell as soon as the query got fired one can use the -e option with Hive to enable this functionality.

What is the difference between order by sort by and distribute by?

SORT BY x : orders data at each of N reducers, but each reducer can receive overlapping ranges of data. You end up with N or more sorted files with overlapping ranges. DISTRIBUTE BY x : ensures each of N reducers gets non-overlapping ranges of x , but doesn't sort the output of each reducer.


2 Answers

It's an old question, but for future references, I write here another proposition:

Hive Windowing functions allows to use previous / next values in your query.

A similar code query may be :

SELECT customer_id, call_time - LAG(call_time, 1, 0) OVER (PARTITION BY customer_id ORDER BY call_time) FROM mytable;
like image 176
jbaptiste Avatar answered Sep 19 '22 16:09

jbaptiste


You can use explicit MAP-REDUCE with other programming language like Java or Python. Where emit from map {cutomer_id,call_time} and in reducer you will get {customer_id,list{time_stamp}} and in reducer you can sort these time stamps and can process the data.

like image 34
mat_vee Avatar answered Sep 18 '22 16:09

mat_vee