Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Apache Spark, how to convert a slow RDD/dataset into a stream?

I'm investigating an interesting case that involves wide transformations (e.g. repartition & join) on a slow RDD or dataset, e.g. the dataset defined by the following code:

val ds = sqlContext.createDataset(1 to 100)
  .repartition(1)
  .mapPartitions { itr =>
    itr.map { ii =>
      Thread.sleep(100)
      println(f"skewed - ${ii}")
      ii
    }
  }

The slow dataset is relevant as it resembles a view of a remote data source, and the partition iterator is derived from a single-threaded network protocol (http, jdbc etc.), in this case, the speed of download > the speed of single-threaded processing, but << the speed of distributed processing.

Unfortunately the conventional Spark computation model won't be efficient on a slow dataset because we are confined to one of the following options:

  1. Use only narrow transformations (flatMap-ish) to pipe the stream with data processing end-to-end in a single thread, obviously the data processing will be a bottle neck and resource utilisation will be low.

  2. Use a wide operation (repartitioning included) to balance the RDD/dataset, while this is essential for parallel data processing efficiency, the Spark coarse-grained scheduler demands that the download to be fully completed, which becomes another bottleneck.

Experiment

The following program represents a simple simulation of such case:

val mapped = ds

val mapped2 = mapped
  .repartition(10)
  .map { ii =>
    println(f"repartitioned - ${ii}")
    ii
  }

mapped2.foreach { _ =>
  }

When executing the above program it can be observed that line println(f"repartitioned - ${ii}") will not be executed before line println(f"skewed - ${ii}") in RDD dependency.

I'd like to instruct Spark scheduler to start distributing/shipping data entries generated by the partition iterator before its task completion (through mechanisms like microbatch or stream). Is there a simple way of doing this? E.g. converting the slow dataset into a structured stream would be nice, but there should be alternatives that are better integrated.

Thanks a lot for your opinion

UPDATE: to make your experimentation easier I have appended my scala tests that can be ran out of the box:

package com.tribbloids.spookystuff.spike

import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.{FunSpec, Ignore}

@Ignore
class SlowRDDSpike extends FunSpec {

  lazy val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()

  lazy val sc: SparkContext = spark.sparkContext
  lazy val sqlContext: SQLContext = spark.sqlContext

  import sqlContext.implicits._

  describe("is repartitioning non-blocking?") {

    it("dataset") {

      val ds = sqlContext
        .createDataset(1 to 100)
        .repartition(1)
        .mapPartitions { itr =>
          itr.map { ii =>
            Thread.sleep(100)
            println(f"skewed - $ii")
            ii
          }
        }

      val mapped = ds

      val mapped2 = mapped
        .repartition(10)
        .map { ii =>
          Thread.sleep(400)
          println(f"repartitioned - $ii")
          ii
        }

      mapped2.foreach { _ =>
        }
    }
  }

  it("RDD") {
    val ds = sc
      .parallelize(1 to 100)
      .repartition(1)
      .mapPartitions { itr =>
        itr.map { ii =>
          Thread.sleep(100)
          println(f"skewed - $ii")
          ii
        }
      }

    val mapped = ds

    val mapped2 = mapped
      .repartition(10)
      .map { ii =>
        Thread.sleep(400)
        println(f"repartitioned - $ii")
        ii
      }

    mapped2.foreach { _ =>
      }

  }
}
like image 909
tribbloid Avatar asked Aug 13 '19 16:08

tribbloid


People also ask

How you will convert RDD into data frame and datasets?

Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.

What are some of the ways of processing streaming data in Apache Spark?

Spark Streaming comes with several API methods that are useful for processing data streams. There are RDD-like operations like map, flatMap, filter, count, reduce, groupByKey, reduceByKey, sortByKey , and join. It also provides additional API to process the streaming data based on window and stateful operations.

Why RDD is slower than DataFrame?

RDD is slower than both Dataframes and Datasets to perform simple operations like grouping the data. It provides an easy API to perform aggregation operations. It performs aggregation faster than both RDDs and Datasets. Dataset is faster than RDDs but a bit slower than Dataframes.

What is RDD in Spark Streaming?

Each batch of data is a Resilient Distributed Dataset (RDD) in Spark, which is the basic abstraction of a fault-tolerant dataset in Spark. This allows the streaming data to be processed using any Spark code or library.


1 Answers

First thanks for the experimentation code. This question is data source dependent (see Why information about the data source is essential section below).

That being said, the main problem here is creating more partitions while avoiding shuffle. Unfortunately repartition is one of the operations which requires shuffle.

In your example, you can increase the number of partitions without a shuffle using union.

var ds: Dataset[Int] = Seq[Int]().toDS()
val sequences = (1 to 100).grouped(10)
sequences.map(sequence => {
  println(sequence)
  sqlContext.createDataset(sequence)
}).foreach(sequenceDS =>  {
  ds = ds.union(sequenceDS)
})

Results using union dataset: Elapsed time: 24980 ms Number of partitions: 41

Without union the overall time is 34493 ms, so we are seeing significant improvement on local machine.

This avoids shuffle yet creates several connections to the given http endpoint or database connection. This is a common practice that is being used for managing parallelism.

There is no need to convert the a Dataset to streaming, as streaming works with datasets. If your data source supports streaming, you can use it to generate a Dataset without having to transition from batch to streaming. if your data source doesn't support streaming your can consider using custom receivers.


Why information about the data source is essential:

  1. Can you control the number of partitions of an initial Dataset when reading from a given data source?
  2. What is an acceptable request rate or number of connection to your data source?
  3. How much data is involved? Is shuffle an option?
  4. Does your data source support spark streaming? Some data sources (kinesis, Kafka, File systems, ElasticSearch) supports streaming and some does not.

Full logic:

  it("dataset_with_union") {
    val start = System.nanoTime()
    var ds: Dataset[Int] = Seq[Int]().toDS()
    val sequences = (1 to 100).grouped(10)
    sequences.map(sequence => {
      println(sequence)
      sqlContext.createDataset(sequence)
    }).foreach(sequenceDS =>  {
      ds = ds.union(sequenceDS)
    })

    ds.mapPartitions { itr =>
      itr.map { ii =>
        Thread.sleep(100)
        ii
      }
    }

    // Number of partitions here is 41
    println(f"dataset number or partitions: ${ds.rdd.getNumPartitions}")
    val mapped = ds

    val mapped2 = mapped
      .repartition(10)
      .map { ii =>
        Thread.sleep(400)
        println(f"repartitioned - $ii")
        ii
      }

    mapped2.foreach { _ =>
    }

    val end = System.nanoTime()
    println("Elapsed time: " + (end - start) + "ns")
  }
like image 90
Yosi Dahari Avatar answered Sep 20 '22 04:09

Yosi Dahari