Сохранение данных CSV в динамические пути с помощью запроса Athena (Начало выполнения запроса)

#amazon-s3 #aws-lambda #amazon-athena

Вопрос:

Я пытаюсь сохранить данные JSON, которые были сброшены во входное хранилище S3, и преобразовать файл в CSV в другом месте выходного хранилища S3, используя выполнение начального запроса Athena.

  1. Я использую большой запрос, который будет вставлен во временную таблицу (с помощью INSERT INTO).
  2. Эта таблица разделена на год, месяц, день и час.
  3. Используя AWS Glue, я смог настроить storage.location.template для таблицы запросов (см. Очистку экрана).

    s3://prod-cog-kahala-test-output/data/landing/olo/baja/year=${year}/month=${month}/day=${day}/hour=${hour}

  4. Я также использую прогноз года, часа, месяца и дня, используя AWS Gue в этой таблице. (См. царапину на экране)

Этот выходной патч динамически создается на основе даты и времени, когда произошло событие. Он будет хранить CSV-файлы из JSON, которые были созданы во время этого события во время запроса Афины. Выходной путь должен выглядеть следующим образом: введите описание изображения здесь

Я использую python lambda для извлечения значения даты события записи события, а затем, используя запрос Athena, вывожу csv-файлы в динамический путь вывода

Примечание: Я смог успешно выполнить это только с использованием статического пути S3, но не динамического пути S3, что является обязательным требованием .

Когда я принимаю/извлекаю входной файл JSON в буклете ввода S3, я получаю следующую ошибку, когда Athena запускает запрос с использованием динамического пути S3:

 (<class 'botocore.errorfactory.InvalidRequestException'>, InvalidRequestException("An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 45:64: mismatched input '{'. Expecting: '*', <expression>, <identifier>"), <traceback object at 0x7fac83b93300>).
 

Пожалуйста, помогите мне определить, что я делаю неправильно. Большое спасибо

Вот свойство таблицы
введите описание изображения здесь

Вот лямбда — код:

 def lambda_handler(event, context):
try:
    print('<< IN Handler >>')
    print(event)
    print("File size is", event['Records'][0]['s3']['object']['size'])
    if(event['Records'][0]['s3']['object']['size'] > 0):
        file_path = event['Records'][0]['s3']['object']['key']
        print('<<FILEPATH >>' file_path)
        bucket_name = os.environ['BUCKETBAJA']
        file_name = file_path.split("/")[-1]
        print(file_name)
        #process the  input file time stamp to extrac tthe year,month,day
        parse_filename = file_name.split(today_year)
        #Get date and hour
        parse_date = event['Records'][0]['eventTime']
        #print(parse_date)
        dateStr = parse_date.split("-")
        print('<< PROCESS DATE >>' dateStr[2])
        fyear= dateStr[0]
        fmonth= dateStr[1]
        fday =  dateStr[2].split("T")
        #print('REST OF STRING ' fday[1])
        parse_more = fday
        #print(parse_more)
        hour = fday[1].split(":")
        #print(hour)
        fhour = hour[0]
        fday = fday[0]
        print('<< DAY >>' fday)
        print('<< HOUR >>' fhour)
        print('<< MONTH >>' fmonth)
        process_date = fyear '-' fmonth '-' fday
        start_date = fyear '-' fmonth '-' fday
        end_date = fyear '-' fmonth '-' fday
        output_prefix= "all_dates/year=/" fyear "/month=" fmonth '/day=' fday '/hour=' fhour
        print('<< OUT_PREFIX  >>' output_prefix)
        clients3 = boto3.client('s3')
        result = clients3.list_objects(Bucket=bucket_name, Prefix=output_prefix )
        exists=False
        athena_out_path = 's3://' bucket_name '/all-dates/year=' fyear '/month=' fmonth '/day=' fday '/hour=' fhour 
        print('<<< INSIDE BAJA TABLE DUMP NEXT ...  >>>>')
        #list_tables = table_queries_v1
        if 'coldstone' in file_name:
            query = kahala_coldstone
            bucket_name = os.environ['BUCKETCOLDSTONE']
            athena_out_path = 's3://athena-' bucket_name '/all-dates/year=' fyear '/month=' fmonth '/day=' fday '/hour=' fhour 
        else:
            query = kahala_baja
            print('<<QUERY  >>')
            response = athena_client.start_query_execution( 
                QueryString=query,
                QueryExecutionContext={
                  'Database': database
                },
                ResultConfiguration={
                    'OutputLocation': athena_out_path,
                }
            )
       
    else:
        print("Empty File, exiting")
except:
    print("Unknown Error")
    print(sys.exc_info())
 

