So I want to read the csv files from a directory, as a pyspark dataframe and then append them into single dataframe. Not getting the alternative for this in pyspark, the way we do in pandas.
For example in Pandas, we do:
files=glob.glob(path +'*.csv')
df=pd.DataFrame()
for f in files:
dff=pd.read_csv(f,delimiter=',')
df.append(dff)
In Pyspark I have tried this but not successful
schema=StructType([])
union_df = sqlContext.createDataFrame(sc.emptyRDD(),schema)
for f in files:
dff = sqlContext.read.load(f,format='com.databricks.spark.csv',header='true',inferSchema='true',delimiter=',')
df=df.union_All(dff)
Would really appreciate any help.
Thanks
In order to create an empty PySpark DataFrame manually with schema ( column names & data types) first, Create a schema using StructType and StructField . Now use the empty RDD created above and pass it to createDataFrame() of SparkSession along with the schema for column names & data types.
Append Data to an Empty Pandas Dataframe loc , we can also use the . append() method to add rows. The . append() method works by, well, appending a dataframe to another dataframe.
Define schema first, and then you can use unionAll to concatenate new dataframes to the empty one and even run iterations to combine a bunch of dataframes together.
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sc) # Need to use SparkSession(sc) to createDataFrame
schema = StructType([
StructField("column1",StringType(),True),
StructField("column2",StringType(),True)
])
empty = spark.createDataFrame(sc.emptyRDD(), schema)
empty = empty.unionAll(addOndata)
you can get away with an empty DataFrame here. create an empty list and keep adding the child DataFrames to it. Once you're done with adding all the DataFrames that you want to combine, do a reduce using union to the list and it will combine all of them into one DataFrame.
list_of_dfs = []
for i in number_of_dfs:
list_of_dfs.append(df_i)
combined_df = reduce(DataFrame.union, list_of_dfs)
Here is how I do it. I don't create an empty DataFrame.
def concat_spark_iterator(iterator):
"""
:param iterator: iterator(Spark DataFrame)
:return: Concatenated Spark DataFrames
"""
df = next(iterator)
for _df in iterator:
df = df.union(_df)
return df
One way for getting this done as below in spark 2.1:
files=glob.glob(path +'*.csv')
for idx,f in enumerate(files):
if idx == 0:
df = spark.read.csv(f,header=True,inferSchema=True)
dff = df
else:
df = spark.read.csv(f,header=True,inferSchema=True)
dff=dff.unionAll(df)
The schema should be same when using "unionAll" on 2 dataframes. Therefore, the schema of the empty dataframe should be as per the csv schema.
For eg:
schema = StructType([
StructField("v1", LongType(), True), StructField("v2", StringType(), False), StructField("v3", StringType(), False)
])
df = sqlContext.createDataFrame([],schema)
Or you can do like this:
f = files.pop(0)
df = sqlContext.read.load(f,format='com.databricks.spark.csv',header='true',inferSchema='true',delimiter=',')
for f in files:
dff = sqlContext.read.load(f,format='com.databricks.spark.csv',header='true',inferSchema='true',delimiter=',')
df=df.union_All(dff)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With