Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Flink- Class file for org.apache.flink.streaming.api.scala.DataStream not found

Using Apache Flink version 1.3.2 and Cassandra 3.11, I wrote a simple code to write data into Cassandra using Apache Flink Cassandra connector. The following is the code:

final Collection<String> collection = new ArrayList<>(50);
        for (int i = 1; i <= 50; ++i) {
            collection.add("element " + i);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<UUID, String>> dataStream = env
                .fromCollection(collection)
                .map(new MapFunction<String, Tuple2<UUID, String>>() {

                    final String mapped = " mapped ";
                    String[] splitted;

                    @Override
                    public Tuple2<UUID, String> map(String s) throws Exception {
                        splitted = s.split("\\s+");
                        return new Tuple2(
                                UUID.randomUUID(),
                                splitted[0] + mapped + splitted[1]
                        );
                    }
                });
        dataStream.print();
        CassandraSink.addSink(dataStream)
                .setQuery("INSERT INTO test.phases (id, text) values (?, ?);")
                .setHost("127.0.0.1")
                .build();
        env.execute();

Trying to run the same code using Apache Flink 1.4.2 (1.4.x), I got the error:

Error:(36, 22) java: cannot access org.apache.flink.streaming.api.scala.DataStream
  class file for org.apache.flink.streaming.api.scala.DataStream not found

on the line

CassandraSink.addSink(dataStream)
                    .setQuery("INSERT INTO test.phases (id, text) values (?, ?);")
                    .setHost("127.0.0.1")
                    .build();

I think we have some dependency changes in Apache Flink 1.4.2 and it causes the problem.

I use the following dependencies imported in the code:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

How can I solve the error in Apache Flink version 1.4.2?

Update: In Flink 1.3.2, the class org.apache.flink.streaming.api.scala.DataStream<T> is in Java documents, but in version 1.4.2 there is no such class. see here

I tried the code example in Flink 1.4.2 documents for Cassandra connector but I got the same error, but the example worked with Flink 1.3.2 dependencies!

like image 329
Soheil Pourbafrani Avatar asked Dec 23 '22 08:12

Soheil Pourbafrani


1 Answers

Besides all other dependencies make sure you have the Flink Scala dependency:

Maven

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.4.2</version>
</dependency>

Gradle

dependencies {
    compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.4.2'
..
}

I managed to get your example working with the following dependencies:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

Maven

<dependencies>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-cassandra_2.11</artifactId>
        <version>1.4.2</version>
    </dependency>

</dependencies>
like image 120
Alex Avatar answered May 25 '23 01:05

Alex