RDD- When you want low-level transformation and actions, we use RDDs. Also, when we need high-level abstractions we use RDDs. DataFrame- We use dataframe when we need a high level of abstraction and for unstructured data, such as media streams or streams of text.
Conceptually, consider DataFrame as an alias for a collection of generic objects Dataset[Row], where a Row is a generic untyped JVM object. Dataset, by contrast, is a collection of strongly-typed JVM objects, dictated by a case class you define in Scala or a class in Java.
A DataFrame
is defined well with a google search for "DataFrame definition":
A data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case.
So, a DataFrame
has additional metadata due to its tabular format, which allows Spark to run certain optimizations on the finalized query.
An RDD
, on the other hand, is merely a Resilient Distributed Dataset that is more of a blackbox of data that cannot be optimized as the operations that can be performed against it, are not as constrained.
However, you can go from a DataFrame to an RDD
via its rdd
method, and you can go from an RDD
to a DataFrame
(if the RDD is in a tabular format) via the toDF
method
In general it is recommended to use a DataFrame
where possible due to the built in query optimization.
First thing is
DataFrame
was evolved fromSchemaRDD
.
Yes.. conversion between Dataframe
and RDD
is absolutely possible.
Below are some sample code snippets.
df.rdd
is RDD[Row]
Below are some of options to create dataframe.
1) yourrddOffrow.toDF
converts to DataFrame
.
2) Using createDataFrame
of sql context
val df = spark.createDataFrame(rddOfRow, schema)
where schema can be from some of below options as described by nice SO post..
From scala case class and scala reflection apiimport org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]
OR using
Encoders
import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema
as described by Schema can also be created using
StructType
andStructField
val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc...
In fact there Are Now 3 Apache Spark APIs..
RDD
API :The
RDD
(Resilient Distributed Dataset) API has been in Spark since the 1.0 release.The
RDD
API provides many transformation methods, such asmap
(),filter
(), andreduce
() for performing computations on the data. Each of these methods results in a newRDD
representing the transformed data. However, these methods are just defining the operations to be performed and the transformations are not performed until an action method is called. Examples of action methods arecollect
() andsaveAsObjectFile
().
RDD Example:
rdd.filter(_.age > 21) // transformation
.map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action
Example: Filter by attribute with RDD
rdd.filter(_.age > 21)
DataFrame
APISpark 1.3 introduced a new
DataFrame
API as part of the Project Tungsten initiative which seeks to improve the performance and scalability of Spark. TheDataFrame
API introduces the concept of a schema to describe the data, allowing Spark to manage the schema and only pass data between nodes, in a much more efficient way than using Java serialization.The
DataFrame
API is radically different from theRDD
API because it is an API for building a relational query plan that Spark’s Catalyst optimizer can then execute. The API is natural for developers who are familiar with building query plans
Example SQL style :
df.filter("age > 21");
Limitations : Because the code is referring to data attributes by name, it is not possible for the compiler to catch any errors. If attribute names are incorrect then the error will only detected at runtime, when the query plan is created.
Another downside with the DataFrame
API is that it is very scala-centric and while it does support Java, the support is limited.
For example, when creating a DataFrame
from an existing RDD
of Java objects, Spark’s Catalyst optimizer cannot infer the schema and assumes that any objects in the DataFrame implement the scala.Product
interface. Scala case class
works out the box because they implement this interface.
Dataset
APIThe
Dataset
API, released as an API preview in Spark 1.6, aims to provide the best of both worlds; the familiar object-oriented programming style and compile-time type-safety of theRDD
API but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as theDataFrame
API.When it comes to serializing data, the
Dataset
API has the concept of encoders which translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object. Spark does not yet provide an API for implementing custom encoders, but that is planned for a future release.Additionally, the
Dataset
API is designed to work equally well with both Java and Scala. When working with Java objects, it is important that they are fully bean-compliant.
Example Dataset
API SQL style :
dataset.filter(_.age < 21);
Evaluations diff. between DataFrame
& DataSet
:
Catalist level flow..(Demystifying DataFrame and Dataset presentation from spark summit)
Further reading... databricks article - A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets
Apache Spark provide three type of APIs
Here is the APIs comparison between RDD, Dataframe and Dataset.
The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.
Distributed collection:
RDD uses MapReduce operations which is widely adopted for processing and generating large datasets with a parallel, distributed algorithm on a cluster. It allows users to write parallel computations, using a set of high-level operators, without having to worry about work distribution and fault tolerance.
Immutable: RDDs composed of a collection of records which are partitioned. A partition is a basic unit of parallelism in an RDD, and each partition is one logical division of data which is immutable and created through some transformations on existing partitions.Immutability helps to achieve consistency in computations.
Fault tolerant: In a case of we lose some partition of RDD , we can replay the transformation on that partition in lineage to achieve the same computation, rather than doing data replication across multiple nodes.This characteristic is the biggest benefit of RDD because it saves a lot of efforts in data management and replication and thus achieves faster computations.
Lazy evaluations: All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset . The transformations are only computed when an action requires a result to be returned to the driver program.
Functional transformations: RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
Data processing formats:
It can easily and efficiently process data which is structured as well as unstructured data.
Programming Languages supported:
RDD API is available in Java, Scala, Python and R.
No inbuilt optimization engine: When working with structured data, RDDs cannot take advantages of Spark’s advanced optimizers including catalyst optimizer and Tungsten execution engine. Developers need to optimize each RDD based on its attributes.
Handling structured data: Unlike Dataframe and datasets, RDDs don’t infer the schema of the ingested data and requires the user to specify it.
Spark introduced Dataframes in Spark 1.3 release. Dataframe overcomes the key challenges that RDDs had.
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a R/Python Dataframe. Along with Dataframe, Spark also introduced catalyst optimizer, which leverages advanced programming features to build an extensible query optimizer.
Distributed collection of Row Object: A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database, but with richer optimizations under the hood.
Data Processing: Processing structured and unstructured data formats (Avro, CSV, elastic search, and Cassandra) and storage systems (HDFS, HIVE tables, MySQL, etc). It can read and write from all these various datasources.
Optimization using catalyst optimizer: It powers both SQL queries and the DataFrame API. Dataframe use catalyst tree transformation framework in four phases,
1.Analyzing a logical plan to resolve references
2.Logical plan optimization
3.Physical planning
4.Code generation to compile parts of the query to Java bytecode.
Hive Compatibility: Using Spark SQL, you can run unmodified Hive queries on your existing Hive warehouses. It reuses Hive frontend and MetaStore and gives you full compatibility with existing Hive data, queries, and UDFs.
Tungsten: Tungsten provides a physical execution backend whichexplicitly manages memory and dynamically generates bytecode for expression evaluation.
Programming Languages supported:
Dataframe API is available in Java, Scala, Python, and R.
Example:
case class Person(name : String , age : Int)
val dataframe = sqlContext.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
This is challenging specially when you are working with several transformation and aggregation steps.
Example:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
Dataset API is an extension to DataFrames that provides a type-safe, object-oriented programming interface. It is a strongly-typed, immutable collection of objects that are mapped to a relational schema.
At the core of the Dataset, API is a new concept called an encoder, which is responsible for converting between JVM objects and tabular representation. The tabular representation is stored using Spark internal Tungsten binary format, allowing for operations on serialized data and improved memory utilization. Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.
Provides best of both RDD and Dataframe: RDD(functional programming, type safe), DataFrame (relational model, Query optimazation , Tungsten execution, sorting and shuffling)
Encoders: With the use of Encoders, it is easy to convert any JVM object into a Dataset, allowing users to work with both structured and unstructured data unlike Dataframe.
Programming Languages supported: Datasets API is currently only available in Scala and Java. Python and R are currently not supported in version 1.6. Python support is slated for version 2.0.
Type Safety: Datasets API provides compile time safety which was not available in Dataframes. In the example below, we can see how Dataset can operate on domain objects with compile lambda functions.
Example:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
Example:
ds.select(col("name").as[String], $"age".as[Int]).collect()
No support for Python and R: As of release 1.6, Datasets only support Scala and Java. Python support will be introduced in Spark 2.0.
The Datasets API brings in several advantages over the existing RDD and Dataframe API with better type safety and functional programming.With the challenge of type casting requirements in the API, you would still not the required type safety and will make your code brittle.
image credits
RDD
RDD
is a fault-tolerant collection of elements that can be operated on in parallel.
DataFrame
DataFrame
is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
Dataset
Dataset
is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.
Note:
Dataset of Rows (
Dataset[Row]
) in Scala/Java will often refer as DataFrames.
source
Q: Can you convert one to the other like RDD to DataFrame or vice-versa?
1. RDD
to DataFrame
with .toDF()
val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")
df.show()
+------+----+----+
| id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
more ways: Convert an RDD object to Dataframe in Spark
2. DataFrame
/DataSet
to RDD
with .rdd()
method
val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD
Because DataFrame
is weakly typed and developers aren't getting the benefits of the type system. For example, lets say you want to read something from SQL and run some aggregation on it:
val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")
people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), "gender")
.agg(avg(people("salary")), max(people("age")))
When you say people("deptId")
, you're not getting back an Int
, or a Long
, you're getting back a Column
object which you need to operate on. In languages with a rich type systems such as Scala, you end up losing all the type safety which increases the number of run-time errors for things that could be discovered at compile time.
On the contrary, DataSet[T]
is typed. when you do:
val people: People = val people = sqlContext.read.parquet("...").as[People]
You're actually getting back a People
object, where deptId
is an actual integral type and not a column type, thus taking advantage of the type system.
As of Spark 2.0, the DataFrame and DataSet APIs will be unified, where DataFrame
will be a type alias for DataSet[Row]
.
Simply RDD
is core component, but DataFrame
is an API introduced in spark 1.30.
Collection of data partitions called RDD
. These RDD
must follow few properties such is:
Here RDD
is either structured or unstructured.
DataFrame
is an API available in Scala, Java, Python and R. It allows to process any type of Structured and semi structured data. To define DataFrame
, a collection of distributed data organized into named columns called DataFrame
. You can easily optimize the RDDs
in the DataFrame
.
You can process JSON data, parquet data, HiveQL data at a time by using DataFrame
.
val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")
val sample_DF = sampleRDD.toDF()
Here Sample_DF consider as DataFrame
. sampleRDD
is (raw data) called RDD
.
Most of answers are correct only want to add one point here
In Spark 2.0 the two APIs (DataFrame +DataSet) will be unified together into a single API.
"Unifying DataFrame and Dataset: In Scala and Java, DataFrame and Dataset have been unified, i.e. DataFrame is just a type alias for Dataset of Row. In Python and R, given the lack of type safety, DataFrame is the main programming interface."
Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network.
Spark SQL supports two different methods for converting existing RDDs into Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.
The second method for creating Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.
Here you can find RDD tof Data frame conversation answer
How to convert rdd object to dataframe in spark
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With