Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Primary keys with Apache Spark

I am having a JDBC connection with Apache Spark and PostgreSQL and I want to insert some data into my database. When I use append mode I need to specify id for each DataFrame.Row. Is there any way for Spark to create primary keys?

like image 788
Nhor Avatar asked Oct 13 '15 12:10

Nhor


People also ask

How do I select the first 10 rows in Spark SQL?

In Spark/PySpark, you can use show() action to get the top/first N (5,10,100 ..) rows of the DataFrame and display them on a console or a log, there are also several Spark Actions like take() , tail() , collect() , head() , first() that return top and last n rows as a list of Rows (Array[Row] for Scala).

What is the key concept of Apache spark?

At the core of Apache Spark is the notion of data abstraction as distributed collection of objects. This data abstraction, called Resilient Distributed Dataset (RDD), allows you to write programs that transform these distributed datasets.

How do you find the primary key?

The primary key of a table is the column whose values are different in every row. Because they are different, they make each row unique. If no one such column exists, the primary key is a composite of two or more columns whose values, taken together, are different in every row.


2 Answers

from pyspark.sql.functions import monotonically_increasing_id  df.withColumn("id", monotonically_increasing_id()).show() 

Note that the 2nd argument of df.withColumn is monotonically_increasing_id() not monotonically_increasing_id .

like image 20
Allyn Avatar answered Oct 06 '22 14:10

Allyn


Scala:

If all you need is unique numbers you can use zipWithUniqueId and recreate DataFrame. First some imports and dummy data:

import sqlContext.implicits._ import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, LongType}  val df = sc.parallelize(Seq(     ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar") 

Extract schema for further usage:

val schema = df.schema 

Add id field:

val rows = df.rdd.zipWithUniqueId.map{    case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)} 

Create DataFrame:

val dfWithPK = sqlContext.createDataFrame(   rows, StructType(StructField("id", LongType, false) +: schema.fields)) 

The same thing in Python:

from pyspark.sql import Row from pyspark.sql.types import StructField, StructType, LongType  row = Row("foo", "bar") row_with_index = Row(*["id"] + df.columns)  df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()  def make_row(columns):     def _make_row(row, uid):         row_dict = row.asDict()         return row_with_index(*[uid] + [row_dict.get(c) for c in columns])     return _make_row  f = make_row(df.columns)  df_with_pk = (df.rdd     .zipWithUniqueId()     .map(lambda x: f(*x))     .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields))) 

If you prefer consecutive number your can replace zipWithUniqueId with zipWithIndex but it is a little bit more expensive.

Directly with DataFrame API:

(universal Scala, Python, Java, R with pretty much the same syntax)

Previously I've missed monotonicallyIncreasingId function which should work just fine as long as you don't require consecutive numbers:

import org.apache.spark.sql.functions.monotonicallyIncreasingId  df.withColumn("id", monotonicallyIncreasingId).show() // +---+----+-----------+ // |foo| bar|         id| // +---+----+-----------+ // |  a|-1.0|17179869184| // |  b|-2.0|42949672960| // |  c|-3.0|60129542144| // +---+----+-----------+ 

While useful monotonicallyIncreasingId is non-deterministic. Not only ids may be different from execution to execution but without additional tricks cannot be used to identify rows when subsequent operations contain filters.

Note:

It is also possible to use rowNumber window function:

from pyspark.sql.window import Window from pyspark.sql.functions import rowNumber  w = Window().orderBy() df.withColumn("id", rowNumber().over(w)).show() 

Unfortunately:

WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

So unless you have a natural way to partition your data and ensure uniqueness is not particularly useful at this moment.

like image 102
zero323 Avatar answered Oct 06 '22 15:10

zero323