I have a DataFrame that I want to add a column of distinct uuid4() rows. My code:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StringType
from uuid import uuid4
spark_session = SparkSession.builder.getOrCreate()
df = spark_session.createDataFrame([
[1, 1, 'teste'],
[2, 2, 'teste'],
[3, 0, 'teste'],
[4, 5, 'teste'],
],
list('abc'))
df = df.withColumn("_tmp", f.lit(1))
uuids = [str(uuid4()) for _ in range(df.count())]
df1 = spark_session.createDataFrame(uuids, StringType())
df1 = df_1.withColumn("_tmp", f.lit(1))
df2 = df.join(df_1, "_tmp", "inner").drop("_tmp")
df2.show()
But I've got this ERROR:
Py4JJavaError: An error occurred while calling o1571.showString.
: org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
I already try with alias and using monotonically_increasing_id as the join column, but I see here that I cannot trust in monotonically_increasing_id as merge column. I'm expecting:
+---+---+-----+------+
| a| b| c| value|
+---+---+-----+------+
| 1| 1|teste| uuid4|
| 2| 2|teste| uuid4|
| 3| 0|teste| uuid4|
| 4| 5|teste| uuid4|
+---+---+-----+------+
what's the correct approach in this case?
You can create duplicates UUIDs by creating more 16384 uuid1 in less than 100ns. Don't use uuid1 when you don't want to make the MAC address of your machine visible. UUID4() uses the cryptographically secure random number generator to generate UUID. uuid4() generates a random UUID.
Use monotonically_increasing_id() for unique, but not consecutive numbers. The monotonically_increasing_id() function generates monotonically increasing 64-bit integers. The generated id numbers are guaranteed to be increasing and unique, but they are not guaranteed to be consecutive.
PySpark Collect() – Retrieve data from DataFrame Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
PySpark SQL is a module in Spark which integrates relational processing with Spark's functional programming API. We can extract the data by using an SQL query language. We can use the queries same as the SQL language.
I use row_number as @Tetlanesh suggest. I have to create an ID column to ensure that row_number count every row of Window.
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from uuid import uuid4
from pyspark.sql.window import Window
from pyspark.sql.types import StringType
from pyspark.sql.functions import row_number
spark_session = SparkSession.builder.getOrCreate()
df = spark_session.createDataFrame([
[1, 1, 'teste'],
[1, 2, 'teste'],
[2, 0, 'teste'],
[2, 5, 'teste'],
],
list('abc'))
df = df.alias("_tmp")
df.registerTempTable("_tmp")
df2 = self.spark_session.sql("select *, uuid() as uuid from _tmp")
df2.show()
Another approach is using windows, but It's not efficient as the first one:
df = df.withColumn("_id", f.lit(1))
df = df.withColumn("_tmp", row_number().over(Window.orderBy('_id')))
uuids = [(str(uuid4()), 1) for _ in range(df.count())]
df1 = spark_session.createDataFrame(uuids, ['uuid', '_id'])
df1 = df1.withColumn("_tmp", row_number().over(Window.orderBy('_id')))
df2 = df.join(df1, "_tmp", "inner").drop('_id')
df2.show()
both outputs:
+---+---+-----+------+
| a| b| c| uuid|
+---+---+-----+------+
| 1| 1|teste| uuid4|
| 2| 2|teste| uuid4|
| 3| 0|teste| uuid4|
| 4| 5|teste| uuid4|
+---+---+-----+------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With