Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Access Spark broadcast variable in different classes

I am broadcasting a value in Spark Streaming application . But I am not sure how to access that variable in a different class than the class where it was broadcasted.

My code looks as follows:

object AppMain{
  def main(args: Array[String]){
    //...
    val broadcastA = sc.broadcast(a)
    //..
    lines.foreachRDD(rdd => {
    val obj = AppObject1
    rdd.filter(p => obj.apply(p))
    rdd.count
  }
}

object AppObject1: Boolean{
  def apply(str: String){
    AnotherObject.process(str)
  }
}
object AnotherObject{
  // I want to use broadcast variable in this object
  val B = broadcastA.Value // compilation error here
  def process(): Boolean{
   //need to use B inside this method
  }
}

Can anyone suggest how to access broadcast variable in this case?

like image 802
Alok Avatar asked Apr 15 '16 09:04

Alok


2 Answers

There is nothing particularly Spark specific here ignoring possible serialization issues. If you want to use some object it has to be available in the current scope and you can achieve this the same way as usual:

  • you can define your helpers in a scope where broadcast is already defined:

    {
        ...
        val x = sc.broadcast(1)
        object Foo {
          def foo = x.value
        }
        ...
    }
    
  • you can use it as a constructor argument:

    case class Foo(x: org.apache.spark.broadcast.Broadcast[Int]) {
      def foo = x.value
    }
    
    ...
    
    Foo(sc.broadcast(1)).foo
    
  • method argument

    case class Foo() {
      def foo(x: org.apache.spark.broadcast.Broadcast[Int]) = x.value
    }
    
    ...
    
    Foo().foo(sc.broadcast(1))
    
  • or even mixed-in your helpers like this:

    trait Foo {
      val x: org.apache.spark.broadcast.Broadcast[Int]
      def foo = x.value
    }
    
    object Main extends Foo {
      val sc = new SparkContext("local",  "test", new SparkConf())
      val x = sc.broadcast(1)
    
      def main(args: Array[String]) {
        sc.parallelize(Seq(None)).map(_ => foo).first
        sc.stop
      }
    }
    
like image 99
zero323 Avatar answered Oct 03 '22 17:10

zero323


Just a short take on performance considerations that were introduced earlier.

Options proposed by zero233 are indeed very elegant way of doing this kind of things in Scala. At the same time it is important to understand implications of using certain patters in distributed system.

It is not the best idea to use mixin approach / any logic that uses enclosing class state. Whenever you use a state of enclosing class within lambdas Spark will have to serialize outer object. This is not always true but you'd better off writing safer code than one day accidentally blow up the whole cluster.

Being aware of this, I would personally go for explicit argument passing to the methods as this would not result in outer class serialization (method argument approach).

like image 31
bottaio Avatar answered Oct 03 '22 18:10

bottaio