I am trying to get a small example from Apache flink running in clojure, but right now I am stuck, because of the type hinting in clojure and some strange quirk in flink.
Here is my code:
(ns pipeline.core
(:import
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.util Collector)
(java.lang String)))
(def flink-env (ExecutionEnvironment/createLocalEnvironment))
(def dataset (.fromElements flink-env (to-array ["please test me"])))
(defn tokenizer [] (reify FlatMapFunction
( flatMap [this value collector]
(println value))))
(.flatMap dataset (tokenizer))
If I do not provide type hints, I get an error from the flink api:
Caused by: java.lang.IllegalArgumentException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic types is limited at this point.
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:662)
If I provide type hints:
(defn tokenizer [] (reify FlatMapFunction
( ^void flatMap [this ^String value ^Collector collector]
(println value))))
I get an error from the clojure compiler:
Caused by: java.lang.IllegalArgumentException: Can't find matching method: flatMap, leave off hints for auto match.
at clojure.lang.Compiler$NewInstanceMethod.parse(Compiler.java:8065)
Is there a way to add type hints in clojure with generic classes? It should be something like this:
(defn tokenizer [] (reify FlatMapFunction
( ^void flatMap [this ^String value ^Collector<Tuple2<String, Integer>> collector]
(println value))))
But that doesn't work. Any ideas?
The lein config looks like this:
(defproject pipeline "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.7.0"]
[org.apache.flink/flink-java "0.9.0"]
]
:aot :all)
Clojure cannot handle reflections, thus you need to specify the return type manually via Flink method returns
.
(.returns (.flatMap dataset (tokenizer)) String)
Furthermore, you need to use deftype
to define tokenizer
and instantiate a new object when using it because Flink cannot handle anonymous classes:
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(println value)))
(.flatMap dataset (tokenizer.))
Here is a full "Word-Count-Example" that can be packed into a jar and executed.
Pay attention to the type hints and casts. For tokenizer
output (int 1)
is required, otherwise Long
would be the second type of Tuple2
. Furthermore, we use a String to declare the output type for tokenizer
(a class type is not sufficient because the reflection types must also be specified). Finally, we need to type hint (int-array [0])
to resolve the overload of groupBy
(without it, the method is ambiguous to the Clojure compiler).
(ns org.apache.flink.flink-clojure.WordCount
(:import
(org.apache.flink.api.common.functions FlatMapFunction)
(org.apache.flink.api.java DataSet)
(org.apache.flink.api.java ExecutionEnvironment)
(org.apache.flink.api.java.tuple Tuple2)
(org.apache.flink.util Collector)
(java.lang String))
(:require [clojure.string :as str])
(:gen-class))
(def flink-env (ExecutionEnvironment/createLocalEnvironment))
(def text (.fromElements flink-env (to-array ["please test me and me too"])))
(deftype tokenizer [] FlatMapFunction
(flatMap [this value collector]
(doseq [v (str/split value #"\s")]
(.collect collector (Tuple2. v (int 1))))))
(def tokens (.returns (.flatMap text (tokenizer.)) "Tuple2<String,Integer>"))
(def counts (.sum (.groupBy tokens (int-array [0])) 1))
(defn -main []
(.print counts)
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With