Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

jsontostructs to Row in spark structured streaming

I'm using Spark 2.2 and i'm trying to read the JSON messages from Kafka, transform them to DataFrame and have them as a Row:

spark
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topic")
    .load()
    .select(col("value").cast(StringType).as("col"))
    .writeStream()
    .format("console")
    .start();

with this I can achieve:

+--------------------+
|                 col|
+--------------------+
|{"myField":"somet...|
+--------------------+

I wanted something more like this:

+--------------------+
|             myField|
+--------------------+
|"something"         |
+--------------------+

I tried to use from_json function using struct:

DataTypes.createStructType(
    new StructField[] {
            DataTypes.createStructField("myField", DataTypes.StringType)
    }
)

but I only got:

+--------------------+
|  jsontostructs(col)|
+--------------------+
|[something]         |
+--------------------+

then I tried to use explode but I only got Exception saying:

cannot resolve 'explode(`col`)' due to data type mismatch: 
input to function explode should be array or map type, not 
StructType(StructField(...

Any idea how to make this work?

like image 660
Martin Brisiak Avatar asked Oct 12 '17 16:10

Martin Brisiak


People also ask

How do you handle late data in structured streaming?

Watermarking is a feature in Spark Structured Streaming that is used to handle the data that arrives late. Spark Structured Streaming can maintain the state of the data that arrives, store it in memory, and update it accurately by aggregating it with the data that arrived late.

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

How does Spark handle duplicates in streaming?

Spark doesn't have a distinct method that takes columns that should run distinct on however, Spark provides another signature of dropDuplicates() function which takes multiple columns to eliminate duplicates. Note that calling dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed.


1 Answers

You're almost there, just select the right thing. from_json returns a struct column matching the schema. If schema (JSON representation) looks like this:

{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]}

you'll get nested object equivalent to:

root
 |-- jsontostructs(col): struct (nullable = true)
 |    |-- myField: string (nullable = false)

You can use getField (or getItem) method to select specific field

df.select(from_json(col("col"), schema).getField("myField").alias("myField"));

or .* to select all top level fields in the struct:

df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*");

although for single string column, get_json_object should be more than enough:

df.select(get_json_object(col("col"), "$.myField"));
like image 159
zero323 Avatar answered Sep 19 '22 01:09

zero323