Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you perform blocking IO in apache spark job?

What if, when I traverse RDD, I need to calculate values in dataset by calling external (blocking) service? How do you think that could be achieved?

val values: Future[RDD[Double]] = Future sequence tasks

I've tried to create a list of Futures, but as RDD id not Traversable, Future.sequence is not suitable.

I just wonder, if anyone had such a problem, and how did you solve it? What I'm trying to achieve is to get a parallelism on a single worker node, so I can call that external service 3000 times per second.

Probably, there is another solution, more suitable for spark, like having multiple working nodes on single host.

It's interesting to know, how do you cope with such a challenge? Thanks.

like image 708
Dr.Khu Avatar asked Sep 08 '14 13:09

Dr.Khu


People also ask

Which is one of the possible ways to optimize a Spark job?

Ideally, Spark organises one thread per task and per CPU core. Each task is related to a single partition. Thus, a first intuition is to configure a number of partitions at least as large as the number of available CPU cores. All cores should be occupied most of the time during the execution of the Spark job.

How can we reduce garbage collection in Spark?

To avoid full GC in G1 GC, there are two commonly-used approaches: Decrease the InitiatingHeapOccupancyPercent option's value (the default value is 45), to let G1 GC starts initial concurrent marking at an earlier time, so that we are more likely to avoid full GC.

How will you do memory tuning in Spark?

In order, to reduce memory usage you might have to store spark RDDs in serialized form. Data serialization also determines a good network performance. You will be able to obtain good results in Spark performance by: Terminating those jobs that run long.


2 Answers

Here is answer to my own question:

val buckets = sc.textFile(logFile, 100)
val tasks: RDD[Future[Object]] = buckets map { item =>
  future {
    // call native code
  }
}

val values = tasks.mapPartitions[Object] { f: Iterator[Future[Object]] =>
  val searchFuture: Future[Iterator[Object]] = Future sequence f
  Await result (searchFuture, JOB_TIMEOUT)
}

The idea here is, that we get the collection of partitions, where each partition is sent to the specific worker and is the smallest piece of work. Each that piece of work contains data, that could be processed by calling native code and sending that data.

'values' collection contains the data, that is returned from the native code and that work is done across the cluster.

like image 172
Dr.Khu Avatar answered Nov 15 '22 21:11

Dr.Khu


Based on your answer, that the blocking call is to compare provided input with each individual item in the RDD, I would strongly consider rewriting the comparison in java/scala so that it can be run as part of your spark process. If the comparison is a "pure" function (no side effects, depends only on its inputs), it should be straightforward to re-implement, and the decrease in complexity and increase in stability in your spark process due to not having to make remote calls will probably make it worth it.

It seems unlikely that your remote service will be able to handle 3000 calls per second, so a local in-process version would be preferable.

If that is absolutely impossible for some reason, then you might be able to create a RDD transformation which turns your data into a RDD of futures, in pseudo-code:

val callRemote(data:Data):Future[Double] = ...

val inputData:RDD[Data] = ...

val transformed:RDD[Future[Double]] = inputData.map(callRemote)

And then carry on from there, computing on your Future[Double] objects.

If you know how much parallelism your remote process can handle, it might be best to abandon the Future mode and accept that it is a bottleneck resource.

val remoteParallelism:Int = 100 // some constant

val callRemoteBlocking(data:Data):Double = ...

val inputData:RDD[Data] = ...

val transformed:RDD[Double] = inputData.
  coalesce(remoteParallelism).
  map(callRemoteBlocking)

Your job will probably take quite some time, but it shouldn't flood your remote service and die horribly.

A final option is that if the inputs are reasonably predictable and the range of outcomes is consistent and limited to some reasonable number of outputs (millions or so), you could precompute them all as a data set using your remote service and find them at spark job time using a join.

like image 37
DPM Avatar answered Nov 15 '22 22:11

DPM