Is it possible to save DataFrame
in spark directly to Hive?
I have tried with converting DataFrame
to Rdd
and then saving as a text file and then loading in hive. But I am wondering if I can directly save dataframe
to hive
You can create an in-memory temporary table and store them in hive table using sqlContext.
Lets say your data frame is myDf. You can create one temporary table using,
myDf.createOrReplaceTempView("mytempTable")
Then you can use a simple hive statement to create table and dump the data from your temp table.
sqlContext.sql("create table mytable as select * from mytempTable");
Use DataFrameWriter.saveAsTable
. (df.write.saveAsTable(...)
) See Spark SQL and DataFrame Guide.
I don't see df.write.saveAsTable(...)
deprecated in Spark 2.0 documentation. It has worked for us on Amazon EMR. We were perfectly able to read data from S3 into a dataframe, process it, create a table from the result and read it with MicroStrategy.
Vinays answer has also worked though.
you need to have/create a HiveContext
import org.apache.spark.sql.hive.HiveContext;
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
Then directly save dataframe or select the columns to store as hive table
df is dataframe
df.write().mode("overwrite").saveAsTable("schemaName.tableName");
or
df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
or
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
SaveModes are Append/Ignore/Overwrite/ErrorIfExists
I added here the definition for HiveContext from Spark Documentation,
In addition to the basic SQLContext, you can also create a HiveContext, which provides a superset of the functionality provided by the basic SQLContext. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables. To use a HiveContext, you do not need to have an existing Hive setup, and all of the data sources available to a SQLContext are still available. HiveContext is only packaged separately to avoid including all of Hive’s dependencies in the default Spark build.
on Spark version 1.6.2, using "dbName.tableName" gives this error:
org.apache.spark.sql.AnalysisException: Specifying database name or other qualifiers are not allowed for temporary tables. If the table name has dots (.) in it, please quote the table name with backticks ().`
Sorry writing late to the post but I see no accepted answer.
df.write().saveAsTable
will throw AnalysisException
and is not HIVE table compatible.
Storing DF as df.write().format("hive")
should do the trick!
However, if that doesn't work, then going by the previous comments and answers, this is what is the best solution in my opinion (Open to suggestions though).
Best approach is to explicitly create HIVE table (including PARTITIONED table),
def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
"PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}
save DF as temp table,
df.createOrReplaceTempView("$tempTableName")
and insert into PARTITIONED HIVE table:
spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)
Offcourse the LAST COLUMN in DF will be the PARTITION COLUMN so create HIVE table accordingly!
Please comment if it works! or not.
--UPDATE--
df.write()
.partitionBy("$partition_column")
.format("hive")
.mode(SaveMode.append)
.saveAsTable($new_table_name_to_be_created_in_hive) //Table should not exist OR should be a PARTITIONED table in HIVE
Saving to Hive is just a matter of using write()
method of your SQLContext:
df.write.saveAsTable(tableName)
See https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)
From Spark 2.2: use DataSet instead DataFrame.
For Hive external tables I use this function in PySpark:
def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
print("Saving result in {}.{}".format(database, table_name))
output_schema = "," \
.join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
.replace("StringType", "STRING") \
.replace("IntegerType", "INT") \
.replace("DateType", "DATE") \
.replace("LongType", "INT") \
.replace("TimestampType", "INT") \
.replace("BooleanType", "BOOLEAN") \
.replace("FloatType", "FLOAT")\
.replace("DoubleType","FLOAT")
output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)
sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))
query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
.format(database, table_name, output_schema, save_format, database, table_name)
sparkSession.sql(query)
dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
You could use Hortonworks spark-llap library like this
import com.hortonworks.hwc.HiveWarehouseSession
df.write
.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
.mode("append")
.option("table", "myDatabase.myTable")
.save()
If you want to create a hive table(which does not exist) from a dataframe (some times it fails to create with
DataFrameWriter.saveAsTable
).StructType.toDDL
will helps in listing the columns as a string.
val df = ...
val schemaStr = df.schema.toDDL # This gives the columns
spark.sql(s"""create table hive_table ( ${schemaStr})""")
//Now write the dataframe to the table
df.write.saveAsTable("hive_table")
hive_table
will be created in default space since we did not provide any database at spark.sql()
. stg.hive_table
can be used to create hive_table
in stg
database.
Here is PySpark version to create Hive table from parquet file. You may have generated Parquet files using inferred schema and now want to push definition to Hive metastore. You can also push definition to the system like AWS Glue or AWS Athena and not just to Hive metastore. Here I am using spark.sql to push/create permanent table.
# Location where my parquet files are present.
df = spark.read.parquet("s3://my-location/data/")
cols = df.dtypes
buf = []
buf.append('CREATE EXTERNAL TABLE test123 (')
keyanddatatypes = df.dtypes
sizeof = len(df.dtypes)
print ("size----------",sizeof)
count=1;
for eachvalue in keyanddatatypes:
print count,sizeof,eachvalue
if count == sizeof:
total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
else:
total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
buf.append(total)
count = count + 1
buf.append(' )')
buf.append(' STORED as parquet ')
buf.append("LOCATION")
buf.append("'")
buf.append('s3://my-location/data/')
buf.append("'")
buf.append("'")
##partition by pt
tabledef = ''.join(buf)
print "---------print definition ---------"
print tabledef
## create a table using spark.sql. Assuming you are using spark 2.1+
spark.sql(tabledef);
In my case this works fine:
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("DatabaseName")
df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv")
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()
Done!!
You can read the Data, let you give as "Employee"
hive.executeQuery("select * from Employee").show()
For more details use this URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With