Что считается очень большим набором данных для Python Beam SDK?

# #python #google-cloud-dataflow #apache-beam

Вопрос:

Я работаю над настройкой потока данных Google с помощью Python 2.31 Beam SDK. Это довольно простая задача, которая считывает данные из таблицы BigQuery, выполняет некоторую логику для добавления нескольких дополнительных столбцов к данным, а затем записывает результаты обратно в отдельную таблицу BigQuery.

Это успешно выполняется для 48 миллиардов строк данных, но завершается неудачей при выполнении примерно для 80 миллиардов строк. Проблема, по-видимому, связана с этапом записи в Большой запрос, так как я вижу ошибки на странице диагностики, в которых упоминаются шаги на этапе записи результатов в большой запрос, которые блокируются или занимают много времени. Я также вижу ошибки во время выполнения, когда идентификатор инструкции не регистрируется на том же этапе.

В документации SDK отмечается, что могут возникнуть проблемы при написании очень больших наборов данных с помощью Python. https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations. Исходя из этого, я немного изменил код, чтобы писать в 5 разных таблицах вместо одной, но столкнулся с аналогичными проблемами.

Я предполагаю, что ограничение-это то, с чем я сталкиваюсь здесь. Мне неясно, как найти, где находится это ограничение, или есть ли для этого достойный обходной путь. Я только что нашел некоторую информацию о потоковых вставках, поэтому я собираюсь провести некоторое тестирование, чтобы понять, работает ли это лучше, чем использование загрузки файлов.

Для справки, это слегка отредактированная версия кода для попытки без секционирования:

 from __future__ import division
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText,WriteToBigQuery

from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, 
    WorkerOptions
import pandas as pd
import logging

class JoiningDoFn2(beam.DoFn):
    # Do lazy initializaiton here. Otherwise, error messages pop up, associated with "A large DoFn instance that is serialized for transmission to remote workers.""
    def __init__(self):
        #self._pickle_file = None
        from google.cloud import storage
        import pickle as pkl
        self._storage = storage
        self._pkl = pkl

    def setup(self):
        bucket = self._storage.Client().get_bucket("GCS BUCKET NAME")
        blob = bucket.get_blob('ubi/dataflow/density_balltree2.pkl')
        self._pickle_file = self._pkl.loads(blob.download_as_bytes())        

    def process(self, gps_element):
        import numpy as np 

        # Get coordinates in radians
        balltree = self._pickle_file[0]
        density_mapping = self._pickle_file[1]
        nn_lat_mapping  = self._pickle_file[2]
        nn_lon_mapping  = self._pickle_file[3]
        
        distances, closest_indicies = balltree.query(np.asarray((gps_element['GPSLatitude']*np.pi/180,gps_element['GPSLongitude']*np.pi/180)).reshape(1, -1),k=1)

    # Use mapping to get traffic density from closest index
        gps_element['traffic_density'] = density_mapping[closest_indicies[0][0]]
        gps_element["distance_to_nn"] = 6371 * distances[0][0]

        gps_element['nn_lat'] = nn_lat_mapping[closest_indicies[0][0]]
        gps_element['nn_lon'] = nn_lon_mapping[closest_indicies[0][0]]
 
        yield gps_element
        
        
DESTINATION_TABLE_CONFIGS = {
    "table": "TARGET TABLE NAME",

    "dataset": "TARGET DATASET NAME",

    "project":  "PROJECT NAME",

    "schema": "driverid:STRING,    tripid:STRING,        TimeStamp:NUMERIC,
                         GPSLatitude:FLOAT64,        GPSLongitude:FLOAT64, 
                         traffic_density:FLOAT64,    distance_to_nn:FLOAT64,
                         nn_lat:FLOAT64,        nn_lon:FLOAT64",

    "create_disposition": beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

    "write_disposition": beam.io.BigQueryDisposition.WRITE_APPEND

}


pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project='PROJECT NAME',
    #job_name=job_name,
    region='us-east4',
    subnetwork= 'SUBNETWORK INFO',
    use_public_ips=False,
    staging_location = f'gs://GCS BUCKET NAME/ubi/dataflow/'   'staging/',
    temp_location = f'gs://GCS BUCKET NAME/ubi/dataflow/'   'temp/',    
    machine_type = "n1-standard-1",
    max_num_workers = 600,
    #num_workers = 500,
    autoscaling_algorithm='THROUGHPUT_BASED'#None
    #disk_size_gb = 30

)

class DataFlowPipeline:
    """THIS IS THE CLASS THAT ACTUALLY RUNS THE JOB"""
    def run(self):
        """This is the job runner it holds the beam pipeline"""

        with beam.Pipeline(options=pipeline_options) as p:

            query_sql =  """
SQL QUERY TEXT
"""            
            
            processed_trips = p | 'Read records from BQ' >>beam.io.ReadFromBigQuery(query=query_sql, use_standard_sql=True) | 'Add traffic density' >> beam.ParDo(JoiningDoFn2()) | 'Write result to BQ' >> WriteToBigQuery(**DESTINATION_TABLE_CONFIGS)


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    print('setting up config for runner...')
    trainer = DataFlowPipeline()
    trainer.run()
    print('The runner is done!')
 

