Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark on Java - What is the right way to have a static object on all workers

I need to use a non-serialisable 3rd party class in my functions on all executors in Spark, for example:

JavaRDD<String> resRdd = origRdd
    .flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(String t) throws Exception {

        //A DynamoDB mapper I don't want to initialise every time
        DynamoDBMapper mapper = new DynamoDBMapper(new AmazonDynamoDBClient(credentials));

        Set<String> userFav = mapper.load(userDataDocument.class, userId).getFav();

        return userFav;
    }
});

I would like to have a static DynamoDBMapper mapper which I initialise once for every executor and be able to use it over and over again.

Since it's not a serialisable, I can't initialise it once in the drive and broadcast it.

note: this is an answer here (What is the right way to have a static object on all workers) but it's only for Scala.

like image 309
Roee Gavirel Avatar asked Jan 26 '16 15:01

Roee Gavirel


1 Answers

You can use mapPartition or foreachPartition. Here is a snippet taken from Learning Spark

By using partition- based operations, we can share a connection pool to this database to avoid setting up many connections, and reuse our JSON parser. As Examples 6-10 through 6-12 show, we use the mapPartitions() function, which gives us an iterator of the elements in each partition of the input RDD and expects us to return an iterator of our results.

This allows us to initialize one connection per executor, then iterate over the elements in the partition however you would like. This is very useful for saving data into some external database or for expensive reusable object creation.

Here is a simple scala example taken from the linked book. This can be translated to java if needed. Just here to show a simple use case of mapPartition and foreachPartition.

ipAddressRequestCount.foreachRDD { rdd => rdd.foreachPartition { partition =>
    // Open connection to storage system (e.g. a database connection)
    partition.foreach { item =>
    // Use connection to push item to system
    }
    // Close connection
    } 
}

Here is a link to a java example.

like image 95
Alex Naspo Avatar answered Oct 17 '22 13:10

Alex Naspo