Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass Spring context to Spark worker node

Tags:

apache-spark

I am very new to Spark and I have spring context(loading the xml in to application context using ClassPathXmlApplicationContext) that need to be passed to worker node . I am able to create the Spark context and trying to use the same with in my flatMap and its null. Is there anyway I can pass this to the Flatmap function ? I am calling another framework with in spark that needs this spring context.

Here is some code

ApplicationContext context = new ClassPathXmlApplicationContext("spring/rules-engine-spring.xml");

JavaRDD<Row> partitions = r.mapPartitions(
  new FlatMapFunction<Iterator<Row>, Row>() {
      public Iterable<Row> call(Iterator<Row> rowIterator) throws Exception {

          List<Data> factList = new ArrayList<Data>();
          while (rowIterator.hasNext()) {
              Row rw = rowIterator.next();
              Data fact = new Data();
              fact.setGrain(rw.getString(0));
              fact.setRoute(rw.getString(1));
              factList.add(fact);
          }
          Iterable itr = new Iterable() {
                  List l = new ArrayList<Integer>();

                  public Iterator iterator() {
                      return l.iterator();
                  }
              };
          return itr;
      }

  });
List result=partitions.collect();

when I am trying to use the context with in FlatMapFunction its null. Outside of this method , context has value . any help would be appreciated.

like image 642
Ram Kumar Avatar asked Oct 19 '22 02:10

Ram Kumar


1 Answers

When you make a variable transient, it's not getting serialized and not available on the workers. That would explain the null inside the flatMap. You'll have to make sure the serializer picks up the class (and remove the transient). You might be able to use kryo to serialize the class even though it's not Serializable.

Via http://apache-spark-user-list.1001560.n3.nabble.com/Un-serializable-3rd-party-classes-Spark-Java-td7815.html:

There are a few options:

  • Kryo might be able to serialize these objects out of the box, depending what’s inside them. Try turning it on as described at http://spark.apache.org/docs/latest/tuning.html.
  • If that doesn’t work, you can create your own “wrapper” objects that implement Serializable, or even a subclass of FlexCompRowMatrix. No need to change the original library.
  • If the library has its own serialization functions, you could also use those inside a wrapper object. Take a look at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala for an example where we make Hadoop’s Writables serializable.
like image 166
Reactormonk Avatar answered Oct 22 '22 00:10

Reactormonk