Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Difference between df.SaveAsTable and spark.sql(Create table..)

Referring to here on the difference between saveastable and insertInto

What is the difference between the following two approaches :

df.saveAsTable("mytable");

and

df.createOrReplaceTempView("my_temp_table");
spark.sql("drop table if exists " + my_temp_table);
spark.sql("create table mytable as select * from 
my_temp_table");

In which case is the table stored in memory and in which case physically on disk ?

Also, as per my understanding, createOrReplaceTempView only register the dataframe (already in memory) to be accessible through Hive query, without actually persisting it, is it correct ?

I have to Join hundreds of tables and hit OutOfMemory issue. In terms of efficiency, what would be the best way ?

  • df.persist() and df.join(..).join(..).join(..).... #hundred joins

  • createOrReplaceTempView then join with spark.sql(),

  • SaveAsTable (? not sure the next step)

  • Write to disk with Create Table then join with spark.sql()?

like image 846
Kenny Avatar asked Apr 15 '19 15:04

Kenny


People also ask

What is the difference between DataFrame and Spark SQL?

A Spark DataFrame is basically a distributed collection of rows (Row types) with the same schema. It is basically a Spark Dataset organized into named columns. A point to note here is that Datasets, are an extension of the DataFrame API that provides a type-safe, object-oriented programming interface.

What is Spark saveAsTable?

saveAsTable("t") . When the table is dropped, the custom table path will not be removed and the table data is still there. If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.

Does Spark SQL create a DataFrame?

Creating DataFrames With a SparkSession , applications can create DataFrames from an existing RDD , from a Hive table, or from Spark data sources.

What is the difference between saveAsTable and insert into?

saveAsTable uses column-name based resolution while insertInto uses position-based resolution. In Append mode, saveAsTable pays more attention to underlying schema of the existing table to make certain resolutions.


1 Answers

Let's go step-by-step.

In the case of df.saveAsTable("mytable"), the table is actually written to storage (HDFS/ S3). It is a Spark action.

On the other hand: df.createOrReplaceTempView("my_temp_table") is a transformation. It is just an identifier to be used for the DAG of df. Nothing is actually stored in memory or on disk.

spark.sql("drop table if exists " + my_temp_table) drops the table.

spark.sql("create table mytable as select * from my_temp_table") creates mytable on storage. createOrReplaceTempView creates tables in global_temp database.

It would be best to modify the query to:

create table mytable as select * from global_temp.my_temp_table

createOrReplaceTempView only register the dataframe (already in memory) to be accessible through Hive query, without actually persisting it, is it correct?

Yes, for large DAGs, spark will automatically cache data depending on spark.memory.fraction setting. Check this page.

I have to Join hundreds of tables and hit OutOfMemory issue. In terms of efficiency, what would be the best way ?

df.persist() and df.join(..).join(..).join(..).... #hundred joins

createOrReplaceTempView then join with spark.sql(),

SaveAsTable (? not sure the next step)

Write to disk with Create Table then join with spark.sql()?

persist would store some data in cached format depending on available memory and for end table that is generated by joining hundreds of tables, this would probably is not the best approach.

It would not be possible to suggest the approach that would work for you, but here are some general patterns:

If writes fail with OOM and the default spark.shuffle.partitions are used, then the start point is to increase the shuffle partition count to ensure that each executor's partition is sized correctly depending on its memory availability.

The spark.shuffle.partitions setting can be set across different joins, it doesn't need to be a constant across the Spark job.

Calculating partition size become difficult if multiple tables are involved. In that case, writing to disk and reading back before large tables is a good idea.

For small tables, less than 2GB, broadcasting is a possibility. The default limit is 10MB (I think) but it can be changed.

It would be best if the final table is stored on disk rather than serving thrift clients through temp tables.

Good luck!

like image 180
Sai Avatar answered Oct 03 '22 22:10

Sai