Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala & Spark: Recycling SQL statements

I spent quite some time to code multiple SQL queries that were formerly used to fetch the data for various R scripts. This is how it worked

sqlContent = readSQLFile("file1.sql")
sqlContent = setSQLVariables(sqlContent, variables)
results = executeSQL(sqlContent)

The clue is, that for some queries a result from a prior query is required - why creating VIEWs in the database itself does not solve this problem. With Spark 2.0 I already figured out a way to do just that through

// create a dataframe using a jdbc connection to the database
val tableDf = spark.read.jdbc(...)
var tempTableName = "TEMP_TABLE" + java.util.UUID.randomUUID.toString.replace("-", "").toUpperCase
var sqlQuery = Source.fromURL(getClass.getResource("/sql/" + sqlFileName)).mkString
sqlQuery = setSQLVariables(sqlQuery, sqlVariables)
sqlQuery = sqlQuery.replace("OLD_TABLE_NAME",tempTableName)
tableDf.createOrReplaceTempView(tempTableName) 
var data = spark.sql(sqlQuery)

But this is in my humble opinion very fiddly. Also, more complex queries, e.g. queries that incooporate subquery factoring currently don't work. Is there a more robust way like re-implementing the SQL code into Spark.SQL code using filter($""), .select($""), etc.

The overall goal is to get multiple org.apache.spark.sql.DataFrames, each representing the results of one former SQL query (which always a few JOINs, WITHs, etc.). So n queries leading to n DataFrames.

Is there a better option than the provided two?

Setup: Hadoop v.2.7.3, Spark 2.0.0, Intelli J IDEA 2016.2, Scala 2.11.8, Testcluster on Win7 Workstation

like image 908
Boern Avatar asked Sep 23 '16 14:09

Boern


1 Answers

It's not especially clear what your requirement is, but I think you're saying you have queries something like:

SELECT * FROM people LEFT OUTER JOIN places ON ...
SELECT * FROM (SELECT * FROM people LEFT OUTER JOIN places ON ...) WHERE age>20

and you would want to declare and execute this efficiently as

SELECT * FROM people LEFT OUTER JOIN places ON ...
SELECT * FROM <cachedresult> WHERE age>20

To achieve that I would enhance the input file so each sql statement has an associated table name into which the result will be stored.

e.g.

PEOPLEPLACES\tSELECT * FROM people LEFT OUTER JOIN places ON ...
ADULTS=SELECT * FROM PEOPLEPLACES WHERE age>18

Then execute in a loop like

parseSqlFile().foreach({case (name, query) => {
    val data: DataFrame = execute(query)
    data.createOrReplaceTempView(name)
}

Make sure you declare the queries in order so all required tables have been created. Other do a little more parsing and sort by dependencies.

In an RDMS I'd call these tables Materialised Views. i.e. a transform on other data, like a view, but with the result cached for later reuse.

like image 142
barclar Avatar answered Nov 19 '22 08:11

barclar