Как направить поток (kafka) файла JSON в spark и преобразовать его в RDD?

#apache-spark #pyspark #apache-kafka #apache-spark-sql

#apache-spark #pyspark #apache-kafka #apache-spark-sql

Вопрос:

Написал код, который направляет количество слов streams (kafka) при передаче файла (в producer)

код :

 from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) 
        .map(lambda word: (word, 1)) 
        .reduceByKey(lambda a, b: a b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()
  

Необходимо преобразовать входной файл json в spark Dataframe с помощью Dstream.

Ответ №1:

Это должно сработать:

Как только у вас будет переменная, содержащая преобразованный DStream kvs , вы можете просто создать карту DStreams и передать данные в функцию-обработчик, подобную этой:

 data = kvs.map( lambda tuple: tuple[1] )
data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )
  

Вы должны определить функцию-обработчик, которая должна создать фрейм данных, используя ваши данные JSON:

 def readMyRddsFromKafkaStream( readRdd ):
  # Put RDD into a Dataframe
  df = spark.read.json( readRdd )
  df.registerTempTable( "temporary_table" )
  df = spark.sql( """
    SELECT
      *
    FROM
      temporary_table
  """ )
  df.show()
  

Надеюсь, это поможет моим друзьям 🙂