Spark 2.4.0 Avro Java — не удается разрешить метод из_avro

#java #scala #spark-avro #spark-streaming-kafka

#java #scala #spark-avro #spark-streaming-кафка

Вопрос:

Я пытаюсь запустить поток spark из очереди kafka, содержащей сообщения Avro.

Согласно https://spark.apache.org/docs/latest/sql-data-sources-avro.html Я должен быть в состоянии использовать from_avro для преобразования значения столбца в Dataset<Row> .

Однако я не могу скомпилировать проект, поскольку он жалуется, что from_avro не может быть найден. Я вижу метод, объявленный в package.class зависимости.

Как я могу использовать from_avro метод from org.apache.spark.sql.avro в моем Java-коде локально?

 import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.avro.*;


public class AvroStreamTest {
    public static void main(String[] args) throws IOException, InterruptedException {

     // Creating local sparkSession here...

        Dataset<Row> df = sparkSession
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "host:port")
                .option("subscribe", "avro_queue")
                .load();

        // Cannot resolve method 'from_avro'...
        df.select(from_avro(col("value"), jsonFormatSchema)).writeStream().format("console")
                .outputMode("update")
                .start();


    }
}
  

pom.xml:

 <dependencies>
    <dependency> 
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>2.4.0</version>
    </dependency>
  <!-- more dependencies below -->

</dependencies>
  

Похоже, Java не может импортировать имена из sql.avro.package.class

Ответ №1:

Это из-за сгенерированных имен классов, импорт его как import org.apache.spark.sql.avro.package$; , а затем использование package$.MODULE$.from_avro(...) должно сработать

Комментарии:

1. Это сработало, спасибо! Интересно, как org.apache.spark.sql.functions импорт работает нормально, а avro one — нет.

2. @Maciej C Метод from_avro определен внутри объекта package, в то время как functions он находится внутри обычного объекта. Обычные объекты генерируют аналогичный байт-код с помощью статических методов Java, но в Java нет конструкции, подобной объектам пакета Scala.

Ответ №2:

Вам необходимо включить spark-sql-avro в свой pom.xml который доступен по

https://mvnrepository.com/artifact/org.apache.spark/spark-sql-avro_2.11/2.4.0-palantir.28-1-gdf34e2d

Комментарии:

1. Спасибо за ваш ответ! Хотя это не похоже на официальный jar?