I'm using Spark 2.0 while working with tab-separated value (TSV) and comma-separated value (CSV) files. I want to load the data into Spark-SQL dataframes, where I would like to control the schema completely when the files are read. I don't want Spark to guess the schema from the data in the file.
How would I load TSV or CSV files into Spark SQL Dataframes and apply a schema to them?
Below is a complete Spark 2.0 example of loading a tab-separated value (TSV) file and applying a schema.
I'm using the Iris data set in TSV format from UAH.edu as an example. Here are the first few rows from that file:
Type PW PL SW SL
0 2 14 33 50
1 24 56 31 67
1 23 51 31 69
0 2 10 36 46
1 20 52 30 65
To enforce a schema, you can programmatically build it using one of two methods:
A. Create the schema with StructType
:
import org.apache.spark.sql.types._
var irisSchema = StructType(Array(
StructField("Type", IntegerType, true),
StructField("PetalWidth", IntegerType, true),
StructField("PetalLength", IntegerType, true),
StructField("SepalWidth", IntegerType, true),
StructField("SepalLength", IntegerType, true)
))
B. Alternatively, create the schema with a case class
and Encoders
(this approach is less verbose):
import org.apache.spark.sql.Encoders
case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int,
SepalWidth: Int, SepalLength: Int)
var irisSchema = Encoders.product[IrisSchema].schema
Once you have created your schema, you can use spark.read
to read in the TSV file. Note that you can actually also read comma-separated value (CSV) files as well, or any delimited files, as long as you set the option("delimiter", d)
option correctly. Further, if you have a data file that has a header line, be sure to set option("header", "true")
.
Below is the complete final code:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoders
val spark = SparkSession.builder().getOrCreate()
case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int,
SepalWidth: Int, SepalLength: Int)
var irisSchema = Encoders.product[IrisSchema].schema
var irisDf = spark.read.format("csv"). // Use "csv" regardless of TSV or CSV.
option("header", "true"). // Does the file have a header line?
option("delimiter", "\t"). // Set delimiter to tab or comma.
schema(irisSchema). // Schema that was built above.
load("iris.tsv")
irisDf.show(5)
And here is the output:
scala> irisDf.show(5)
+----+----------+-----------+----------+-----------+
|Type|PetalWidth|PetalLength|SepalWidth|SepalLength|
+----+----------+-----------+----------+-----------+
| 0| 2| 14| 33| 50|
| 1| 24| 56| 31| 67|
| 1| 23| 51| 31| 69|
| 0| 2| 10| 36| 46|
| 1| 20| 52| 30| 65|
+----+----------+-----------+----------+-----------+
only showing top 5 rows
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With