Pyspark’s df.writeStream не генерирует выходных данных

#python #apache-spark #elasticsearch #pyspark #spark-streaming

#python #apache-spark #elasticsearch #pyspark #искровая потоковая передача

Вопрос:

Я пытаюсь сохранить твиты из моего кластера kafka в Elastic Search. Изначально я установил формат вывода 'org.elasticsearch.spark.sql' равным . Но он не создал индекс.

Я попытался изменить формат на «консоль», чтобы проверить работу потоковой передачи. Но он также ничего не выводит на консоль.

Я предполагаю, что это проблема с моими потоковыми фреймами данных. Но, похоже, я не могу выяснить, в чем именно проблема.

Это мой полный код для потребителя (потоковая передача Spark):

 import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.elasticsearch:elasticsearch-hadoop:7.6.2 pyspark-shell'

from pyspark import SparkContext,SparkConf
#    Spark Streaming
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json
import nltk
import logging
from datetime import datetime
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

def evaluate_sentiment(avg):
    try:
        if avg < 0:
            return 'Negative'
        elif avg > 0:
            return 'Positive'
        else:
            return 'Neutral'
    except TypeError:
        return 'Neutral'
    
eval_udf = udf(evaluate_sentiment,StringType())

def start_stream(df):
    df.writeStream.format('console').start()


conf = SparkConf().setAppName('twitter_analysis')
spark = SparkSession.builder.appName('twitter_analysis').getOrCreate()
conf.set("es.index.auto.create", "true")

schema = StructType([StructField("date", TimestampType(), True),
                    StructField("user", StringType(), True),
                    StructField("text", StringType(), True),
                    StructField("reply_count", IntegerType(), True),
                    StructField("retweet_count", IntegerType(), True),
                    StructField("favorite_count", IntegerType(), True),
                    StructField("sentiment_score", DecimalType(), True)])

kafkaStream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.0.10:9092").option("subscribe", "twitter_analysis")
.option('failOnDataLoss',False).load()

parsed_df = kafkaStream.select(from_json(col('value').cast('string'),schema).alias('parsed_value')) 
            .withColumn('timestamp', lit(current_timestamp()))

mdf = parsed_df.select('parsed_value.*', 'timestamp')


evaluated_df = mdf.withColumn('status',eval_udf('sentiment_score'))
               .withColumn('date',to_date(col('timestamp')))

start_stream(evaluated_df)

  

Что может быть причиной этой проблемы? Должен ли он что-либо делать со схемой, которую я определил?

Пример данных JSON, которые отправляются из кластера Kafka в потоковую передачу spark :

 {"date": "2020-11-07 21:02:39", "user": "TalhianeM", "text": "RT @amin_goat: Non, des problu00e8mes de vote dans une du00e9mocratie occidentale ?nnOn mu2019avait assuru00e9 que cela nu2019arrivait quu2019en Afrique pourtant.", "reply_count": 0, "retweet_count": 0, "favorite_count": 0, "sentiment_score": 0.0}

  

Может кто-нибудь, пожалуйста, помочь мне решить эту проблему? Я пробовал несколько методов, но, похоже, ничего не работает при отправке потоков данных в Elastic Search.

ОБНОВЛЕНИЕ: я решил это. Похоже, возникла проблема с хостом .

Комментарии:

1. Просто для подтверждения: print(evaluated_df.isStream) возвращает True, верно?

2. ДА. Я не пробовал . Но, поскольку это потоковый набор данных, я полагаю, он должен возвращать true .

3. Можете ли вы закрыть вопрос, поскольку это ошибка пользователя?

4. Поскольку я запустил щедрость, это не позволяет мне закрыть вопрос. Итак, я сделаю это, как только срок действия вознаграждения истечет.