Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to get the first n elements of every RDD in Spark Streaming?

When using Spark Streaming, is it possible to get the first n elements of every RDD in a DStream? In the real world, my stream consists of a number of geotagged events, and I want to take the 100 (or whatever) which are closest to a given point for further processing, but a simple example which shows what I'm trying to do is something like:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object take {
  def main(args: Array[String]) {

    val data = 1 to 10

    val sparkConf = new SparkConf().setAppName("Take");
    val streamingContext = new StreamingContext(sparkConf, Seconds(1))

    val rdd = streamingContext.sparkContext.makeRDD(data)
    val stream = new ConstantInputDStream(streamingContext, rdd)

    // In the real world, do a bunch of stuff which results in an ordered RDD

    // This obviously doesn't work
    // val filtered = stream.transform { _.take(5) }

    // In the real world, do some more processing on the DStream

    stream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

I understand I could pull the top n results back to the driver fairly easily, but that isn't something I want to do in this case as I need to do further processing on the RDD after having filtered it down.

like image 369
Philip Kendall Avatar asked Jul 21 '15 09:07

Philip Kendall


People also ask

How many RDDs can Cogroup () can work at once?

cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.

Does Spark streaming use RDD?

DStreams are built on RDDs, Spark's core data abstraction. This allows Spark Streaming to seamlessly integrate with any other Spark components like MLlib and Spark SQL.

Which of the following are possible data sources in Spark streaming?

Different data sources that Spark supports are Parquet, CSV, Text, JDBC, AVRO, ORC, HIVE, Kafka, Azure Cosmos, Amazon S3, Redshift, etc. Parquet…


1 Answers

Why is it not working? I think your example is fine.

  1. You should compute the distance for each event
  2. Sort the events by distance with a number of partitions adapted to your amount of data
  3. Take the first 100 events from each partition (so you'll shuffle a small part of the initial data), make the returned collection a new RDD with sparkContext.parallelize(data)
  4. Sort again with only one partition so all the data is shuffled in the same dataset
  5. Take the first 100 events, this is your top 100

The code for the sort is the same in step 2 and 4, you just change the number of partitions.

Step 1 is executed on the DStream, steps 2 to 5 are executed on the RDDs in a transform operation.

like image 185
Fabien COMTE Avatar answered Sep 27 '22 22:09

Fabien COMTE