Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala spark: how to use dataset for a case class with the schema has snake_case?

I have the following case class:

case class User(userId: String)

and the following schema:

+--------------------+------------------+
|            col_name|         data_type|
+--------------------+------------------+
|             user_id|            string|
+--------------------+------------------+

When I try to convert a DataFrame to a typed Dataset[User] with spark.read.table("MyTable").as[User], I get an error that the field names mismatch:

Exception in thread "main" org.apache.spark.sql.AnalysisException:
    cannot resolve ''`user_id`' given input columns: [userId];;

Is there any simple way to solve this without breaking scala idioms and naming my fields user_id? Of course, my real table has a lot of more fields, and I have a lot more case classes / tables, so it's not feasible to manually define an Encoder for each case class (And I don't know macros well-enough, so that's out of a question; though I'm happy to use one if such exists!).

I feel like I'm missing a very obvious "convert snake_case to camelCase=true" option, since one exists in practically any ORM I've worked with.

like image 553
Gal Avatar asked Apr 16 '18 08:04

Gal


People also ask

How do I convert a Scala Case class to a schema?

Convert Scala Case Class to Spark Schema Spark SQL also provides Encoders to convert case class to struct object. If you are using older versions of Spark, you can also transform the case class to the schema using the Scala hack. Both examples are present here.

How to create dataset using sequence of case classes in spark?

To create a dataset using the sequence of case classes by calling the .toDS () method : To create the dataset from Dataframe using Case Class: 2. Operations on Spark Dataset 1. Word Count Example 2. Convert Spark Dataset to Dataframe We can also convert Spark Dataset to Datafame and utilize Dataframe APIs as below :

What is a case object in Scala?

Scala case objects are the same as the other objects in scala, but they are the combination of case class and case object. Case object contains more features and attributes than other objects in the scala. Case objects are serializable, and they have by default implementation for the hashcode method in it.

How to get the schema of a spark dataframe?

To get the schema of the Spark DataFrame, use printSchema () on DataFrame object. From the above example, printSchema () prints the schema to console ( stdout) and show () displays the content of the Spark DataFrame. 4.


1 Answers

scala> val df = Seq(("Eric" ,"Theodore", "Cartman"), ("Butters", "Leopold", "Stotch")).toDF.select(concat($"_1", lit(" "), ($"_2")) as "first_and_middle_name", $"_3" as "last_name")
df: org.apache.spark.sql.DataFrame = [first_and_middle_name: string, last_name: string]

scala> df.show
+---------------------+---------+
|first_and_middle_name|last_name|
+---------------------+---------+
|        Eric Theodore|  Cartman|
|      Butters Leopold|   Stotch|
+---------------------+---------+


scala> val ccnames = df.columns.map(sc => {val ccn = sc.split("_")
    | (ccn.head +: ccn.tail.map(_.capitalize)).mkString
    | })
ccnames: Array[String] = Array(firstAndMiddleName, lastName)

scala> df.toDF(ccnames: _*).show
+------------------+--------+
|firstAndMiddleName|lastName|
+------------------+--------+
|     Eric Theodore| Cartman|
|   Butters Leopold|  Stotch|
+------------------+--------+

EDIT: Would this help? Defining a single function that takes loader: String => DataFrame and path: String.

scala> val parquetloader = spark.read.parquet _
parquetloader: String => org.apache.spark.sql.DataFrame = <function1>

scala> val tableloader = spark.read.table _
tableloader: String => org.apache.spark.sql.DataFrame = <function1>

scala> val textloader = spark.read.text _
textloader: String => org.apache.spark.sql.DataFrame = <function1>

// csv loader and others

def snakeCaseToCamelCaseDataFrameColumns(path: String, loader: String => DataFrame): DataFrame = {
  val ccnames = loader(path).columns.map(sc => {val ccn = sc.split("_")
    (ccn.head +: ccn.tail.map(_.capitalize)).mkString
    })
  df.toDF(ccnames: _*)
}

scala> :paste
// Entering paste mode (ctrl-D to finish)

def snakeCaseToCamelCaseDataFrameColumns(path: String, loader: String => DataFrame): DataFrame = {
      val ccnames = loader(path).columns.map(sc => {val ccn = sc.split("_")
        (ccn.head +: ccn.tail.map(_.capitalize)).mkString
        })
      df.toDF(ccnames: _*)
    }

// Exiting paste mode, now interpreting.

snakeCaseToCamelCaseDataFrameColumns: (path: String, loader: String => org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

val oneDF = snakeCaseToCamelCaseDataFrameColumns(tableloader("/path/to/table"))
val twoDF = snakeCaseToCamelCaseDataFrameColumns(parquetloader("/path/to/parquet/file"))
like image 84
C.S.Reddy Gadipally Avatar answered Sep 17 '22 15:09

C.S.Reddy Gadipally