#amazon-s3 #aws-lambda #amazon-athena
Вопрос:
Я пытаюсь сохранить данные JSON, которые были сброшены во входное хранилище S3, и преобразовать файл в CSV в другом месте выходного хранилища S3, используя выполнение начального запроса Athena.
- Я использую большой запрос, который будет вставлен во временную таблицу (с помощью INSERT INTO).
- Эта таблица разделена на год, месяц, день и час.
- Используя AWS Glue, я смог настроить storage.location.template для таблицы запросов (см. Очистку экрана).
s3://prod-cog-kahala-test-output/data/landing/olo/baja/year=${year}/month=${month}/day=${day}/hour=${hour}
- Я также использую прогноз года, часа, месяца и дня, используя AWS Gue в этой таблице. (См. царапину на экране)
Этот выходной патч динамически создается на основе даты и времени, когда произошло событие. Он будет хранить CSV-файлы из JSON, которые были созданы во время этого события во время запроса Афины. Выходной путь должен выглядеть следующим образом:
Я использую python lambda для извлечения значения даты события записи события, а затем, используя запрос Athena, вывожу csv-файлы в динамический путь вывода
Примечание: Я смог успешно выполнить это только с использованием статического пути S3, но не динамического пути S3, что является обязательным требованием .
Когда я принимаю/извлекаю входной файл JSON в буклете ввода S3, я получаю следующую ошибку, когда Athena запускает запрос с использованием динамического пути S3:
(<class 'botocore.errorfactory.InvalidRequestException'>, InvalidRequestException("An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 45:64: mismatched input '{'. Expecting: '*', <expression>, <identifier>"), <traceback object at 0x7fac83b93300>).
Пожалуйста, помогите мне определить, что я делаю неправильно. Большое спасибо
Вот лямбда — код:
def lambda_handler(event, context):
try:
print('<< IN Handler >>')
print(event)
print("File size is", event['Records'][0]['s3']['object']['size'])
if(event['Records'][0]['s3']['object']['size'] > 0):
file_path = event['Records'][0]['s3']['object']['key']
print('<<FILEPATH >>' file_path)
bucket_name = os.environ['BUCKETBAJA']
file_name = file_path.split("/")[-1]
print(file_name)
#process the input file time stamp to extrac tthe year,month,day
parse_filename = file_name.split(today_year)
#Get date and hour
parse_date = event['Records'][0]['eventTime']
#print(parse_date)
dateStr = parse_date.split("-")
print('<< PROCESS DATE >>' dateStr[2])
fyear= dateStr[0]
fmonth= dateStr[1]
fday = dateStr[2].split("T")
#print('REST OF STRING ' fday[1])
parse_more = fday
#print(parse_more)
hour = fday[1].split(":")
#print(hour)
fhour = hour[0]
fday = fday[0]
print('<< DAY >>' fday)
print('<< HOUR >>' fhour)
print('<< MONTH >>' fmonth)
process_date = fyear '-' fmonth '-' fday
start_date = fyear '-' fmonth '-' fday
end_date = fyear '-' fmonth '-' fday
output_prefix= "all_dates/year=/" fyear "/month=" fmonth '/day=' fday '/hour=' fhour
print('<< OUT_PREFIX >>' output_prefix)
clients3 = boto3.client('s3')
result = clients3.list_objects(Bucket=bucket_name, Prefix=output_prefix )
exists=False
athena_out_path = 's3://' bucket_name '/all-dates/year=' fyear '/month=' fmonth '/day=' fday '/hour=' fhour
print('<<< INSIDE BAJA TABLE DUMP NEXT ... >>>>')
#list_tables = table_queries_v1
if 'coldstone' in file_name:
query = kahala_coldstone
bucket_name = os.environ['BUCKETCOLDSTONE']
athena_out_path = 's3://athena-' bucket_name '/all-dates/year=' fyear '/month=' fmonth '/day=' fday '/hour=' fhour
else:
query = kahala_baja
print('<<QUERY >>')
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': athena_out_path,
}
)
else:
print("Empty File, exiting")
except:
print("Unknown Error")
print(sys.exc_info())
Вот код запроса:
"""insert into olo_baja_insert_to_csv
select cast(first_name as varchar) first_name,
cast(contact_number as varchar) contact_number,
cast(membership_number as varchar) membership_number,
cast(olo_customer_id as varchar) olo_customer_id,
cast(login_providers as varchar) login_providers,
cast(external_reference as varchar) external_reference,
cast(olo_email_address as varchar) olo_email_address,
cast(last_name as varchar) last_name,
cast(loyalty_scheme as varchar) loyalty_scheme,
cast(product_id as varchar) product_id,
cast(modifier_detail['modifierid'] as varchar) modifier_detail_modifier_id,
cast(modifier_detail['description'] as varchar) modifier_detail_description,
cast(modifier_detail['vendorspecificmodifierid'] as varchar) modifier_detail_vendor_specific_modifier_id,
cast(modifier_detail['modifiers'] as varchar) modifier_detail_modifiers,
cast(modifier_detail['modifierquantity'] as varchar)
modifier_detail_modifier_quantity,
cast(modifier_detail['customfields'] as varchar) modifier_detail_custom_fields,
cast(modifier_quantity as varchar) modifier_quantity,
cast(modifier_custom_fields as varchar) modifier_custom_fields,
cast(delivery as varchar) delivery,
cast(total as varchar) total,
cast(subtotal as varchar) subtotal,
cast(discount as varchar) discount,
cast(tip as varchar) tip,
cast(sales_tax as varchar) sales_tax,
cast(customer_delivery as varchar) customer_delivery,
cast(payment_amount as varchar) payment_amount,
cast(payment_description as varchar) payment_description,
cast(payment_type as varchar) payment_type,
cast(location_lat as varchar) location_lat,
cast(location_long as varchar) location_long,
cast(location_name as varchar) location_name,
cast(location_logo as varchar) location_logo,
cast(ordering_provider_name as varchar) ordering_provider_name,
cast(ordering_provider_slug as varchar) ordering_provider_slug,year,month,day, hour
from(
select cast(first_name as varchar) first_name,
cast(contact_number as varchar) contact_number,
cast(membership_number as varchar) membership_number,
cast(olo_customer_id as varchar) olo_customer_id,
cast(login_providers as varchar) login_providers,
cast(external_reference as varchar) external_reference,
cast(olo_email_address as varchar) olo_email_address,
cast(last_name as varchar) last_name,
cast(loyalty_scheme as varchar) loyalty_scheme,
cast(product_id as varchar) product_id,
cast(special_instructions as varchar) special_instructions,
cast(quantity as varchar) quantity,
cast(recipient_name as varchar) recipient_name,
cast(custom_values as varchar) custom_values,
cast(item_description as varchar) item_description,
cast(item_selling_price as varchar) item_selling_price,
cast(modifier['sellingprice'] as varchar) pre_modifier_selling_price,
cast(modifier['modifierid'] as varchar) modifier_id,
cast(modifier['description'] as varchar) modifier_description,
cast(modifier['vendorspecificmodifierid'] as varchar) vendor_specific_modifierid,
cast(modifier['modifiers'] as varchar) modifier_details,
cast(modifier['modifierquantity'] as varchar) modifier_quantity,
cast(modifier['customfields'] as varchar) modifier_custom_fields,
cast(delivery as varchar),cast(total as varchar),cast(subtotal as varchar),
cast(discount as varchar),cast(tip as varchar),cast(sales_tax as varvchar),cast(customer_delivery as varchar), cast(payment_amount as varchar), cast(payment_description as varchar),
cast(payment_type as varchar),cast(location_lat as varchar),cast(location_long as varchar), cast(location_name as varchar),
cast(location_logo as varchar),
cast(ordering_provider_name as varchar), cast(ordering_provider_slug as varchar), year,month,day,hour
from(
select
cast(json_extract(customer, '$.firstname') as varchar) as first_name,
cast(json_extract(customer, '$.contactnumber') as varchar) as contact_number,
cast(json_extract(customer, '$.membershipnumber') as varchar) as membership_number,
cast(json_extract(customer, '$.customerid') as varchar) as olo_customer_id,
cast(json_extract(customer, '$.loginproviders') as array<map<varchar,varchar>>) as login_providers,
cast(json_extract(customer, '$.externalreference') as varchar) as external_reference,
cast(json_extract(customer, '$.email') as varchar) as olo_email_address,
cast(json_extract(customer, '$.lastname') as varchar) as last_name,
cast(json_extract(customer, '$.loyaltyscheme') as varchar) as loyalty_scheme,
try_cast(json_extract("item", '$.productid') as varchar) product_id,
try_cast(json_extract("item", '$.specialinstructions') as varchar) special_instructions,
try_cast(json_extract("item", '$.quantity') as varchar) quantity,
try_cast(json_extract("item", '$.recipientname') as varchar) recipient_name,
try_cast(json_extract("item", '$.customvalues') as varchar) custom_values,
try_cast(json_extract("item", '$.description') as varchar) item_description,
try_cast(json_extract("item", '$.sellingprice') as varchar) item_selling_price,
cast(json_extract("item", '$.modifiers') as array<map<varchar,json>>) modifiers,
cast(json_extract(totals, '$.delivery') as varchar) delivery,
cast(json_extract(totals, '$.total') as varchar) total,
cast(json_extract(totals, '$.subtotal') as varchar) subtotal,
cast(json_extract(totals, '$.discount') as varchar) discount,
cast(json_extract(totals, '$.tip') as varchar) tip,
cast(json_extract(totals, '$.salestax') as varchar) sales_tax,
cast(json_extract(totals, '$.customerdelivery') as varchar) customer_delivery,
payment['amount'] payment_amount,
payment['description'] payment_description,
payment['type'] payment_type,
cast(json_extract("location", '$.latitude') as varchar) location_lat,
cast(json_extract("location", '$.longitude') as varchar) location_long,
cast(json_extract("location", '$.name') as varchar) location_name,
cast(json_extract("location", '$.logo') as varchar) location_logo,
cast(json_extract("orderingprovider", '$.name') as varchar) ordering_provider_name,
cast(json_extract("orderingprovider", '$.slug') as varchar) ordering_provider_slug,hour
from sandbox_twilliams.olo_baja_raw_john_testing
cross join unnest (payments, "items") as t (payment, item)
--cross join unnest ("items") as t (item)
--cross join unnest (payments) as t (payment)
where year = {fyear}
and month = {fmonth}
and day = {fday}
and hour = {fhour}
).format(year={fyear},month={fmonth},day={fday}, hour={fhour})
CROSS JOIN UNNEST (modifiers) as t (modifier)
)
CROSS JOIN UNNEST (cast(modifier_details as array<map<varchar,json>>)) as t (modifier_detail)
group by 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45;"""
Ответ №1:
В вашем запросе есть синтаксическая ошибка, но вы не включаете запрос в вопрос, поэтому трудно понять, что происходит не так. Похоже, вы печатаете SQL-запрос в своем журнале, поэтому я предлагаю взять этот SQL и запустить его вручную в консоли Athena и посмотреть, сможете ли вы понять из сообщения об ошибке, что не так.
С другой стороны, преобразование в CSV лучше всего выполнять через UNLOAD
. Результаты запроса Афины-это CSV, но Афина также записывает файл двоичных метаданных вместе с файлом CSV, что может все испортить, если вы ожидаете, что выходной каталог будет содержать только данные CSV.
Комментарии:
1. Извините. Я забыл добавить запрос . Он огромен, и он использует перекрестные соединения.
2. хорошо @Theo Я обновил вопрос, чтобы сделать его более читабельным, и добавил запрос.
3. Итак, что произошло, когда вы попытались исправить синтаксическую ошибку в сообщении об ошибке?
4. Это была абсолютная неразбериха, пытаясь заставить оператор insert в этом запросе функционировать. В итоге я внес некоторые изменения в запрос, и теперь файл создается. Я думаю, что единственная причина, по которой запрос не выполнялся, заключалась в том, что {месяц} должен был быть {месяц} (месяц использовался для хранения значения месяца с даты события) . Я потратил большую часть своего времени, пытаясь заставить вставку работать, но я искренне не думаю, что можно сделать эту вставку из JSON SerDe в CSV SerDe. Хотя понятия не имею, почему это так.
Ответ №2:
Как я уже прокомментировал @Theo , это была абсолютная неразбериха, пытаясь заставить insert into
оператор — по этому запросу — функционировать. Пришлось внести некоторые изменения в запрос, и теперь создаются csv — файлы. Следующие изменения были:
Этот элемент функций format() был опущен из запроса непосредственно перед присвоением его значения клиенту Athena для запроса Start Execute :
query = kahala_coldstone.format(fyear=fyear,fmonth=fmonth,fday=fday,fhour=fhour)
Другая причина, по которой запрос не выполнялся, заключалась в привязке {месяц}, которая должна была быть {месяц} (месяц-это переменная, используемая для хранения значения месяца с даты события) . Большая часть времени была потрачена на то, чтобы заставить вставку работать, но, похоже, невозможно выполнить эту вставку из JSON-файла в CSV-файл. Интересно, почему это так?
Кроме этого, девизом для начала этого исправления является «опустить INSERT INTO
оператор перед началом строки запроса».