#apache-spark #twitter #apache-spark-sql #streaming
#apache-spark #Twitter #apache-spark-sql #потоковая передача
Вопрос:
Я запускаю следующий .py, который создает сокет и ожидает подключения:
def get_tweets():
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')]
query_url = url '?' 'amp;'.join([str(t[0]) '=' str(t[1]) for t in query_data])
response = requests.get(query_url, auth=my_auth, stream=True)
print(query_url, response)
return response
def send_tweets_to_spark(http_resp, tcp_connection):
for line in http_resp.iter_lines():
try:
full_tweet = json.loads(line)
tweet_text = full_tweet['text']
print("Tweet Text: " tweet_text)
print ("------------------------------------------")
tcp_connection.send(tweet_text 'n')
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
TCP_IP = '127.0.0.1'
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
print(resp, conn)
send_tweets_to_spark(resp, conn)
Затем я запускаю spark-submit
следующий скрипт Spark streaming, который должен подсчитывать твиты каждые 2 секунды:
def aggregate_tags_count(new_values, total_sum):
return sum(new_values) (total_sum or 0)
def get_sql_context_instance(spark_context):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
return globals()['sqlContextSingletonInstance']
def process_rdd(time, rdd):
print("----------- %s -----------" % str(time))
sql_context = get_sql_context_instance(rdd.context)
row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
hashtags_df = sql_context.createDataFrame(row_rdd)
hashtags_df.registerTempTable("hashtags")
hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
hashtag_counts_df.show()
send_df_to_dashboard(hashtag_counts_df)
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
ssc.checkpoint("checkpoint_TwitterApp")
dataStream = ssc.socketTextStream("127.0.0.1",9009)
words = dataStream.flatMap(lambda line: line.split(" "))
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
tags_totals.foreachRDD(process_rdd)
ssc.start()
ssc.awaitTermination()
Это запускает приложения, как я вижу, в запущенном ВЕБ-интерфейсе. Моя проблема в том, что когда я запускаю приложения spark, он попадает в первый скрипт, который отправляет твит, но выводит пустой RDD. Ошибка, как показано ниже:
20/12/30 08:53:56 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20201230085356-0012/0 is now RUNNING
20/12/30 08:53:57 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
----------- 2020-12-30 08:54:20 -----------
20/12/30 08:54:24 ERROR JobScheduler: Error running job streaming job 1609318460000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "/home/ubuntu/market_risk/utils/spark_twitter_count.py", line 26, in process_rdd
hashtags_df = sql_context.createDataFrame(row_rdd)
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 320, in createDataFrame
return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema)
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 605, in createDataFrame
return self._create_dataframe(data, schema, samplingRatio, verifySchema)
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 628, in _create_dataframe
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 425, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio, names=schema)
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 396, in _inferSchema
first = rdd.first()
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1467, in first
raise ValueError("RDD is empty")
ValueError: RDD is empty
Комментарии:
1. почему вы используете Streaming API вместо StructuredStreaming?
2. @AlexOtt спасибо за предложение. Я не знаю всех решений. Также я считаю неудобным иметь два запущенных файла. Не могли бы вы сказать мне, возможно ли иметь соединение с сокетом И потоковый процесс ETL в одном
.py
файле? Или это стандарт — иметь отдельные процессы?3. Как правило, потребители и производители разделены
4. возьмите лучше этот скрипт для извлечения твитов и отправки их в сокет: github.com/aaronregis/TwitterSentiment/blob/master / … — он обрабатывает несколько потребителей без проблем