#python #apache-spark #pyspark #apache-kafka
#python #apache-spark #pyspark #apache-kafka
Вопрос:
Я использую несколько контейнеров, Kafka, spark и zookeeper. Я отправил данные в раздел Kafka (Kafka работает и обрабатывает данные) и подключил spark к этой теме, но если я использую «KafkaUtils.createDirectStream», Jupiter не выдает выходных данных. не могли бы вы мне помочь? контейнеры docker взяты из архитектуры Lambda
# # Kafka/ pyspark streaming
#Kafka retrieve topics
import sys
from kafka import KafkaClient
client = KafkaClient(bootstrap_servers='localhost:9092')
future = client.cluster.request_update()
client.poll(future=future)
metadata = client.cluster
print(metadata.topics())
#Import libraries to load csv data on topic 'test'
from kafka import KafkaProducer
import logging
from json import dumps, loads
import csv
logging.basicConfig(level=logging.INFO)
#load csv data into kafka topic 'test
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda
K:dumps(K).encode('utf-8'))
with open('/Users/karsten/Desktop/Datensets/divvy_data.csv', 'r') as file:
reader = csv.reader(file)
for messages in reader:
producer.send('test', messages)
producer.flush()
import findspark
findspark.init()
import pyspark
def spark_context_creator():
conf = SparkConf()
#set name for our app
conf.setAppName('divvy_test')
#the master url to connect
conf.setMaster('spark://spark-master:7077')
sc = None
try:
sc.stop()
sc = SparkContext(conf=conf)
except:
sc = SparkContext(conf=conf)
return sc
sc = spark_context_creator()
ssc = StreamingContext(sc,1)
kafkastream = KafkaUtils.createDirectStream(ssc, 'zookeeper:2181', 'my-created-consumer-group', {'test':1})
#extract json data from tupil
data = kafkastream.map(lambda x: json.loads(x[1])).pprint()
print(type(data))
rdd = scc.parallelize(data)
print(type(rdd))
ssc.start()
ssc.awaitTermination()
print(rdd)
Комментарии:
1.
localhost
относится к контейнеру Jupyter, а не к брокеру Kafka…. В противном случае, если это выполняется на хосте, тоzookeeper
иspark-master
не разрешимые имена хостов2. спасибо за ваш ответ. записная книжка jupyter запускается не в контейнере, а на локальном хосте. kafka работает в контейнере docker, и обычно я должен обращаться к ним через localhost:9092, потому что я настроил KAFKA_ADVERTISED_LISTENERS: ‘PLAINTEXT:// localhost:9092’ в файле compose. Но это не работает. В файле compose я настроил spark следующим образом.. SPARK_DRIVER_HOST=192.168.1.5, но как я могу получить доступ к мастеру?
3. Тогда та же проблема. Spark фактически выполняет код. Это в контейнере, поэтому он не сможет подключиться к localhost, чтобы найти Kafka. И, как я уже сказал, zookeeper и spark-master являются неизвестными DNS-именами для вашего хоста
4. Вам действительно нужна искра? У вас уже есть библиотека Python Kafka, которая может использовать ваши данные
5. есть ли решение для этого? Я должен иметь возможность каким-то образом получить доступ к kafka в контейнере с помощью pyspark из jupyter?