I have to encode the column in a big DataFrame in pyspark(spark 2.0). All the values are almost unique(about 1000mln values). The best choice could be StringIndexer, but at some reason it always fails and kills my spark session. Can I somehow write a function like that:
id_dict() = dict()
def indexer(x):
id_dict.setdefault(x, len(id_dict))
return id_dict[x]
And map it to DataFrame with id_dict saving the items()? Will this dict will be synced on each executor? I need all this for preprocessing tuples ('x', 3, 5) for spark.mllib ALS model. Thank you.
StringIndexer keeps all labels in memory, so if values are almost unique, it just won't scale.
You can take unique values, sort and add id, which is expensive, but more robust in this case:
from pyspark.sql.functions import monotonically_increasing_id
df = spark.createDataFrame(["a", "b", "c", "a", "d"], "string").toDF("value")
indexer = (df.select("value").distinct()
.orderBy("value")
.withColumn("label", monotonically_increasing_id()))
df.join(indexer, ["value"]).show()
# +-----+-----------+
# |value| label|
# +-----+-----------+
# | d|25769803776|
# | c|17179869184|
# | b| 8589934592|
# | a| 0|
# | a| 0|
# +-----+-----------+
Note that labels are not consecutive and can differ from run to run or can change if spark.sql.shuffle.partitions changes. If it is not acceptable you'll have to use RDDs:
from operator import itemgetter
indexer = (df.select("value").distinct()
.rdd.map(itemgetter(0)).zipWithIndex()
.toDF(["value", "label"]))
df.join(indexer, ["value"]).show()
# +-----+-----+
# |value|label|
# +-----+-----+
# | d| 0|
# | c| 1|
# | b| 2|
# | a| 3|
# | a| 3|
# +-----+-----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With