#amazon-web-services #pyspark #amazon-sqs #aws-glue
#amazon-web-services #pyspark #amazon-sqs #aws-glue
Вопрос:
Привет, я работаю с AWS glue spark. Я собираю данные из таблицы dynamodb и создаю из нее динамический фрейм. Я хочу иметь возможность отправлять все данные из этой таблицы, запись за записью в sqs. Я видел другое предложение по преобразованию динамического фрейма в spark dataframe. Но это будет таблица с миллионами записей. Преобразование в фрейм данных может занять некоторое время. Я хочу иметь возможность просто отправлять все записи в динамическом фрейме в очередь sqs.
Вот мой код:
sqs = boto3.resource('sqs')
sqs_queue_url = f"https://sqs.us-east-1.amazonaws.com/{account_id}/my-stream-queue"
queue = sqs.Queue(sqs_queue_url)
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job = Job(glueContext)
## @params: [JOB_NAME]
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
df = glueContext.create_dynamic_frame.from_options("dynamodb",
connection_options={
"dynamodb.input.tableName": "my_table",
"dynamodb.throughput.read.percent": "1.5",
"dynamodb.splits": "500"
},
numSlots=2368)
job.commit()
# iterate over dynamic frame and send each record over the sqs queue
for record in df:
queue.send_message(MessageBody=record)
Ответ №1:
Я делаю что-то очень похожее. Вот что я обнаружил:
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database="athena",
table_name=str(args['value']),
transformation_ctx="datasource0")
job.commit()
df = datasource0.toDF()
pandasDF = df.toPandas()
for index, row in pandasDF.iterrows():
message_body = generate_message(
row['bucket'], row['key'], row['version_id'])
send_message(sqs_queue, json.loads(json.dumps(message_body)))
Комментарии:
1. Вместо того, чтобы выбрасывать кучу кода, вы должны дать краткое представление о том, как будет работать ваше решение, любая связанная документация и т. Д