Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pass additional arguments to foreachBatch in pyspark

I am using foreachBatch in pyspark structured streaming to write each microbatch to SQL Server using JDBC. I need to use the same process for several tables, and I'd like to reuse the same writer function by adding an additional argument for table name, but I'm not sure how to pass the table name argument.

The example here is pretty helpful, but in the python example the table name is hardcoded, and it looks like in the scala example they're referencing a global variable(?) I would like to pass the name of the table into the function.

The function given in the python example at the link above is:

def writeToSQLWarehose(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

I'd like to use something like this:

def writeToSQLWarehose(df, epochId, tableName):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", tableName) \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

But I'm not sure how to pass the additional argument through foreachBatch.

like image 511
nstudenski Avatar asked May 03 '19 16:05

nstudenski


2 Answers

Something like this should work.

streamingDF.writeStream.foreachBatch(lambda df,epochId: writeToSQLWarehose(df, epochId,tableName )).start()
like image 102
Samellas Avatar answered Oct 22 '22 09:10

Samellas


Samellas' solution does not work if you need to run multiple streams. The foreachBatch function gets serialised and sent to Spark worker. The parameter seems to be still a shared variable within the worker and may change during the execution.

My solution is to add parameter as a literate column in the batch dataframe (passing a silver data lake table path to the merge operation):

.withColumn("dl_tablePath", func.lit(silverPath))
.writeStream.format("delta")
.foreachBatch(insertIfNotExisting)

In the batch function insertIfNotExisting, I pick up the parameter and drop the parameter column:

def insertIfNotExisting(batchDf, batchId):
  tablePath = batchDf.select("dl_tablePath").limit(1).collect()[0][0]
  realDf = batchDf.drop("dl_tablePath")
like image 2
Teemu Koivisto Avatar answered Oct 22 '22 08:10

Teemu Koivisto