Поток Kafka-spark не работает в Jupyter

#python #apache-spark #pyspark #apache-kafka

#python #apache-spark #pyspark #apache-kafka

Вопрос:

Я использую несколько контейнеров, Kafka, spark и zookeeper. Я отправил данные в раздел Kafka (Kafka работает и обрабатывает данные) и подключил spark к этой теме, но если я использую «KafkaUtils.createDirectStream», Jupiter не выдает выходных данных. не могли бы вы мне помочь? контейнеры docker взяты из архитектуры Lambda

 # # Kafka/ pyspark streaming

#Kafka retrieve topics

import sys

from kafka import KafkaClient

client = KafkaClient(bootstrap_servers='localhost:9092')

future = client.cluster.request_update()
client.poll(future=future)

metadata = client.cluster
print(metadata.topics())

#Import libraries to load csv data on topic 'test'

from kafka import KafkaProducer

import logging
from json import dumps, loads
import csv
logging.basicConfig(level=logging.INFO)

#load csv data into kafka topic 'test

producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda 
   K:dumps(K).encode('utf-8'))

with open('/Users/karsten/Desktop/Datensets/divvy_data.csv', 'r') as file:
    reader = csv.reader(file)
    for messages in reader:
        producer.send('test', messages)
        producer.flush()

import findspark
findspark.init()
import pyspark


def spark_context_creator():
    conf = SparkConf()
    #set name for our app
    conf.setAppName('divvy_test')
    #the master url to connect
    conf.setMaster('spark://spark-master:7077')
    sc = None
    try:
        sc.stop()
        sc = SparkContext(conf=conf)
    except:
        sc = SparkContext(conf=conf)
    return sc

sc = spark_context_creator()

    
ssc = StreamingContext(sc,1)


kafkastream = KafkaUtils.createDirectStream(ssc, 'zookeeper:2181', 'my-created-consumer-group', {'test':1})
#extract json data from tupil
data = kafkastream.map(lambda x: json.loads(x[1])).pprint()
print(type(data))
rdd = scc.parallelize(data)
print(type(rdd))
ssc.start()
ssc.awaitTermination()


print(rdd)
 

Комментарии:

1. localhost относится к контейнеру Jupyter, а не к брокеру Kafka…. В противном случае, если это выполняется на хосте, то zookeeper и spark-master не разрешимые имена хостов

2. спасибо за ваш ответ. записная книжка jupyter запускается не в контейнере, а на локальном хосте. kafka работает в контейнере docker, и обычно я должен обращаться к ним через localhost:9092, потому что я настроил KAFKA_ADVERTISED_LISTENERS: ‘PLAINTEXT:// localhost:9092’ в файле compose. Но это не работает. В файле compose я настроил spark следующим образом.. SPARK_DRIVER_HOST=192.168.1.5, но как я могу получить доступ к мастеру?

3. Тогда та же проблема. Spark фактически выполняет код. Это в контейнере, поэтому он не сможет подключиться к localhost, чтобы найти Kafka. И, как я уже сказал, zookeeper и spark-master являются неизвестными DNS-именами для вашего хоста

4. Вам действительно нужна искра? У вас уже есть библиотека Python Kafka, которая может использовать ваши данные

5. есть ли решение для этого? Я должен иметь возможность каким-то образом получить доступ к kafka в контейнере с помощью pyspark из jupyter?