Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to execute a stored procedure in Azure Databricks PySpark?

I am able to execute a simple SQL statement using PySpark in Azure Databricks but I want to execute a stored procedure instead. Below is the PySpark code I tried.

#initialize pyspark
import findspark
findspark.init('C:\Spark\spark-2.4.5-bin-hadoop2.7')
#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
import pandas as pd

#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

table = "dbo.test"
#read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://localhost:1433;databaseName=Demo;integratedSecurity=true;") \
    .option("dbtable", table) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show the data loaded into dataframe
#jdbcDF.show()
sqlQueries="execute testJoin"
resultDF=spark.sql(sqlQueries)
resultDF.show(resultDF.count(),False)

This doesn't work — how do I do it?

like image 472
Ajay Avatar asked Oct 15 '22 05:10

Ajay


2 Answers

In case someone is still looking for a method on how to do this, it's possible to use the built-in jdbc-connector of you spark session. Following code sample will do the trick:

import msal

# Set url & credentials
jdbc_url = ...
tenant_id = ...
sp_client_id = ...
sp_client_secret = ...

# Write your SQL statement as a string
name = "Some passed value"

statement = f"""
EXEC Staging.SPR_InsertDummy
  @Name = '{name}'
"""

# Generate an OAuth2 access token for service principal
authority = f"https://login.windows.net/{tenant_id}"
app = msal.ConfidentialClientApplication(sp_client_id, sp_client_secret, authority)
token = app.acquire_token_for_client(scopes="https://database.windows.net/.default")["access_token"]

# Create a spark properties object and pass the access token
properties = spark._sc._gateway.jvm.java.util.Properties()
properties.setProperty("accessToken", token)

# Fetch the driver manager from your spark context
driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager

# Create a connection object and pass the properties object
con = driver_manager.getConnection(jdbc_url, properties)

# Create callable statement and execute it
exec_statement = con.prepareCall(statement)
exec_statement.execute()

# Close connections
exec_statement.close()
con.close()

For more information and a similar method using SQL-user credentials to connect over JDBC, or on how to take return parameters, I'd suggest you take a look at this blogpost:

https://medium.com/delaware-pro/executing-ddl-statements-stored-procedures-on-sql-server-using-pyspark-in-databricks-2b31d9276811

like image 83
Dr. Casual Avatar answered Nov 15 '22 05:11

Dr. Casual


Running a stored procedure through a JDBC connection from azure databricks is not supported as of now. But your options are:

  1. Use a pyodbc library to connect and execute your procedure. But by using this library, it means that you will be running your code on the driver node while all your workers are idle. See this article for details. https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark

  2. Use a SQL table function rather than procedures. In a sense, you can use anything that you can use in the FORM clause of a SQL query.

  3. Since you are in an azure environment, then using a combination of azure data factory (to execute your procedure) and azure databricks can help you to build pretty powerful pipelines.

like image 44
BICube Avatar answered Nov 15 '22 07:11

BICube