Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to CREATE TABLE USING delta with Spark 2.4.4?

This is Spark 2.4.4 and Delta Lake 0.5.0.

I'm trying to create a table using delta data source and seems I'm missing something. Although the CREATE TABLE USING delta command worked fine neither the table directory is created nor insertInto works.

The following CREATE TABLE USING delta worked fine, but insertInto failed.

scala> sql("""
create table t5
USING delta
LOCATION '/tmp/delta'
""").show

scala> spark.catalog.listTables.where('name === "t5").show
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
|  t5| default|       null| EXTERNAL|      false|
+----+--------+-----------+---------+-----------+

scala> spark.range(5).write.option("mergeSchema", true).insertInto("t5")
org.apache.spark.sql.AnalysisException: `default`.`t5` requires that the data to be inserted have the same number of columns as the target table: target table has 0 column(s) but the inserted data has 1 column(s), including 0 partition column(s) having constant value(s).;
  at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion.org$apache$spark$sql$execution$datasources$PreprocessTableInsertion$$preprocess(rules.scala:341)
  ...

I thought I'd create with columns defined, but that didn't work either.

scala> sql("""
create table t6
(id LONG, name STRING)
USING delta
LOCATION '/tmp/delta'
""").show
org.apache.spark.sql.AnalysisException: delta does not allow user-specified schemas.;
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
  at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:78)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:194)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
  ... 54 elided
like image 309
Jacek Laskowski Avatar asked Dec 31 '19 16:12

Jacek Laskowski


2 Answers

tl;dr CREATE TABLE USING delta is not supported by Spark before 3.0.0 and Delta Lake before 0.7.0.


Delta Lake 0.7.0 with Spark 3.0.0 (both just released) do support CREATE TABLE SQL command.

Be sure to "install" Delta SQL using spark.sql.catalog.spark_catalog configuration property with org.apache.spark.sql.delta.catalog.DeltaCatalog.

$ ./bin/spark-submit \
  --packages io.delta:delta-core_2.12:0.7.0 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

scala> spark.version
res0: String = 3.0.0

scala> sql("CREATE TABLE delta_101 (id LONG) USING delta").show
++
||
++
++

scala> spark.table("delta_101").show
+---+
| id|
+---+
+---+

scala> sql("DESCRIBE EXTENDED delta_101").show(truncate = false)
+----------------------------+---------------------------------------------------------+-------+
|col_name                    |data_type                                                |comment|
+----------------------------+---------------------------------------------------------+-------+
|id                          |bigint                                                   |       |
|                            |                                                         |       |
|# Partitioning              |                                                         |       |
|Not partitioned             |                                                         |       |
|                            |                                                         |       |
|# Detailed Table Information|                                                         |       |
|Name                        |default.delta_101                                        |       |
|Location                    |file:/Users/jacek/dev/oss/spark/spark-warehouse/delta_101|       |
|Provider                    |delta                                                    |       |
|Table Properties            |[]                                                       |       |
+----------------------------+---------------------------------------------------------+-------+
like image 105
2 revs Avatar answered Oct 07 '22 04:10

2 revs


An example with pyspark 3.0.0 & delta 0.7.0

print(f"LOCATION '{location}")
spark.sql(f"""
CREATE OR REPLACE TABLE  {TABLE_NAME} (
  CD_DEVICE INT, 
  FC_LOCAL_TIME TIMESTAMP,  
  CD_TYPE_DEVICE STRING,
  CONSUMTION DOUBLE,
  YEAR INT,
  MONTH INT, 
  DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, FC_LOCAL_TIME)
LOCATION '{location}'
""")

Where "location" is a dir HDFS for spark cluster mode save de delta table.

like image 26
Cristián Vargas Acevedo Avatar answered Oct 07 '22 02:10

Cristián Vargas Acevedo