I have UDF
that processes JSON and returns dynamic data results per row. In my case I need this to validate data and return validated data.
The schema is flexible for each row. This means I cannot create case class
for every case (some of my data can be nested).
I've tried to return tuple from my UDF function, but I had no luck in this either (because I needed to convert from list to tuple), and I didn't find an elegant solution for that.
The data types that I'm returning are String
, Integer
, Double
, DateTime
, in different order.
I've tried to use map
on DataFrame, but having issues with my schema.
import spark.implicits._
def processData(row_type: String) = {
/*
completely random output here. Tuple/List/Array of
elements with a type Integer, String, Double, DateType.
*/
// pseudo-code starts here
if row_type == A
(1, "second", 3)
else
(1, "second", 3, 4)
}
val processDataUDF = udf((row_type: String) => processData(row_type))
val df = Seq((0, 1), (1, 2)).toDF("a", "b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()
Results
+------------+
| UDF(a)|
+------------+
|[1,second,3]|
|[1,second,3]|
+------------+
How how should I approach this problem? We have different processing results per row_type
. All the row_type
's are set dynamically. I can great Schema
for each row_type
, but I cannot make same UDF return results with different schemas.
Is using map
is the only approach here ?
1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer. Let's consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown.
Description. Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.
UDF can return only a single column at the time.
Types of Spark UDFs and execution:In distributed mode, Spark uses master/worker architecture for execution. The central coordinator, called driver, communicates with a potentially large number of distributed workers, called executors. The driver and workers run their own Java process (JVM).
Spark Dataset
is a columnar data structure and there is really no place for a flexible schema here. Schema has to be homogeneous (all rows have to have the same general structure) and known upfront (if you use UDF it has to return well defined SQL type).
You can achieve some flexibility by:
nullable
. This is possible only if there are no type conflicts (if Row
contains field foo
it is always represented using the same SQL type).MapType
, ArrayType
) to represent fields with variable size. All values and / or keys have to be of the same type.json4s
, which provides a set of tools for merging, diffing and querying JSON data. It can be used to apply relatively complex transformations if needed.If this is not practical I would recommend keeping JSON field "as is" and parsing it only on-demand to extract specific values. You can use get_json_object
and explicit type casting. This allows for testing different scenarios:
coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar")
.map(get_json_object($"json_col", _)): _*).cast(DoubleType)
without assuming a single document structure.
You can get a bit more flexibility with binary Encoders
(Encoders.kryo
, Encoders.java
) or RDD
API, which can be used to store union types (or even Any
), but if you really expect completely random output, it suggests some serious design or data modeling problem. Even if you can store parsed data it will be really hard to work with it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With