I have seen several mentions of an "upsert mode" for dynamic tables based on a unique key in the Flink documentation and on the official Flink blog. However, I do not see any examples / documentation regarding how to enable this mode on a dynamic table.
Examples:
Blog post:
When defining a dynamic table on a stream via update mode, we can specify a unique key attribute on the table. In that case, update and delete operations are performed with respect to the key attribute. The update mode is visualized in the following figure.
Documentation:
A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.
So my questions are:
The linked resources describe two different scenarios.
DataStream -> Table
conversion. Table -> DataStream
conversion.The following discussion is based on Flink 1.4.0 (Jan. 2018).
Upsert DataStream -> Table
Conversion
Converting a DataStream
into a Table
by upsert on keys is not natively supported but on the roadmap. Meanwhile, you can emulate this behavior using an append Table
and a query with a user-defined aggregation function.
If you have an append Table
Logins
with the schema (user, loginTime, ip)
that tracks logins of users, you can convert that into an upsert Table
keyed on user
with the following query:
SELECT user, LAST_VAL(loginTime), LAST_VAL(ip) FROM Logins GROUP BY user
The LAST_VAL
aggregation function is a user-defined aggregation function that always returns the latest added value.
Native support for upsert DataStream -> Table
conversion would work basically the same way, although providing a more concise API.
Upsert Table -> DataStream
Conversion
Converting a Table
into an upsert DataStream
is not supported. This is also properly reflected in the documentation:
Please note that only append and retract streams are supported when converting a dynamic table into a DataStream.
We deliberately chose not to support upsert Table -> DataStream
conversions, because an upsert DataStream
can only be processed if the key attributes are known. These depend on the query and are not always straight-forward to identify. It would be the responsibility of the developer to make sure that the key attributes are correctly interpreted. Failing to do so would result in faulty programs. To avoid problems, we decided to not offer the upsert Table -> DataStream
conversion.
Instead users can convert a Table
into a retraction DataStream
. Moreover, we support UpsertTableSink
that writes an upsert DataStream
to an external system, such as a database or key-value store.
Update: since Flink 1.9, LAST_VALUE
is part of the build-in aggregate functions, if we use the Blink planner (which is the default since Flink 1.11).
Assuming the existence of the Logins
table mentioned in the response of Fabian Hueske above, we can now convert it to an upsert table as simply as:
SELECT
user,
LAST_VALUE(loginTime),
LAST_VALUE(ip)
FROM Logins
GROUP BY user
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With