как автоматизировать ИЗМЕНЕНИЕ ТАБЛИЦЫ ДОБАВЛЕНИЯ РАЗДЕЛОВ с/без обходчика клея

#amazon-web-services #aws-glue #amazon-athena #aws-glue-data-catalog

Вопрос:

Я заметил, что ДОБАВЛЕНИЕ РАЗДЕЛОВ намного эффективнее, так как я работаю с данными JSON в s3. Краулер, который я настроил, занимает слишком много времени для завершения. Когда я прошу его только «Сканировать только новые папки», на самом деле это не добавление новых разделов. Я также снимаю флажок «Обновить все новые и существующие разделы метаданными из таблицы».

Что мне, по сути, нужно, так это автоматизировать ДОБАВЛЕНИЕ РАЗДЕЛОВ по мере добавления новых разделов (это должно произойти за считанные секунды). Я не могу сделать это из базы кода, которая отправляет данные в s3, поскольку это другая команда, отвечающая только за заполнение данных s3. Я не хочу, чтобы задача автоматизации просматривала каждую запись / метаданные и исправляла записи (сейчас это занимает ~40 минут).

Ценю любую помощь, так как я новичок в Клею и Афине.

Ответ №1:

MSCK REPAIR TABLE table_name это самый простой способ обновить новые разделы в существующей таблице. Вы можете отправить этот запрос из различных SDK, таких как boto3 для python:

 import boto3

client = boto3.client('athena')
client.start_query_execution(QueryString='MSCK REPAIR TABLE table_name')

 

Вы можете запустить этот код в Лямбде с помощью триггера при добавлении новых файлов в корзину S3 или с помощью событий-запланированных событий шины.

Ответ №2:

Используя цикл for, вы можете просмотреть разделы таблицы, чтобы извлечь общее количество используемых разделов. При этом вы можете использовать следующий код для лямбды, например

   import time
  import boto3

  DATABASE = "name_database"
  TABLE = "name_table"
  query_base = "ALTER TABLE " name_database "." name_table " ADD IF NOT EXISTS PARTITION (dt = '2016-05-14', country = 'IN')"
  
  output='s3://aws-athena-query-results-example'

  def lambda_handler(event, context):
      query = query_base
      client = boto3.client('athena')

# Execution
  response = client.start_query_execution(
      QueryString=query,
      QueryExecutionContext={
        'Database': DATABASE
      },
      ResultConfiguration={
        'OutputLocation': output,
      }
  )
  return response
  return