Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert a spark structured streaming dataframe into JSON

I am reading a stream using spark structured streaming that has the structure:

col1
col2
col3

After some transformations I want to write the dataframe to the console in json format. I am trying the following approach:

df.select(to_json($"*"))
      .writeStream
      .outputMode("append")
      .format("console")
      .start()

But I get Invalid usage of '*' in expression 'structstojson';

Is there a way to join all rows into the same column in order to be able to use to_json?

The expected output is a dataframe with one column that has json data on each row:

{"col1":"val11","col2":"val12","col3":"val13"}
{"col1":"val21","col2":"val22","col3":"val23"}
like image 993
djWann Avatar asked Feb 12 '26 12:02

djWann


1 Answers

to_json has the following defintions :

def to_json(e: org.apache.spark.sql.Column): org.apache.spark.sql.Column
def to_json(e: org.apache.spark.sql.Column,options: java.util.Map[String,String]): org.apache.spark.sql.Column
def to_json(e: org.apache.spark.sql.Column,options: Map[String,String]): org.apache.spark.sql.Column

Here's our dataframe :

df.show
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   b|   c|
|   d|   e|   f|
+----+----+----+

You need to create a struct and then call to_json on it. Something like :

df.select(to_json( struct( df.columns.map(col(_)):_*  ) )  as "json").show(false)
+----------------------------------+
|json                              |
+----------------------------------+
|{"col1":"a","col2":"b","col3":"c"}|
|{"col1":"d","col2":"e","col3":"f"}|
+----------------------------------+
like image 91
philantrovert Avatar answered Feb 16 '26 05:02

philantrovert



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!