I'm investigating an interesting case that involves wide transformations (e.g. repartition & join) on a slow RDD or dataset, e.g. the dataset defined by the following code:
val ds = sqlContext.createDataset(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - ${ii}")
ii
}
}
The slow dataset is relevant as it resembles a view of a remote data source, and the partition iterator is derived from a single-threaded network protocol (http, jdbc etc.), in this case, the speed of download > the speed of single-threaded processing, but << the speed of distributed processing.
Unfortunately the conventional Spark computation model won't be efficient on a slow dataset because we are confined to one of the following options:
Use only narrow transformations (flatMap-ish) to pipe the stream with data processing end-to-end in a single thread, obviously the data processing will be a bottle neck and resource utilisation will be low.
Use a wide operation (repartitioning included) to balance the RDD/dataset, while this is essential for parallel data processing efficiency, the Spark coarse-grained scheduler demands that the download to be fully completed, which becomes another bottleneck.
Experiment
The following program represents a simple simulation of such case:
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
println(f"repartitioned - ${ii}")
ii
}
mapped2.foreach { _ =>
}
When executing the above program it can be observed that line println(f"repartitioned - ${ii}")
will not be executed before line println(f"skewed - ${ii}")
in RDD dependency.
I'd like to instruct Spark scheduler to start distributing/shipping data entries generated by the partition iterator before its task completion (through mechanisms like microbatch or stream). Is there a simple way of doing this? E.g. converting the slow dataset into a structured stream would be nice, but there should be alternatives that are better integrated.
Thanks a lot for your opinion
UPDATE: to make your experimentation easier I have appended my scala tests that can be ran out of the box:
package com.tribbloids.spookystuff.spike
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.{FunSpec, Ignore}
@Ignore
class SlowRDDSpike extends FunSpec {
lazy val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
lazy val sc: SparkContext = spark.sparkContext
lazy val sqlContext: SQLContext = spark.sqlContext
import sqlContext.implicits._
describe("is repartitioning non-blocking?") {
it("dataset") {
val ds = sqlContext
.createDataset(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - $ii")
ii
}
}
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
Thread.sleep(400)
println(f"repartitioned - $ii")
ii
}
mapped2.foreach { _ =>
}
}
}
it("RDD") {
val ds = sc
.parallelize(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - $ii")
ii
}
}
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
Thread.sleep(400)
println(f"repartitioned - $ii")
ii
}
mapped2.foreach { _ =>
}
}
}
Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.
Spark Streaming comes with several API methods that are useful for processing data streams. There are RDD-like operations like map, flatMap, filter, count, reduce, groupByKey, reduceByKey, sortByKey , and join. It also provides additional API to process the streaming data based on window and stateful operations.
RDD is slower than both Dataframes and Datasets to perform simple operations like grouping the data. It provides an easy API to perform aggregation operations. It performs aggregation faster than both RDDs and Datasets. Dataset is faster than RDDs but a bit slower than Dataframes.
Each batch of data is a Resilient Distributed Dataset (RDD) in Spark, which is the basic abstraction of a fault-tolerant dataset in Spark. This allows the streaming data to be processed using any Spark code or library.
First thanks for the experimentation code. This question is data source dependent (see Why information about the data source is essential section below).
That being said, the main problem here is creating more partitions while avoiding shuffle. Unfortunately repartition is one of the operations which requires shuffle.
In your example, you can increase the number of partitions without a shuffle using union
.
var ds: Dataset[Int] = Seq[Int]().toDS()
val sequences = (1 to 100).grouped(10)
sequences.map(sequence => {
println(sequence)
sqlContext.createDataset(sequence)
}).foreach(sequenceDS => {
ds = ds.union(sequenceDS)
})
Results using union dataset: Elapsed time: 24980 ms Number of partitions: 41
Without union the overall time is 34493 ms
, so we are seeing significant improvement on local machine.
This avoids shuffle yet creates several connections to the given http endpoint or database connection. This is a common practice that is being used for managing parallelism.
There is no need to convert the a Dataset to streaming, as streaming works with datasets. If your data source supports streaming, you can use it to generate a Dataset without having to transition from batch to streaming. if your data source doesn't support streaming your can consider using custom receivers.
Why information about the data source is essential:
Full logic:
it("dataset_with_union") {
val start = System.nanoTime()
var ds: Dataset[Int] = Seq[Int]().toDS()
val sequences = (1 to 100).grouped(10)
sequences.map(sequence => {
println(sequence)
sqlContext.createDataset(sequence)
}).foreach(sequenceDS => {
ds = ds.union(sequenceDS)
})
ds.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
ii
}
}
// Number of partitions here is 41
println(f"dataset number or partitions: ${ds.rdd.getNumPartitions}")
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
Thread.sleep(400)
println(f"repartitioned - $ii")
ii
}
mapped2.foreach { _ =>
}
val end = System.nanoTime()
println("Elapsed time: " + (end - start) + "ns")
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With