Проблема с декодированием Apache Beam ReadFromSpanner

#apache-beam #dataflow #google-cloud-spanner

#apache-beam #поток данных #google-cloud-spanner

Вопрос:

Я пытаюсь запустить следующий скрипт в конвейере потока данных GCP.

 import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from typing import NamedTuple, Optional
from apache_beam.io.gcp.spanner import *
from past.builtins import unicode
import logging

class ItemRow(NamedTuple):
    item_id: unicode


class LogResults(beam.DoFn):
  """Just log the results"""
  def process(self, element):
    logging.info("row: %s", element)
    yield element

class SpannerToSpannerAndBigQueryPipelineOptions(PipelineOptions):
    """
    Runtime Parameters given during template execution
    path parameter is necessary for execution of pipeline
    """
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--SOURCE_SPANNER_PROJECT_ID', type=str, help='Source Spanner project ID',
            default='project_id')
        parser.add_argument(
            '--SOURCE_SPANNER_DATASET_ID', type=str, help='Source Spanner dataset ID',
            default='dataset_id')
        parser.add_argument(
            '--SOURCE_SPANNER_INSTANCE_ID', type=str, help='Source Spanner instance ID',
            default='instance_id')
        parser.add_argument(
            '--SOURCE_QUERY', type=str, help='SQL to run in Source Spanner Instance',
            required=True)


# Setup pipeline

def run():

    beam.coders.registry.register_coder(ItemRow, beam.coders.RowCoder)
    pipeline_options = PipelineOptions()
    p = beam.Pipeline(options=pipeline_options)
    importer_options = pipeline_options.view_as(
        SpannerToSpannerAndBigQueryPipelineOptions)

    
    rows = (
        p
        | "Read from source Spanner" >> ReadFromSpanner(
            project_id=importer_options.SOURCE_SPANNER_PROJECT_ID,
            instance_id=importer_options.SOURCE_SPANNER_INSTANCE_ID,
            database_id=importer_options.SOURCE_SPANNER_DATASET_ID,
            row_type=ItemRow,
            sql='Select item_id from Items WHERE created_ts BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 5 SECOND) AND CURRENT_TIMESTAMP()',
            timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS,
            staleness=3,
            time_unit=TimeUnit.HOURS,
        ).with_output_types(ItemRow)
    )

    rows | 'Log results' >> beam.ParDo(LogResults())

    result = p.run()
    result.wait_until_finish()



if __name__ == "__main__":
    run()

 

Тем не менее, я столкнулся с проблемами при декодировании результатов, полученных с помощью Spanner. Это выходные журналы из моего задания потока данных:

 
"An exception was raised when trying to execute the workitem 6665479626992209510 : Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.native_operations.NativeReadOperation.start
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/inmemory.py", line 108, in __iter__
    yield self._source.coder.decode(value)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 468, in decode
    return self.get_impl().decode(encoded)
  File "apache_beam/coders/coder_impl.py", line 226, in apache_beam.coders.coder_impl.StreamCoderImpl.decode
  File "apache_beam/coders/coder_impl.py", line 228, in apache_beam.coders.coder_impl.StreamCoderImpl.decode
  File "apache_beam/coders/coder_impl.py", line 123, in apache_beam.coders.coder_impl.CoderImpl.decode_from_stream
  File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/row_coder.py", line 215, in decode_from_stream
    is_null in zip(self.components, nulls)))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/row_coder.py", line 215, in <genexpr>
    is_null in zip(self.components, nulls)))
  File "apache_beam/coders/coder_impl.py", line 259, in apache_beam.coders.coder_impl.CallbackCoderImpl.decode_from_stream
  File "apache_beam/coders/coder_impl.py", line 261, in apache_beam.coders.coder_impl.CallbackCoderImpl.decode_from_stream
  File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 414, in decode
    return value.decode('utf-8')
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x83 in position 9: invalid start byte
"
 

Я не уверен, как решить эту проблему. Я использую это https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.gcp.spanner.html ?выделите = гаечный ключ #модуль-пример apache_beam.io.gcp.spanner в качестве отправной точки. Проблема, по-видимому, заключается в декодировании результатов, полученных с помощью Spanner. Практически нет документации о том, как указать схему для таблицы / таблиц Spanner, которые я пытаюсь запросить.

Существует также экспериментальный модуль ввода-вывода для Spanner, который не использует модуль расширения Java. Рекомендуется ли переключаться на экспериментальную версию?

Спасибо

Комментарии:

1. Схема объявлена в аргументе «row_type». В вашем случае ItemRow с именем tuple . Можете ли вы попробовать изменить тип item_id на int из unicode?

2. Фактическим идентификатором является СТРОКОВОЕ поле <MAX> в Cloud Spanner. Почему это должно быть int?

3. Я подумал, что если ваша схема была int в Spanner, то попытка прочитать ее как unicode создаст эту ошибку. Но поскольку ваш идентификатор на самом деле является строкой, что-то еще не так.

4. Может быть, вы можете попробовать изменить схему на байты и посмотреть, что она возвращает? Это не должно требовать декодирования, чтобы вы могли видеть, что Beam считает идентификатором.

5. Привет @Cubez, я изменил тип item_id на bytes , бегун выводит только следующую строку: row: BeamSchema_0beb6e87_96a5_4128_a80c_78e1697e7674(item_id=b'') я уверен, что этого не должно быть. Мне удалось прочитать из Spanner, используя экспериментальный модуль ввода-вывода Spanner, используя те же учетные данные.

Ответ №1:

Я не смог запустить конвейер с использованием apache_beam.io.gcp.spanner модуля, поэтому вместо этого я использовал apache_beam.io.gcp.experimental.spannerio модуль.