I have a very large pyspark data frame. I need to convert the dataframe into a JSON formatted string for each row then publish the string to a Kafka topic. I originally used the following code.
for message in df.toJSON().collect():
kafkaClient.send(message)
However the dataframe is very large so it fails when trying to collect()
.
I was thinking of using a UDF
since it processes it row by row.
from pyspark.sql.functions import udf, struct
def get_row(row):
json = row.toJSON()
kafkaClient.send(message)
return "Sent"
send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()
But I am getting an error because the column is inputed to the function and not the row.
For illustrative purposes, we can use the df below where we can assume Col1 and Col2 must be send over.
df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])
The JSON string for each row:
'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'
The to_json() function in PySpark is defined as to converts the MapType or Struct type to JSON string. The json_tuple() function in PySpark is defined as extracting the Data from JSON and then creating them as the new columns.
Use the json.loads() function. The json. loads() function accepts as input a valid string and converts it to a Python dictionary. This process is called deserialization – the act of converting a string to an object.
pyspark.sql.functions. explode (col)[source] Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.
You cannot use select
like this. Use foreach
/ foreachPartition
:
import json
def send(part):
kafkaClient = ...
for r in part:
kafkaClient.send(json.dumps(r.asDict()))
If you need diagnostic information just use Accumulator
.
In current releases I would use Kafka source directly (2.0 and later):
from pyspark.sql.functions import to_json, struct
(df.select(to_json(struct([df[x] for x in df.columns])).alias("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("topic", topic)
.save())
You'll need Kafka SQL package for example:
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.1
Here is an approach that should work for you.
Collect the column names (keys) and the column values into lists (values) for each row. Then rearrange these into a list of key-value-pair tuples to pass into the dict
constructor. Finally, convert the dict
to a string using json.dumps()
.
Collect Keys and Values into Lists
Collect the column names and the values into a single list, but interleave the keys and values.
import pyspark.sql.functions as f
def kvp(cols, *args):
a = cols
b = map(str, args)
c = a + b
c[::2] = a
c[1::2] = b
return c
kvp_udf = lambda cols: f.udf(lambda *args: kvp(cols, *args), ArrayType(StringType()))
df.withColumn('kvp', kvp_udf(df.columns)(*df.columns)).show()
#+----+----+------------------+
#|Col1|Col2| kvp|
#+----+----+------------------+
#| A| 1|[Col1, A, Col2, 1]|
#| B| 2|[Col1, B, Col2, 2]|
#| D| 3|[Col1, D, Col2, 3]|
#+----+----+------------------+
Pass the Key-Value-Pair column into dict
constructor
Use json.dumps()
to convert the dict
into JSON string.
import json
df.withColumn('kvp', kvp_udf(df.columns)(*df.columns))\
.select(
f.udf(lambda x: json.dumps(dict(zip(x[::2],x[1::2]))), StringType())(f.col('kvp'))\
.alias('json')
)\
.show(truncate=False)
#+--------------------------+
#|json |
#+--------------------------+
#|{"Col2": "1", "Col1": "A"}|
#|{"Col2": "2", "Col1": "B"}|
#|{"Col2": "3", "Col1": "D"}|
#+--------------------------+
Note: Unfortunately, this will convert all datatypes to strings.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With