Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I convert an array (i.e. list) column to Vector

Short version of the question!

Consider the following snippet (assuming spark is already set to some SparkSession):

from pyspark.sql import Row source_data = [     Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),     Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),  ] df = spark.createDataFrame(source_data) 

Notice that the temperatures field is a list of floats. I would like to convert these lists of floats to the MLlib type Vector, and I'd like this conversion to be expressed using the basic DataFrame API rather than going via RDDs (which is inefficient because it sends all data from the JVM to Python, the processing is done in Python, we don't get the benefits of Spark's Catalyst optimizer, yada yada). How do I do this? Specifically:

  1. Is there a way to get a straight cast working? See below for details (and a failed attempt at a workaround)? Or, is there any other operation that has the effect I was after?
  2. Which is more efficient out of the two alternative solutions I suggest below (UDF vs exploding/reassembling the items in the list)? Or are there any other almost-but-not-quite-right alternatives that are better than either of them?

A straight cast doesn't work

This is what I would expect to be the "proper" solution. I want to convert the type of a column from one type to another, so I should use a cast. As a bit of context, let me remind you of the normal way to cast it to another type:

from pyspark.sql import types df_with_strings = df.select(     df["city"],      df["temperatures"].cast(types.ArrayType(types.StringType()))), ) 

Now e.g. df_with_strings.collect()[0]["temperatures"][1] is '-7.0'. But if I cast to an ml Vector then things do not go so well:

from pyspark.ml.linalg import VectorUDT df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT())) 

This gives an error:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;; 'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)] +- LogicalRDD [city#0, temperatures#1] " 

Yikes! Any ideas how to fix this?

Possible alternatives

Alternative 1: Using VectorAssembler

There is a Transformer that seems almost ideal for this job: the VectorAssembler. It takes one or more columns and concatenates them into a single vector. Unfortunately it only takes Vector and Float columns, not Array columns, so the follow doesn't work:

from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector") df_fail = assembler.transform(df) 

It gives this error:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.' 

The best work around I can think of is to explode the list into multiple columns and then use the VectorAssembler to collect them all back up again:

from pyspark.ml.feature import VectorAssembler TEMPERATURE_COUNT = 3 assembler_exploded = VectorAssembler(     inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],      outputCol="temperature_vector" ) df_exploded = df.select(     df["city"],      *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)] ) converted_df = assembler_exploded.transform(df_exploded) final_df = converted_df.select("city", "temperature_vector") 

This seems like it would be ideal, except that TEMPERATURE_COUNT be more than 100, and sometimes more than 1000. (Another problem is that the code would be more complicated if you don't know the size of the array in advance, although that is not the case for my data.) Does Spark actually generate an intermediate data set with that many columns, or does it just consider this an intermediate step that individual items pass through transiently (or indeed does it optimise this away step entirely when it sees that the only use of these columns is to be assembled into a vector)?

Alternative 2: use a UDF

A rather simpler alternative is to use a UDF to do the conversion. This lets me express quite directly what I want to do in one line of code, and doesn't require making a data set with a crazy number of columns. But all that data has to be exchanged between Python and the JVM, and every individual number has to be handled by Python (which is notoriously slow for iterating over individual data items). Here is how that looks:

from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT()) df_with_vectors = df.select(     df["city"],      list_to_vector_udf(df["temperatures"]).alias("temperatures") ) 

Ignorable remarks

The remaining sections of this rambling question are some extra things I came up with while trying to find an answer. They can probably be skipped by most people reading this.

Not a solution: use Vector to begin with

In this trivial example it's possible to create the data using the vector type to begin with, but of course my data isn't really a Python list that I'm parallelizing, but instead is being read from a data source. But for the record, here is how that would look:

from pyspark.ml.linalg import Vectors from pyspark.sql import Row source_data = [     Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),     Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])), ] df = spark.createDataFrame(source_data) 

Inefficient solution: use map()

One possibility is to use the RDD map() method to transform the list to a Vector. This is similar to the UDF idea, except that its even worse because the cost of serialisation etc. is incurred for all the fields in each row, not just the one being operated on. For the record, here's what that solution would look like:

df_with_vectors = df.rdd.map(lambda row: Row(     city=row["city"],      temperatures=Vectors.dense(row["temperatures"]) )).toDF() 

