#apache-kafka #spark-streaming
#apache-kafka #искровая потоковая передача
Вопрос:
Я добавил пакеты sbt для потоковой передачи kafka и spark следующим образом:
"org.apache.spark" % "spark-streaming_2.10" % "1.6.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1"
однако, когда я хочу использовать потоки kafkadirect..Я не могу получить к нему доступ..
val topics="CCN_TOPIC,GGSN_TOPIC"
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBrokers)
val messages= org.apache.spark.streaming.kafka.KafkaUtils[String, String, kafka.serializer.StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
Компилятор не распознает kafka.serializer.Строковый декодер..
object serializer is not a member of package org.apache.spark.streaming.kafka
Редактировать:
Я также пробовал
import _root_.kafka.serializer
..но в этом нет StringDecoder ..
Комментарии:
1. где вы вызываете
createDirectStream
метод из класса KafkaUtils?
Ответ №1:
Пожалуйста, попробуйте сделать следующее: добавьте файл
kafka_2.11-0.10.0.0.jar
к зависимости вашего проекта. Это должно исправить вашу ошибку.
Ответ №2:
Прошу прощения, если я использую неправильную терминологию, я не специалист в области зависимостей, ссылок и т.д. Приведенный ниже метод работает с использованием HDP 2.4.3.
1. Найдите правильную банку
Вам необходимо найти правильные банки Kafka в вашей среде.
Следующий сценарий оболочки полезен для создания общего списка всех классов во всех Jar и сброса их в файл (который затем можно искать с помощью egrep, когда вам нужно найти Jar для определенных классов).
Я использую HDP, поэтому я указал корневой каталог установки HDP в качестве точки для поиска jars. Ваши банки платформы могут быть в другом месте.
all_hdp_classes () {
find -L /usr/hdp/current -maxdepth 20 -name "*.jar" -print | while read line; do
for i in `jar tf $line | grep .class`
do
echo $line : $i
done
done
}
all_hdp_classes > ~/all_hdp_classes
Получив список, вы можете выполнить поиск в файле следующим образом:
egrep 'kafka' ~/all_hdp_classes | grep Decoder
ПРИМЕЧАНИЕ: Ambari устанавливает библиотеки брокера Kafka только на том узле, на котором находится брокер Kafka. Поэтому вам нужно выполнить поиск на этом узле, иначе вы ничего не найдете (или вы найдете только spark-examples).
Затем скопируйте Jar на узел / ы, на которых вы запускаете Spark.
На узле брокера Kafka я нашел следующий Jar, который содержит класс StringDecoder:
/usr/hdp/current/kafka-broker/libs/kafka_2.10-0.10.1.2.1.2.0-10.jar
Обратите внимание, что это другой файл, чем kafka_2.11-0.10.0.0.jar как было предложено в другом ответе, который не содержал класс декодера (по крайней мере, в моей среде).
Также проверьте свою версию scala на 2.10 или 2.11 и убедитесь, что Jar выровнен.
2. Добавление Jar в ваш путь к классу Spark
Включите Jar в свой путь к классу Spark.
Метод зависит от того, хотите ли вы обновить путь к классу для одного сеанса только с помощью spark-shell или spark-submit, или если вы хотите обновить путь к классу для всех сеансов Spark (например, с помощью Ambari).
Для сеанса spark-shell:
spark-shell --jars /path/to/jar/on/this/node/kafka_2.10-0.10.1.2.1.2.0-10.jar
Обратите внимание, что зависимость spark-streaming-kafka maven, упомянутая в документации Spark, может вызвать конфликт на этапе импорта, как будет объяснено позже.
Вы все равно можете продолжить и добавить его, используя опцию —packages, если вам это нужно.
Пример (Spark 1.6.2 и scala 2.10, ваш может отличаться):
spark-shell --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.2 --jars kafka_2.10-0.10.1.2.1.2.0-10.jar
3. Импорт в ваш сеанс
import org.apache.spark.streaming._
import kafka.serializer.StringDecoder
Вы можете получить следующую ошибку:
error: object serializer is not a member of package org.apache.spark.streaming.kafka
В моем примере выше в пакет maven включен другой пакет kafka, и поэтому он был импортирован как часть «org.apache.spark.streaming._»
Для решения выполните следующие действия:
import org.apache.spark.streaming._
import _root_.kafka.serializer.StringDecoder
Комментарии:
1. Добавление root . помогло
Ответ №3:
Одной из возможных причин возникновения этой проблемы является то, что вы импортировали
раньше
import org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream
.
import kafka.serializer.StringDecoder
Правильный порядок
import kafka.serializer.StringDecoder //first
import org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream