Context: I have a DataFrame
with 2 columns: word and vector. Where the column type of "vector" is VectorUDT
.
An Example:
word | vector
assert | [435,323,324,212...]
And I want to get this:
word | v1 | v2 | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....
Question:
How can I split a column with vectors in several columns for each dimension using PySpark ?
Thanks in advance
Interpretable and CSV writable One Hot Encoding in PySpark To create an interpretable One Hot Encoder, we need to create a separate column for each distinct value. This is easily done using pyspark dataframe's in-built withColumn function by passing a UDF (user-defined function) as a parameter.
A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0] .
(1) The get_dummies can't handle the unknown category during the transformation natively. You have to apply some techniques to handle it. But it is not efficient. On the other hand, OneHotEncoder will natively handle unknown categories.
A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0.
Spark >= 3.0.0
Since Spark 3.0.0 this can be done without using UDF.
from pyspark.ml.functions import vector_to_array
(df
.withColumn("xs", vector_to_array("vector")))
.select(["word"] + [col("xs")[i] for i in range(3)]))
## +-------+-----+-----+-----+
## | word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert| 1.0| 2.0| 3.0|
## |require| 0.0| 2.0| 0.0|
## +-------+-----+-----+-----+
Spark < 3.0.0
One possible approach is to convert to and from RDD:
from pyspark.ml.linalg import Vectors
df = sc.parallelize([
("assert", Vectors.dense([1, 2, 3])),
("require", Vectors.sparse(3, {1: 2}))
]).toDF(["word", "vector"])
def extract(row):
return (row.word, ) + tuple(row.vector.toArray().tolist())
df.rdd.map(extract).toDF(["word"]) # Vector values will be named _2, _3, ...
## +-------+---+---+---+
## | word| _2| _3| _4|
## +-------+---+---+---+
## | assert|1.0|2.0|3.0|
## |require|0.0|2.0|0.0|
## +-------+---+---+---+
An alternative solution would be to create an UDF:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType
def to_array(col):
def to_array_(v):
return v.toArray().tolist()
# Important: asNondeterministic requires Spark 2.3 or later
# It can be safely removed i.e.
# return udf(to_array_, ArrayType(DoubleType()))(col)
# but at the cost of decreased performance
return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
(df
.withColumn("xs", to_array(col("vector")))
.select(["word"] + [col("xs")[i] for i in range(3)]))
## +-------+-----+-----+-----+
## | word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert| 1.0| 2.0| 3.0|
## |require| 0.0| 2.0| 0.0|
## +-------+-----+-----+-----+
For Scala equivalent see Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)].
To split the rawPrediction
or probability
columns generated after training a PySpark ML model into Pandas columns, you can split like this:
your_pandas_df['probability'].apply(lambda x: pd.Series(x.toArray()))
It is much faster to use the i_th udf from how-to-access-element-of-a-vectorudt-column-in-a-spark-dataframe
The extract function given in the solution by zero323 above uses toList, which creates a Python list object, populates it with Python float objects, finds the desired element by traversing the list, which then needs to be converted back to java double; repeated for each row. Using the rdd is much slower than the to_array udf, which also calls toList, but both are much slower than a udf that lets SparkSQL handle most of the work.
Timing code comparing rdd extract and to_array udf proposed here to i_th udf from 3955864:
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext, SparkSession
from pyspark.sql.functions import lit, udf, col
from pyspark.sql.types import ArrayType, DoubleType
import pyspark.sql.dataframe
from pyspark.sql.functions import pandas_udf, PandasUDFType
sc = SparkContext('local[4]', 'FlatTestTime')
spark = SparkSession(sc)
spark.conf.set("spark.sql.execution.arrow.enabled", True)
from pyspark.ml.linalg import Vectors
# copy the two rows in the test dataframe a bunch of times,
# make this small enough for testing, or go for "big data" and be prepared to wait
REPS = 20000
df = sc.parallelize([
("assert", Vectors.dense([1, 2, 3]), 1, Vectors.dense([4.1, 5.1])),
("require", Vectors.sparse(3, {1: 2}), 2, Vectors.dense([6.2, 7.2])),
] * REPS).toDF(["word", "vector", "more", "vorpal"])
def extract(row):
return (row.word, ) + tuple(row.vector.toArray().tolist(),) + (row.more,) + tuple(row.vorpal.toArray().tolist(),)
def test_extract():
return df.rdd.map(extract).toDF(['word', 'vector__0', 'vector__1', 'vector__2', 'more', 'vorpal__0', 'vorpal__1'])
def to_array(col):
def to_array_(v):
return v.toArray().tolist()
return udf(to_array_, ArrayType(DoubleType()))(col)
def test_to_array():
df_to_array = df.withColumn("xs", to_array(col("vector"))) \
.select(["word"] + [col("xs")[i] for i in range(3)] + ["more", "vorpal"]) \
.withColumn("xx", to_array(col("vorpal"))) \
.select(["word"] + ["xs[{}]".format(i) for i in range(3)] + ["more"] + [col("xx")[i] for i in range(2)])
return df_to_array
# pack up to_array into a tidy function
def flatten(df, vector, vlen):
fieldNames = df.schema.fieldNames()
if vector in fieldNames:
names = []
for fieldname in fieldNames:
if fieldname == vector:
names.extend([col(vector)[i] for i in range(vlen)])
else:
names.append(col(fieldname))
return df.withColumn(vector, to_array(col(vector)))\
.select(names)
else:
return df
def test_flatten():
dflat = flatten(df, "vector", 3)
dflat2 = flatten(dflat, "vorpal", 2)
return dflat2
def ith_(v, i):
try:
return float(v[i])
except ValueError:
return None
ith = udf(ith_, DoubleType())
select = ["word"]
select.extend([ith("vector", lit(i)) for i in range(3)])
select.append("more")
select.extend([ith("vorpal", lit(i)) for i in range(2)])
# %% timeit ...
def test_ith():
return df.select(select)
if __name__ == '__main__':
import timeit
# make sure these work as intended
test_ith().show(4)
test_flatten().show(4)
test_to_array().show(4)
test_extract().show(4)
print("i_th\t\t",
timeit.timeit("test_ith()",
setup="from __main__ import test_ith",
number=7)
)
print("flatten\t\t",
timeit.timeit("test_flatten()",
setup="from __main__ import test_flatten",
number=7)
)
print("to_array\t",
timeit.timeit("test_to_array()",
setup="from __main__ import test_to_array",
number=7)
)
print("extract\t\t",
timeit.timeit("test_extract()",
setup="from __main__ import test_extract",
number=7)
)
Results:
i_th 0.05964796099999958
flatten 0.4842299350000001
to_array 0.42978780299999997
extract 2.9254476840000017
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With