Spark streaming Twitter API

#apache-spark #twitter #apache-spark-sql #streaming

#apache-spark #Twitter #apache-spark-sql #потоковая передача

Вопрос:

Я запускаю следующий .py, который создает сокет и ожидает подключения:

 def get_tweets():
    url = 'https://stream.twitter.com/1.1/statuses/filter.json'
    query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')]
    query_url = url   '?'   'amp;'.join([str(t[0])   '='   str(t[1]) for t in query_data])
    response = requests.get(query_url, auth=my_auth, stream=True)
    print(query_url, response)
    return response


def send_tweets_to_spark(http_resp, tcp_connection):
    for line in http_resp.iter_lines():
        try:
            full_tweet = json.loads(line)
            tweet_text = full_tweet['text']
            print("Tweet Text: "   tweet_text)
            print ("------------------------------------------")
            tcp_connection.send(tweet_text   'n')
        except:
            e = sys.exc_info()[0]
            print("Error: %s" % e)


TCP_IP = '127.0.0.1'
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
print(resp, conn)
send_tweets_to_spark(resp, conn)
 

Затем я запускаю spark-submit следующий скрипт Spark streaming, который должен подсчитывать твиты каждые 2 секунды:

 def aggregate_tags_count(new_values, total_sum):
    return sum(new_values)   (total_sum or 0)


def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']


def process_rdd(time, rdd):
    print("----------- %s -----------" % str(time))
    sql_context = get_sql_context_instance(rdd.context)
    row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
    hashtags_df = sql_context.createDataFrame(row_rdd)
    hashtags_df.registerTempTable("hashtags")
    hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
    hashtag_counts_df.show()
    send_df_to_dashboard(hashtag_counts_df)


conf = SparkConf()
conf.setAppName("TwitterStreamApp")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
ssc.checkpoint("checkpoint_TwitterApp")
dataStream = ssc.socketTextStream("127.0.0.1",9009)

words = dataStream.flatMap(lambda line: line.split(" "))
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
tags_totals.foreachRDD(process_rdd)
ssc.start()
ssc.awaitTermination()
 

Это запускает приложения, как я вижу, в запущенном ВЕБ-интерфейсе. Моя проблема в том, что когда я запускаю приложения spark, он попадает в первый скрипт, который отправляет твит, но выводит пустой RDD. Ошибка, как показано ниже:

 20/12/30 08:53:56 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20201230085356-0012/0 is now RUNNING
20/12/30 08:53:57 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
----------- 2020-12-30 08:54:20 -----------
20/12/30 08:54:24 ERROR JobScheduler: Error running job streaming job 1609318460000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/home/ubuntu/market_risk/utils/spark_twitter_count.py", line 26, in process_rdd
    hashtags_df = sql_context.createDataFrame(row_rdd)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 320, in createDataFrame
    return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 605, in createDataFrame
    return self._create_dataframe(data, schema, samplingRatio, verifySchema)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 628, in _create_dataframe
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 425, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio, names=schema)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 396, in _inferSchema
    first = rdd.first()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1467, in first
    raise ValueError("RDD is empty")
ValueError: RDD is empty
 

Комментарии:

1. почему вы используете Streaming API вместо StructuredStreaming?

2. @AlexOtt спасибо за предложение. Я не знаю всех решений. Также я считаю неудобным иметь два запущенных файла. Не могли бы вы сказать мне, возможно ли иметь соединение с сокетом И потоковый процесс ETL в одном .py файле? Или это стандарт — иметь отдельные процессы?

3. Как правило, потребители и производители разделены

4. возьмите лучше этот скрипт для извлечения твитов и отправки их в сокет: github.com/aaronregis/TwitterSentiment/blob/master / … — он обрабатывает несколько потребителей без проблем