I have the following code:
source
.mapValues(value -> value + " Stream it!!!")
.print(Printed.toSysOut());
as you can see, mapValues
expects a lambda expression.
Now, I am using Java library but the application is written in Scala. How to pass Scala lambda to Java code?
I tried the following:
source
.mapValues(value => value + "hello")
.print(Printed.toSysOut)
But the compiler complains:
[error] (x$1: org.apache.kafka.streams.kstream.Printed[String,?0(in value x$1)])Unit <and>
[error] (x$1: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: ?0(in value x$1), String])Unit <and>
[error] (x$1: String)Unit
[error] cannot be applied to (org.apache.kafka.streams.kstream.Printed[Nothing,Nothing])
[error] .print(Printed.toSysOut)
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 2 s, completed Nov 19, 2017 7:53:44 PM
Lambda Expression refers to an expression that uses an anonymous function instead of variable or value. Lambda expressions are more convenient when we have a simple function to be used in one place. These expressions are faster and more expressive than defining a whole function.
Introduction to Scala Lambda. Scala lambda functions are anonymous functions. They reduce the line of code and make the function more readable and convenient to define. We can also reuse them. We can use this lambda function to iterate our collection data structure and performed any kind of operation on them.
AWS Lambda's Java support also makes it easy to write Lambda functions in other jvm-based languages. Let's take a look at how you can do that for Scala.
A lambda expression is a short block of code which takes in parameters and returns a value. Lambda expressions are similar to methods, but they do not need a name and they can be implemented right in the body of a method.
It depends on your version of Scala.
In 2.12 Scala functions can be used in places where Java functions are expected and vice versa.
App1.java
import java.util.function.Function;
public class App1 {
public static void method(Function<String, String> function) {
System.out.println(function.apply("a"));
}
public static void main(String[] args) {
App.method1((String s) -> s.toUpperCase());
}
}
App.scala
object App {
def main(args: Array[String]): Unit = {
App1.method((s: String) => s.toUpperCase)
}
def method1(function: String => String): Unit = {
println(function("xyz"))
}
}
In 2.11 you can use scala-java8-compat
libraryDependencies += "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0"
App1.java
import java.util.function.Function;
import static scala.compat.java8.JFunction.func;
public class App1 {
public static void method(Function<String, String> function) {
System.out.println(function.apply("a"));
}
public static void main(String[] args) {
App.method1(func((String s) -> s.toUpperCase()));
}
}
App.scala
import scala.compat.java8.FunctionConverters._
object App {
def main(args: Array[String]): Unit = {
App1.method(((s: String) => s.toUpperCase).asJava)
}
def method1(function: String => String): Unit = {
println(function("xyz"))
}
}
Alternatively in 2.11 in Scala you can define implicit converters between java.util.function.Function
and scala.Function1
.
So if you use 2.11 try
source
.mapValues((value => value + "hello").asJava)
.print(Printed.toSysOut)
or
source
.mapValues(((value: String) => value + "hello").asJava)
.print(Printed.toSysOut[String, String])
The error message lists the types of arguments that print
supports. One of them is:
org.apache.kafka.streams.kstream.Printed[String,?0(in value x$1)]
From the error message you can see that you're providing Printed.toSysOut
with a type of:
org.apache.kafka.streams.kstream.Printed[Nothing,Nothing]
According to the Kafka 1 javadoc (Printed
was not present in Kafka 1.1), toSysOut
is defined as:
public static <K,V> Printed<K,V> toSysOut()
So the answer problem is that Scala is inferring K
and V
with types of Nothing
. You need to provide the types explicitly.
The following will probably work:
source
.mapValues[String](value -> value + " Stream it!!!")
.print(Printed.toSysOut[String,String])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With