I asked a similar question yesterday - Matrix Multiplication between two RDD[Array[Double]] in Spark - however I've decided to shift to pyspark to do this. I've made some progress loading and reformatting the data - Pyspark map from RDD of strings to RDD of list of doubles - however the matrix multiplcation is difficult. Let me share my progress first:
matrix1.txt
1.2 3.4 2.3
2.3 1.1 1.5
3.3 1.8 4.5
5.3 2.2 4.5
9.3 8.1 0.3
4.5 4.3 2.1
it's difficult to share files, however this is what my matrix1.txt file looks like. It is a space-delimited text file including the values of a matrix. Next is the code:
# do the imports for pyspark and numpy
from pyspark import SparkConf, SparkContext
import numpy as np
# loadmatrix is a helper function used to read matrix1.txt and format
# from RDD of strings to RDD of list of floats
def loadmatrix(sc):
data = sc.textFile("matrix1.txt").map(lambda line: line.split(' ')).map(lambda line: [float(x) for x in line])
return(data)
# this is the function I am struggling with, it should take a line of the
# matrix (formatted as list of floats), compute an outer product with itself
def AtransposeA(line):
# pseudocode for this would be...
# outerprod = compute line * line^transpose
# return(outerprod)
# here is the main body of my file
if __name__ == "__main__":
# create the conf, sc objects, then use loadmatrix to read data
conf = SparkConf().setAppName('SVD').setMaster('local')
sc = SparkContext(conf = conf)
mymatrix = loadmatrix(sc)
# this is pseudocode for calling AtransposeA
ATA = mymatrix.map(lambda line: AtransposeA(line)).reduce(elementwise add all the outerproducts)
# the SVD of ATA is computed below
U, S, V = np.linalg.svd(ATA)
# ...
My approach is as follows - to do matrix multiplication A^T * A, I create a function that computes outer products of rows of A. The elementwise sum of all of the outerproducts is the product I want. I then call AtransposeA() in a map function, that way is it performed on each row of the matrix, and finally I use a reduce() to add the resulting matrices.
I'm struggling thinking about how the AtransposeA function should look. How can I do an outerproduct in pyspark like this? Thanks in advance for help!
First, consider why you want to use Spark for this. It sounds like all your data fits in memory, in which case you can use numpy and pandas in a very straight-forward way.
If your data isn't structured so that rows are independent, then it probably can't be parallelized by sending groups of rows to different nodes, which is the whole point of using Spark.
Having said that... here is some pyspark (2.1.1) code that I think does what you want.
# read the matrix file
df = spark.read.csv("matrix1.txt",sep=" ",inferSchema=True)
df.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|1.2|3.4|2.3|
|2.3|1.1|1.5|
|3.3|1.8|4.5|
|5.3|2.2|4.5|
|9.3|8.1|0.3|
|4.5|4.3|2.1|
+---+---+---+
# do the sum of the multiplication that we want, and get
# one data frame for each column
colDFs = []
for c2 in df.columns:
colDFs.append( df.select( [ F.sum(df[c1]*df[c2]).alias("op_{0}".format(i)) for i,c1 in enumerate(df.columns) ] ) )
# now union those separate data frames to build the "matrix"
mtxDF = reduce(lambda a,b: a.select(a.columns).union(b.select(a.columns)), colDFs )
mtxDF.show()
+------------------+------------------+------------------+
| op_0| op_1| op_2|
+------------------+------------------+------------------+
| 152.45|118.88999999999999| 57.15|
|118.88999999999999|104.94999999999999| 38.93|
| 57.15| 38.93|52.540000000000006|
+------------------+------------------+------------------+
This seems to be the same result that you get from numpy.
a = numpy.genfromtxt("matrix1.txt")
numpy.dot(a.T, a)
array([[ 152.45, 118.89, 57.15],
[ 118.89, 104.95, 38.93],
[ 57.15, 38.93, 52.54]])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With