I'd like to perform principle component analysis (PCA), using PySpark (Spark 1.6.2), on numerical data that exists in a Hive table. I'm able to import the Hive table to a Spark dataframe:
>>> from pyspark.sql import HiveContext
>>> hiveContext = HiveContext(sc)
>>> dataframe = hiveContext.sql("SELECT * FROM my_table")
>>> type(dataframe)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> dataframe.columns
['par001', 'par002', 'par003', etc...]
>>> dataframe.collect()
[Row(par001=1.1, par002=5.5, par003=8.2, etc...), Row(par001=0.0, par002=5.7, par003=4.2, etc...), etc...]
There's an excellent StackOverflow post that shows how to perform PCA in PySpark: https://stackoverflow.com/a/33481471/2626491
In the 'test' section of the post, @desertnaut creates a dataframe with just one column (called 'features'):
>>> from pyspark.ml.feature import *
>>> from pyspark.mllib.linalg import Vectors
>>> data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
... (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
... (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = sqlContext.createDataFrame(data,["features"])
>>> type(df)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> df.columns
['features']
>>> df.collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0])), Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0])), Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]))]
Each row in @desertnaut's example dataframe contains a DenseVector
object, which is then used by the pca
function.
Q) How can I convert the dataframe from Hive into a single column dataframe ("features") where each row contains a DenseVector
representing all the values from the original row?
You should use a VectorAssembler
. If data is similar to this:
from pyspark.sql import Row
data = sc.parallelize([
Row(par001=1.1, par002=5.5, par003=8.2),
Row(par001=0.0, par002=5.7, par003=4.2)
]).toDF()
you should import required class:
from pyspark.ml.feature import VectorAssembler
create an instance:
assembler = VectorAssembler(inputCols=data.columns, outputCol="features")
transform and select:
assembler.transform(data).select("features")
You can also use an user defined function. In Spark 1.6 import Vectors
and VectorUDT
from mllib
:
from pyspark.mllib.linalg import Vectors, VectorUDT
and udf
from sql.functions
:
from pyspark.sql.functions import udf, array
and select:
data.select(
udf(Vectors.dense, VectorUDT())(*data.columns)
).toDF("features")
This is less verbose but much slower.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With