I have the following structure:
mylist = [{"key1":"val1"}, {"key2":"val2"}]
myrdd = value_counts.map(lambda item: ('key', { 
    'field': somelist 
}))
I get the error: 15/02/10 15:54:08 INFO scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 6) on executor ip-10-80-15-145.ec2.internal: org.apache.spark.SparkException (Data of type java.util.ArrayList cannot be used) [duplicate 1]
rdd.saveAsNewAPIHadoopFile( 
            path='-', 
            outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
            keyClass="org.apache.hadoop.io.NullWritable", 
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
            conf={ 
        "es.nodes" : "localhost", 
        "es.port" : "9200", 
        "es.resource" : "mboyd/mboydtype" 
    }) 
What I want the document to end up like when written to ES is:
{
field:[{"key1":"val1"}, {"key2":"val2"}]
}
                A bit late to the game, but this is the solution we came up with after running in to this yesterday. Add 'es.input.json': 'true' to your conf, and then run json.dumps() on your data.
Modifying your example, this would look like:
import json
rdd = sc.parallelize([{"key1": ["val1", "val2"]}])
json_rdd = rdd.map(json.dumps)
json_rdd.saveAsNewAPIHadoopFile( 
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf={ 
        "es.nodes" : "localhost", 
        "es.port" : "9200", 
        "es.resource" : "mboyd/mboydtype",
        "es.input.json": "true"
    }
) 
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With