Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting Pandas DataFrame to Spark DataFrame

I had asked a previous question about how to Convert scipy sparse matrix to pyspark.sql.dataframe.DataFrame, and made some progress after reading the answer provided, as well as this article. I eventually came to the following code for converting a scipy.sparse.csc_matrix to a pandas dataframe:

df = pd.DataFrame(csc_mat.todense()).to_sparse(fill_value=0)
df.columns = header

I then tried converting the pandas dataframe to a spark dataframe using the suggested syntax:

spark_df = sqlContext.createDataFrame(df)

However, I get back the following error:

ValueError: cannot create an RDD from type: <type 'list'>

I do not believe it has anything to do with the sqlContext as I was able to convert another pandas dataframe of roughly the same size to a spark dataframe, no problem. Any thoughts?

like image 878
Dirigo Avatar asked Nov 03 '16 21:11

Dirigo


People also ask

Can pandas DataFrame convert to Spark DataFrame?

Convert Pandas to PySpark (Spark) DataFrameSpark provides a createDataFrame(pandas_dataframe) method to convert pandas to Spark DataFrame, Spark by default infers the schema based on the pandas data types to PySpark data types. If you want all data types to String use spark. createDataFrame(pandasDF. astype(str)) .

What is the difference between a Spark and a Pandas DataFrame?

In very simple words Pandas run operations on a single machine whereas PySpark runs on multiple machines. If you are working on a Machine Learning application where you are dealing with larger datasets, PySpark is a best fit which could processes operations many times(100x) faster than Pandas.

Can Spark replace pandas?

Conclusion. Do not try to replace Pandas with Spark, they are complementary to each other and have each their pros and cons. Whether to use Pandas or Spark depends on your use case. For most Machine Learning tasks, you probably will eventually use Pandas, even if you do your preprocessing with Spark.


1 Answers

I am not sure if this question is still relevant to the current version of pySpark, but here is the solution I worked out a couple weeks after posting this question. The code is rather ugly and possibly inefficient, but I am posting it here due to the continued interest in this question.:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark import SparkConf
from py4j.protocol import Py4JJavaError

myConf = SparkConf(loadDefaults=True)
sc = SparkContext(conf=myConf)
hc = HiveContext(sc)


def chunks(lst, k):
    """Yield k chunks of close to equal size"""
    n = len(lst) / k
    for i in range(0, len(lst), n):
        yield lst[i: i + n]


def reconstruct_rdd(lst, num_parts):
    partitions = chunks(lst, num_parts)
    for part in range(0, num_parts - 1):
        print "Partition ", part, " started..."
        partition = next(partitions)    # partition is a list of lists
        if part == 0:
            prime_rdd = sc.parallelize(partition)
        else:
            second_rdd = sc.parallelize(partition)
            prime_rdd = prime_rdd.union(second_rdd)
        print "Partition ", part, " complete!"
    return prime_rdd


def build_col_name_list(len_cols):
    name_lst = []
    for i in range(1, len_cols):
        idx = "_" + str(i)
        name_lst.append(idx)
    return name_lst


def set_spark_df_header(header, sdf):
    oldColumns = build_col_name_lst(len(sdf.columns))
    newColumns = header
    sdf = reduce(lambda sdf, idx: sdf.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sdf)
    return sdf


def convert_pdf_matrix_to_sdf(pdf, sdf_header, num_of_parts):
    try:
        sdf = hc.createDataFrame(pdf)
    except ValueError:
        lst = pdf.values.tolist()   #Need to convert to list of list to parallelize
        try:
            rdd = sc.parallelize(lst)
        except Py4JJavaError:
            rdd = reconstruct_rdd(lst, num_of_parts)
            sdf = hc.createDataFrame(rdd)
            sdf = set_spark_df_header(sdf_header, sdf)
    return sdf
like image 184
Dirigo Avatar answered Sep 28 '22 01:09

Dirigo