Потоковая передача данных Twitter в корзину S3 с помощью firehose

#python #amazon-s3 #twitter #boto3 #amazon-kinesis-firehose

#python #amazon-s3 #Twitter #boto3 #amazon-kinesis-firehose

Вопрос:

Я пытаюсь передать данные из Twitter в корзину aws. Хорошей новостью является то, что я могу передавать данные в свою корзину, но данные поступают примерно по 20 КБ порциями (я думаю, это может быть связано с некоторыми настройками firehose), и они не сохраняются в формате JSON даже после того, как я указал его в своем коде на python с помощью JSON.LOAD. Вместо сохранения в формате JSON данные в моей корзине S3 выглядят так, как будто они не имеют расширения файла и содержат длинную строку буквенно-цифровых символов. Я думаю, что это может быть как-то связано с параметрами, используемыми в client.put_record()

Любая помощь приветствуется!

Пожалуйста, найдите мой код ниже, который я получил с github здесь .

 
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
import boto3
import time


#Variables that contains the user credentials to access Twitter API
consumer_key = "MY_CONSUMER_KEY"
consumer_secret = "MY_CONSUMER_SECRET"
access_token = "MY_ACCESS_TOKEN"
access_token_secret = "MY_SECRET_ACCESS_TOKEN"


#This is a basic listener that just prints received tweets to stdout.
class StdOutListener(StreamListener):

    def on_data(self, data):
        tweet = json.loads(data)
        try:
            if 'extended_tweet' in tweet.keys():
                #print (tweet['text'])
                message_lst = [str(tweet['id']),
                       str(tweet['user']['name']),
                       str(tweet['user']['screen_name']),
                       tweet['extended_tweet']['full_text'],
                       str(tweet['user']['followers_count']),
                       str(tweet['user']['location']),
                       str(tweet['geo']),
                       str(tweet['created_at']),
                       'n'
                       ]
                message = 't'.join(message_lst)
                print(message)
                client.put_record(
                    DeliveryStreamName=delivery_stream,
                    Record={
                    'Data': message
                    }
                )
            elif 'text' in tweet.keys():
                #print (tweet['text'])
                message_lst = [str(tweet['id']),
                       str(tweet['user']['name']),
                       str(tweet['user']['screen_name']),
                       tweet['text'].replace('n',' ').replace('r',' '),
                       str(tweet['user']['followers_count']),
                       str(tweet['user']['location']),
                       str(tweet['geo']),
                       str(tweet['created_at']),
                       'n'
                       ]
                message = 't'.join(message_lst)
                print(message)
                client.put_record(
                    DeliveryStreamName=delivery_stream,
                    Record={
                    'Data': message
                    }
                )
        except (AttributeError, Exception) as e:
                print (e)
        return True

    def on_error(self, status):
        print (status)
        
        
        
        
        
if __name__ == '__main__':

    #This handles Twitter authetification and the connection to Twitter Streaming API
    listener = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    #tweets = Table('tweets_ft',connection=conn)
    client = boto3.client('firehose', 
                          region_name='us-east-1',
                          aws_access_key_id='MY ACCESS KEY',
                          aws_secret_access_key='MY SECRET KEY' 
                          )

    delivery_stream = 'my_firehose'
    #This line filter Twitter Streams to capture data by the keywords: 'python', 'javascript', 'ruby'
    #stream.filter(track=['trump'], stall_warnings=True)
    while True:
        try:
            print('Twitter streaming...')
            stream = Stream(auth, listener)
            stream.filter(track=['brexit'], languages=['en'], stall_warnings=True)
        except Exception as e:
            print(e)
            print('Disconnected...')
            time.sleep(5)
            continue   
  

Ответ №1:

Возможно, вы включили сжатие S3 для вашего firehose . Пожалуйста, убедитесь, что сжатие отключено, если вы хотите сохранить необработанные данные json в своей корзине:

введите описание изображения здесь

К вам также может быть применено некоторое преобразование firehose , которое кодирует or otherwise transform ваши сообщения json в какой-либо другой формат.

Комментарии:

1. Привет, Марчин, спасибо за ответ. Я проверил настройки сжатия, и они действительно отключены, оба параметра «преобразование исходной записи» и «преобразование формата записи» отключены. Нужно ли мне создавать функцию lmda для преобразования записи в JSON?

Ответ №2:

Похоже, что файлы загружались с форматированием JSON, мне просто нужно было открыть файлы в S3 с помощью firefox, и я мог видеть содержимое файлов. Проблема с размерами файлов связана с настройками буфера firehose, у меня они установлены на наименьшее значение, поэтому файлы отправлялись такими маленькими порциями