Regarding generating surrogate key, the first step is to get the distinct and then build an incremental key for each tuple.
So I use Java Set to get the distinct elements and it's out of heap space. Then, I use Flink's distinct() and it totally works.
Could I ask what make this difference?
Another related question is, can Flink generate surrogate key in mapper?
Flink executes a distinct()
internally as a GroupBy
followed by a ReduceGroup
operator, where the reduce operator returns the first element of the group only.
The GroupBy
is done by sorting the data. Sorting is done on a binary data representation, if possible in-memory, but might spill to disk if not enough memory is available. This blog post gives some insight about that. GroupBy
and Sort
are memory-safe in Flink and will not fail with an OutOfMemoryError
.
You can also do a distinct on a custom key, by using DataSet.distinct(KeySelector ks)
. The key selector is basically a MapFunction
that generates a custom key.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With