I have the following case class:
case class User(userId: String)
and the following schema:
+--------------------+------------------+
| col_name| data_type|
+--------------------+------------------+
| user_id| string|
+--------------------+------------------+
When I try to convert a DataFrame
to a typed Dataset[User]
with spark.read.table("MyTable").as[User]
, I get an error that the field names mismatch:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
cannot resolve ''`user_id`' given input columns: [userId];;
Is there any simple way to solve this without breaking scala idioms and naming my fields user_id
? Of course, my real table has a lot of more fields, and I have a lot more case classes / tables, so it's not feasible to manually define an Encoder
for each case class (And I don't know macros well-enough, so that's out of a question; though I'm happy to use one if such exists!).
I feel like I'm missing a very obvious "convert snake_case to camelCase=true" option, since one exists in practically any ORM I've worked with.
Convert Scala Case Class to Spark Schema Spark SQL also provides Encoders to convert case class to struct object. If you are using older versions of Spark, you can also transform the case class to the schema using the Scala hack. Both examples are present here.
To create a dataset using the sequence of case classes by calling the .toDS () method : To create the dataset from Dataframe using Case Class: 2. Operations on Spark Dataset 1. Word Count Example 2. Convert Spark Dataset to Dataframe We can also convert Spark Dataset to Datafame and utilize Dataframe APIs as below :
Scala case objects are the same as the other objects in scala, but they are the combination of case class and case object. Case object contains more features and attributes than other objects in the scala. Case objects are serializable, and they have by default implementation for the hashcode method in it.
To get the schema of the Spark DataFrame, use printSchema () on DataFrame object. From the above example, printSchema () prints the schema to console ( stdout) and show () displays the content of the Spark DataFrame. 4.
scala> val df = Seq(("Eric" ,"Theodore", "Cartman"), ("Butters", "Leopold", "Stotch")).toDF.select(concat($"_1", lit(" "), ($"_2")) as "first_and_middle_name", $"_3" as "last_name")
df: org.apache.spark.sql.DataFrame = [first_and_middle_name: string, last_name: string]
scala> df.show
+---------------------+---------+
|first_and_middle_name|last_name|
+---------------------+---------+
| Eric Theodore| Cartman|
| Butters Leopold| Stotch|
+---------------------+---------+
scala> val ccnames = df.columns.map(sc => {val ccn = sc.split("_")
| (ccn.head +: ccn.tail.map(_.capitalize)).mkString
| })
ccnames: Array[String] = Array(firstAndMiddleName, lastName)
scala> df.toDF(ccnames: _*).show
+------------------+--------+
|firstAndMiddleName|lastName|
+------------------+--------+
| Eric Theodore| Cartman|
| Butters Leopold| Stotch|
+------------------+--------+
EDIT: Would this help? Defining a single function that takes loader: String => DataFrame and path: String.
scala> val parquetloader = spark.read.parquet _
parquetloader: String => org.apache.spark.sql.DataFrame = <function1>
scala> val tableloader = spark.read.table _
tableloader: String => org.apache.spark.sql.DataFrame = <function1>
scala> val textloader = spark.read.text _
textloader: String => org.apache.spark.sql.DataFrame = <function1>
// csv loader and others
def snakeCaseToCamelCaseDataFrameColumns(path: String, loader: String => DataFrame): DataFrame = {
val ccnames = loader(path).columns.map(sc => {val ccn = sc.split("_")
(ccn.head +: ccn.tail.map(_.capitalize)).mkString
})
df.toDF(ccnames: _*)
}
scala> :paste
// Entering paste mode (ctrl-D to finish)
def snakeCaseToCamelCaseDataFrameColumns(path: String, loader: String => DataFrame): DataFrame = {
val ccnames = loader(path).columns.map(sc => {val ccn = sc.split("_")
(ccn.head +: ccn.tail.map(_.capitalize)).mkString
})
df.toDF(ccnames: _*)
}
// Exiting paste mode, now interpreting.
snakeCaseToCamelCaseDataFrameColumns: (path: String, loader: String => org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
val oneDF = snakeCaseToCamelCaseDataFrameColumns(tableloader("/path/to/table"))
val twoDF = snakeCaseToCamelCaseDataFrameColumns(parquetloader("/path/to/parquet/file"))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With