Consider the following DataFrame:
#+------+---+
#|letter|rpt|
#+------+---+
#| X| 3|
#| Y| 1|
#| Z| 2|
#+------+---+
which can be created using the following code:
df = spark.createDataFrame([("X", 3),("Y", 1),("Z", 2)], ["letter", "rpt"])
Suppose I wanted to repeat each row the number of times specified in the column rpt
, just like in this question.
One way would be to replicate my solution to that question using the following pyspark-sql
query:
query = """
SELECT *
FROM
(SELECT DISTINCT *,
posexplode(split(repeat(",", rpt), ",")) AS (index, col)
FROM df) AS a
WHERE index > 0
"""
query = query.replace("\n", " ") # replace newlines with spaces, avoid EOF error
spark.sql(query).drop("col").sort('letter', 'index').show()
#+------+---+-----+
#|letter|rpt|index|
#+------+---+-----+
#| X| 3| 1|
#| X| 3| 2|
#| X| 3| 3|
#| Y| 1| 1|
#| Z| 2| 1|
#| Z| 2| 2|
#+------+---+-----+
This works and produces the correct answer. However, I am unable to replicate this behavior using the DataFrame API functions.
I tried:
import pyspark.sql.functions as f
df.select(
f.posexplode(f.split(f.repeat(",", f.col("rpt")), ",")).alias("index", "col")
).show()
But this results in:
TypeError: 'Column' object is not callable
Why am I able to pass the column as an input to repeat
within the query, but not from the API? Is there a way to replicate this behavior using the spark DataFrame functions?
Method 1: Using Lit() function Select table by using select() method and pass the arguments first one is the column name, or “*” for selecting the whole table and second argument pass the lit() function with constant values.
A column that will be computed based on the data in a DataFrame . A new column is constructed based on the input columns present in a dataframe: df("columnName") // On a specific DataFrame. col("columnName") // A generic column no yet associcated with a DataFrame.
You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.
One option is to use pyspark.sql.functions.expr
, which allows you to use columns values as inputs to spark-sql functions.
Based on @user8371915's comment I have found that the following works:
from pyspark.sql.functions import expr
df.select(
'*',
expr('posexplode(split(repeat(",", rpt), ","))').alias("index", "col")
).where('index > 0').drop("col").sort('letter', 'index').show()
#+------+---+-----+
#|letter|rpt|index|
#+------+---+-----+
#| X| 3| 1|
#| X| 3| 2|
#| X| 3| 3|
#| Y| 1| 1|
#| Z| 2| 1|
#| Z| 2| 2|
#+------+---+-----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With