#java #scala #spark-avro #spark-streaming-kafka
#java #scala #spark-avro #spark-streaming-кафка
Вопрос:
Я пытаюсь запустить поток spark из очереди kafka, содержащей сообщения Avro.
Согласно https://spark.apache.org/docs/latest/sql-data-sources-avro.html Я должен быть в состоянии использовать from_avro
для преобразования значения столбца в Dataset<Row>
.
Однако я не могу скомпилировать проект, поскольку он жалуется, что from_avro
не может быть найден. Я вижу метод, объявленный в package.class зависимости.
Как я могу использовать from_avro
метод from org.apache.spark.sql.avro
в моем Java-коде локально?
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.avro.*;
public class AvroStreamTest {
public static void main(String[] args) throws IOException, InterruptedException {
// Creating local sparkSession here...
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "avro_queue")
.load();
// Cannot resolve method 'from_avro'...
df.select(from_avro(col("value"), jsonFormatSchema)).writeStream().format("console")
.outputMode("update")
.start();
}
}
pom.xml:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<!-- more dependencies below -->
</dependencies>
Похоже, Java не может импортировать имена из sql.avro.package.class
Ответ №1:
Это из-за сгенерированных имен классов, импорт его как import org.apache.spark.sql.avro.package$;
, а затем использование package$.MODULE$.from_avro(...)
должно сработать
Комментарии:
1. Это сработало, спасибо! Интересно, как
org.apache.spark.sql.functions
импорт работает нормально, а avro one — нет.2. @Maciej C Метод
from_avro
определен внутри объекта package, в то время какfunctions
он находится внутри обычного объекта. Обычные объекты генерируют аналогичный байт-код с помощью статических методов Java, но в Java нет конструкции, подобной объектам пакета Scala.
Ответ №2:
Вам необходимо включить spark-sql-avro в свой pom.xml который доступен по
https://mvnrepository.com/artifact/org.apache.spark/spark-sql-avro_2.11/2.4.0-palantir.28-1-gdf34e2d
Комментарии:
1. Спасибо за ваш ответ! Хотя это не похоже на официальный jar?