И это слегка отредактированная версия попытки вывода в 5 разных таблицах:

 from __future__ import division
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText,WriteToBigQuery

from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, 
    WorkerOptions
import pandas as pd
import logging

class JoiningDoFn2(beam.DoFn):
    # Do lazy initializaiton here. Otherwise, error messages pop up, associated with "A large DoFn instance that is serialized for transmission to remote workers.""
    def __init__(self):
        #self._pickle_file = None
        from google.cloud import storage
        import pickle as pkl
        self._storage = storage
        self._pkl = pkl

    def setup(self):
        bucket = self._storage.Client().get_bucket('GCS BUCKET NAME')
        blob = bucket.get_blob('ubi/dataflow/density_balltree2.pkl')
        self._pickle_file = self._pkl.loads(blob.download_as_bytes())        

    def process(self, gps_element):
        import numpy as np

        # Get coordinates in radians
        balltree = self._pickle_file[0]
        density_mapping = self._pickle_file[1]
        nn_lat_mapping  = self._pickle_file[2]
        nn_lon_mapping  = self._pickle_file[3]
        
        distances, closest_indicies = balltree.query(np.asarray((gps_element['GPSLatitude']*np.pi/180,gps_element['GPSLongitude']*np.pi/180)).reshape(1, -1),k=1)

    # Use mapping to get traffic density from closest index
        gps_element['traffic_density'] = density_mapping[closest_indicies[0][0]]
        gps_element["distance_to_nn"] = 6371 * distances[0][0]

        gps_element['nn_lat'] = nn_lat_mapping[closest_indicies[0][0]]
        gps_element['nn_lon'] = nn_lon_mapping[closest_indicies[0][0]]
 
        yield gps_element
        
        
DESTINATION_TABLE_CONFIGS = {
    "dataset": "TARGET DATASET",

    "project":  "PROJECT NAME",

    "schema": "driverid:STRING,    tripid:STRING,        TimeStamp:NUMERIC,
                         GPSLatitude:FLOAT64,        GPSLongitude:FLOAT64, 
                         traffic_density:FLOAT64,    distance_to_nn:FLOAT64,
                         nn_lat:FLOAT64,        nn_lon:FLOAT64",

    "create_disposition": beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

    "write_disposition": beam.io.BigQueryDisposition.WRITE_APPEND
}


pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project='PROJECT NAME',
    #job_name=job_name,
    region='us-east4',
    subnetwork= 'SUBNETWORK INFO',
    use_public_ips=False,
    staging_location = 'gs://GCS BUCKET/ubi/dataflow/'   'staging/',
    temp_location = 'gs://GCS BUCKET/ubi/dataflow/'   'temp/',        
    machine_type = "n1-standard-1",
    max_num_workers = 500,
    #num_workers = 50,
    #disk_size_gb = 30,
    autoscaling_algorithm='THROUGHPUT_BASED'#None


)

class DataFlowPipeline:
    """THIS IS THE CLASS THAT ACTUALLY RUNS THE JOB"""        
    
    def run(self):
        """This is the job runner it holds the beam pipeline"""   

        #How many splits the data should do. Make sure to set up the appropriate amount of p_ values below to match this number if it's changed.
        PARTITIONS = 5

        with beam.Pipeline(options=pipeline_options) as p:

            query_sql =  """
SQL QUERY TEXT
"""            

            partition = (
                p | 'Read records from BQ' >>beam.io.ReadFromBigQuery(query=query_sql, use_standard_sql=True) 
                | 'Add traffic density' >> beam.ParDo(JoiningDoFn2())
                | 'Split output' >> beam.Partition(lambda element, PARTITIONS: round(element["TimeStamp"]) % PARTITIONS, PARTITIONS) 
            )
            
            for x in range(PARTITIONS):
                partition[x] | f'Write Partition {x}' >> beam.io.WriteToBigQuery(table=f'TARGET TABLE NAME_{x}', **DESTINATION_TABLE_CONFIGS)

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    print('setting up config for runner...')
    trainer = DataFlowPipeline()
    trainer.run()
    print('The runner is done!')
 

Комментарии:

1. 80 миллиардов строк — это около ~40 ТБ или около того?

2. Это довольно маленький столик, по ширине. Мы проходим только через 5 колонн. Таким образом, исходный запрос на 80 миллиардов строк показывает, что он будет сканировать 6,3 ТБ данных, если его запустить непосредственно в BigQuery. Результирующая таблица с дополнительными столбцами выглядит так, как будто она составляет примерно 8,1 ТБ.

3. это не должно быть слишком много для балочного трубопровода. Недавно у нас была проблема с производительностью, которая будет исправлена в версии 2.33.0 — а пока не могли бы вы попробовать работать Read(BigQuerySource(...)) с теми же параметрами?

4. Конечно, я могу попробовать с версией Read(BigQuerySource (…)). Я просто запустил его на небольшом наборе данных, чтобы проверить свой синтаксис, и это сработало. Однако мне придется отложить большой забег до завтра, так как он съедает много ресурсов. Ведет ли Read(BigQuerySource(…)) себя за кулисами иначе, чем ReadFromBigQuery(…)?

5. да — BigQuerySource работает в бэкэнде Dataflow runner — это более старая реализация, но она не страдает от проблем с производительностью, которые возникают у ReadFromBQ