Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Accessing Flink Classloader before Stream Start

In my project I would like to access the Flink User Classloader before the stream is executed. I have been instantiating my own Classloader to deserialize classes (doing my best to avoid issues related to multiple classloaders) prior to stream execution.

However the further I am progressing the more issues I am having to write (bad) code to avoid this issue.

This could be solved if I can access the Flink User Classloader and use that, however I don't see a mechanism to do so outside of "RichFunctions" (https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/functions/RichFunction.html) which require the stream to be running.

Any guidance here would be appreciated

like image 745
Luka Jurukovski Avatar asked Aug 08 '18 16:08

Luka Jurukovski


People also ask

How do you stop a Flink stream from a program?

The best way, at least for the moment, is to insert a special control message into your stream which lets the source properly terminate (simply stop reading more data by leaving the reading loop). That way Flink will tell all down-stream operators that they can stop after they have consumed all data.

What is a Classloader leak?

Class loader leaks may happen when reloading or redeploying an application within an application server but not when restarting the whole server. Each time you restart an application in an application Server e.g. Tomcat, then a new class loader will be created in order to load the classes of the application.

What is keyed stream in Flink?

Using keyed streams - Flink Tutorial Flink users are hashing algorithms to divide the stream by partitions based on the number of slots allocated to the job. It then distributes the same keys to the same slots. Partitioning by key is ideal for aggregation operations that aggregate on a specific key.

How do I start a Flink program?

Go to Flink's home directory and run the below command in the terminal. Go to Flink dashboard, you will be able to see a completed job with its details. If you click on Completed Jobs, you will get detailed overview of the jobs. To check the output of wordcount program, run the below command in the terminal.


1 Answers

You can use yourself class loader in flink.

Building the graph yourself and submit the class loader with the client.

The code is like follows:

final StandaloneClusterClient client;
try {
    client = new StandaloneClusterClient(configuration);
} catch (final Exception e) {
    throw new RuntimeException("Could not establish a connection to the job manager", e);
}

try {
    ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
            Collections.<URL>singletonList(uploadedJarUrl),
            Collections.<URL>emptyList(),
            this.getClass().getClassLoader());
    client.runDetached(jobGraph, classLoader);
} catch (final ProgramInvocationException e) {
    throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
}

But I still wonder why you want yourself classloader, it may can be implement by other way.

like image 192
yunfan Avatar answered Sep 22 '22 09:09

yunfan