I'm developing a Spark Application and I'm used to Spring as a Dependency Injection Framework. Now I'm stuck with the problem, that the processing part uses the @Autowired functionality of Spring, but it is serialized and deserialized by Spark.
So the following code gets me into trouble:
Processor processor = ...; // This is a Spring constructed object // and makes all the trouble JavaRDD<Txn> rdd = ...; // some data for Spark rdd.foreachPartition(processor);
The Processor looks like that:
public class Processor implements VoidFunction<Iterator<Txn>>, Serializeable { private static final long serialVersionUID = 1L; @Autowired // This will not work if the object is deserialized private transient DatabaseConnection db; @Override public void call(Iterator<Txn> txns) { ... // do some fance stuff db.store(txns); } }
So my question is: Is it even possible to use something like Spring in combination with Spark? If not, what is the most elegant way to do something like that? Any help is appreciated!
FROM THE QUESTION ASKER: Added: To interfere the deserialization part directly without modifying your own classes use the following spring-spark project by parapluplu
. This projects autowires your bean when it gets deserialized by spring.
EDIT:
In order to use Spark, you need the following setup (also seen in this repository):
.
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
...
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> <!-- fix java.lang.ClassNotFoundException: org.codehaus.commons.compiler.UncheckedCompileException --> <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId> <version>2.7.8</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/log4j-over-slf4j --> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.4</version> </dependency> </dependencies>
Then you need the application class, as usual with Spring Boot:
@SpringBootApplication public class SparkExperimentApplication { public static void main(String[] args) { SpringApplication.run(SparkExperimentApplication.class, args); } }
And then a configuration that binds it all together
@Configuration @PropertySource("classpath:application.properties") public class ApplicationConfig { @Autowired private Environment env; @Value("${app.name:jigsaw}") private String appName; @Value("${spark.home}") private String sparkHome; @Value("${master.uri:local}") private String masterUri; @Bean public SparkConf sparkConf() { SparkConf sparkConf = new SparkConf() .setAppName(appName) .setSparkHome(sparkHome) .setMaster(masterUri); return sparkConf; } @Bean public JavaSparkContext javaSparkContext() { return new JavaSparkContext(sparkConf()); } @Bean public SparkSession sparkSession() { return SparkSession .builder() .sparkContext(javaSparkContext().sc()) .appName("Java Spark SQL basic example") .getOrCreate(); } @Bean public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() { return new PropertySourcesPlaceholderConfigurer(); } }
Then you can use SparkSession
class to communicate with Spark SQL:
/** * Created by achat1 on 9/23/15. * Just an example to see if it works. */ @Component public class WordCount { @Autowired private SparkSession sparkSession; public List<Count> count() { String input = "hello world hello hello hello"; String[] _words = input.split(" "); List<Word> words = Arrays.stream(_words).map(Word::new).collect(Collectors.toList()); Dataset<Row> dataFrame = sparkSession.createDataFrame(words, Word.class); dataFrame.show(); //StructType structType = dataFrame.schema(); RelationalGroupedDataset groupedDataset = dataFrame.groupBy(col("word")); groupedDataset.count().show(); List<Row> rows = groupedDataset.count().collectAsList();//JavaConversions.asScalaBuffer(words)).count(); return rows.stream().map(new Function<Row, Count>() { @Override public Count apply(Row row) { return new Count(row.getString(0), row.getLong(1)); } }).collect(Collectors.toList()); } }
Referring to these two classes:
public class Word { private String word; public Word() { } public Word(String word) { this.word = word; } public void setWord(String word) { this.word = word; } public String getWord() { return word; } } public class Count { private String word; private long count; public Count() { } public Count(String word, long count) { this.word = word; this.count = count; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } }
Then you can run see it returns the right data:
@RequestMapping("api") @Controller public class ApiController { @Autowired WordCount wordCount; @RequestMapping("wordcount") public ResponseEntity<List<Count>> words() { return new ResponseEntity<>(wordCount.count(), HttpStatus.OK); } }
Says
[{"word":"hello","count":4},{"word":"world","count":1}]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With