Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark UDF that returns dynamic data types

I have UDF that processes JSON and returns dynamic data results per row. In my case I need this to validate data and return validated data.

The schema is flexible for each row. This means I cannot create case class for every case (some of my data can be nested).

I've tried to return tuple from my UDF function, but I had no luck in this either (because I needed to convert from list to tuple), and I didn't find an elegant solution for that.

The data types that I'm returning are String, Integer, Double, DateTime, in different order.

I've tried to use map on DataFrame, but having issues with my schema.

import spark.implicits._

def processData(row_type: String) = {
  /*
  completely random output here. Tuple/List/Array of 
  elements with a type Integer, String, Double, DateType.
  */

  // pseudo-code starts here

  if row_type == A
     (1, "second", 3)
  else
     (1, "second", 3, 4)
}

val processDataUDF = udf((row_type: String) => processData(row_type))

val df = Seq((0, 1), (1, 2)).toDF("a", "b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()

Results

+------------+
|      UDF(a)|
+------------+
|[1,second,3]|
|[1,second,3]|
+------------+

How how should I approach this problem? We have different processing results per row_type. All the row_type's are set dynamically. I can great Schema for each row_type, but I cannot make same UDF return results with different schemas.

Is using map is the only approach here ?

like image 743
artyomboyko Avatar asked Jan 23 '17 20:01

artyomboyko


People also ask

Why UDF are not recommended in Spark?

1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer. Let's consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown.

What is difference between UDF and UDAF in Spark SQL?

Description. Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.

Can a UDF return multiple columns?

UDF can return only a single column at the time.

Are Spark UDFs distributed?

Types of Spark UDFs and execution:In distributed mode, Spark uses master/worker architecture for execution. The central coordinator, called driver, communicates with a potentially large number of distributed workers, called executors. The driver and workers run their own Java process (JVM).


1 Answers

Spark Dataset is a columnar data structure and there is really no place for a flexible schema here. Schema has to be homogeneous (all rows have to have the same general structure) and known upfront (if you use UDF it has to return well defined SQL type).

You can achieve some flexibility by:

  • Defining schema which represents a superset of all possible fields and mark individual columns as nullable. This is possible only if there are no type conflicts (if Row contains field foo it is always represented using the same SQL type).
  • Using collection types (MapType, ArrayType) to represent fields with variable size. All values and / or keys have to be of the same type.
  • Reshaping raw data to the point where it is actually representable with fixed schema. Spark includes, as its dependency, json4s, which provides a set of tools for merging, diffing and querying JSON data. It can be used to apply relatively complex transformations if needed.

If this is not practical I would recommend keeping JSON field "as is" and parsing it only on-demand to extract specific values. You can use get_json_object and explicit type casting. This allows for testing different scenarios:

coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar")
  .map(get_json_object($"json_col", _)): _*).cast(DoubleType)

without assuming a single document structure.

You can get a bit more flexibility with binary Encoders (Encoders.kryo, Encoders.java) or RDD API, which can be used to store union types (or even Any), but if you really expect completely random output, it suggests some serious design or data modeling problem. Even if you can store parsed data it will be really hard to work with it.

like image 137
zero323 Avatar answered Sep 19 '22 02:09

zero323