Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark 2.0 Dataset vs DataFrame

starting out with spark 2.0.1 I got some questions. I read a lot of documentation but so far could not find sufficient answers:

  • What is the difference between
    • df.select("foo")
    • df.select($"foo")
  • do I understand correctly that
    • myDataSet.map(foo.someVal) is typesafe and will not convert into RDD but stay in DataSet representation / no additional overhead (performance wise for 2.0.0)
  • all the other commands e.g. select, .. are just syntactic sugar. They are not typesafe and a map could be used instead. How could I df.select("foo") type-safe without a map statement?
    • why should I use a UDF / UADF instead of a map (assuming map stays in the dataset representation)?
like image 596
Georg Heiler Avatar asked Nov 14 '16 19:11

Georg Heiler


People also ask

What is difference between Dataset and DataFrame in Spark?

DataFrames allow the Spark to manage schema. DataSet – It also efficiently processes structured and unstructured data. It represents data in the form of JVM objects of row or a collection of row object. Which is represented in tabular forms through encoders.

Which is better RDD or DataFrame or Dataset?

RDD is slower than both Dataframes and Datasets to perform simple operations like grouping the data. It provides an easy API to perform aggregation operations. It performs aggregation faster than both RDDs and Datasets. Dataset is faster than RDDs but a bit slower than Dataframes.

Why Dataset is faster than DataFrame?

DataSet gives the best performance than dataframe. DataSet provide Encoders and type-safe but dataframe still in usage is there any particular scenario only dataframe is used in that scenario or is there any function which is working on dataframe and not working in dataset.

What is difference between RDD and DataFrame and Dataset?

While RDD offers low-level control over data, Dataset and DataFrame APIs bring structure and high-level abstractions. Keep in mind that transformations from an RDD to a Dataset or DataFrame are easy to execute.

What are Dataframe and dataset in spark?

Spark 1.3 introduced two new data abstraction APIs – DataFrame and DataSet. The DataFrame APIs organizes the data into named columns like a table in relational database. It enables programmers to define schema on a distributed collection of data. Each row in a DataFrame is of object type row.

Is samplerdd a Dataframe or dataset?

Here Sample_DF consider as DataFrame. sampleRDD is (raw data) called RDD. In Spark 2.0 the two APIs (DataFrame +DataSet) will be unified together into a single API. "Unifying DataFrame and Dataset: In Scala and Java, DataFrame and Dataset have been unified, i.e. DataFrame is just a type alias for Dataset of Row.

What is the difference between Dataframe and dataset?

A Spark DataFrame is an immutable set of objects organized into columns and distributed across nodes in a cluster. DataFrames are a SparkSQL data abstraction and are similar to relational database tables or Python Pandas DataFrames. A Dataset is also a SparkSQL structure and represents an extension of the DataFrame API.

What are the basic concepts of spark?

In this quick tutorial, we'll go through three of the Spark basic concepts: dataframes, datasets, and RDDs. 2. DataFrame Spark SQL introduced a tabular data abstraction called a DataFrame since Spark 1.3. Since then, it has become one of the most important features in Spark.


1 Answers

  1. Difference between df.select("foo") and df.select($"foo") is signature. The former one takes at least one String, the later one zero or more Columns. There is no practical difference beyond that.
  2. myDataSet.map(foo.someVal) type checks, but as any Dataset operation uses RDD of objects, and compared to DataFrame operations, there is a significant overhead. Let's take a look at a simple example:

    case class FooBar(foo: Int, bar: String) val ds = Seq(FooBar(1, "x")).toDS ds.map(_.foo).explain 
    == Physical Plan == *SerializeFromObject [input[0, int, true] AS value#123] +- *MapElements <function1>, obj#122: int    +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar       +- LocalTableScan [foo#117, bar#118] 

    As you can see this execution plan requires access to all fields and has to DeserializeToObject.

  3. No. In general other methods are not syntactic sugar and generate a significantly different execution plan. For example:

    ds.select($"foo").explain 
    == Physical Plan == LocalTableScan [foo#117] 

    Compared to the plan shown before it can access column directly. It is not so much a limitation of the API but a result of a difference in the operational semantics.

  4. How could I df.select("foo") type-safe without a map statement?

    There is no such option. While typed columns allow you to transform statically Dataset into another statically typed Dataset:

    ds.select($"bar".as[Int]) 

    there are not type safe. There some other attempts to include type safe optimized operations, like typed aggregations, but this experimental API.

  5. why should I use a UDF / UADF instead of a map

    It is completely up to you. Each distributed data structure in Spark provides its own advantages and disadvantages (see for example Spark UDAF with ArrayType as bufferSchema performance issues).

Personally, I find statically typed Dataset to be the least useful:

  • Don't provide the same range of optimizations as Dataset[Row] (although they share storage format and some execution plan optimizations it doesn't fully benefit from code generation or off-heap storage) nor access to all the analytical capabilities of the DataFrame.

  • Typed transformations are black boxes, and effectively create analysis barrier for the optimizer. For example selections (filters) cannot be be pushed over typed transformation:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain 
    == Physical Plan == *Filter (foo#133 = 1) +- *Filter <function1>.apply    +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])       +- Exchange hashpartitioning(foo#133, 200)          +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])             +- LocalTableScan [foo#133, bar#134] 

    Compared to:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain 
    == Physical Plan == *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200)    +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])       +- *Filter (foo#133 = 1)          +- LocalTableScan [foo#133, bar#134]  

    This impacts features like predicate pushdown or projection pushdown.

  • There are not as flexible as RDDs with only a small subset of types supported natively.

  • "Type safety" with Encoders is disputable when Dataset is converted using as method. Because data shape is not encoded using a signature, a compiler can only verify the existence of an Encoder.

Related questions:

  • Perform a typed join in Scala with Spark Datasets
  • Spark 2.0 DataSets groupByKey and divide operation and type safety
like image 121
zero323 Avatar answered Sep 29 '22 07:09

zero323