Вот код запроса:

     """insert into olo_baja_insert_to_csv
    select cast(first_name as varchar) first_name,
    cast(contact_number as varchar) contact_number,
    cast(membership_number as varchar) membership_number,
    cast(olo_customer_id as varchar) olo_customer_id,
    cast(login_providers as varchar) login_providers,
    cast(external_reference as varchar) external_reference,
    cast(olo_email_address as varchar) olo_email_address,
    cast(last_name as varchar) last_name,
    cast(loyalty_scheme as varchar) loyalty_scheme,
    cast(product_id as varchar) product_id,
    cast(modifier_detail['modifierid'] as varchar) modifier_detail_modifier_id,
    cast(modifier_detail['description'] as varchar) modifier_detail_description,
    cast(modifier_detail['vendorspecificmodifierid'] as varchar) modifier_detail_vendor_specific_modifier_id,
    cast(modifier_detail['modifiers'] as varchar) modifier_detail_modifiers,
    cast(modifier_detail['modifierquantity'] as varchar) 
modifier_detail_modifier_quantity,
    cast(modifier_detail['customfields'] as varchar)  modifier_detail_custom_fields,
    cast(modifier_quantity as varchar) modifier_quantity,
   cast(modifier_custom_fields as varchar) modifier_custom_fields,
    cast(delivery as varchar) delivery,
    cast(total as varchar) total,
    cast(subtotal as varchar) subtotal,
    cast(discount as varchar) discount,
    cast(tip as varchar) tip,
    cast(sales_tax as varchar) sales_tax,
    cast(customer_delivery as varchar) customer_delivery, 
    cast(payment_amount as varchar) payment_amount, 
    cast(payment_description as varchar) payment_description,
    cast(payment_type as varchar) payment_type,
    cast(location_lat as varchar) location_lat,
    cast(location_long as varchar) location_long,
    cast(location_name as varchar) location_name,
    cast(location_logo as varchar) location_logo,
    cast(ordering_provider_name as varchar) ordering_provider_name,
    cast(ordering_provider_slug as varchar) ordering_provider_slug,year,month,day, hour
    from( 
    select cast(first_name as varchar) first_name,
    cast(contact_number as varchar) contact_number, 
    cast(membership_number as varchar) membership_number,
    cast(olo_customer_id as varchar) olo_customer_id,
    cast(login_providers as varchar) login_providers,
    cast(external_reference as varchar) external_reference,
    cast(olo_email_address as varchar) olo_email_address,
    cast(last_name as varchar) last_name,
    cast(loyalty_scheme as varchar) loyalty_scheme,
    cast(product_id as varchar) product_id,
    cast(special_instructions as varchar) special_instructions,
    cast(quantity as varchar) quantity,
    cast(recipient_name as varchar) recipient_name,
    cast(custom_values as varchar) custom_values,
    cast(item_description as varchar) item_description,
    cast(item_selling_price as varchar) item_selling_price,
    cast(modifier['sellingprice'] as varchar) pre_modifier_selling_price,
    cast(modifier['modifierid'] as varchar) modifier_id,
    cast(modifier['description'] as varchar) modifier_description,
    cast(modifier['vendorspecificmodifierid'] as varchar) vendor_specific_modifierid,
    cast(modifier['modifiers'] as varchar) modifier_details,
    cast(modifier['modifierquantity'] as varchar) modifier_quantity,
    cast(modifier['customfields'] as varchar) modifier_custom_fields,
    cast(delivery as varchar),cast(total as varchar),cast(subtotal as varchar), 
    cast(discount as varchar),cast(tip as varchar),cast(sales_tax as varvchar),cast(customer_delivery as varchar), cast(payment_amount as varchar), cast(payment_description as varchar),
    cast(payment_type as varchar),cast(location_lat as varchar),cast(location_long as varchar), cast(location_name as varchar), 
    cast(location_logo as varchar),
    cast(ordering_provider_name as varchar), cast(ordering_provider_slug as varchar), year,month,day,hour
    from(
    select
    cast(json_extract(customer, '$.firstname') as varchar) as first_name,
    cast(json_extract(customer, '$.contactnumber') as varchar) as contact_number,
    cast(json_extract(customer, '$.membershipnumber') as varchar) as membership_number,
    cast(json_extract(customer, '$.customerid') as varchar) as olo_customer_id,
    cast(json_extract(customer, '$.loginproviders') as array<map<varchar,varchar>>) as login_providers,
    cast(json_extract(customer, '$.externalreference') as varchar) as external_reference,
    cast(json_extract(customer, '$.email') as varchar) as olo_email_address,
    cast(json_extract(customer, '$.lastname') as varchar) as last_name,
    cast(json_extract(customer, '$.loyaltyscheme') as varchar) as loyalty_scheme,
    try_cast(json_extract("item", '$.productid') as varchar) product_id,
    try_cast(json_extract("item", '$.specialinstructions') as varchar) special_instructions,
    try_cast(json_extract("item", '$.quantity') as varchar) quantity,
    try_cast(json_extract("item", '$.recipientname') as varchar) recipient_name,
    try_cast(json_extract("item", '$.customvalues') as varchar) custom_values,
    try_cast(json_extract("item", '$.description') as varchar) item_description,
    try_cast(json_extract("item", '$.sellingprice') as varchar) item_selling_price,
    cast(json_extract("item", '$.modifiers') as array<map<varchar,json>>) modifiers,
    cast(json_extract(totals, '$.delivery') as varchar) delivery,
    cast(json_extract(totals, '$.total') as varchar) total,
    cast(json_extract(totals, '$.subtotal') as varchar) subtotal,
    cast(json_extract(totals, '$.discount') as varchar) discount,
    cast(json_extract(totals, '$.tip') as varchar) tip,
    cast(json_extract(totals, '$.salestax') as varchar) sales_tax,
    cast(json_extract(totals, '$.customerdelivery') as varchar) customer_delivery,
    payment['amount'] payment_amount,
    payment['description'] payment_description,
    payment['type'] payment_type,
    cast(json_extract("location", '$.latitude') as varchar) location_lat,
    cast(json_extract("location", '$.longitude') as varchar) location_long,
    cast(json_extract("location", '$.name') as varchar) location_name,
    cast(json_extract("location", '$.logo') as varchar) location_logo,
    cast(json_extract("orderingprovider", '$.name') as varchar) ordering_provider_name,
    cast(json_extract("orderingprovider", '$.slug') as varchar) ordering_provider_slug,hour
    from sandbox_twilliams.olo_baja_raw_john_testing
    cross join unnest (payments, "items") as t (payment, item)
    --cross join unnest ("items") as t (item)
    --cross join unnest (payments) as t (payment)
    where year = {fyear}
    and month = {fmonth}
    and day = {fday}
    and hour = {fhour}
    ).format(year={fyear},month={fmonth},day={fday}, hour={fhour})  
    CROSS JOIN UNNEST (modifiers) as t (modifier)
    )
    CROSS JOIN UNNEST (cast(modifier_details as array<map<varchar,json>>)) as t (modifier_detail)
    group by 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45;"""      
 

