In my project I would like to access the Flink User Classloader before the stream is executed. I have been instantiating my own Classloader to deserialize classes (doing my best to avoid issues related to multiple classloaders) prior to stream execution.
However the further I am progressing the more issues I am having to write (bad) code to avoid this issue.
This could be solved if I can access the Flink User Classloader and use that, however I don't see a mechanism to do so outside of "RichFunctions" (https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/functions/RichFunction.html) which require the stream to be running.
Any guidance here would be appreciated
The best way, at least for the moment, is to insert a special control message into your stream which lets the source properly terminate (simply stop reading more data by leaving the reading loop). That way Flink will tell all down-stream operators that they can stop after they have consumed all data.
Class loader leaks may happen when reloading or redeploying an application within an application server but not when restarting the whole server. Each time you restart an application in an application Server e.g. Tomcat, then a new class loader will be created in order to load the classes of the application.
Using keyed streams - Flink Tutorial Flink users are hashing algorithms to divide the stream by partitions based on the number of slots allocated to the job. It then distributes the same keys to the same slots. Partitioning by key is ideal for aggregation operations that aggregate on a specific key.
Go to Flink's home directory and run the below command in the terminal. Go to Flink dashboard, you will be able to see a completed job with its details. If you click on Completed Jobs, you will get detailed overview of the jobs. To check the output of wordcount program, run the below command in the terminal.
You can use yourself class loader in flink.
Building the graph yourself and submit the class loader with the client.
The code is like follows:
final StandaloneClusterClient client;
try {
client = new StandaloneClusterClient(configuration);
} catch (final Exception e) {
throw new RuntimeException("Could not establish a connection to the job manager", e);
}
try {
ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
Collections.<URL>singletonList(uploadedJarUrl),
Collections.<URL>emptyList(),
this.getClass().getClassLoader());
client.runDetached(jobGraph, classLoader);
} catch (final ProgramInvocationException e) {
throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
}
But I still wonder why you want yourself classloader, it may can be implement by other way.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With