Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to extract an element from a array in pyspark

I have a data frame with following type:

col1|col2|col3|col4
xxxx|yyyy|zzzz|[1111],[2222]

I want my output to be following type:

col1|col2|col3|col4|col5
xxxx|yyyy|zzzz|1111|2222

My col4 is an array and I want to convert it to a separate column. What needs to be done?

I saw many answers with flatMap, but they are increasing a row, I want just the tuple to be put in another column but in the same row

The following is my actual schema:

root
 |-- PRIVATE_IP: string (nullable = true)
 |-- PRIVATE_PORT: integer (nullable = true)
 |-- DESTINATION_IP: string (nullable = true)
 |-- DESTINATION_PORT: integer (nullable = true)
 |-- collect_set(TIMESTAMP): array (nullable = true)
 |    |-- element: string (containsNull = true)

Also, can please someone help me with explanation on both dataframes and RDD's.

like image 774
AnmolDave Avatar asked Jul 22 '17 13:07

AnmolDave


People also ask

How do you slice in PySpark?

In this method, we are first going to make a PySpark DataFrame using createDataFrame(). We will then use randomSplit() function to get two slices of the DataFrame while specifying the fractions of rows that will be present in both slices. The rows are split up RANDOMLY.

What does .collect do in PySpark?

Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.


2 Answers

Create sample data:

from pyspark.sql import Row
x = [Row(col1="xx", col2="yy", col3="zz", col4=[123,234])]
rdd = sc.parallelize([Row(col1="xx", col2="yy", col3="zz", col4=[123,234])])
df = spark.createDataFrame(rdd)
df.show()
#+----+----+----+----------+
#|col1|col2|col3|      col4|
#+----+----+----+----------+
#|  xx|  yy|  zz|[123, 234]|
#+----+----+----+----------+

Use getItem to extract element from the array column as this, in your actual case replace col4 with collect_set(TIMESTAMP):

df = df.withColumn("col5", df["col4"].getItem(1)).withColumn("col4", df["col4"].getItem(0))
df.show()
#+----+----+----+----+----+
#|col1|col2|col3|col4|col5|
#+----+----+----+----+----+
#|  xx|  yy|  zz| 123| 234|
#+----+----+----+----+----+
like image 158
Psidom Avatar answered Oct 06 '22 12:10

Psidom


You have 4 options to extract the value inside the array:

df = spark.createDataFrame([[1, [10, 20, 30, 40]]], ['A', 'B'])
df.show()

+---+----------------+
|  A|               B|
+---+----------------+
|  1|[10, 20, 30, 40]|
+---+----------------+

from pyspark.sql import functions as F

df.select(
    "A",
    df.B[0].alias("B0"), # dot notation and index        
    F.col("B")[1].alias("B1"), # function col and index
    df.B.getItem(2).alias("B2"), # dot notation and method getItem
    F.col("B").getItem(3).alias("B3"), # function col and method getItem
).show()

+---+---+---+---+---+
|  A| B0| B1| B2| B3|
+---+---+---+---+---+
|  1| 10| 20| 30| 40|
+---+---+---+---+---+

In case you have many columns, use a list comprehension:

df.select(
    'A', *[F.col('B')[i].alias(f'B{i}') for i in range(4)]
).show()

+---+---+---+---+---+
|  A| B0| B1| B2| B3|
+---+---+---+---+---+
|  1| 10| 20| 30| 40|
+---+---+---+---+---+
like image 3
Mykola Zotko Avatar answered Oct 06 '22 11:10

Mykola Zotko