Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL/Hive Query Takes Forever With Join

So I'm doing something that should be simple, but apparently it's not in Spark SQL.

If I run the following query in MySQL, the query finishes in a fraction of a second:

SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;

However, running the same query in HiveContext under Spark (1.5.1) takes more than 13 seconds. Adding more joins makes the query run for a very very long time (over 10 minutes). I'm not sure what I'm doing wrong here and how I can speed things up.

The tables are MySQL tables that are loaded into the Hive Context as temporary tables.This is running in a single instance, with the database on a remote machine.

  • user table has about 4.8 Million rows.
  • user_address table has 350,000 rows.

The tables have foreign key fields, but no explicit fk relationships is defined in the db. I'm using InnoDB.

The execution plan in Spark:

Plan:

Scan JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, {user=, password=, url=jdbc:mysql://, dbtable=user}) [address_id#0L,user_address_id#27L]

Filter (user_id#0L = 123) Scan JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,{user=, password=, url=jdbc:mysql://, dbtable=user_address})[address_id#52L]

ConvertToUnsafe ConvertToUnsafe

TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0

SortMergeJoin [user_address_id#27L], [address_id#52L]

== Physical Plan == TungstenProject [address_id#0L]

like image 998
Ali B Avatar asked Mar 15 '23 00:03

Ali B


2 Answers

First of all type of query you perform is extremely inefficient. As for now (Spark 1.5.0*) to perform join like this, both tables has to be shuffled / hash-partitioned each time query is executed. It shouldn't be a problem in case of users table where user_id = 123 predicate is most likely pushed-down but still requires full shuffle on user_address.

Moreover, if tables are only registered and not cached, then every execution of this query will fetch a whole user_address table from MySQL to Spark.

I'm not sure what I'm doing wrong here and how I can speed things up.

It is not exactly clear why you want to use Spark for application but single machine setup, small data and type of queries suggest that Spark is not a good fit here.

Generally speaking if application logic requires a single record access then Spark SQL won't perform well. It is designed for analytical queries not as a OLTP database replacement.

If a single table / data frame is much smaller you could try broadcasting.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast

val user: DataFrame = ???
val user_address: DataFrame = ???

val userFiltered = user.where(???)

user_addresses.join(
  broadcast(userFiltered), $"address_id" === $"user_address_id")

* This should change in Spark 1.6.0 with SPARK-11410 which should enable persistent table partitioning.

like image 78
zero323 Avatar answered Mar 20 '23 11:03

zero323


I have had the same problem in a similar situation (Spark 1.5.1, PostgreSQL 9.4).

Given the two tables like

val t1 = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "t1")).load()

val t2 = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "t2")).load()

then the join in HQL over the registered temporary tables results in a full table scan over one of the tables (in my case it was the child).

Anyway a workaround is to push the query to the underlying RDBMS:

val joined = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "(select t1.*, t2.* from t1 inner join t2 on ...) as t")).load()

This way the query optimizer of the underlying RDBMS kicks in, and in my case it switched to index scans. Spark on the other hand pushed down two independent queries, and a RDBMS can't really optimize this.

like image 42
Beryllium Avatar answered Mar 20 '23 12:03

Beryllium