Im using Flink (latest via git) to stream from kafka to cassandra. To ease unit testing Im adding dependency injection via Dagger.
The ObjectGraph seems to be setting itself up properly but the 'inner objects' are being flagged as 'not serializable' by Flink. If I include these objects directly they work - so what's the difference?
Class in question implements MapFunction and @Inject a module for cassandra and one for reading config files.
Is there a way to build this so I can use late binding or does Flink make this impossible?
fwiw - Dependency injection (via dagger) and RichMapFunction can't coexist. Dagger won't let you include any objects that have extends in their definition.
Objects instantiated via Dagger Lazy<t> won't serialize either.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.someapp.SaveMap@2e029d61 not serializable
...
Caused by: java.io.NotSerializableException: dagger.internal.LazyBinding$1
Before diving into the specifics of the question, a bit of background on serializability of functions in Apache Flink:
Apache Flink uses Java Serialization (java.io.Serializable) to ship the function objects (here the MapFunction
) to the workers that execute them in parallel. Because of that, the functions need to be serializable: The function may not contain any non-serializable fields, i.e. types that are not primitive (int, long, double, ...) and not implementing java.io.Serializable
.
The typical way to work with non-serializable constructs is to lazily initialize them.
One way to use non-serializable types in Flink functions is to lazily initialize them. The fields that hold these types are still null
when the function is serialized to be shipped, and only set after the function has been deserialized by the workers.
In Scala, you can simply use lazy fields, for example lazy val x = new NonSerializableType()
. The NonSerializableType
type is actually only created upon first access to the variable x
, which is usually on the worker. Consequently, the type can be non serializable, because x
is null when the function is serialized to shipping to the workers.
In Java, you can initialize the non-serializable fields on the open()
method of the function, if you make it a Rich Function. Rich functions (like RichMapFunction
) are extended versions of basic functions (here MapFunction
) and give you access to life-cycle methods like open()
and close()
.
I am not too familiar with dependency injection, but dagger seems to provide something like a lazy dependency as well, which may help as a workaround quite like lazy variables in Scala:
new MapFunction<Long, Long>() {
@Inject Lazy<MyDependency> dep;
public Long map(Long value) {
return dep.get().doSomething(value);
}
}
I faced a similar issue. There are 2 ways to not deserialize your dependency.
Make your dependency static, but it is not always possible. It can also mess your code design.
Use Transient: By declaring your dependency as transient you are saying that they are not part of the persistent state of an object, and should not be part of serialization.
public ClassA implements Serializable{ //class A code here } public ClassB{ //class B code here } public class MySinkFunction implements SinkFunction<MyData> { private ClassA mySerializableDependency; private transient ClassB nonSerializableDependency; }
This is especially useful when you are using external libraries, whose implementations cannot be changed by you to make them serializable.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With