I am creating a dataframe with pyspark, like this:
+----+------+
| k| v|
+----+------+
|key1|value1|
|key1|value1|
|key1|value1|
|key2|value1|
|key2|value1|
|key2|value1|
+----+------+
I want to add one 'rowNum' column using 'withColumn' method, the result of dataframe changed like this:
+----+------+------+
| k| v|rowNum|
+----+------+------+
|key1|value1| 1|
|key1|value1| 2|
|key1|value1| 3|
|key2|value1| 4|
|key2|value1| 5|
|key2|value1| 6|
+----+------+------+
the range of rowNum is from 1 to n, n is equal to number of raws. I modified my code, like this:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
w = Window().partitionBy("v").orderBy('k')
my_df= my_df.withColumn("rowNum", F.rowNumber().over(w))
But, I got error message:
'module' object has no attribute 'rowNumber'
I replaced rowNumber() method with row_number, the above code can run. But, When I run code:
my_df.show()
I got error message again:
Py4JJavaError: An error occurred while calling o898.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: row_number()
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
at org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate.doGenCode(interfaces.scala:342)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
at scala.Option.getOrElse(Option.scala:121)
Add a DataFrame Column with constant value using '+' operator. We can use the '+' operator to add a constant number to each element of a DataFrame column. We can assign these new Using this approach you can also append a constant string to each element of string column.
Solution in Spark 2.2:
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("rowNum", row_number().over(w))
If you require require a sequential rowNum
value from 1 to n, rather than a monotonically_increasing_id
you can use zipWithIndex()
Recreating your example data as follows:
rdd = sc.parallelize([('key1','value1'),
('key1','value1'),
('key1','value1'),
('key1','value1'),
('key1','value1'),
('key1','value1')])
You can then use zipWithIndex()
to add an index to each row. The map
is used to reformat the data and to add 1 to the index so it starts at 1.
rdd_indexed = rdd.zipWithIndex().map(lambda x: (x[0][0],x[0][1],x[1]+1))
df = rdd_indexed.toDF(['id','score','rowNum'])
df.show()
+----+------+------+
| id| score|rowNum|
+----+------+------+
|key1|value1| 1|
|key1|value1| 2|
|key1|value1| 3|
|key1|value1| 4|
|key1|value1| 5|
|key1|value1| 6|
+----+------+------+
You can do this with windows
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
your_df= your_df.withColumn("rowNum", rowNumber().over(w))
Here your_df is data frame in which you need this column.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With