Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

selecting a range of elements in an array spark sql

I use spark-shell to do the below operations.

Recently loaded a table with an array column in spark-sql .

Here is the DDL for the same:

create table test_emp_arr{
    dept_id string,
    dept_nm string,
    emp_details Array<string>
}

the data looks something like this

+-------+-------+-------------------------------+
|dept_id|dept_nm|                     emp_details|
+-------+-------+-------------------------------+
|     10|Finance|[Jon, Snow, Castle, Black, Ned]|
|     20|     IT|            [Ned, is, no, more]|
+-------+-------+-------------------------------+

I can query the emp_details column something like this :

sqlContext.sql("select emp_details[0] from emp_details").show

Problem

I want to query a range of elements in the collection :

Expected query to work

sqlContext.sql("select emp_details[0-2] from emp_details").show

or

sqlContext.sql("select emp_details[0:2] from emp_details").show

Expected output

+-------------------+
|        emp_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

In pure Scala, if i have an array something as :

val emp_details = Array("Jon","Snow","Castle","Black")

I can get the elements from 0 to 2 range using

emp_details.slice(0,3)

returns me

Array(Jon, Snow,Castle)

I am not able to apply the above operation of the array in spark-sql.

Thanks

like image 284
serious_black Avatar asked Oct 19 '16 14:10

serious_black


2 Answers

Here is a solution using a User Defined Function which has the advantage of working for any slice size you want. It simply builds a UDF function around the scala builtin slice method :

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val slice = udf((array : Seq[String], from : Int, to : Int) => array.slice(from,to))

Example with a sample of your data :

val df = sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned') as emp_details")
df.withColumn("slice", slice($"emp_details", lit(0), lit(3))).show

Produces the expected output

+--------------------+-------------------+
|         emp_details|              slice|
+--------------------+-------------------+
|[Jon, Snow, Castl...|[Jon, Snow, Castle]|
+--------------------+-------------------+

You can also register the UDF in your sqlContext and use it like this

sqlContext.udf.register("slice", (array : Seq[String], from : Int, to : Int) => array.slice(from,to))
sqlContext.sql("select array('Jon','Snow','Castle','Black','Ned'),slice(array('Jon‌​','Snow','Castle','Black','Ned'),0,3)")

You won't need lit anymore with this solution

like image 58
cheseaux Avatar answered Nov 15 '22 20:11

cheseaux


Since Spark 2.4 you can use slice function. In Python):

pyspark.sql.functions.slice(x, start, length)

Collection function: returns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.

...

New in version 2.4.

from pyspark.sql.functions import slice

df = spark.createDataFrame([
    (10, "Finance", ["Jon", "Snow", "Castle", "Black", "Ned"]),
    (20, "IT", ["Ned", "is", "no", "more"])
], ("dept_id", "dept_nm", "emp_details"))

df.select(slice("emp_details", 1, 3).alias("empt_details")).show()
+-------------------+
|       empt_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

In Scala

def slice(x: Column, start: Int, length: Int): Column

Returns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.

import org.apache.spark.sql.functions.slice

val df = Seq(
    (10, "Finance", Seq("Jon", "Snow", "Castle", "Black", "Ned")),
    (20, "IT", Seq("Ned", "is", "no", "more"))
).toDF("dept_id", "dept_nm", "emp_details")

df.select(slice($"emp_details", 1, 3) as "empt_details").show
+-------------------+
|       empt_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

The same thing can be of course done in SQL

SELECT slice(emp_details, 1, 3) AS emp_details FROM df

Important:

Please note, that unlike Seq.slice, values are indexed from zero and the second argument is length, not end position.

like image 21
zero323 Avatar answered Nov 15 '22 19:11

zero323