Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Elasticsearch-Spark serialization not working with inner classes

Elasticsearch/Spark serialization does not appear to play well with nested types.

For example:

public class Foo implements Serializable {
   private List<Bar> bars = new ArrayList<Bar>();
   // getters and setters

   public static class Bar implements Serializable {
   }
}

List<Foo> foos = new ArrayList<Foo>();
foos.add( new Foo());
// Note: Foo object does not contain nested Bar instances

SparkConf sc = new SparkConf(); //
sc.setMaster("local");
sc.setAppName("spark.app.name");
sc.set("spark.serializer", KryoSerializer.class.getName()); 
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(foos));
JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME);  

The above code above works, and documents of type Foo will be indexed within Elasticsearch.

The issue arises when the bars list in a Foo object is not empty, for instance:

Foo = new Foo();
Bar = new Foo.Bar();
foo.getBars().add(bar);

Then, when indexing to Elasticsearch, the following exception is thrown:

org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: 
Cannot handle type [Bar] within type [class Foo], instance [Bar ...]] 
within instance [Foo@1cf628a] 
using writer [org.elasticsearch.spark.serialization.ScalaValueWriter@4e635d]
at org.elasticsearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:63)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:71)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:58)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:148)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:47)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:68)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:68)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

These are the relevant Maven dependencies

<dependency>
   <groupId>com.sksamuel.elastic4s</groupId>
   <artifactId>elastic4s_2.11</artifactId>
   <version>1.5.5</version>
</dependency>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.11</artifactId>
   <version>1.3.1</version>
</dependency>

<dependency>
   <groupId>org.elasticsearch</groupId>
   <artifactId>elasticsearch-hadoop-cascading</artifactId>
   <version>2.1.0.Beta4</version>
</dependency>

<dependency>
   <groupId>com.fasterxml.jackson.core</groupId>
   <artifactId>jackson-databind</artifactId>
   <version>2.1.3</version>
</dependency>

<dependency>
   <groupId>org.elasticsearch</groupId>
   <artifactId>elasticsearch-spark_2.10</artifactId>
   <version>2.1.0.Beta4</version>
</dependency>

<dependency>
   <groupId>org.scala-lang</groupId>
   <artifactId>scala-xml</artifactId>
   <version>2.11.0-M4</version>
</dependency>

What is the correct way to index when using nested types with ElasticSearch and Spark?

Thanks

like image 825
user1052610 Avatar asked Oct 20 '22 10:10

user1052610


1 Answers

A solution could be to build a json from the object you're trying to save, using for example Json4s. In this case your "JavaEsSpark" RDD would be a RDD of strings. Then you simply have to call

JavaEsSpark.saveJsonToEs...

instead of

JavaEsSpark.saveToEs...

This workaround helped me save countless hours trying to figure out a way to Serialize nested maps.

like image 135
pbamba Avatar answered Oct 30 '22 22:10

pbamba