Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to subclass DataFrame in Pyspark?

The documentation for Pyspark shows DataFrames being constructed from sqlContext, sqlContext.read(), and a variety of other methods.

(See https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html)

Is it possible to subclass Dataframe and instantiate it independently? I would like to add methods and functionality to the base DataFrame class.

like image 955
jerzy Avatar asked Jan 11 '17 18:01

jerzy


People also ask

How do you change data types in PySpark?

In PySpark, you can cast or change the DataFrame column data type using cast() function of Column class, in this article, I will be using withColumn(), selectExpr() , and SQL expression to cast the from String to Int (Integer Type), String to Boolean e.t.c using PySpark examples.

Are PySpark Dataframes immutable?

While Pyspark derives its basic data types from Python, its own data structures are limited to RDD, Dataframes, Graphframes. These data frames are immutable and offer reduced flexibility during row/column level handling, as compared to Python.

Which is faster pandas or PySpark?

Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often required to covert Pandas DataFrame to PySpark (Spark with Python) for better performance. This is one of the major differences between Pandas vs PySpark DataFrame.


1 Answers

It really depends on your goals.

  • Technically speaking it is possible. pyspark.sql.DataFrame is just a plain Python class. You can extend it or monkey-patch if you need.

    from pyspark.sql import DataFrame
    
    class DataFrameWithZipWithIndex(DataFrame):
         def __init__(self, df):
             super(self.__class__, self).__init__(df._jdf, df.sql_ctx)
    
         def zipWithIndex(self):
             return (self.rdd
                 .zipWithIndex()
                 .map(lambda row: (row[1], ) + row[0])
                 .toDF(["_idx"] + self.columns))
    

    Example usage:

    df = sc.parallelize([("a", 1)]).toDF(["foo", "bar"])
    
    with_zipwithindex = DataFrameWithZipWithIndex(df)
    
    isinstance(with_zipwithindex, DataFrame)
    
    True
    
    with_zipwithindex.zipWithIndex().show()
    
    +----+---+---+
    |_idx|foo|bar|
    +----+---+---+
    |   0|  a|  1|
    +----+---+---+
    
  • Practically speaking you won't be able to do much here. DataFrame is an thin wrapper around JVM object and doesn't do much beyond providing docstrings, converting arguments to the form required natively, calling JVM methods, and wrapping the results using Python adapters if necessary.

    With plain Python code you won't be able to even go near DataFrame / Dataset internals or modify its core behavior. If you're looking for standalone, Python only Spark DataFrame implementation it is not possible.

like image 184
zero323 Avatar answered Oct 14 '22 05:10

zero323