Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to select last row and also how to access PySpark dataframe by index?

From a PySpark SQL dataframe like

name age city
abc   20  A
def   30  B

How to get the last row.(Like by df.limit(1) I can get first row of dataframe into new dataframe).

And how can I access the dataframe rows by index.like row no. 12 or 200 .

In pandas I can do

df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]

I am just curious how to access pyspark dataframe in such ways or alternative ways.

Thanks

like image 623
Satya Avatar asked Sep 17 '16 08:09

Satya


People also ask

How do you get the last row in Pyspark DataFrame?

Use tail() action to get the Last N rows from a DataFrame, this returns a list of class Row for PySpark and Array[Row] for Spark with Scala.

How do I select a specific row in Pyspark?

Method 6: Using select() with collect() method This method is used to select a particular row from the dataframe, It can be used with collect() function. where, dataframe is the pyspark dataframe. Columns is the list of columns to be displayed in each row.

How do you get the last value in a column in Pyspark DataFrame?

Pick any random value from the column -> take(1) is fine. You have some kind of order and then you can use sort function and the asc parameter to reverse the sorting.

How do you index the last column in Python?

Use iloc[] to select last column of pandas dataframe. Use [] to select last column of pandas dataframe. Use tail() to select last column of pandas dataframe. Get last column of pandas dataframe as list on python.


3 Answers

How to get the last row.

Long and ugly way which assumes that all columns are oderable:

from pyspark.sql.functions import (
    col, max as max_, struct, monotonically_increasing_id
)

last_row = (df
    .withColumn("_id", monotonically_increasing_id())
    .select(max(struct("_id", *df.columns))
    .alias("tmp")).select(col("tmp.*"))
    .drop("_id"))

If not all columns can be order you can try:

with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]

with_id.where(col("_id") == i).drop("_id")

Note. There is last function in pyspark.sql.functions/ `o.a.s.sql.functions but considering description of the corresponding expressions it is not a good choice here.

how can I access the dataframe rows by index.like

You cannot. Spark DataFrame and accessible by index. You can add indices using zipWithIndex and filter later. Just keep in mind this O(N) operation.

like image 69
zero323 Avatar answered Oct 29 '22 23:10

zero323


How to get the last row.

If you have a column that you can use to order dataframe, for example "index", then one easy way to get the last record is using SQL: 1) order your table by descending order and 2) take 1st value from this order

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()

And how can I access the dataframe rows by index.like row no. 12 or 200 .

Similar way you can get record in any line

row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()

If you do not have "index" column you can create it using

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("index", monotonically_increasing_id())
like image 39
Danylo Zherebetskyy Avatar answered Oct 29 '22 23:10

Danylo Zherebetskyy


from pyspark.sql import functions as F

expr = [F.last(col).alias(col) for col in df.columns]

df.agg(*expr)

Just a tip: Looks like you still have the mindset of someone who is working with pandas or R. Spark is a different paradigm in the way we work with data. You don't access data inside individual cells anymore, now you work with whole chunks of it. If you keep collecting stuff and doing actions, like you just did, you lose the whole concept of parallelism that spark provide. Take a look on the concept of transformations vs actions in Spark.

like image 36
Henrique Florencio Avatar answered Oct 29 '22 22:10

Henrique Florencio