Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I define an empty dataframe in Pyspark and append the corresponding dataframes with it?

So I want to read the csv files from a directory, as a pyspark dataframe and then append them into single dataframe. Not getting the alternative for this in pyspark, the way we do in pandas.

For example in Pandas, we do:

files=glob.glob(path +'*.csv')

df=pd.DataFrame() 

for f in files:
    dff=pd.read_csv(f,delimiter=',')
    df.append(dff)

In Pyspark I have tried this but not successful

schema=StructType([])
union_df = sqlContext.createDataFrame(sc.emptyRDD(),schema)

for f in files:
    dff = sqlContext.read.load(f,format='com.databricks.spark.csv',header='true',inferSchema='true',delimiter=',')
    df=df.union_All(dff)

Would really appreciate any help.

Thanks

like image 928
Gaurav Chawla Avatar asked Apr 10 '17 07:04

Gaurav Chawla


People also ask

How does PySpark define empty DataFrame?

In order to create an empty PySpark DataFrame manually with schema ( column names & data types) first, Create a schema using StructType and StructField . Now use the empty RDD created above and pass it to createDataFrame() of SparkSession along with the schema for column names & data types.

Can you append to empty DataFrame?

Append Data to an Empty Pandas Dataframe loc , we can also use the . append() method to add rows. The . append() method works by, well, appending a dataframe to another dataframe.


5 Answers

Define schema first, and then you can use unionAll to concatenate new dataframes to the empty one and even run iterations to combine a bunch of dataframes together.

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

sc = SparkContext(conf=SparkConf())
spark = SparkSession(sc)     # Need to use SparkSession(sc) to createDataFrame

schema = StructType([
    StructField("column1",StringType(),True),
    StructField("column2",StringType(),True)
])
empty = spark.createDataFrame(sc.emptyRDD(), schema)

empty = empty.unionAll(addOndata)
like image 173
Rong Du Avatar answered Oct 13 '22 11:10

Rong Du


you can get away with an empty DataFrame here. create an empty list and keep adding the child DataFrames to it. Once you're done with adding all the DataFrames that you want to combine, do a reduce using union to the list and it will combine all of them into one DataFrame.

list_of_dfs = []
for i in number_of_dfs:
    list_of_dfs.append(df_i)
combined_df = reduce(DataFrame.union, list_of_dfs)
like image 39
rishab137 Avatar answered Oct 13 '22 11:10

rishab137


Here is how I do it. I don't create an empty DataFrame.

def concat_spark_iterator(iterator):
    """
    :param iterator: iterator(Spark DataFrame)   
    :return: Concatenated Spark DataFrames
    """

    df = next(iterator)

    for _df in iterator:
        df = df.union(_df)

    return df
like image 33
MathLal Avatar answered Sep 23 '22 14:09

MathLal


One way for getting this done as below in spark 2.1:

files=glob.glob(path +'*.csv')

for idx,f in enumerate(files):
    if idx == 0:
        df = spark.read.csv(f,header=True,inferSchema=True)
        dff = df
    else:
        df = spark.read.csv(f,header=True,inferSchema=True)
        dff=dff.unionAll(df)
like image 14
Nim J Avatar answered Oct 13 '22 11:10

Nim J


The schema should be same when using "unionAll" on 2 dataframes. Therefore, the schema of the empty dataframe should be as per the csv schema.

For eg:

schema = StructType([
    StructField("v1", LongType(), True), StructField("v2", StringType(), False), StructField("v3", StringType(), False)
])
df = sqlContext.createDataFrame([],schema)

Or you can do like this:

f = files.pop(0)
df = sqlContext.read.load(f,format='com.databricks.spark.csv',header='true',inferSchema='true',delimiter=',')
for f in files:
    dff = sqlContext.read.load(f,format='com.databricks.spark.csv',header='true',inferSchema='true',delimiter=',')
    df=df.union_All(dff)
like image 6
Abhishek Bansal Avatar answered Oct 13 '22 12:10

Abhishek Bansal