Как узнать, обновляется ли таблица по-прежнему в BigQuery?

# #python #firebase #google-bigquery #google-analytics-firebase

Вопрос:

Я написал процесс, который извлекает данные о событиях из BigQuery в файлах JSON и вставляет их в Azure (для использования Snowflake). Каждые 10 минут процесс ищет новые данные в таблице events_intraday_yyymmdd, а также проверяет наличие новой таблицы events_yyymmdd, а затем извлекает ее. При последующем анализе пробелов я обнаруживаю, что у меня разрыв примерно на 5% (больше данных в BQ, чем в SN), и я думаю, что это связано с тем, что в то время, когда я извлекаю данные из ежедневной таблицы, они все еще обновляются. Есть ли способ узнать, используется ли таблица для процессов ВСТАВКИ, МАССОВОЙ ВСТАВКИ, ОБНОВЛЕНИЯ, чтобы я мог пропустить, если да?

Это код, который я в настоящее время использую для извлечения данных

     # Check if new daily table exists
    print(f"Checking if new daily table exists")
    logging.info(f"Checking if new daily table exists")
    sql = f'''
    begin
        create table if not exists `{tempDatasetName}` (id string, insert_date timestamp);
    
        create table if not exists `{dailyLogDatasetName}` (table_name string, insert_date timestamp);

        select distinct
            table_name
        from
            `{informationSchemaTables}`
        where
            regexp_contains(table_name, '^events_\\d{{8}}


Комментарии:

1. Я не думаю, что вы можете проверить, обновляется ли events_intraday_, но если вы можете, то, вероятно, получите его метаданные потоковой передачи.

2. Обратите внимание, что дневная таблица не является производной от внутридневной таблицы, и данные между ними могут отличаться. Примите во внимание, что: 1). внутридневной экспорт-это экспорт наилучших усилий. 2). API потокового экспорта BigQuery не может гарантировать семантику "ровно один раз", поэтому строка может быть экспортирована несколько раз во внутридневной таблице. 3). Потоковый экспорт BigQuery может привести к сбою RPC (редко, но это может произойти). И если произойдет какой-либо сбой экспорта, внутридневной экспорт не будет повторять его, так как это, по сути, потоковый экспорт с максимальными усилиями.


)
and table_name not in (select table_name from `{dailyLogDatasetName}`)
order by 1;
end
'''
query_job = bqClient.query(sql)
query_job.result(timeout=7200)
for job in bqClient.list_jobs(parent_job=query_job.job_id):
if job.statement_type == "SELECT":
results = list(job.result().to_arrow().to_pydict().values())[0]

# Get missing rows from the daily table
if results != []:
for table in results:
dailyDatasetName = f"{projectName}.{databaseName}.{table}"
print(f"New Daily table {dailyDatasetName} found. Commencing extraction")
logging.info(f"New Daily table {dailyDatasetName} found. Commencing extraction")

requested_session = types.ReadSession(
table=f"projects/{projectName}/datasets/{databaseName}/tables/{table}",
data_format=types.DataFormat.ARROW,
read_options=types.ReadSession.TableReadOptions(selected_fields=[])
)
read_session = bqStorageClient.create_read_session(
parent=f"projects/{projectName}", read_session=requested_session, max_stream_count=streamCount
)
streams = read_session.streams
obtainedStreamCount = len(streams)
print(f"Obtained Streams. Starting {obtainedStreamCount} threads")

with concurrent.futures.ThreadPoolExecutor(max_workers=obtainedStreamCount) as executor:
results = [
executor.submit(
scroller,
bqStorageClient,
stream,
table,
partition,
lowerFormat,
storageContainerName
) for stream, partition in zip(streams, range(obtainedStreamCount))
]
for f in concurrent.futures.as_completed(results):
f.result(timeout=None)
try:
streams.clear()
except:
pass

# Insert into the dailylog table the table_name processed so it will skip it on the next run
print(f"Inserting {table} into {dailyLogDatasetName}")
logging.info(f"Inserting {table} into {dailyLogDatasetName}")
sql = f'''
insert into {dailyLogDatasetName} values ('{table}', current_timestamp());
'''
query_job = bqClient.query(sql)
query_job.result(timeout=7200)

Комментарии:

1. Я не думаю, что вы можете проверить, обновляется ли events_intraday_, но если вы можете, то, вероятно, получите его метаданные потоковой передачи.

2. Обратите внимание, что дневная таблица не является производной от внутридневной таблицы, и данные между ними могут отличаться. Примите во внимание, что: 1). внутридневной экспорт-это экспорт наилучших усилий. 2). API потокового экспорта BigQuery не может гарантировать семантику «ровно один раз», поэтому строка может быть экспортирована несколько раз во внутридневной таблице. 3). Потоковый экспорт BigQuery может привести к сбою RPC (редко, но это может произойти). И если произойдет какой-либо сбой экспорта, внутридневной экспорт не будет повторять его, так как это, по сути, потоковый экспорт с максимальными усилиями.