I have the following structure:
mylist = [{"key1":"val1"}, {"key2":"val2"}]
myrdd = value_counts.map(lambda item: ('key', {
'field': somelist
}))
I get the error: 15/02/10 15:54:08 INFO scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 6) on executor ip-10-80-15-145.ec2.internal: org.apache.spark.SparkException (Data of type java.util.ArrayList cannot be used) [duplicate 1]
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "mboyd/mboydtype"
})
What I want the document to end up like when written to ES is:
{
field:[{"key1":"val1"}, {"key2":"val2"}]
}
A bit late to the game, but this is the solution we came up with after running in to this yesterday. Add 'es.input.json': 'true'
to your conf, and then run json.dumps()
on your data.
Modifying your example, this would look like:
import json
rdd = sc.parallelize([{"key1": ["val1", "val2"]}])
json_rdd = rdd.map(json.dumps)
json_rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "mboyd/mboydtype",
"es.input.json": "true"
}
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With