I get the following error when trying to convert an rdd made of numpy arrays to a dataframe in pyspark :
below is the piece of code leading to this error, I'm not even sure I can get the point where the error actually is, even reading the trace...
Does anyone knows how that could be bypassed ?
Thanks a lot !
In [111]: rddUser.take(5)
Out[111]:
[array([u'1008798262000292538', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'102254941859441333', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'1035609083097069747', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'10363297284472000', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'1059178934871294116', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32')]
then here comes the mess:
In [110]: rddUser.toDF(schema=None).show()
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-110-073037afd70e> in <module>()
----> 1 rddUser.toDF(schema=None).show()
62 [Row(name=u'Alice', age=1)]
63 """
---> 64 return sqlContext.createDataFrame(self, schema, sampleRatio)
65
66 RDD.toDF = toDF
421
422 if isinstance(data, RDD):
--> 423 rdd, schema = self._createFromRDD(data, schema, samplingRatio)
424 else:
425 rdd, schema = self._createFromLocal(data, schema)
308 """
309 if schema is None or isinstance(schema, (list, tuple)):
--> 310 struct = self._inferSchema(rdd, samplingRatio)
311 converter = _create_converter(struct)
312 rdd = rdd.map(converter)
253 """
254 first = rdd.first()
--> 255 if not first:
256 raise ValueError("The first row in RDD is empty, "
257 "can not infer schema")
ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()
If RDD is defined as just map
with tolist
import numpy as np
rdd = spark.sparkContext.parallelize([
np.array([u'1059178934871294116', u'1.0', u'0.0', u'0.0', u'0.0', u'1.0']),
np.array([u'102254941859441333', u'1.0', u'0.0', u'0.0', u'0.0', u'1.0'])
])
df = rdd.map(lambda x: x.tolist()).toDF(["user_id"])
# +-------------------+---+---+---+---+---+
# | user_id| _2| _3| _4| _5| _6|
# +-------------------+---+---+---+---+---+
# |1059178934871294116|1.0|0.0|0.0|0.0|1.0|
# | 102254941859441333|1.0|0.0|0.0|0.0|1.0|
# +-------------------+---+---+---+---+---+
but considering your comment I assume you want to use it with ml
. Then this might be better:
from pyspark.ml.linalg import DenseVector
(rdd
.map(lambda x: (x[0].tolist(), DenseVector(x[1:])))
.toDF(["user_id", "features"])
.show(2, False))
# +-------------------+---------------------+
# |user_id |features |
# +-------------------+---------------------+
# |1059178934871294116|[1.0,0.0,0.0,0.0,1.0]|
# |102254941859441333 |[1.0,0.0,0.0,0.0,1.0]|
# +-------------------+---------------------+
You should also take a look at pyspark.ml.feature.OneHotEncoder
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With