Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to load a csv directly into a Spark Dataset?

I have a csv file [1] which I want to load directly into a Dataset. The problem is that I always get errors like

org.apache.spark.sql.AnalysisException: Cannot up cast `probability` from string to float as it may truncate
The type path of the target object is:
- field (class: "scala.Float", name: "probability")
- root class: "TFPredictionFormat"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;

Moreover, and specifically for the phrases field (check case class [2]) it get

org.apache.spark.sql.AnalysisException: cannot resolve '`phrases`' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true);

If I define all the fields in my case class [2] as type String then everything works fine but this is not what I want. Is there a simple way to do it [3]?


References

[1] An example row

B017NX63A2,Merrell,"['merrell_for_men', 'merrell_mens_shoes', 'merrel']",merrell_shoes,0.0806054356579781

[2] My code snippet is as follows

import spark.implicits._

val INPUT_TF = "<SOME_URI>/my_file.csv"

final case class TFFormat (
    doc_id: String,
    brand: String,
    phrases: Seq[String],
    prediction: String,
    probability: Float
)

val ds = sqlContext.read
.option("header", "true")
.option("charset", "UTF8")
.csv(INPUT_TF)
.as[TFFormat]

ds.take(1).map(println)

[3] I have found ways to do it by first defining columns on a DataFrame level and the convert things to Dataset (like here or here or here) but I am almost sure this is not the way things supposed to be done. I am also pretty sure that Encoders are probably the answer but I don't have a clue how

like image 885
Vassilis Moustakas Avatar asked Mar 08 '17 17:03

Vassilis Moustakas


People also ask

How do I read a CSV file into a DataFrame in Spark?

read(). csv("file_name") to read a file or directory of files in CSV format into Spark DataFrame, and dataframe. write(). csv("path") to write to a CSV file.

How do I load data into Spark DataFrame?

Using csv("path") or format("csv"). load("path") of DataFrameReader, you can read a CSV file into a PySpark DataFrame, These methods take a file path to read from as an argument.


1 Answers

TL;DR With csv input transforming with standard DataFrame operations is the way to go. If you want to avoid you should use input format which has is expressive (Parquet or even JSON).

In general data to be converted to statically typed dataset must be already of the correct type. The most efficient way to do it is to provide schema argument for csv reader:

val schema: StructType = ???
val ds = spark.read
  .option("header", "true")
  .schema(schema)
  .csv(path)
  .as[T]

where schema could be inferred by reflection:

import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.types.StructType

val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]

Unfortunately it won't work with your data and class because csv reader doesn't support ArrayType (but it would work for atomic types like FloatType) so you have to use the hard way. A naive solution could be expressed as below:

import org.apache.spark.sql.functions._

val df: DataFrame = ???  // Raw data

df
  .withColumn("probability", $"probability".cast("float"))
  .withColumn("phrases",
    split(regexp_replace($"phrases", "[\\['\\]]", ""), ","))
  .as[TFFormat]

but you may need something more sophisticated depending on the content of phrases.

like image 194
zero323 Avatar answered Sep 18 '22 08:09

zero323