Consider the following snippet (assuming spark
is already set to some SparkSession
):
from pyspark.sql import Row source_data = [ Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]), Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), ] df = spark.createDataFrame(source_data)
Notice that the temperatures field is a list of floats. I would like to convert these lists of floats to the MLlib type Vector
, and I'd like this conversion to be expressed using the basic DataFrame
API rather than going via RDDs (which is inefficient because it sends all data from the JVM to Python, the processing is done in Python, we don't get the benefits of Spark's Catalyst optimizer, yada yada). How do I do this? Specifically:
This is what I would expect to be the "proper" solution. I want to convert the type of a column from one type to another, so I should use a cast. As a bit of context, let me remind you of the normal way to cast it to another type:
from pyspark.sql import types df_with_strings = df.select( df["city"], df["temperatures"].cast(types.ArrayType(types.StringType()))), )
Now e.g. df_with_strings.collect()[0]["temperatures"][1]
is '-7.0'
. But if I cast to an ml Vector then things do not go so well:
from pyspark.ml.linalg import VectorUDT df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
This gives an error:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;; 'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)] +- LogicalRDD [city#0, temperatures#1] "
Yikes! Any ideas how to fix this?
VectorAssembler
There is a Transformer
that seems almost ideal for this job: the VectorAssembler
. It takes one or more columns and concatenates them into a single vector. Unfortunately it only takes Vector
and Float
columns, not Array
columns, so the follow doesn't work:
from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector") df_fail = assembler.transform(df)
It gives this error:
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
The best work around I can think of is to explode the list into multiple columns and then use the VectorAssembler
to collect them all back up again:
from pyspark.ml.feature import VectorAssembler TEMPERATURE_COUNT = 3 assembler_exploded = VectorAssembler( inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], outputCol="temperature_vector" ) df_exploded = df.select( df["city"], *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)] ) converted_df = assembler_exploded.transform(df_exploded) final_df = converted_df.select("city", "temperature_vector")
This seems like it would be ideal, except that TEMPERATURE_COUNT
be more than 100, and sometimes more than 1000. (Another problem is that the code would be more complicated if you don't know the size of the array in advance, although that is not the case for my data.) Does Spark actually generate an intermediate data set with that many columns, or does it just consider this an intermediate step that individual items pass through transiently (or indeed does it optimise this away step entirely when it sees that the only use of these columns is to be assembled into a vector)?
A rather simpler alternative is to use a UDF to do the conversion. This lets me express quite directly what I want to do in one line of code, and doesn't require making a data set with a crazy number of columns. But all that data has to be exchanged between Python and the JVM, and every individual number has to be handled by Python (which is notoriously slow for iterating over individual data items). Here is how that looks:
from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT()) df_with_vectors = df.select( df["city"], list_to_vector_udf(df["temperatures"]).alias("temperatures") )
The remaining sections of this rambling question are some extra things I came up with while trying to find an answer. They can probably be skipped by most people reading this.
Vector
to begin withIn this trivial example it's possible to create the data using the vector type to begin with, but of course my data isn't really a Python list that I'm parallelizing, but instead is being read from a data source. But for the record, here is how that would look:
from pyspark.ml.linalg import Vectors from pyspark.sql import Row source_data = [ Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])), Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])), ] df = spark.createDataFrame(source_data)
map()
One possibility is to use the RDD map()
method to transform the list to a Vector
. This is similar to the UDF idea, except that its even worse because the cost of serialisation etc. is incurred for all the fields in each row, not just the one being operated on. For the record, here's what that solution would look like:
df_with_vectors = df.rdd.map(lambda row: Row( city=row["city"], temperatures=Vectors.dense(row["temperatures"]) )).toDF()
In desperation, I noticed that Vector
is represented internally by a struct with four fields, but using a traditional cast from that type of struct doesn't work either. Here is an illustration (where I built the struct using a udf but the udf isn't the important part):
from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType()) df_almost_vector = df.select( df["city"], list_to_almost_vector_udf(df["temperatures"]).alias("temperatures") ) df_with_vectors = df_almost_vector.select( df_almost_vector["city"], df_almost_vector["temperatures"].cast(VectorUDT()) )
This gives the error:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;; 'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)] +- Project [city#0, <lambda>(temperatures#1) AS temperatures#5] +- LogicalRDD [city#0, temperatures#1] "
Here, the user has to call the pull() function from the dplyr library of R language and pass the column name which is needed to be converted into the vector form and the name of the datafile read variable. pull function: This function pull selects a column in a data frame and transforms it into a vector.
For the Conversion of dataframe into a vector, we can simply pass the dataframe column name as [[index]]. Approach: We are taking a column in the dataframe and passing it into another variable by the selection method. Selection method can be defined as choosing a column from a data frame using ” [[]]”.
To create a vector of data frame values by rows we can use c function after transposing the data frame with t. For example, if we have a data frame df that contains many columns then the df values can be transformed into a vector by using c(t(df)), this will print the values of the data frame row by row.
User-defined type for Vector which allows easy interaction with SQL via DataFrame .
Personally I would go with Python UDF and wouldn't bother with anything else:
Vectors
are not native SQL types so there will be performance overhead one way or another. In particular this process requires two steps where data is first converted from external type to row, and then from row to internal representation using generic RowEncoder
.Pipeline
will be much more expensive than a simple conversion. Moreover it requires a process which opposite to the one described above But if you really want other options here you are:
Scala UDF with Python wrapper:
Install sbt following the instructions on the project site.
Create Scala package with following structure:
. ├── build.sbt └── udfs.scala
Edit build.sbt
(adjust to reflect Scala and Spark version):
scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.4.4", "org.apache.spark" %% "spark-mllib" % "2.4.4" )
Edit udfs.scala
:
package com.example.spark.udfs import org.apache.spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector object udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) }
Package:
sbt package
and include (or equivalent depending on Scala version):
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
as an argument for --driver-class-path
when starting shell / submitting application.
In PySpark define a wrapper:
from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
Test:
with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show()
+--------+------------------+----------------+ | city| temperatures| vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_vec.printSchema()
root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- vector: vector (nullable = true)
Dump data to a JSON format reflecting DenseVector
schema and read it back:
from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # type 1 is dense, type 0 is sparse col("temperatures").alias("values") ).alias("v"))) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn( "parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show()
+--------+------------------+----------------+ | city| temperatures| parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+
with_parsed_vector.printSchema()
root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- parsed_vector: vector (nullable = true)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With