Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark job with Async HTTP call

I build a RDD from a list of urls, and then try to fetch datas with some async http call. I need all the results before doing other calculs. Ideally, I need to make the http calls on differents nodes for scaling considerations.

I did something like this:

//init spark
val sparkContext = new SparkContext(conf)
val datas = Seq[String]("url1", "url2")

//create rdd
val rdd = sparkContext.parallelize[String](datas)

//httpCall return Future[String]
val requests = rdd.map((url: String) => httpCall(url))

//await all results (Future.sequence may be better)
val responses = requests.map(r => Await.result(r, 10.seconds))

//print responses
response.collect().foreach((s: String) => println(s))

//stop spark
sparkContext.stop()

This work, but Spark job never finish !

So I wonder what is are the best practices for dealing with Future using Spark (or Future[RDD]).

I think this use case looks pretty common, but didn't find any answer yet.

Best regards

like image 216
Vincent Spiewak Avatar asked Mar 09 '16 18:03

Vincent Spiewak


4 Answers

I finally made it using scalaj-http instead of Dispatch. Call are synchronous, but this match my use case.

I think the Spark Job never finish using Dispatch because the Http connection was not closed properly.

Best Regards

like image 64
Vincent Spiewak Avatar answered Nov 18 '22 10:11

Vincent Spiewak


I couldnt find an easy way to achieve this. But after several iteration of retries this is what I did and its working for a huge list of queries. Basically we used this to do a batch operation for a huge query into multiple sub queries.

// Break down your huge workload into smaller chunks, in this case huge query string is broken 
// down to a small set of subqueries
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq)

// Then map each one those to a Spark Task, in this case its a Future that returns a string
val tasks: RDD[Future[String]] = queries.map(query => {
    val task = makeHttpCall(query) // Method returns http call response as a Future[String]
    task.recover { 
        case ex => logger.error("recover: " + ex.printStackTrace()) }
    task onFailure {
        case t => logger.error("execution failed: " + t.getMessage) }
    task
})

// Note:: Http call is still not invoked, you are including this as part of the lineage

// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved

val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] =>
   val searchFuture: Future[Iterator[String]] = Future sequence f
   Await.result(searchFuture, threadWaitTime.seconds)
}

// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. 
// When you perform any action on that Rdd, then at that point, 
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and 
// collect those data in a single rdd. 

If you dont want to perform any transformation on the content like parsing the response payload, etc. Then you could use foreachPartition instead of mapPartitions to perform all those http calls immediately.

like image 21
raksja Avatar answered Nov 18 '22 11:11

raksja


this use case looks pretty common

Not really, because it simply doesn't work as you (probably) expect. Since each task operates on standard Scala Iterators these operations will be squashed together. It means that all operations will be blocking in practice. Assuming you have three URLs ["x", "y", "z"] you code will be executed in a following order:

Await.result(httpCall("x", 10.seconds))
Await.result(httpCall("y", 10.seconds))
Await.result(httpCall("z", 10.seconds))

You can easily reproduce the same behavior locally. If you want to execute your code asynchronously you should handle this explicitly using mapPartitions:

rdd.mapPartitions(iter => {
  ??? // Submit requests
  ??? // Wait until all requests completed and return Iterator of results
})

but this is relatively tricky. There is no guarantee all data for a given partition fits into memory so you'll probably need some batching mechanism as well.

All of that being said I couldn't reproduce the problem you've described to is can be some configuration issue or a problem with httpCall itself.

On a side note allowing a single timeout to kill whole task doesn't look like a good idea.

like image 11
zero323 Avatar answered Nov 18 '22 10:11

zero323


This wont work.

You cannot expect the request objects be distributed and responses collected over a cluster by other nodes. If you do then the spark calls for future will never end. The futures will never work in this case.

If your map() make sync(http) requests then please collect responses within the same action/transformation call and then subject the results(responses) to further map/reduce/other calls.

In your case, please rewrite logic collect the responses for each call in sync and remove the notion of futures then all should be fine.

like image 2
user6044522 Avatar answered Nov 18 '22 11:11

user6044522