I'm using Spark 2.2 and i'm trying to read the JSON messages from Kafka, transform them to DataFrame
and have them as a Row
:
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic")
.load()
.select(col("value").cast(StringType).as("col"))
.writeStream()
.format("console")
.start();
with this I can achieve:
+--------------------+
| col|
+--------------------+
|{"myField":"somet...|
+--------------------+
I wanted something more like this:
+--------------------+
| myField|
+--------------------+
|"something" |
+--------------------+
I tried to use from_json
function using struct
:
DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("myField", DataTypes.StringType)
}
)
but I only got:
+--------------------+
| jsontostructs(col)|
+--------------------+
|[something] |
+--------------------+
then I tried to use explode
but I only got Exception saying:
cannot resolve 'explode(`col`)' due to data type mismatch:
input to function explode should be array or map type, not
StructType(StructField(...
Any idea how to make this work?
Watermarking is a feature in Spark Structured Streaming that is used to handle the data that arrives late. Spark Structured Streaming can maintain the state of the data that arrives, store it in memory, and update it accurately by aggregating it with the data that arrived late.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Spark doesn't have a distinct method that takes columns that should run distinct on however, Spark provides another signature of dropDuplicates() function which takes multiple columns to eliminate duplicates. Note that calling dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed.
You're almost there, just select the right thing. from_json
returns a struct
column matching the schema. If schema (JSON representation) looks like this:
{"type":"struct","fields":[{"name":"myField","type":"string","nullable":false,"metadata":{}}]}
you'll get nested object equivalent to:
root
|-- jsontostructs(col): struct (nullable = true)
| |-- myField: string (nullable = false)
You can use getField
(or getItem
) method to select specific field
df.select(from_json(col("col"), schema).getField("myField").alias("myField"));
or .*
to select all top level fields in the struct
:
df.select(from_json(col("col"), schema).alias("tmp")).select("tmp.*");
although for single string
column, get_json_object
should be more than enough:
df.select(get_json_object(col("col"), "$.myField"));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With