#docker #apache-spark #apache-kafka #docker-compose #spark-structured-streaming
#docker #apache-spark #apache-kafka #docker-compose #spark-structured-streaming
Вопрос:
Когда задание Spark выполняется локально без Docker через spark-submit
, все работает нормально. Однако запуск в контейнере docker не приводит к созданию выходных данных.
Чтобы проверить, работает ли сама Kafka, я извлек Kafka в контейнер Spark worker и заставил консольного пользователя прослушивать тот же хост, порт и тему (kafka: 9092, crypto_topic), который работал правильно и показывал выходные данные. (Производитель постоянно отправляет данные в тему в другом контейнере)
Ожидается —
20/09/11 17:35:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.29.10:42565 with 366.3 MB RAM, BlockManagerId(driver, 192.168.29.10, 42565, None)
20/09/11 17:35:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.29.10, 42565, None)
20/09/11 17:35:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.29.10, 42565, None)
-------------------------------------------
Batch: 0
-------------------------------------------
--------- ----------- ----------------- ------ ---------- ------------ ----- ------------------- ---------
|name_coin|symbol_coin|number_of_markets|volume|market_cap|total_supply|price|percent_change_24hr|timestamp|
--------- ----------- ----------------- ------ ---------- ------------ ----- ------------------- ---------
--------- ----------- ----------------- ------ ---------- ------------ ----- ------------------- ---------
...
...
...
followed by more output
Актуально
20/09/11 14:49:44 INFO BlockManagerMasterEndpoint: Registering block manager d7443d94165c:46203 with 366.3 MB RAM, BlockManagerId(driver, d7443d94165c, 46203, None)
20/09/11 14:49:44 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, d7443d94165c, 46203, None)
20/09/11 14:49:44 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, d7443d94165c, 46203, None)
20/09/11 14:49:44 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
no more output, stuck here
файл docker-compose.yml
version: "3"
services:
zookeeper:
image: zookeeper:3.6.1
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
networks:
- crypto-network
kafka:
image: wurstmeister/kafka:2.13-2.6.0
container_name: kafka
hostname: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_PORT=9092
# topic-name:partitions:in-sync-replicas:cleanup-policy
- KAFKA_CREATE_TOPICS="crypto_topic:1:1:compact"
networks:
- crypto-network
kafka-producer:
image: python:3-alpine
container_name: kafka-producer
command: >
sh -c "pip install -r /usr/src/producer/requirements.txt
amp;amp; python3 /usr/src/producer/kafkaProducerService.py"
volumes:
- ./kafkaProducer:/usr/src/producer
networks:
- crypto-network
cassandra:
image: cassandra:3.11.8
container_name: cassandra
hostname: cassandra
ports:
- "9042:9042"
#command:
# cqlsh -f /var/lib/cassandra/cql-queries.cql
volumes:
- ./cassandraData:/var/lib/cassandra
networks:
- crypto-network
spark-master:
image: bde2020/spark-master:2.4.5-hadoop2.7
container_name: spark-master
hostname: spark-master
ports:
- "8080:8080"
- "7077:7077"
- "6066:6066"
networks:
- crypto-network
spark-consumer-worker:
image: bde2020/spark-worker:2.4.5-hadoop2.7
container_name: spark-consumer-worker
environment:
- SPARK_MASTER=spark://spark-master:7077
ports:
- "8081:8081"
volumes:
- ./sparkConsumer:/sparkConsumer
networks:
- crypto-network
networks:
crypto-network:
driver: bridge
spark-submit
выполняется
docker exec -it spark-consumer-worker bash
/spark/bin/spark-submit --master $SPARK_MASTER --class processing.SparkRealTimePriceUpdates
--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5
/sparkConsumer/sparkconsumer_2.11-1.0-RELEASE.jar
Соответствующие части кода Spark
val inputDF: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "crypto_topic")
.load()
...
...
...
val queryPrice: StreamingQuery = castedDF
.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.start()
queryPrice.awaitTermination()
Ответ №1:
val inputDF: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "crypto_topic")
.load()
Эта часть кода на самом деле была
val inputDF: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe", KAFKA_TOPIC)
.load()
Где KAFKA_BOOTSTRAP_SERVERS
и KAFKA_TOPIC
считываются из файла конфигурации при локальной упаковке jar.
Лучшим способом отладки для меня было сделать журналы более подробными.
Локально значение KAFKA_BOOTSTRAP_SERVERS
было localhost:9092
, но в контейнере Docker оно было изменено на kafka:9092
в файле конфигурации там. Однако это не отразилось, поскольку JAR уже был упакован. Поэтому изменение значения на kafka:9092
при локальной упаковке исправило его.
Я был бы признателен за любую помощь в том, как заставить JAR динамически подбирать конфигурации. Я не хочу упаковывать через SBT в контейнер Docker.