Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark elasticsearch connector: how to select _id field?

I'm using the spark elasticsearch connector and I want to extract from ES some fields plus the _id.

myquery = """{"query":..., """
val df = spark.read.format("org.elasticsearch.spark.sql")
                 .option("query", myquery)
                 .option("pushdown", "true")
                 .load("myindex/mytype")
                 .limit(10) 
                 .select("myfield","_id") 

Unfortunately, the _id field is not recognized:

AnalysisException: u'cannot resolve \'`_id`\' given input columns: 
[query, size, @version, @timestamp, 
 sourceinfo, signaletic, document, metadata, fields, aggs]

With this mapping I can select for example document.{fieldA}, sourceinfo.{fieldB}, etc. but not _id. It's like if it's at a level to high in the mapping.

Any idea how to get this particular field?

like image 394
Patrick Avatar asked Nov 19 '22 04:11

Patrick


1 Answers

You can access _id or the metadata by setting the es.read.metadata to True

For example in your code:

myquery = """{"query":..., """
val df = spark.read.format("org.elasticsearch.spark.sql")
                 .option("query", myquery)
                 .option("pushdown", "true")
                 .option("es.read.metadata",True) # Set it to True
                 .load("myindex/mytype")
                 .limit(10) 
                 .select("myfield","_id") 

When you do

df.printSchema()

it will print something like ( in my case )

 |-- user_id: string (nullable = true)
 |-- user_rt: string (nullable = true)
 |-- user_rt_id: string (nullable = true)
 |-- username: string (nullable = true)
 |-- video: long (nullable = true)
 |-- _metadata: map (nullable = true) # this map column will be added
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

You can further extract the properties like _index, _type, _id, _score, sort from _metadata column

To access the _id column from _metadata (map type) i have used this, you can use what you think best

df.createOrReplaceTempView('temp_table_name')
spark.sql(""" 
     SELECT 
     _metadata._id as reference_id 
     FROM temp_table_name 
    """)

Hope this solves your problem

like image 118
Danial Shabbir Avatar answered Nov 21 '22 18:11

Danial Shabbir