Использование Spark Bigquery Connector в Dataproc и данные, по-видимому, задерживаются на час

#apache-spark #google-bigquery #connector #dataproc

# #apache-spark #google-bigquery #соединитель #dataproc

Вопрос:

Я использую spark 2.4, работающий в Dataproc, и запускаю пакетное задание каждые 15 минут, чтобы взять некоторые данные из таблицы bq, объединить их (суммировать) и сохранить в другой таблице bq (перезаписать) через pyspark.sql.

Если я запрашиваю таблицу в spark, похоже, что данные отстают примерно на час. Или, скорее, он отключается примерно за час до этого. Если я использую точно такой же запрос к таблице, который я запрашиваю в Spark, но вместо этого в веб-консоли BQ, все данные будут там и актуальны. Я делаю что-то не так? Или это ожидаемое поведение соединителя?

Вот, по сути, код, который я использую:

 orders_by_hour_query = """
SELECT

_id as app_id,
from_utc_timestamp(DATE_TRUNC('HOUR', created_at), 'America/Los_Angeles') as ts_hour,
SUM(total_price_usd) as gmv,
COUNT(order_id) as orders

FROM `orders`

WHERE DATE(from_utc_timestamp(created_at, 'America/Los_Angeles')) BETWEEN "2020-11-23" AND "2020-11-27"

GROUP BY 1, 2

ORDER BY 1, 2 ASC
"""

orders_df = spark.read.format("bigquery").load(bq_dataset ".orders")
orders_df.createOrReplaceTempView("orders")
orders_by_hour_df = spark.sql(orders_by_hour_query)
 

РЕДАКТИРОВАТЬ: Похоже, что почасовое ограничение кажется почти произвольным. Например, в настоящее время это «2020-11-25 06:31 UTC», но максимальная временная метка, которая запрашивает у BQ через Spark connector, равна: «2020-11-25 05:56:39 UTC».

Дополнительная информация об этой таблице:

 Table size  2.65 GB
Long-term storage size  1.05 GB
Number of rows  4,120,280
Created Jun 3, 2020, 4:56:11 PM
Table expiration    Never
Last modified   Nov 24, 2020, 10:07:54 PM
Data location   US
Table type  Partitioned
Partitioned by  Day
Partitioned on field    created_at
Partition filter    Not required

Streaming buffer statistics

Estimated size  1.01 MB
Estimated rows  1,393
Earliest entry time Nov 24, 2020, 9:57:00 PM
 

Заранее спасибо!

Ответ №1:

Похоже, что недостающие данные могут находиться в потоковом буфере и еще не попали в хранилище BQ.

Это означает, что вы можете запросить его из BQ напрямую, но не с помощью BQ Spark Connector, поскольку он работает через API хранилища (https://cloud.google.com/bigquery/docs/reference/storage )

В качестве обходного пути вы можете попробовать что-то вроде приведенного ниже. Поскольку это всего лишь час данных, если эти данные достаточно малы, вы также можете просто использовать BQ API напрямую и просто преобразовать фрейм данных pandas в фрейм данных spark.

 `def bq2df(QUERY):
    bq = bigquery.Client()
    query_job = bq.query(QUERY)
    query_job.result()

    df = spark.read.format('bigquery') 
        .option('dataset', query_job.destination.dataset_id) 
        .load(query_job.destination.table_id) 
        .persist(StorageLevel.MEMORY_AND_DISK)

    return df