Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to merge pyspark and pandas dataframes

I have a very large pyspark dataframe and a smaller pandas dataframe which I read in as follows:

df1 = spark.read.csv("/user/me/data1/")
df2 = pd.read_csv("data2.csv")

Both dataframes include columns labelled "A" and "B". I would like to create another pyspark dataframe with only those rows from df1 where the entries in columns "A" and "B" occur in those columns with the same name in df2. That is to filter df1 using columns "A" and "B" of df2.

Normally I think this would be a join (implemented with merge) but how do you join a pandas dataframe with a pyspark one?

I can't afford to convert df1 to a pandas dataframe.

like image 592
graffe Avatar asked Sep 19 '17 15:09

graffe


1 Answers

you can either pass the schema while converting from pandas dataframe to pyspark dataframe like this:

from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
df = sqlContext.createDataFrame(pandas_dataframe, schema)

or you can use the hack i have used in this function:

def create_spark_dataframe(file_name):
    """
    will return the spark dataframe input pandas dataframe
    """
    pandas_data_frame = pd.read_csv(file_name)
    for col in pandas_data_frame.columns:
      if ((pandas_data_frame[col].dtypes != np.int64) & (pandas_data_frame[col].dtypes != np.float64)):
        pandas_data_frame[col] = pandas_data_frame[col].fillna('')

    spark_data_frame = sqlContext.createDataFrame(pandas_data_frame)
    return spark_data_frame
like image 50
Ankit Kumar Namdeo Avatar answered Oct 27 '22 00:10

Ankit Kumar Namdeo