Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark generate row hash of specific columns and add it as a new column

I am working with spark 2.2.0 and pyspark2.

I have created a DataFrame df and now trying to add a new column "rowhash" that is the sha2 hash of specific columns in the DataFrame.

For example, say that df has the columns: (column1, column2, ..., column10)

I require sha2((column2||column3||column4||...... column8), 256) in a new column "rowhash".

For now, I tried using below methods:

1) Used hash() function but since it gives an integer output it is of not much use

2) Tried using sha2() function but it is failing.

Say columnarray has array of columns I need.

def concat(columnarray):
    concat_str = ''
    for val in columnarray:
        concat_str = concat_str + '||' + str(val) 
    concat_str = concat_str[2:] 
    return concat_str 

and then

df1 = df1.withColumn("row_sha2", sha2(concat(columnarray),256))

This is failing with "cannot resolve" error.

Thanks gaw for your answer. Since I have to hash only specific columns, I created a list of those column names (in hash_col) and changed your function as :

 def sha_concat(row, columnarray):
   row_dict = row.asDict()      #transform row to a dict
   concat_str = '' 
   for v in columnarray: 
       concat_str = concat_str + '||' + str(row_dict.get(v)) 
   concat_str = concat_str[2:] 
   #preserve concatenated value for testing (this can be removed later)
   row_dict["sha_values"] = concat_str  
   row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest()
   return Row(**row_dict) 

Then passed as :

    df1.rdd.map(lambda row: sha_concat(row,hash_col)).toDF().show(truncate=False)

It is now however failing with error:

    UnicodeEncodeError: 'ascii' codec can't encode character u'\ufffd' in position 8: ordinal not in range(128)

I can see value of \ufffd in one of the column so I am unsure if there is a way to handle this ?

like image 741
msashish Avatar asked Sep 12 '18 09:09

msashish


People also ask

How do I create a new column from an existing column in PySpark?

Method 1: Add New Column With Constant Value In this approach to add a new column with constant values, the user needs to call the lit() function parameter of the withColumn() function and pass the required parameters into these functions. Here, the lit() is available in pyspark.

How do I add a column in PySpark based on condition?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

What is MD5 in PySpark?

md5 (col)[source] Calculates the MD5 digest and returns the value as a 32 character hex string. New in version 1.5.

How to add new row to Dataframe in pyspark?

Pyspark add new row to dataframe is possible by union operation in dataframes. We can create a new dataframe from the row and union them. In this article, we sill first simply create a new dataframe and then create a different dataframe with the same schema/structure and after it. We will union both of them simple.

How to add specific column based on condition in pyspark?

Under this method, the user needs to use the when function along with withcolumn () method used to check the condition and add the column values based on existing column values. So we have to import when () from pyspark.sql.functions to add a specific column based on the given condition.

What is sum as new column in spark dataframe?

It means that we want to create a new column that will contain the sum of all values present in the given row. Now let’s discuss the various methods how we add sum as new columns Now we will see the different methods about how to add new columns in spark Dataframe .

What is a row class in pyspark?

PYSPARK ROW is a class that represents the Data Frame as a record. We can create row objects in PySpark by certain parameters in PySpark. The row class extends the tuple, so the variable arguments are open while creating the row class. We can create a row object and can retrieve the data from the Row.


3 Answers

You can use pyspark.sql.functions.concat_ws() to concatenate your columns and pyspark.sql.functions.sha2() to get the SHA256 hash.

Using the data from @gaw:

from pyspark.sql.functions import sha2, concat_ws
df = spark.createDataFrame(
    [(1,"2",5,1),(3,"4",7,8)],
    ("col1","col2","col3","col4")
)
df.withColumn("row_sha2", sha2(concat_ws("||", *df.columns), 256)).show(truncate=False)
#+----+----+----+----+----------------------------------------------------------------+
#|col1|col2|col3|col4|row_sha2                                                        |
#+----+----+----+----+----------------------------------------------------------------+
#|1   |2   |5   |1   |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|
#|3   |4   |7   |8   |57f057bdc4178b69b1b6ab9d78eabee47133790cba8cf503ac1658fa7a496db1|
#+----+----+----+----+----------------------------------------------------------------+

You can pass in either 0 or 256 as the second argument to sha2(), as per the docs:

Returns the hex string result of SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, and SHA-512). The numBits indicates the desired bit length of the result, which must have a value of 224, 256, 384, 512, or 0 (which is equivalent to 256).

The function concat_ws takes in a separator, and a list of columns to join. I am passing in || as the separator and df.columns as the list of columns.

I am using all of the columns here, but you can specify whatever subset of columns you'd like- in your case that would be columnarray. (You need to use the * to unpack the list.)

like image 117
pault Avatar answered Oct 07 '22 11:10

pault


If you want to have the hash for each value in the different columns of your dataset you can apply a self-designed function via map to the rdd of your dataframe.

import hashlib
test_df = spark.createDataFrame([
    (1,"2",5,1),(3,"4",7,8),              
    ], ("col1","col2","col3","col4"))

def sha_concat(row):
    row_dict = row.asDict()                             #transform row to a dict
    columnarray = row_dict.keys()                       #get the column names
    concat_str = ''
    for v in row_dict.values():
        concat_str = concat_str + '||' + str(v)         #concatenate values
    concat_str = concat_str[2:] 
    row_dict["sha_values"] = concat_str                 #preserve concatenated value for testing (this can be removed later)
    row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest() #calculate sha256
    return Row(**row_dict)

test_df.rdd.map(sha_concat).toDF().show(truncate=False)

The Results would look like:

+----+----+----+----+----------------------------------------------------------------+----------+
|col1|col2|col3|col4|sha_hash                                                        |sha_values|
+----+----+----+----+----------------------------------------------------------------+----------+
|1   |2   |5   |1   |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|1||2||5||1|
|3   |4   |7   |8   |cb8f8c5d9fd7165cf3c0f019e0fb10fa0e8f147960c715b7f6a60e149d3923a5|8||4||7||3|
+----+----+----+----+----------------------------------------------------------------+----------+
like image 23
gaw Avatar answered Oct 07 '22 11:10

gaw


New in version 2.0 is the hash function.

from pyspark.sql.functions import hash

(
    spark
    .createDataFrame([(1,'Abe'),(2,'Ben'),(3,'Cas')], ('id','name'))
    .withColumn('hashed_name', hash('name'))
).show()

wich results in:

+---+----+-----------+
| id|name|hashed_name|
+---+----+-----------+
|  1| Abe| 1567000248|
|  2| Ben| 1604243918|
|  3| Cas| -586163893|
+---+----+-----------+

https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#hash

like image 42
Michael H. Avatar answered Oct 07 '22 11:10

Michael H.