#scala #apache-spark #apache-kafka #streaming #spark-structured-streaming
Вопрос:
Не удается устранить проблему после отправки структурированного потокового задания spark для чтения из кафки. Пример кода задания spark:
object KafkaStructuredStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName(getClass.getName)
.master("spark://spark-master:7077")
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "tweet-upload-6")
.option("enable.auto.commit", false)
.option("group.id", "Structured-Streaming-Examples")
.option("failOnDataLoss", false)
.load()
df.printSchema()
val consoleOutput = df.writeStream
.outputMode("append")
.format("console")
.start()
consoleOutput.awaitTermination()
}
}
Одно примечание: узлы Kafka и spark находятся в одной и той же сети докеров, раньше все работало с потоковой передачей spark, но я переключился на структурированную потоковую передачу, потому что у меня была проблема с одним входным потоком- > несколькими выходными потоками.
Я получаю сейчас ошибку:
07721793-driver-0] Error connecting to node kafka:9092 (id: 1001 rack: null)
submit-spark-job | java.net.UnknownHostException: kafka
submit-spark-job | at java.net.InetAddress.getAllByName0(InetAddress.java:1282)
submit-spark-job | at java.net.InetAddress.getAllByName(InetAddress.java:1194)
submit-spark-job | at java.net.InetAddress.getAllByName(InetAddress.java:1128)
submit-spark-job | at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
submit-spark-job | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)
submit-spark-job | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:512)
submit-spark-job | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)
submit-spark-job | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)
submit-spark-job | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)
submit-spark-job | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)
submit-spark-job | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498)
submit-spark-job | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
submit-spark-job | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306)
submit-spark-job | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1367)
submit-spark-job | 21/10/18 22:44:41 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-4d4a80ae-01a7-4679-8393-42aaa0e35e6a-307721793-driver-0-1, groupId=spark-kafka-source-4d4a80ae-01a7-4679-8393-42aaa0e35e6a-307721793-driver-0] Error connecting to node kafka:9092 (id: 1001 rack: null)
submit-spark-job | java.net.UnknownHostException: kafka
тоже
85949862d6d3-1705030665-driver-0] Group coordinator kafka:9092 (id: 2147482646 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
submit-spark-job | 21/10/18 23:03:46 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job | 21/10/18 23:03:47 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job | 21/10/18 23:03:47 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job | 21/10/18 23:03:48 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job | 21/10/18 23:03:48 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (172.18.0.7 executor 0): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
submit-spark-job | at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
submit-spark-job | at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
submit-spark-job | at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
submit-spark-job | at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:124)
submit-spark-job | at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:61)
submit-spark-job | at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
submit-spark-job | at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
submit-spark-job | at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactor
Docker compose files:
version: '2'
services:
zookeeper:
image: zookeeper
ports:
- "2181:2181"
kafka:
image: linuxkitpoc/kafka:latest
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
tweet-producer:
image: mkovacevic/tweet-producer-app:latest
ports:
- "8080:8080"
tty: true
depends_on:
- kafka
(создано в сети по умолчанию : twitter-streaming_default)
и
version: '3'
services:
spark-master:
image: bde2020/spark-master:3.1.1-hadoop3.2
container_name: spark-master
hostname: spark-master
ports:
- "7077:7077"
environment:
- INIT_DAEMON_STEP=setup_spark
spark-worker-1:
image: bde2020/spark-worker:3.1.1-hadoop3.2
container_name: spark-worker-1
depends_on:
- spark-master
ports:
- "8081:8081"
environment:
- "SPARK_MASTER=spark://spark-master:7077"
spark-worker-2:
image: bde2020/spark-worker:3.1.1-hadoop3.2
container_name: spark-worker-2
depends_on:
- spark-master
ports:
- "8082:8081"
environment:
- "SPARK_MASTER=spark://spark-master:7077"
submit-spark:
image: mkovacevic/spark-streaming-app:latest
container_name: submit-spark-job
depends_on:
- spark-master
- spark-worker-1
- spark-worker-2
networks:
default:
external: true
name: twitter-streaming_default
Есть какие-нибудь предложения??
Комментарии:
1. Ну, прежде всего, покажите нам свой
/etc/hosts
или что-то подобное в Windows. Попробуй пингнутьkafka:9092
. Похоже, имя хостаkafka
не разрешено должным образом. Кроме того, опубликуйте свои файлы docker, чтобы мы могли ознакомиться с настройками сети. Он должен был сказать что-нибудь и без этого.2. Содержимое etc/хостов в контейнере spark-submit-job
127.0.0.1- 1 localhostroot 450 May 3 2019 krb5.conf ::1 localhost ip6-localhost ip6-loopback fe00::0 ip6-localnet ff00::0 ip6-mcastprefix ff02::1 ip6-allnodes ff02::2 ip6-allrouters 172.18.0.9 cec68fa15dfa
Также я могу пинговать кафку из всех контейнеров spark, докер создает файлы для spark и кафки, загруженные в описание вопроса.
Ответ №1:
Каждый файл compose создает свою собственную изолированную сеть мостов по умолчанию.
Если вы хотите использовать два файла, то вам явно необходимо добавить сеть в каждую службу. В противном случае вам нужно поместить все службы в один файл
Кстати, linuxkitpoc/kafka
изображение не обновлялось годами. Я предлагаю вам использовать что-нибудь другое для Кафки. Личная рекомендация была бы от Bitnami
Комментарии:
1. Похоже, изображение Bitnami помогло! @OneCricketeer спасибо!
2. Это изображение не было проблемой (если только брокер на самом деле не начинал правильно). Но если это решит вашу проблему, не стесняйтесь принять ответ, поставив галочку рядом с сообщением