Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to add Row id in pySpark dataframes [duplicate]

Tags:

I have a csv file; which i convert to DataFrame(df) in pyspark; after some transformation; I want to add a column in df; which should be simple row id (starting from 0 or 1 to N).

I converted df in rdd and use "zipwithindex". I converted resulting rdd back to df. this approach works but it generated 250k tasks and takes a lot of time in execution. I was wondering if there is other way to do it which takes less runtime.

following is snippet of my code; the csv file I am processing is BIG; contains billions of rows.

debug_csv_rdd = (sc.textFile("debug.csv")   .filter(lambda x: x.find('header') == -1)   .map(lambda x : x.replace("NULL","0")).map(lambda p: p.split(','))   .map(lambda x:Row(c1=int(x[0]),c2=int(x[1]),c3=int(x[2]),c4=int(x[3]))))  debug_csv_df = sqlContext.createDataFrame(debug_csv_rdd) debug_csv_df.registerTempTable("debug_csv_table") sqlContext.cacheTable("debug_csv_table")  r0 = sqlContext.sql("SELECT c2 FROM debug_csv_table WHERE c1 = 'str'") r0.registerTempTable("r0_table")  r0_1 = (r0.flatMap(lambda x:x)     .zipWithIndex()     .map(lambda x: Row(c1=x[0],id=int(x[1]))))  r0_df=sqlContext.createDataFrame(r0_2) r0_df.show(10)  
like image 232
ankit patel Avatar asked Aug 19 '15 04:08

ankit patel


People also ask

How do I add a row ID in Pyspark?

In order to populate row number in pyspark we use row_number() Function. row_number() function along with partitionBy() of other column populates the row number by group.

How do I get row index in Pyspark?

Method 1: Using collect() This is used to get the all row's data from the dataframe in list format. Where, dataframe is the pyspark dataframe. index_position is the index row in dataframe.

How do you add monotonically increasing ID Pyspark?

Use monotonically_increasing_id() for unique, but not consecutive numbers. The monotonically_increasing_id() function generates monotonically increasing 64-bit integers. The generated id numbers are guaranteed to be increasing and unique, but they are not guaranteed to be consecutive.


1 Answers

You can use also use a function from sql package. It will generate a unique id, however it will not be sequential as it depends on the number of partitions. I believe it is available in Spark 1.5 +

from pyspark.sql.functions import monotonicallyIncreasingId  # This will return a new DF with all the columns + id res = df.withColumn("id", monotonicallyIncreasingId()) 

Edit: 19/1/2017

As commented by @Sean

Use monotonically_increasing_id() instead from Spark 1.6 and on

like image 70
Arkadi T Avatar answered Oct 27 '22 23:10

Arkadi T