#apache-spark #google-bigquery #connector #dataproc
# #apache-spark #google-bigquery #соединитель #dataproc
Вопрос:
Я использую spark 2.4, работающий в Dataproc, и запускаю пакетное задание каждые 15 минут, чтобы взять некоторые данные из таблицы bq, объединить их (суммировать) и сохранить в другой таблице bq (перезаписать) через pyspark.sql.
Если я запрашиваю таблицу в spark, похоже, что данные отстают примерно на час. Или, скорее, он отключается примерно за час до этого. Если я использую точно такой же запрос к таблице, который я запрашиваю в Spark, но вместо этого в веб-консоли BQ, все данные будут там и актуальны. Я делаю что-то не так? Или это ожидаемое поведение соединителя?
Вот, по сути, код, который я использую:
orders_by_hour_query = """
SELECT
_id as app_id,
from_utc_timestamp(DATE_TRUNC('HOUR', created_at), 'America/Los_Angeles') as ts_hour,
SUM(total_price_usd) as gmv,
COUNT(order_id) as orders
FROM `orders`
WHERE DATE(from_utc_timestamp(created_at, 'America/Los_Angeles')) BETWEEN "2020-11-23" AND "2020-11-27"
GROUP BY 1, 2
ORDER BY 1, 2 ASC
"""
orders_df = spark.read.format("bigquery").load(bq_dataset ".orders")
orders_df.createOrReplaceTempView("orders")
orders_by_hour_df = spark.sql(orders_by_hour_query)
РЕДАКТИРОВАТЬ: Похоже, что почасовое ограничение кажется почти произвольным. Например, в настоящее время это «2020-11-25 06:31 UTC», но максимальная временная метка, которая запрашивает у BQ через Spark connector, равна: «2020-11-25 05:56:39 UTC».
Дополнительная информация об этой таблице:
Table size 2.65 GB
Long-term storage size 1.05 GB
Number of rows 4,120,280
Created Jun 3, 2020, 4:56:11 PM
Table expiration Never
Last modified Nov 24, 2020, 10:07:54 PM
Data location US
Table type Partitioned
Partitioned by Day
Partitioned on field created_at
Partition filter Not required
Streaming buffer statistics
Estimated size 1.01 MB
Estimated rows 1,393
Earliest entry time Nov 24, 2020, 9:57:00 PM
Заранее спасибо!
Ответ №1:
Похоже, что недостающие данные могут находиться в потоковом буфере и еще не попали в хранилище BQ.
Это означает, что вы можете запросить его из BQ напрямую, но не с помощью BQ Spark Connector, поскольку он работает через API хранилища (https://cloud.google.com/bigquery/docs/reference/storage )
В качестве обходного пути вы можете попробовать что-то вроде приведенного ниже. Поскольку это всего лишь час данных, если эти данные достаточно малы, вы также можете просто использовать BQ API напрямую и просто преобразовать фрейм данных pandas в фрейм данных spark.
`def bq2df(QUERY):
bq = bigquery.Client()
query_job = bq.query(QUERY)
query_job.result()
df = spark.read.format('bigquery')
.option('dataset', query_job.destination.dataset_id)
.load(query_job.destination.table_id)
.persist(StorageLevel.MEMORY_AND_DISK)
return df