Failed attempt at a workaround for cast

In desperation, I noticed that Vector is represented internally by a struct with four fields, but using a traditional cast from that type of struct doesn't work either. Here is an illustration (where I built the struct using a udf but the udf isn't the important part):

from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType()) df_almost_vector = df.select(     df["city"],      list_to_almost_vector_udf(df["temperatures"]).alias("temperatures") ) df_with_vectors = df_almost_vector.select(     df_almost_vector["city"],      df_almost_vector["temperatures"].cast(VectorUDT()) ) 

This gives the error:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;; 'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)] +- Project [city#0, <lambda>(temperatures#1) AS temperatures#5] +- LogicalRDD [city#0, temperatures#1] " 
like image 242
Arthur Tacca Avatar asked Feb 09 '17 13:02

Arthur Tacca


People also ask

How do I turn a column of data into a vector in R?

Here, the user has to call the pull() function from the dplyr library of R language and pass the column name which is needed to be converted into the vector form and the name of the datafile read variable. pull function: This function pull selects a column in a data frame and transforms it into a vector.

How do I turn a data frame into a vector?

For the Conversion of dataframe into a vector, we can simply pass the dataframe column name as [[index]]. Approach: We are taking a column in the dataframe and passing it into another variable by the selection method. Selection method can be defined as choosing a column from a data frame using ” [[]]”.

How do I make a data frame a vector in R?

To create a vector of data frame values by rows we can use c function after transposing the data frame with t. For example, if we have a data frame df that contains many columns then the df values can be transformed into a vector by using c(t(df)), this will print the values of the data frame row by row.

What is vector UDT?

User-defined type for Vector which allows easy interaction with SQL via DataFrame .


1 Answers

Personally I would go with Python UDF and wouldn't bother with anything else:

  • Vectors are not native SQL types so there will be performance overhead one way or another. In particular this process requires two steps where data is first converted from external type to row, and then from row to internal representation using generic RowEncoder.
  • Any downstream ML Pipeline will be much more expensive than a simple conversion. Moreover it requires a process which opposite to the one described above

But if you really want other options here you are:

  • Scala UDF with Python wrapper:

    Install sbt following the instructions on the project site.

    Create Scala package with following structure:

    . ├── build.sbt └── udfs.scala 

    Edit build.sbt (adjust to reflect Scala and Spark version):

    scalaVersion := "2.11.8"  libraryDependencies ++= Seq(   "org.apache.spark" %% "spark-sql" % "2.4.4",   "org.apache.spark" %% "spark-mllib" % "2.4.4" ) 

    Edit udfs.scala:

    package com.example.spark.udfs  import org.apache.spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector  object udfs {   val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) } 

    Package:

    sbt package 

    and include (or equivalent depending on Scala version):

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar 

    as an argument for --driver-class-path when starting shell / submitting application.

    In PySpark define a wrapper:

    from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext  def as_vector(col):     sc = SparkContext.getOrCreate()     f = sc._jvm.com.example.spark.udfs.udfs.as_vector()     return Column(f.apply(_to_seq(sc, [col], _to_java_column))) 

    Test:

    with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show() 
    +--------+------------------+----------------+ |    city|      temperatures|          vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+  with_vec.printSchema() 
    root  |-- city: string (nullable = true)  |-- temperatures: array (nullable = true)  |    |-- element: double (containsNull = true)  |-- vector: vector (nullable = true) 
  • Dump data to a JSON format reflecting DenseVector schema and read it back:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT  json_vec = to_json(struct(struct(     lit(1).alias("type"),  # type 1 is dense, type 0 is sparse     col("temperatures").alias("values") ).alias("v")))  schema = StructType([StructField("v", VectorUDT())])  with_parsed_vector = df.withColumn(     "parsed_vector", from_json(json_vec, schema).getItem("v") )  with_parsed_vector.show() 
    +--------+------------------+----------------+ |    city|      temperatures|   parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ 
    with_parsed_vector.printSchema() 
    root  |-- city: string (nullable = true)  |-- temperatures: array (nullable = true)  |    |-- element: double (containsNull = true)  |-- parsed_vector: vector (nullable = true) 
like image 65
zero323 Avatar answered Oct 15 '22 10:10

zero323