Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataset select with typedcolumn

Looking at the select() function on the spark DataSet there are various generated function signatures:

(c1: TypedColumn[MyClass, U1],c2: TypedColumn[MyClass, U2] ....)

This seems to hint that I should be able to reference the members of MyClass directly and be type safe, but I'm not sure how...

ds.select("member") of course works .. seems like ds.select(_.member) might also work somehow?

like image 897
Jeremy Avatar asked Jul 28 '16 16:07

Jeremy


People also ask

How do I select a DataFrame in Spark?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

What is the difference between select and selectExpr in Spark?

Therefore, select() method is useful when you simply need to select a subset of columns from a particular Spark DataFrame. On the other hand, selectExpr() comes in handy when you need to select particular columns while at the same time you also need to apply some sort of transformation over particular column(s).

How do I get columns from Spark DataFrame?

In order to convert Spark DataFrame Column to List, first select() the column you want, next use the Spark map() transformation to convert the Row to String, finally collect() the data to the driver which returns an Array[String] .

How do I select a column from a Dataframe in spark?

Spark select () is a transformation function that is used to select the columns from DataFrame and Dataset, It has two different types of syntaxes. select () that returns DataFrame takes Column or String as arguments and used to perform UnTyped transformations. select ( cols : org. apache. spark. sql. Column *) : DataFrame select ( col : scala.

What is the use of select in spark?

1. Spark select () Syntax & Usage Spark select () is a transformation function that is used to select the columns from DataFrame and Dataset, It has two different types of syntaxes. select () that returns DataFrame takes Column or String as arguments and used to perform UnTyped transformations.

How do I create a dataset in spark?

There are typically two ways to create a Dataset. The most common way is by pointing Spark to some files on storage systems, using the read function available on a SparkSession. val people = spark.read.parquet ("...").as [Person] // Scala Dataset<Person> people = spark.read ().parquet ("...").as (Encoders.bean (Person.class)); // Java

How do I select a column from the dataset?

These operations are very similar to the operations available in the data frame abstraction in R or Python. To select a column from the Dataset, use apply method in Scala and col in Java. Note that the Column type can also be manipulated through its various functions.


2 Answers

In the Scala DSL for select, there are many ways to identify a Column:

  • From a symbol: 'name
  • From a string: $"name" or col(name)
  • From an expression: expr("nvl(name, 'unknown') as renamed")

To get a TypedColumn from Column you simply use myCol.as[T].

For example: ds.select(col("name").as[String])

like image 143
Sim Avatar answered Oct 17 '22 09:10

Sim


If you want the equivalent of ds.select(_.member) just use map:

case class MyClass(member: MyMember, foo: A, bar: B)
val ds: DataSet[MyClass] = ???
val members: DataSet[MyMember] = ds.map(_.member)

Edit: The argument for not using map.

A more performant way of doing the same would be through a projection, and not use map at all. You lose the compile-time type checking, but in exchange give the Catalyst query engine a chance to do something more optimized. As @Sim alludes to in his comment below, the primary optimization centers around not requiring whole contents of MyClass to be deserialized from Tungsten memory space into JVM heap memory--just to call the accessor--and then serialize the result of _.member back into Tungsten.

To make a more concrete example, let's redefine our data model like this:

  // Make sure these are not nested classes 
  // (i.e. in a top level compilation units).
  case class MyMember(something: Double)
  case class MyClass(member: MyMember, foo: Int, bar: String)

These need to be case classes so that SQLImplicits.newProductEncoder[T <: Product] can provide us with an implicit Encoder[MyClass], required by the Dataset[T] API.

Now we can make the example above more concrete:

  val ds: Dataset[MyClass] = Seq(MyClass(MyMember(1.0), 2, "three")).toDS()
  val membersMapped: Dataset[Double] = ds.map(_.member.something)

To see what's going on behind the scenes we use the explain() method:

membersMapped.explain()

== Physical Plan ==
*(1) SerializeFromObject [input[0, double, false] AS value#19]
+- *(1) MapElements <function1>, obj#18: double
   +- *(1) DeserializeToObject newInstance(class MyClass), obj#17: MyClass
      +- LocalTableScan [member#12, foo#13, bar#14]

This makes the serialization to/from Tungsten explicitly evident.

Let's get to the same value using a projection[^1]:

val ds2: Dataset[Double] = ds.select($"member.something".as[Double])
ds2.explain()

== Physical Plan ==
LocalTableScan [something#25]

That's it! A single step[^2]. No serialization other than the encoding of MyClass into the original Dataset.

[^1]: The reason the projection is defined as $"member.something" rather than $"value.member.something" has to do with Catalyst automatically projecting the members of a single column DataFrame.

[^2]: To be fair, the * next to the steps in the first physical plan indicate they will be implemented by a WholeStageCodegenExec whereby those steps become a single, on-the-fly compiled JVM function that has its own set of runtime optimizations applied to it. So in practice you'd have to empirically test the performance to really assess the benefits to each approach.

like image 41
metasim Avatar answered Oct 17 '22 09:10

metasim