I'm trying to run a spark stream from a kafka queue containing Avro messages.
As per https://spark.apache.org/docs/latest/sql-data-sources-avro.html I should be able to use from_avro
to convert column value to Dataset<Row>
.
However, I'm unable to compile the project as it complains from_avro
cannot be found. I can see the method declared in package.class of the dependency.
How can I use the from_avro
method from org.apache.spark.sql.avro
in my Java code locally?
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.avro.*;
public class AvroStreamTest {
public static void main(String[] args) throws IOException, InterruptedException {
// Creating local sparkSession here...
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "avro_queue")
.load();
// Cannot resolve method 'from_avro'...
df.select(from_avro(col("value"), jsonFormatSchema)).writeStream().format("console")
.outputMode("update")
.start();
}
}
pom.xml:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<!-- more dependencies below -->
</dependencies>
It seems like Java is unable to import names from sql.avro.package.class
Since Spark 2.4 release, Spark SQL provides built-in support for reading and writing Apache Avro data.
Apache Avro is an open-source, row-based, data serialization and data exchange framework for Hadoop projects, originally developed by databricks as an open-source library that supports reading and writing data in Avro file format. it is mostly used in Apache Spark especially for Kafka-based data pipelines.
Read and write options. When reading or writing Avro data in Spark via DataFrameReader or DataFrameWriter , there are a few options we can specify: avroSchema - Optional schema JSON file. recordName - Top record name in write result.
It's because of the generated class names, importing it as import org.apache.spark.sql.avro.package$;
and then using package$.MODULE$.from_avro(...)
should work
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With