Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use Spring together with Spark

I'm developing a Spark Application and I'm used to Spring as a Dependency Injection Framework. Now I'm stuck with the problem, that the processing part uses the @Autowired functionality of Spring, but it is serialized and deserialized by Spark.

So the following code gets me into trouble:

Processor processor = ...; // This is a Spring constructed object                            // and makes all the trouble JavaRDD<Txn> rdd = ...; // some data for Spark rdd.foreachPartition(processor); 

The Processor looks like that:

public class Processor implements VoidFunction<Iterator<Txn>>, Serializeable {     private static final long serialVersionUID = 1L;      @Autowired // This will not work if the object is deserialized     private transient DatabaseConnection db;      @Override     public void call(Iterator<Txn> txns) {         ... // do some fance stuff         db.store(txns);     } } 

So my question is: Is it even possible to use something like Spring in combination with Spark? If not, what is the most elegant way to do something like that? Any help is appreciated!

like image 858
itsme Avatar asked May 05 '15 12:05

itsme


1 Answers

FROM THE QUESTION ASKER: Added: To interfere the deserialization part directly without modifying your own classes use the following spring-spark project by parapluplu. This projects autowires your bean when it gets deserialized by spring.


EDIT:

In order to use Spark, you need the following setup (also seen in this repository):

  • Spring Boot + Spark:

.

<parent>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-parent</artifactId>     <version>1.5.2.RELEASE</version>     <relativePath/>     <!-- lookup parent from repository --> </parent> 

...

<dependencies>     <dependency>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-web</artifactId>         <exclusions>             <exclusion>                 <groupId>ch.qos.logback</groupId>                 <artifactId>logback-classic</artifactId>             </exclusion>         </exclusions>     </dependency>      <dependency>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-test</artifactId>         <scope>test</scope>     </dependency>      <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->     <dependency>         <groupId>org.apache.spark</groupId>         <artifactId>spark-core_2.11</artifactId>         <version>2.1.0</version>         <exclusions>             <exclusion>                 <groupId>org.slf4j</groupId>                 <artifactId>slf4j-log4j12</artifactId>             </exclusion>             <exclusion>                 <groupId>log4j</groupId>                 <artifactId>log4j</artifactId>             </exclusion>         </exclusions>     </dependency>      <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->     <dependency>         <groupId>org.apache.spark</groupId>         <artifactId>spark-sql_2.11</artifactId>         <version>2.1.0</version>     </dependency>      <!-- fix java.lang.ClassNotFoundException: org.codehaus.commons.compiler.UncheckedCompileException -->     <dependency>         <groupId>org.codehaus.janino</groupId>         <artifactId>commons-compiler</artifactId>         <version>2.7.8</version>     </dependency>      <!-- https://mvnrepository.com/artifact/org.slf4j/log4j-over-slf4j -->     <dependency>         <groupId>org.slf4j</groupId>         <artifactId>log4j-over-slf4j</artifactId>         <version>1.7.25</version>     </dependency>     <dependency>         <groupId>org.slf4j</groupId>         <artifactId>slf4j-api</artifactId>         <version>1.7.5</version>     </dependency>     <dependency>         <groupId>org.slf4j</groupId>         <artifactId>slf4j-simple</artifactId>         <version>1.6.4</version>     </dependency>  </dependencies> 

Then you need the application class, as usual with Spring Boot:

@SpringBootApplication public class SparkExperimentApplication {      public static void main(String[] args) {         SpringApplication.run(SparkExperimentApplication.class, args);     } } 

And then a configuration that binds it all together

@Configuration @PropertySource("classpath:application.properties") public class ApplicationConfig {      @Autowired     private Environment env;      @Value("${app.name:jigsaw}")     private String appName;      @Value("${spark.home}")     private String sparkHome;      @Value("${master.uri:local}")     private String masterUri;      @Bean     public SparkConf sparkConf() {         SparkConf sparkConf = new SparkConf()                 .setAppName(appName)                 .setSparkHome(sparkHome)                 .setMaster(masterUri);          return sparkConf;     }      @Bean     public JavaSparkContext javaSparkContext() {         return new JavaSparkContext(sparkConf());     }      @Bean     public SparkSession sparkSession() {         return SparkSession                 .builder()                 .sparkContext(javaSparkContext().sc())                 .appName("Java Spark SQL basic example")                 .getOrCreate();     }      @Bean     public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {         return new PropertySourcesPlaceholderConfigurer();     } } 

Then you can use SparkSession class to communicate with Spark SQL:

/**  * Created by achat1 on 9/23/15.  * Just an example to see if it works.  */ @Component public class WordCount {     @Autowired     private SparkSession sparkSession;      public List<Count> count() {         String input = "hello world hello hello hello";         String[] _words = input.split(" ");         List<Word> words = Arrays.stream(_words).map(Word::new).collect(Collectors.toList());         Dataset<Row> dataFrame = sparkSession.createDataFrame(words, Word.class);         dataFrame.show();         //StructType structType = dataFrame.schema();          RelationalGroupedDataset groupedDataset = dataFrame.groupBy(col("word"));         groupedDataset.count().show();         List<Row> rows = groupedDataset.count().collectAsList();//JavaConversions.asScalaBuffer(words)).count();         return rows.stream().map(new Function<Row, Count>() {             @Override             public Count apply(Row row) {                 return new Count(row.getString(0), row.getLong(1));             }         }).collect(Collectors.toList());     } } 

Referring to these two classes:

public class Word {     private String word;      public Word() {     }      public Word(String word) {         this.word = word;     }      public void setWord(String word) {         this.word = word;     }      public String getWord() {         return word;     } }  public class Count {     private String word;     private long count;      public Count() {     }      public Count(String word, long count) {         this.word = word;         this.count = count;     }      public String getWord() {         return word;     }      public void setWord(String word) {         this.word = word;     }      public long getCount() {         return count;     }      public void setCount(long count) {         this.count = count;     } } 

Then you can run see it returns the right data:

@RequestMapping("api") @Controller public class ApiController {     @Autowired     WordCount wordCount;      @RequestMapping("wordcount")     public ResponseEntity<List<Count>> words() {         return new ResponseEntity<>(wordCount.count(), HttpStatus.OK);     } } 

Says

[{"word":"hello","count":4},{"word":"world","count":1}] 
like image 84
EpicPandaForce Avatar answered Oct 16 '22 00:10

EpicPandaForce