Spark в контейнере Docker не считывает входные данные Kafka — структурированная потоковая передача

#docker #apache-spark #apache-kafka #docker-compose #spark-structured-streaming

#docker #apache-spark #apache-kafka #docker-compose #spark-structured-streaming

Вопрос:

Когда задание Spark выполняется локально без Docker через spark-submit , все работает нормально. Однако запуск в контейнере docker не приводит к созданию выходных данных.

Чтобы проверить, работает ли сама Kafka, я извлек Kafka в контейнер Spark worker и заставил консольного пользователя прослушивать тот же хост, порт и тему (kafka: 9092, crypto_topic), который работал правильно и показывал выходные данные. (Производитель постоянно отправляет данные в тему в другом контейнере)

Ожидается —

 20/09/11 17:35:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.29.10:42565 with 366.3 MB RAM, BlockManagerId(driver, 192.168.29.10, 42565, None)
20/09/11 17:35:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.29.10, 42565, None)
20/09/11 17:35:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.29.10, 42565, None)
-------------------------------------------
Batch: 0
-------------------------------------------
 --------- ----------- ----------------- ------ ---------- ------------ ----- ------------------- --------- 
|name_coin|symbol_coin|number_of_markets|volume|market_cap|total_supply|price|percent_change_24hr|timestamp|
 --------- ----------- ----------------- ------ ---------- ------------ ----- ------------------- --------- 
 --------- ----------- ----------------- ------ ---------- ------------ ----- ------------------- --------- 
...
...
...
followed by more output

  

Актуально

 20/09/11 14:49:44 INFO BlockManagerMasterEndpoint: Registering block manager d7443d94165c:46203 with 366.3 MB RAM, BlockManagerId(driver, d7443d94165c, 46203, None)
20/09/11 14:49:44 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, d7443d94165c, 46203, None)
20/09/11 14:49:44 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, d7443d94165c, 46203, None)
20/09/11 14:49:44 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0

no more output, stuck here
  

файл docker-compose.yml

 version: "3"

services:

    zookeeper:
        image: zookeeper:3.6.1
        container_name: zookeeper
        hostname: zookeeper
        ports:
            - "2181:2181"
        networks:
            - crypto-network
      
    kafka:
        image: wurstmeister/kafka:2.13-2.6.0
        container_name: kafka
        hostname: kafka
        ports:
            - "9092:9092"
        environment:
            - KAFKA_ADVERTISED_HOST_NAME=kafka
            - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
            - KAFKA_ADVERTISED_PORT=9092
            # topic-name:partitions:in-sync-replicas:cleanup-policy
            - KAFKA_CREATE_TOPICS="crypto_topic:1:1:compact"
        networks:
            - crypto-network

    kafka-producer:
        image: python:3-alpine
        container_name: kafka-producer
        command: >
                sh -c "pip install -r /usr/src/producer/requirements.txt
                amp;amp; python3 /usr/src/producer/kafkaProducerService.py"
        volumes:
            - ./kafkaProducer:/usr/src/producer
        networks: 
            - crypto-network
      
            
    cassandra:
        image: cassandra:3.11.8
        container_name: cassandra
        hostname: cassandra
        ports:
            - "9042:9042"
        #command:
        #    cqlsh -f /var/lib/cassandra/cql-queries.cql
        volumes:
            - ./cassandraData:/var/lib/cassandra

        networks:
            - crypto-network
            
    spark-master:
        image: bde2020/spark-master:2.4.5-hadoop2.7
        container_name: spark-master
        hostname: spark-master
        ports:
            - "8080:8080"
            - "7077:7077"
            - "6066:6066"
        networks:
            - crypto-network
            
    spark-consumer-worker:
        image: bde2020/spark-worker:2.4.5-hadoop2.7
        container_name: spark-consumer-worker
        environment:
            - SPARK_MASTER=spark://spark-master:7077
        ports:
            - "8081:8081"
        volumes:
            - ./sparkConsumer:/sparkConsumer
        networks:
            - crypto-network
    
            
networks:
  crypto-network:
    driver: bridge
  

spark-submit выполняется

 docker exec -it spark-consumer-worker bash

/spark/bin/spark-submit --master $SPARK_MASTER --class processing.SparkRealTimePriceUpdates 
--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 
/sparkConsumer/sparkconsumer_2.11-1.0-RELEASE.jar 
  

Соответствующие части кода Spark

   val inputDF: DataFrame = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "crypto_topic")
    .load()

...
...
...

  val queryPrice: StreamingQuery = castedDF
    .writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", "false")
    .start()

    queryPrice.awaitTermination()
  

Ответ №1:

   val inputDF: DataFrame = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "crypto_topic")
    .load()
  

Эта часть кода на самом деле была

   val inputDF: DataFrame = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
    .option("subscribe", KAFKA_TOPIC)
    .load()
  

Где KAFKA_BOOTSTRAP_SERVERS и KAFKA_TOPIC считываются из файла конфигурации при локальной упаковке jar.

Лучшим способом отладки для меня было сделать журналы более подробными.

Локально значение KAFKA_BOOTSTRAP_SERVERS было localhost:9092 , но в контейнере Docker оно было изменено на kafka:9092 в файле конфигурации там. Однако это не отразилось, поскольку JAR уже был упакован. Поэтому изменение значения на kafka:9092 при локальной упаковке исправило его.

Я был бы признателен за любую помощь в том, как заставить JAR динамически подбирать конфигурации. Я не хочу упаковывать через SBT в контейнер Docker.