Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use foreach or foreachBatch in PySpark to write to database?

I want to do Spark Structured Streaming (Spark 2.4.x) from a Kafka source to a MariaDB with Python (PySpark).

I want to use the streamed Spark dataframe and not the static nor Pandas dataframe.

It seems that one has to use foreach or foreachBatch since there are no possible database sinks for streamed dataframes according to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks.

Here is my try:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = {"user":"writer", "password":"1234"}
# schema
schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])

# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()

# create DataFrame representing the stream
df = spark.readStream \
  .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mytopic") \
  .load() \
  .selectExpr("Timestamp", "cast (value as string) as json") \
  .select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
  .selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ", df2.isStreaming)

def process_row(row):
    # Process row
    row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreach(process_row).start()

I get an error:

AttributeError: write

Why?

like image 736
tardis Avatar asked Nov 08 '19 12:11

tardis


People also ask

How do you use a foreach in Pyspark?

Example of PySpark foreachLet's first create a DataFrame in Python. Now let's create a simple function first that will print all the elements in and will pass it in a For Each Loop. This is a simple Print function that prints all the data in a DataFrame. Let's iterate over all the elements using for Each loop.

What is readStream in Spark?

Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

Is foreach action in Spark?

Hi, foreach() operation is an action. It does not return any value. It executes input function on each element of an RDD.

What is foreach in pyspark?

The PySpark ForEach Function returns only those elements which meet up the condition provided in the function of the For Each Loop. A simple function that applies to each and every element in a data frame is applied to every element in a For Each Loop. ForEach partition is also used to apply to each and every partition in RDD.

What is the difference between foreach and foreachbatch in spark?

If you really need support from Spark (and do use write.jdbc) you should actually use foreachBatch. while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

What is foreach partition in spark?

ForEach partition is also used to apply to each and every partition in RDD. We can create a function and pass it with for each loop in pyspark to apply it over all the functions in Spark. This is an action operation in Spark used for Data processing in Spark.

What is the use of foreach (~) method in Dataframe?

Given these limitations, the foreach (~) method is mainly used for logging some information about each row to the local machine or to an external database. The function to apply to each row ( Row) of the DataFrame. Nothing is returned. Here, the row.name is printed in the worker nodes so you would not see any output in the driver program.


Video Answer


2 Answers

tl;dr Replace foreach with foreachBatch.


Quoting the official documentation:

The foreach and foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

In other words, your writeStream.foreach(process_row) acts on a single row (of data) that has no write.jdbc available and hence the error.

Think of the row as a piece of data that you can save anywhere you want using any API you want.

If you really need support from Spark (and do use write.jdbc) you should actually use foreachBatch.

while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

like image 66
Jacek Laskowski Avatar answered Nov 15 '22 10:11

Jacek Laskowski


With the support of Jacek, I could fix my example:

def process_row(df, epoch_id):
    df2.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreachBatch(process_row).start()

You also must put the epoch_id into the function parameters. Otherwise you get errors in the spark log file that are not shown in the jupyter notebook.

like image 32
tardis Avatar answered Nov 15 '22 11:11

tardis