# #python #google-cloud-dataflow #apache-beam
Вопрос:
Я работаю над настройкой потока данных Google с помощью Python 2.31 Beam SDK. Это довольно простая задача, которая считывает данные из таблицы BigQuery, выполняет некоторую логику для добавления нескольких дополнительных столбцов к данным, а затем записывает результаты обратно в отдельную таблицу BigQuery.
Это успешно выполняется для 48 миллиардов строк данных, но завершается неудачей при выполнении примерно для 80 миллиардов строк. Проблема, по-видимому, связана с этапом записи в Большой запрос, так как я вижу ошибки на странице диагностики, в которых упоминаются шаги на этапе записи результатов в большой запрос, которые блокируются или занимают много времени. Я также вижу ошибки во время выполнения, когда идентификатор инструкции не регистрируется на том же этапе.
В документации SDK отмечается, что могут возникнуть проблемы при написании очень больших наборов данных с помощью Python. https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations. Исходя из этого, я немного изменил код, чтобы писать в 5 разных таблицах вместо одной, но столкнулся с аналогичными проблемами.
Я предполагаю, что ограничение-это то, с чем я сталкиваюсь здесь. Мне неясно, как найти, где находится это ограничение, или есть ли для этого достойный обходной путь. Я только что нашел некоторую информацию о потоковых вставках, поэтому я собираюсь провести некоторое тестирование, чтобы понять, работает ли это лучше, чем использование загрузки файлов.
Для справки, это слегка отредактированная версия кода для попытки без секционирования:
from __future__ import division
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText,WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions,
WorkerOptions
import pandas as pd
import logging
class JoiningDoFn2(beam.DoFn):
# Do lazy initializaiton here. Otherwise, error messages pop up, associated with "A large DoFn instance that is serialized for transmission to remote workers.""
def __init__(self):
#self._pickle_file = None
from google.cloud import storage
import pickle as pkl
self._storage = storage
self._pkl = pkl
def setup(self):
bucket = self._storage.Client().get_bucket("GCS BUCKET NAME")
blob = bucket.get_blob('ubi/dataflow/density_balltree2.pkl')
self._pickle_file = self._pkl.loads(blob.download_as_bytes())
def process(self, gps_element):
import numpy as np
# Get coordinates in radians
balltree = self._pickle_file[0]
density_mapping = self._pickle_file[1]
nn_lat_mapping = self._pickle_file[2]
nn_lon_mapping = self._pickle_file[3]
distances, closest_indicies = balltree.query(np.asarray((gps_element['GPSLatitude']*np.pi/180,gps_element['GPSLongitude']*np.pi/180)).reshape(1, -1),k=1)
# Use mapping to get traffic density from closest index
gps_element['traffic_density'] = density_mapping[closest_indicies[0][0]]
gps_element["distance_to_nn"] = 6371 * distances[0][0]
gps_element['nn_lat'] = nn_lat_mapping[closest_indicies[0][0]]
gps_element['nn_lon'] = nn_lon_mapping[closest_indicies[0][0]]
yield gps_element
DESTINATION_TABLE_CONFIGS = {
"table": "TARGET TABLE NAME",
"dataset": "TARGET DATASET NAME",
"project": "PROJECT NAME",
"schema": "driverid:STRING, tripid:STRING, TimeStamp:NUMERIC,
GPSLatitude:FLOAT64, GPSLongitude:FLOAT64,
traffic_density:FLOAT64, distance_to_nn:FLOAT64,
nn_lat:FLOAT64, nn_lon:FLOAT64",
"create_disposition": beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
"write_disposition": beam.io.BigQueryDisposition.WRITE_APPEND
}
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project='PROJECT NAME',
#job_name=job_name,
region='us-east4',
subnetwork= 'SUBNETWORK INFO',
use_public_ips=False,
staging_location = f'gs://GCS BUCKET NAME/ubi/dataflow/' 'staging/',
temp_location = f'gs://GCS BUCKET NAME/ubi/dataflow/' 'temp/',
machine_type = "n1-standard-1",
max_num_workers = 600,
#num_workers = 500,
autoscaling_algorithm='THROUGHPUT_BASED'#None
#disk_size_gb = 30
)
class DataFlowPipeline:
"""THIS IS THE CLASS THAT ACTUALLY RUNS THE JOB"""
def run(self):
"""This is the job runner it holds the beam pipeline"""
with beam.Pipeline(options=pipeline_options) as p:
query_sql = """
SQL QUERY TEXT
"""
processed_trips = p | 'Read records from BQ' >>beam.io.ReadFromBigQuery(query=query_sql, use_standard_sql=True) | 'Add traffic density' >> beam.ParDo(JoiningDoFn2()) | 'Write result to BQ' >> WriteToBigQuery(**DESTINATION_TABLE_CONFIGS)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
print('setting up config for runner...')
trainer = DataFlowPipeline()
trainer.run()
print('The runner is done!')
И это слегка отредактированная версия попытки вывода в 5 разных таблицах:
from __future__ import division
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText,WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions,
WorkerOptions
import pandas as pd
import logging
class JoiningDoFn2(beam.DoFn):
# Do lazy initializaiton here. Otherwise, error messages pop up, associated with "A large DoFn instance that is serialized for transmission to remote workers.""
def __init__(self):
#self._pickle_file = None
from google.cloud import storage
import pickle as pkl
self._storage = storage
self._pkl = pkl
def setup(self):
bucket = self._storage.Client().get_bucket('GCS BUCKET NAME')
blob = bucket.get_blob('ubi/dataflow/density_balltree2.pkl')
self._pickle_file = self._pkl.loads(blob.download_as_bytes())
def process(self, gps_element):
import numpy as np
# Get coordinates in radians
balltree = self._pickle_file[0]
density_mapping = self._pickle_file[1]
nn_lat_mapping = self._pickle_file[2]
nn_lon_mapping = self._pickle_file[3]
distances, closest_indicies = balltree.query(np.asarray((gps_element['GPSLatitude']*np.pi/180,gps_element['GPSLongitude']*np.pi/180)).reshape(1, -1),k=1)
# Use mapping to get traffic density from closest index
gps_element['traffic_density'] = density_mapping[closest_indicies[0][0]]
gps_element["distance_to_nn"] = 6371 * distances[0][0]
gps_element['nn_lat'] = nn_lat_mapping[closest_indicies[0][0]]
gps_element['nn_lon'] = nn_lon_mapping[closest_indicies[0][0]]
yield gps_element
DESTINATION_TABLE_CONFIGS = {
"dataset": "TARGET DATASET",
"project": "PROJECT NAME",
"schema": "driverid:STRING, tripid:STRING, TimeStamp:NUMERIC,
GPSLatitude:FLOAT64, GPSLongitude:FLOAT64,
traffic_density:FLOAT64, distance_to_nn:FLOAT64,
nn_lat:FLOAT64, nn_lon:FLOAT64",
"create_disposition": beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
"write_disposition": beam.io.BigQueryDisposition.WRITE_APPEND
}
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project='PROJECT NAME',
#job_name=job_name,
region='us-east4',
subnetwork= 'SUBNETWORK INFO',
use_public_ips=False,
staging_location = 'gs://GCS BUCKET/ubi/dataflow/' 'staging/',
temp_location = 'gs://GCS BUCKET/ubi/dataflow/' 'temp/',
machine_type = "n1-standard-1",
max_num_workers = 500,
#num_workers = 50,
#disk_size_gb = 30,
autoscaling_algorithm='THROUGHPUT_BASED'#None
)
class DataFlowPipeline:
"""THIS IS THE CLASS THAT ACTUALLY RUNS THE JOB"""
def run(self):
"""This is the job runner it holds the beam pipeline"""
#How many splits the data should do. Make sure to set up the appropriate amount of p_ values below to match this number if it's changed.
PARTITIONS = 5
with beam.Pipeline(options=pipeline_options) as p:
query_sql = """
SQL QUERY TEXT
"""
partition = (
p | 'Read records from BQ' >>beam.io.ReadFromBigQuery(query=query_sql, use_standard_sql=True)
| 'Add traffic density' >> beam.ParDo(JoiningDoFn2())
| 'Split output' >> beam.Partition(lambda element, PARTITIONS: round(element["TimeStamp"]) % PARTITIONS, PARTITIONS)
)
for x in range(PARTITIONS):
partition[x] | f'Write Partition {x}' >> beam.io.WriteToBigQuery(table=f'TARGET TABLE NAME_{x}', **DESTINATION_TABLE_CONFIGS)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
print('setting up config for runner...')
trainer = DataFlowPipeline()
trainer.run()
print('The runner is done!')
Комментарии:
1. 80 миллиардов строк — это около ~40 ТБ или около того?
2. Это довольно маленький столик, по ширине. Мы проходим только через 5 колонн. Таким образом, исходный запрос на 80 миллиардов строк показывает, что он будет сканировать 6,3 ТБ данных, если его запустить непосредственно в BigQuery. Результирующая таблица с дополнительными столбцами выглядит так, как будто она составляет примерно 8,1 ТБ.
3. это не должно быть слишком много для балочного трубопровода. Недавно у нас была проблема с производительностью, которая будет исправлена в версии 2.33.0 — а пока не могли бы вы попробовать работать
Read(BigQuerySource(...))
с теми же параметрами?4. Конечно, я могу попробовать с версией Read(BigQuerySource (…)). Я просто запустил его на небольшом наборе данных, чтобы проверить свой синтаксис, и это сработало. Однако мне придется отложить большой забег до завтра, так как он съедает много ресурсов. Ведет ли Read(BigQuerySource(…)) себя за кулисами иначе, чем ReadFromBigQuery(…)?
5. да — BigQuerySource работает в бэкэнде Dataflow runner — это более старая реализация, но она не страдает от проблем с производительностью, которые возникают у ReadFromBQ