#python #apache-spark #elasticsearch #pyspark #spark-streaming
#python #apache-spark #elasticsearch #pyspark #искровая потоковая передача
Вопрос:
Я пытаюсь сохранить твиты из моего кластера kafka в Elastic Search. Изначально я установил формат вывода 'org.elasticsearch.spark.sql'
равным . Но он не создал индекс.
Я попытался изменить формат на «консоль», чтобы проверить работу потоковой передачи. Но он также ничего не выводит на консоль.
Я предполагаю, что это проблема с моими потоковыми фреймами данных. Но, похоже, я не могу выяснить, в чем именно проблема.
Это мой полный код для потребителя (потоковая передача Spark):
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.elasticsearch:elasticsearch-hadoop:7.6.2 pyspark-shell'
from pyspark import SparkContext,SparkConf
# Spark Streaming
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
import nltk
import logging
from datetime import datetime
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
def evaluate_sentiment(avg):
try:
if avg < 0:
return 'Negative'
elif avg > 0:
return 'Positive'
else:
return 'Neutral'
except TypeError:
return 'Neutral'
eval_udf = udf(evaluate_sentiment,StringType())
def start_stream(df):
df.writeStream.format('console').start()
conf = SparkConf().setAppName('twitter_analysis')
spark = SparkSession.builder.appName('twitter_analysis').getOrCreate()
conf.set("es.index.auto.create", "true")
schema = StructType([StructField("date", TimestampType(), True),
StructField("user", StringType(), True),
StructField("text", StringType(), True),
StructField("reply_count", IntegerType(), True),
StructField("retweet_count", IntegerType(), True),
StructField("favorite_count", IntegerType(), True),
StructField("sentiment_score", DecimalType(), True)])
kafkaStream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.0.10:9092").option("subscribe", "twitter_analysis")
.option('failOnDataLoss',False).load()
parsed_df = kafkaStream.select(from_json(col('value').cast('string'),schema).alias('parsed_value'))
.withColumn('timestamp', lit(current_timestamp()))
mdf = parsed_df.select('parsed_value.*', 'timestamp')
evaluated_df = mdf.withColumn('status',eval_udf('sentiment_score'))
.withColumn('date',to_date(col('timestamp')))
start_stream(evaluated_df)
Что может быть причиной этой проблемы? Должен ли он что-либо делать со схемой, которую я определил?
Пример данных JSON, которые отправляются из кластера Kafka в потоковую передачу spark :
{"date": "2020-11-07 21:02:39", "user": "TalhianeM", "text": "RT @amin_goat: Non, des problu00e8mes de vote dans une du00e9mocratie occidentale ?nnOn mu2019avait assuru00e9 que cela nu2019arrivait quu2019en Afrique pourtant.", "reply_count": 0, "retweet_count": 0, "favorite_count": 0, "sentiment_score": 0.0}
Может кто-нибудь, пожалуйста, помочь мне решить эту проблему? Я пробовал несколько методов, но, похоже, ничего не работает при отправке потоков данных в Elastic Search.
ОБНОВЛЕНИЕ: я решил это. Похоже, возникла проблема с хостом .
Комментарии:
1. Просто для подтверждения: print(evaluated_df.isStream) возвращает True, верно?
2. ДА. Я не пробовал . Но, поскольку это потоковый набор данных, я полагаю, он должен возвращать true .
3. Можете ли вы закрыть вопрос, поскольку это ошибка пользователя?
4. Поскольку я запустил щедрость, это не позволяет мне закрыть вопрос. Итак, я сделаю это, как только срок действия вознаграждения истечет.