This is Spark 2.4.4 and Delta Lake 0.5.0.
I'm trying to create a table using delta data source and seems I'm missing something. Although the CREATE TABLE USING delta
command worked fine neither the table directory is created nor insertInto
works.
The following CREATE TABLE USING delta
worked fine, but insertInto
failed.
scala> sql("""
create table t5
USING delta
LOCATION '/tmp/delta'
""").show
scala> spark.catalog.listTables.where('name === "t5").show
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
| t5| default| null| EXTERNAL| false|
+----+--------+-----------+---------+-----------+
scala> spark.range(5).write.option("mergeSchema", true).insertInto("t5")
org.apache.spark.sql.AnalysisException: `default`.`t5` requires that the data to be inserted have the same number of columns as the target table: target table has 0 column(s) but the inserted data has 1 column(s), including 0 partition column(s) having constant value(s).;
at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion.org$apache$spark$sql$execution$datasources$PreprocessTableInsertion$$preprocess(rules.scala:341)
...
I thought I'd create with columns defined, but that didn't work either.
scala> sql("""
create table t6
(id LONG, name STRING)
USING delta
LOCATION '/tmp/delta'
""").show
org.apache.spark.sql.AnalysisException: delta does not allow user-specified schemas.;
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:78)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:194)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
... 54 elided
tl;dr CREATE TABLE USING delta
is not supported by Spark before 3.0.0 and Delta Lake before 0.7.0.
Delta Lake 0.7.0 with Spark 3.0.0 (both just released) do support CREATE TABLE
SQL command.
Be sure to "install" Delta SQL using spark.sql.catalog.spark_catalog
configuration property with org.apache.spark.sql.delta.catalog.DeltaCatalog
.
$ ./bin/spark-submit \
--packages io.delta:delta-core_2.12:0.7.0 \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
scala> spark.version
res0: String = 3.0.0
scala> sql("CREATE TABLE delta_101 (id LONG) USING delta").show
++
||
++
++
scala> spark.table("delta_101").show
+---+
| id|
+---+
+---+
scala> sql("DESCRIBE EXTENDED delta_101").show(truncate = false)
+----------------------------+---------------------------------------------------------+-------+
|col_name |data_type |comment|
+----------------------------+---------------------------------------------------------+-------+
|id |bigint | |
| | | |
|# Partitioning | | |
|Not partitioned | | |
| | | |
|# Detailed Table Information| | |
|Name |default.delta_101 | |
|Location |file:/Users/jacek/dev/oss/spark/spark-warehouse/delta_101| |
|Provider |delta | |
|Table Properties |[] | |
+----------------------------+---------------------------------------------------------+-------+
An example with pyspark 3.0.0 & delta 0.7.0
print(f"LOCATION '{location}")
spark.sql(f"""
CREATE OR REPLACE TABLE {TABLE_NAME} (
CD_DEVICE INT,
FC_LOCAL_TIME TIMESTAMP,
CD_TYPE_DEVICE STRING,
CONSUMTION DOUBLE,
YEAR INT,
MONTH INT,
DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, FC_LOCAL_TIME)
LOCATION '{location}'
""")
Where "location" is a dir HDFS for spark cluster mode save de delta table.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With