I want to create on DataFrame
with a specified schema in Scala. I have tried to use JSON read (I mean reading empty file) but I don't think that's the best practice.
We can create a DataFrame programmatically using the following three steps. Create an RDD of Rows from an Original RDD. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
In order to create an empty PySpark DataFrame manually with schema ( column names & data types) first, Create a schema using StructType and StructField . Now use the empty RDD created above and pass it to createDataFrame() of SparkSession along with the schema for column names & data types.
You can create an empty dataframe by importing pandas from the python library. Later, using the pd. DataFrame(), create an empty dataframe without rows and columns as shown in the below example.
Lets assume you want a data frame with the following schema:
root |-- k: string (nullable = true) |-- v: integer (nullable = false)
You simply define schema for a data frame and use empty RDD[Row]
:
import org.apache.spark.sql.types.{ StructType, StructField, StringType, IntegerType} import org.apache.spark.sql.Row val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil) // Spark < 2.0 // sqlContext.createDataFrame(sc.emptyRDD[Row], schema) spark.createDataFrame(sc.emptyRDD[Row], schema)
PySpark equivalent is almost identical:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType schema = StructType([ StructField("k", StringType(), True), StructField("v", IntegerType(), False) ]) # or df = sc.parallelize([]).toDF(schema) # Spark < 2.0 # sqlContext.createDataFrame([], schema) df = spark.createDataFrame([], schema)
Using implicit encoders (Scala only) with Product
types like Tuple
:
import spark.implicits._ Seq.empty[(String, Int)].toDF("k", "v")
or case class:
case class KV(k: String, v: Int) Seq.empty[KV].toDF
or
spark.emptyDataset[KV].toDF
As of Spark 2.0.0, you can do the following.
Let's define a Person
case class:
scala> case class Person(id: Int, name: String) defined class Person
Import spark
SparkSession implicit Encoders
:
scala> import spark.implicits._ import spark.implicits._
And use SparkSession to create an empty Dataset[Person]
:
scala> spark.emptyDataset[Person] res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]
You could also use a Schema "DSL" (see Support functions for DataFrames in org.apache.spark.sql.ColumnName).
scala> val id = $"id".int id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true) scala> val name = $"name".string name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true) scala> import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType scala> val mySchema = StructType(id :: name :: Nil) mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true)) scala> import org.apache.spark.sql.Row import org.apache.spark.sql.Row scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema) emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string] scala> emptyDF.printSchema root |-- id: integer (nullable = true) |-- name: string (nullable = true)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With