Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Appending data to an empty dataframe

I am creating an empty dataframe and later trying to append another data frame to that. In fact I want to append many dataframes to the initially empty dataframe dynamically depending on number of RDDs coming.

the union() function works fine if I assign the value to another a third dataframe.

val df3=df1.union(df2)

But I want to keep appending to the initial dataframe (empty) I created because I want to store all the RDDs in one dataframe. The below code however does not show right counts. It seems that it simply did not append

df1.union(df2)

df1.count() // this shows 0 although df2 has some data and that is shown if I assign to third datafram. 

If I do the below (I get reassignment error since df1 is val. And if I change it to var type, I get kafka multithreading not safe error.

df1=d1.union(df2) 

Any idea how to add all the dynamically created dataframes to one initially created data frame?

like image 575
omer Avatar asked May 02 '26 07:05

omer


2 Answers

Not sure if this is what you are looking for!

# Import pyspark functions
from pyspark.sql.types import StructType, StructField, IntegerType, StringType 

# Define your schema
field = [StructField("Col1",StringType(), True), StructField("Col2", IntegerType(), True)]
schema = StructType(field)

# Your empty data frame
df = spark.createDataFrame(sc.emptyRDD(), schema)

l = []

for i in range(5):
    # Build and append to the list dynamically
    l = l + [([str(i), i])]

    # Create a temporary data frame similar to your original schema
    temp_df = spark.createDataFrame(l, schema)

    # Do the union with the original data frame
    df = df.union(temp_df)
df.show()
like image 164
Rudr Avatar answered May 04 '26 01:05

Rudr


DataFrames and other distributed data structures are immutable, therefore methods which operate on them always return new object. There is no appending, no modification in place, and no ALTER TABLE equivalent.

And if I change it to var type, I get kafka multithreading not safe error.

Without actual code is impossible to give you a definitive answer, but it is unlikely related to union code.

There is a number of known Spark bugs cause by incorrect internal implementation (SPARK-19185, SPARK-23623 to enumerate just a few).

like image 20
user9735649 Avatar answered May 04 '26 00:05

user9735649



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!