Структурированная потоковая передача Spark не может считываться из кафки внутри docker

#scala #apache-spark #apache-kafka #streaming #spark-structured-streaming

Вопрос:

Не удается устранить проблему после отправки структурированного потокового задания spark для чтения из кафки. Пример кода задания spark:

 object KafkaStructuredStreaming {


  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName(getClass.getName)
      .master("spark://spark-master:7077")
      .getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092")
      .option("startingOffsets", "earliest")
      .option("subscribe", "tweet-upload-6")
      .option("enable.auto.commit", false)
      .option("group.id", "Structured-Streaming-Examples")
      .option("failOnDataLoss", false)
      .load()

    df.printSchema()

    val consoleOutput = df.writeStream
      .outputMode("append")
      .format("console")
      .start()
    consoleOutput.awaitTermination()

  }
}
 

Одно примечание: узлы Kafka и spark находятся в одной и той же сети докеров, раньше все работало с потоковой передачей spark, но я переключился на структурированную потоковую передачу, потому что у меня была проблема с одним входным потоком- > несколькими выходными потоками.

Я получаю сейчас ошибку:

 07721793-driver-0] Error connecting to node kafka:9092 (id: 1001 rack: null)
submit-spark-job  | java.net.UnknownHostException: kafka
submit-spark-job  |     at java.net.InetAddress.getAllByName0(InetAddress.java:1282)
submit-spark-job  |     at java.net.InetAddress.getAllByName(InetAddress.java:1194)
submit-spark-job  |     at java.net.InetAddress.getAllByName(InetAddress.java:1128)
submit-spark-job  |     at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
submit-spark-job  |     at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)
submit-spark-job  |     at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:512)
submit-spark-job  |     at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)
submit-spark-job  |     at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)
submit-spark-job  |     at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)
submit-spark-job  |     at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1367)
submit-spark-job  | 21/10/18 22:44:41 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-4d4a80ae-01a7-4679-8393-42aaa0e35e6a-307721793-driver-0-1, groupId=spark-kafka-source-4d4a80ae-01a7-4679-8393-42aaa0e35e6a-307721793-driver-0] Error connecting to node kafka:9092 (id: 1001 rack: null)
submit-spark-job  | java.net.UnknownHostException: kafka
 

тоже

 85949862d6d3-1705030665-driver-0] Group coordinator kafka:9092 (id: 2147482646 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
submit-spark-job  | 21/10/18 23:03:46 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:47 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:47 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:48 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:48 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (172.18.0.7 executor 0): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
submit-spark-job  |     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
submit-spark-job  |     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
submit-spark-job  |     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:124)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:61)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
submit-spark-job  |     at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactor
 

Docker compose files:

 version: '2'

services:
  zookeeper:
    image: zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: linuxkitpoc/kafka:latest
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: "9092"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  tweet-producer:
    image: mkovacevic/tweet-producer-app:latest
    ports:
      - "8080:8080"
    tty: true
    depends_on:
      - kafka

 

(создано в сети по умолчанию : twitter-streaming_default)
и

 version: '3'
services:
  spark-master:
    image: bde2020/spark-master:3.1.1-hadoop3.2
    container_name: spark-master
    hostname:  spark-master
    ports:
      - "7077:7077"
    environment:
      - INIT_DAEMON_STEP=setup_spark
  spark-worker-1:
    image: bde2020/spark-worker:3.1.1-hadoop3.2
    container_name: spark-worker-1
    depends_on:
      - spark-master
    ports:
      - "8081:8081"
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
  spark-worker-2:
    image: bde2020/spark-worker:3.1.1-hadoop3.2
    container_name: spark-worker-2
    depends_on:
      - spark-master
    ports:
      - "8082:8081"
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
  submit-spark:
    image: mkovacevic/spark-streaming-app:latest
    container_name: submit-spark-job
    depends_on:
      - spark-master
      - spark-worker-1
      - spark-worker-2

networks:
  default:
    external: true
    name: twitter-streaming_default

 

Есть какие-нибудь предложения??

Комментарии:

1. Ну, прежде всего, покажите нам свой /etc/hosts или что-то подобное в Windows. Попробуй пингнуть kafka:9092 . Похоже, имя хоста kafka не разрешено должным образом. Кроме того, опубликуйте свои файлы docker, чтобы мы могли ознакомиться с настройками сети. Он должен был сказать что-нибудь и без этого.

2. Содержимое etc/хостов в контейнере spark-submit-job 127.0.0.1- 1 localhostroot 450 May 3 2019 krb5.conf ::1 localhost ip6-localhost ip6-loopback fe00::0 ip6-localnet ff00::0 ip6-mcastprefix ff02::1 ip6-allnodes ff02::2 ip6-allrouters 172.18.0.9 cec68fa15dfa Также я могу пинговать кафку из всех контейнеров spark, докер создает файлы для spark и кафки, загруженные в описание вопроса.

Ответ №1:

Каждый файл compose создает свою собственную изолированную сеть мостов по умолчанию.

Если вы хотите использовать два файла, то вам явно необходимо добавить сеть в каждую службу. В противном случае вам нужно поместить все службы в один файл

Кстати, linuxkitpoc/kafka изображение не обновлялось годами. Я предлагаю вам использовать что-нибудь другое для Кафки. Личная рекомендация была бы от Bitnami

Комментарии:

1. Похоже, изображение Bitnami помогло! @OneCricketeer спасибо!

2. Это изображение не было проблемой (если только брокер на самом деле не начинал правильно). Но если это решит вашу проблему, не стесняйтесь принять ответ, поставив галочку рядом с сообщением