#apache-spark #pyspark #apache-kafka #apache-spark-sql
#apache-spark #pyspark #apache-kafka #apache-spark-sql
Вопрос:
Написал код, который направляет количество слов streams (kafka) при передаче файла (в producer)
код :
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Необходимо преобразовать входной файл json в spark Dataframe с помощью Dstream.
Ответ №1:
Это должно сработать:
Как только у вас будет переменная, содержащая преобразованный DStream kvs
, вы можете просто создать карту DStreams и передать данные в функцию-обработчик, подобную этой:
data = kvs.map( lambda tuple: tuple[1] )
data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )
Вы должны определить функцию-обработчик, которая должна создать фрейм данных, используя ваши данные JSON:
def readMyRddsFromKafkaStream( readRdd ):
# Put RDD into a Dataframe
df = spark.read.json( readRdd )
df.registerTempTable( "temporary_table" )
df = spark.sql( """
SELECT
*
FROM
temporary_table
""" )
df.show()
Надеюсь, это поможет моим друзьям