Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to specify multiple tables in Spark SQL?

I have a code where I need to set three tables. To do that I need to call jdbc function three times for each table. See code below

val props = new Properties
    props.setProperty("user", "root")
    props.setProperty("password", "pass")

val df0 = sqlContext.read.jdbc(
  "jdbc:mysql://127.0.0.1:3306/Firm42", "company", props)

val df1 = sqlContext.read.jdbc(
  "jdbc:mysql://127.0.0.1:3306/Firm42", "employee", props)

val df2 = sqlContext.read.jdbc(
  "jdbc:mysql://127.0.0.1:3306/Firm42", "company_employee", props)

df0.registerTempTable("company")
df1.registerTempTable("employee")
df2.registerTempTable("company_employee")

val rdf = sqlContext.sql(
  """some_sql_query_with_joins_of_various_tables""".stripMargin)

rdf.show

Is it possible to simplify my code? Or maybe there is some way to specify multiple tables somewhere in SQL configuration.

like image 366
Finkelson Avatar asked Jul 06 '16 23:07

Finkelson


People also ask

How do I join multiple tables in Spark?

In order to explain join with multiple tables, we will use Inner join, this is the default join in Spark and it's mostly used, this joins two DataFrames/Datasets on key columns, and where keys don't match the rows get dropped from both datasets.

How do I select multiple columns in Spark?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

How do I connect two big DataFrames in Spark?

Spark uses SortMerge joins to join large table. It consists of hashing each row on both table and shuffle the rows with the same hash into the same partition. There the keys are sorted on both side and the sortMerge algorithm is applied. That's the best approach as far as I know.

What does take () do in Spark?

In Spark, the take function behaves like an array. It receives an integer value (let say, n) as a parameter and returns an array of first n elements of the dataset.


1 Answers

DRY:

val url = "jdbc:mysql://127.0.0.1:3306/Firm42"
val tables =  List("company", "employee", "company_employee")

val dfs = for {
    table <- tables
} yield (table, sqlContext.read.jdbc(url, table, props))

for {
  (name, df) <- dfs
} df.registerTempTable(name) 

Don't need data frames? Skip first loop:

for {
  table <- tables
} sqlContext.read.jdbc(url, table, props).registerTempTable(table) 
like image 151
2 revsuser6022341 Avatar answered Nov 06 '22 01:11

2 revsuser6022341