Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

add one column including values from 1 to n in dataframe

Tags:

pyspark

I am creating a dataframe with pyspark, like this:

+----+------+
|   k|     v|
+----+------+
|key1|value1|
|key1|value1|
|key1|value1|
|key2|value1|
|key2|value1|
|key2|value1|
+----+------+

I want to add one 'rowNum' column using 'withColumn' method, the result of dataframe changed like this:

+----+------+------+
|   k|     v|rowNum|
+----+------+------+
|key1|value1|     1|
|key1|value1|     2|
|key1|value1|     3|
|key2|value1|     4|
|key2|value1|     5|
|key2|value1|     6|
+----+------+------+

the range of rowNum is from 1 to n, n is equal to number of raws. I modified my code, like this:

from pyspark.sql.window import Window
from pyspark.sql import functions as F
w = Window().partitionBy("v").orderBy('k')
my_df= my_df.withColumn("rowNum", F.rowNumber().over(w))

But, I got error message:

'module' object has no attribute 'rowNumber' 

I replaced rowNumber() method with row_number, the above code can run. But, When I run code:

my_df.show()

I got error message again:

Py4JJavaError: An error occurred while calling o898.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: row_number()
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
    at org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate.doGenCode(interfaces.scala:342)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
    at scala.Option.getOrElse(Option.scala:121)
like image 773
Ivan Lee Avatar asked Mar 09 '17 08:03

Ivan Lee


People also ask

How do I add a column to a DataFrame with a constant value?

Add a DataFrame Column with constant value using '+' operator. We can use the '+' operator to add a constant number to each element of a DataFrame column. We can assign these new Using this approach you can also append a constant string to each element of string column.


3 Answers

Solution in Spark 2.2:

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("rowNum", row_number().over(w))
like image 71
cph_sto Avatar answered Oct 23 '22 01:10

cph_sto


If you require require a sequential rowNum value from 1 to n, rather than a monotonically_increasing_id you can use zipWithIndex()

Recreating your example data as follows:

rdd = sc.parallelize([('key1','value1'),
                      ('key1','value1'),
                      ('key1','value1'),
                      ('key1','value1'),
                      ('key1','value1'),
                      ('key1','value1')])

You can then use zipWithIndex() to add an index to each row. The map is used to reformat the data and to add 1 to the index so it starts at 1.

rdd_indexed = rdd.zipWithIndex().map(lambda x: (x[0][0],x[0][1],x[1]+1))
df = rdd_indexed.toDF(['id','score','rowNum'])
df.show()


+----+------+------+
|  id| score|rowNum|
+----+------+------+
|key1|value1|     1|
|key1|value1|     2|
|key1|value1|     3|
|key1|value1|     4|
|key1|value1|     5|
|key1|value1|     6|
+----+------+------+
like image 30
Alex Avatar answered Oct 23 '22 01:10

Alex


You can do this with windows

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
your_df= your_df.withColumn("rowNum", rowNumber().over(w))

Here your_df is data frame in which you need this column.

like image 1
Rakesh Kumar Avatar answered Oct 23 '22 00:10

Rakesh Kumar