Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create Spark Dataset from a CSV file

I would like to create a Spark Dataset from a simple CSV file. Here are the contents of the CSV file:

name,state,number_of_people,coolness_index
trenton,nj,"10","4.5"
bedford,ny,"20","3.3"
patterson,nj,"30","2.2"
camden,nj,"40","8.8"

Here is the code to make the Dataset:

var location = "s3a://path_to_csv"

case class City(name: String, state: String, number_of_people: Long)

val cities = spark.read
  .option("header", "true")
  .option("charset", "UTF8")
  .option("delimiter",",")
  .csv(location)
  .as[City]

Here is the error message: "Cannot up cast number_of_people from string to bigint as it may truncate"

Databricks talks about creating Datasets and this particular error message in this blog post.

Encoders eagerly check that your data matches the expected schema, providing helpful error messages before you attempt to incorrectly process TBs of data. For example, if we try to use a datatype that is too small, such that conversion to an object would result in truncation (i.e. numStudents is larger than a byte, which holds a maximum value of 255) the Analyzer will emit an AnalysisException.

I am using the Long type, so I didn't expect to see this error message.

like image 842
Powers Avatar asked Sep 16 '16 01:09

Powers


People also ask

How do you create a dataset in Spark?

There two ways to create Datasets: dynamically and by reading from a JSON file using SparkSession . First, for primitive types in examples or demos, you can create Datasets within a Scala or Python notebook or in your sample Spark application.


2 Answers

with your case class as case class City(name: String, state: String, number_of_people: Long), you just need one line

private val cityEncoder = Seq(City("", "", 0)).toDS

then you code

val cities = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.csv(location)
.as[City]

will just work.

This is the official source [http://spark.apache.org/docs/latest/sql-programming-guide.html#overview][1]

like image 26
mingzhao.pro Avatar answered Sep 28 '22 00:09

mingzhao.pro


Use schema inference:

val cities = spark.read
  .option("inferSchema", "true")
  ...

or provide schema:

val cities = spark.read
  .schema(StructType(Array(StructField("name", StringType), ...)

or cast:

val cities = spark.read
  .option("header", "true")
  .csv(location)
  .withColumn("number_of_people", col("number_of_people").cast(LongType))
  .as[City]
like image 194
2 revsuser6022341 Avatar answered Sep 28 '22 01:09

2 revsuser6022341