Ответ №1:

В вашем запросе есть синтаксическая ошибка, но вы не включаете запрос в вопрос, поэтому трудно понять, что происходит не так. Похоже, вы печатаете SQL-запрос в своем журнале, поэтому я предлагаю взять этот SQL и запустить его вручную в консоли Athena и посмотреть, сможете ли вы понять из сообщения об ошибке, что не так.

С другой стороны, преобразование в CSV лучше всего выполнять через UNLOAD . Результаты запроса Афины-это CSV, но Афина также записывает файл двоичных метаданных вместе с файлом CSV, что может все испортить, если вы ожидаете, что выходной каталог будет содержать только данные CSV.

Комментарии:

1. Извините. Я забыл добавить запрос . Он огромен, и он использует перекрестные соединения.

2. хорошо @Theo Я обновил вопрос, чтобы сделать его более читабельным, и добавил запрос.

3. Итак, что произошло, когда вы попытались исправить синтаксическую ошибку в сообщении об ошибке?

4. Это была абсолютная неразбериха, пытаясь заставить оператор insert в этом запросе функционировать. В итоге я внес некоторые изменения в запрос, и теперь файл создается. Я думаю, что единственная причина, по которой запрос не выполнялся, заключалась в том, что {месяц} должен был быть {месяц} (месяц использовался для хранения значения месяца с даты события) . Я потратил большую часть своего времени, пытаясь заставить вставку работать, но я искренне не думаю, что можно сделать эту вставку из JSON SerDe в CSV SerDe. Хотя понятия не имею, почему это так.

Ответ №2:

Как я уже прокомментировал @Theo , это была абсолютная неразбериха, пытаясь заставить insert into оператор — по этому запросу — функционировать. Пришлось внести некоторые изменения в запрос, и теперь создаются csv — файлы. Следующие изменения были:

Этот элемент функций format() был опущен из запроса непосредственно перед присвоением его значения клиенту Athena для запроса Start Execute : query = kahala_coldstone.format(fyear=fyear,fmonth=fmonth,fday=fday,fhour=fhour)

Другая причина, по которой запрос не выполнялся, заключалась в привязке {месяц}, которая должна была быть {месяц} (месяц-это переменная, используемая для хранения значения месяца с даты события) . Большая часть времени была потрачена на то, чтобы заставить вставку работать, но, похоже, невозможно выполнить эту вставку из JSON-файла в CSV-файл. Интересно, почему это так?

Кроме этого, девизом для начала этого исправления является «опустить INSERT INTO оператор перед началом строки запроса».