I would like to do some cleanup at the start of my Spark program (Pyspark). For example, I would like to delete data from previous HDFS run. In pig this can be done using commands such as
fs -copyFromLocal ....
rmf /path/to-/hdfs
or locally using sh command.
I was wondering how to do the same with Pyspark.
If using external libraries is not an issue, another way to interact with HDFS from PySpark is by simply using a raw Python library. Examples are the hdfs lib, or snakebite from Spotify:
PySpark (Spark with python) default comes with an interactive pyspark shell command (with several options) that is used to learn, test PySpark examples and analyze data from the command line. Since Spark supports Scala, Python, R, and Java, It provides different shells for each language.
PySpark (Spark with python) default comes with an interactive pyspark shell command (with several options) that is used to learn, test PySpark examples and analyze data from the command line. Since Spark supports Scala, Python, R, and Java, It provides different shells for each language. But for Java, there is no shell.
In this tutorial, we will use PySpark which as its name suggests uses the Spark framework. Remember here that Spark is not a programming language but a distributed computing environment or framework. By nature it is therefore widely used with Hadoop. We will read and write data to hadoop.
You can delete an hdfs
path in PySpark
without using third party dependencies as follows:
from pyspark.sql import SparkSession
# example of preparing a spark session
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
# Prepare a FileSystem manager
fs = (sc._jvm.org
.apache.hadoop
.fs.FileSystem
.get(sc._jsc.hadoopConfiguration())
)
path = "Your/hdfs/path"
# use the FileSystem manager to remove the path
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)
To improve one step further, you can wrap the above idea into a helper function that you can re-use across jobs/packages:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
def delete_path(spark, path):
sc = spark.sparkContext
fs = (sc._jvm.org
.apache.hadoop
.fs.FileSystem
.get(sc._jsc.hadoopConfiguration())
)
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)
delete_path(spark, "Your/hdfs/path")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With