Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Futures within Spark

A Spark job makes a remote web service for every element in an RDD. A simple implementation might look something like this:

def webServiceCall(url: String) = scala.io.Source.fromURL(url).mkString
rdd2 = rdd1.map(x => webServiceCall(x.field1))

(The above example has been kept simple and does not handle timeouts).

There is no interdependency between any of the results for different elements of the RDD.

Would the above be improved by using Futures to optimise performance by making parallel calls to the web service for each element of the RDD? Or does Spark itself have that level of optimization built in, so that it will run the operations on each element in the RDD in parallel?

If the above can be optimized by using Futures, does anyone have some code examples showing the correct way to use Futures within a function passed to a Spark RDD.

Thanks

like image 589
user1052610 Avatar asked May 27 '16 08:05

user1052610


People also ask

What is Spark Future?

Futures are a means of performing asynchronous programming in Scala. A Future gives you a simple way to run a job inside your spark application concurrently. Let's look at the usual way we write our Spark code and then see how Future can help us.

How does Future work in Scala?

Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.

How do I run parallel jobs in Spark?

Scheduling Within an Application. Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save , collect ) and any tasks that need to run to evaluate that action.

What is Future and promise in Scala?

The Promise is a writable, single-assignment container that completes a Future. The Promise is similar to the Future. However, the Future is about the read-side of an asynchronous operation, while the Promise is about the write-side.


2 Answers

Or does Spark itself have that level of optimization built in, so that it will run the operations on each element in the RDD in parallel?

It doesn't. Spark parallelizes tasks at the partition level but by default every partition is processed sequentially in a single thread.

Would the above be improved by using Futures

It could be an improvement but is quite hard to do it right. In particular:

  • every Future has to be completed in the same stage before any reshuffle takes place.
  • given lazy nature of the Iterators used to expose partition data you cannot do it high level primitives like map (see for example Spark job with Async HTTP call).
  • you can build your custom logic using mapPartitions but then you have to deal with all the consequences of non-lazy partition evaluation.
like image 189
zero323 Avatar answered Sep 20 '22 01:09

zero323


I couldnt find an easy way to achieve this. But after several iteration of retries this is what I did and its working for a huge list of queries. Basically we used this to do a batch operation for a huge query into multiple sub queries.

// Break down your huge workload into smaller chunks, in this case huge query string is broken 
// down to a small set of subqueries
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq)

// Then map each one those to a Spark Task, in this case its a Future that returns a string
val tasks: RDD[Future[String]] = queries.map(query => {
    val task = makeHttpCall(query) // Method returns http call response as a Future[String]
    task.recover { 
        case ex => logger.error("recover: " + ex.printStackTrace()) }
    task onFailure {
        case t => logger.error("execution failed: " + t.getMessage) }
    task
})

// Note:: Http call is still not invoked, you are including this as part of the lineage

// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved

val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] =>
   val searchFuture: Future[Iterator[String]] = Future sequence f
   Await.result(searchFuture, threadWaitTime.seconds)
}

// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. 
// When you perform any action on that Rdd, then at that point, 
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and 
// collect those data in a single rdd. 

I'm reposting it from my original answer here

like image 36
raksja Avatar answered Sep 20 '22 01:09

raksja