#apache-beam #dataflow #google-cloud-spanner
#apache-beam #поток данных #google-cloud-spanner
Вопрос:
Я пытаюсь запустить следующий скрипт в конвейере потока данных GCP.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from typing import NamedTuple, Optional
from apache_beam.io.gcp.spanner import *
from past.builtins import unicode
import logging
class ItemRow(NamedTuple):
item_id: unicode
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("row: %s", element)
yield element
class SpannerToSpannerAndBigQueryPipelineOptions(PipelineOptions):
"""
Runtime Parameters given during template execution
path parameter is necessary for execution of pipeline
"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--SOURCE_SPANNER_PROJECT_ID', type=str, help='Source Spanner project ID',
default='project_id')
parser.add_argument(
'--SOURCE_SPANNER_DATASET_ID', type=str, help='Source Spanner dataset ID',
default='dataset_id')
parser.add_argument(
'--SOURCE_SPANNER_INSTANCE_ID', type=str, help='Source Spanner instance ID',
default='instance_id')
parser.add_argument(
'--SOURCE_QUERY', type=str, help='SQL to run in Source Spanner Instance',
required=True)
# Setup pipeline
def run():
beam.coders.registry.register_coder(ItemRow, beam.coders.RowCoder)
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
importer_options = pipeline_options.view_as(
SpannerToSpannerAndBigQueryPipelineOptions)
rows = (
p
| "Read from source Spanner" >> ReadFromSpanner(
project_id=importer_options.SOURCE_SPANNER_PROJECT_ID,
instance_id=importer_options.SOURCE_SPANNER_INSTANCE_ID,
database_id=importer_options.SOURCE_SPANNER_DATASET_ID,
row_type=ItemRow,
sql='Select item_id from Items WHERE created_ts BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 5 SECOND) AND CURRENT_TIMESTAMP()',
timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS,
staleness=3,
time_unit=TimeUnit.HOURS,
).with_output_types(ItemRow)
)
rows | 'Log results' >> beam.ParDo(LogResults())
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
run()
Тем не менее, я столкнулся с проблемами при декодировании результатов, полученных с помощью Spanner. Это выходные журналы из моего задания потока данных:
"An exception was raised when trying to execute the workitem 6665479626992209510 : Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.native_operations.NativeReadOperation.start
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/inmemory.py", line 108, in __iter__
yield self._source.coder.decode(value)
File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 468, in decode
return self.get_impl().decode(encoded)
File "apache_beam/coders/coder_impl.py", line 226, in apache_beam.coders.coder_impl.StreamCoderImpl.decode
File "apache_beam/coders/coder_impl.py", line 228, in apache_beam.coders.coder_impl.StreamCoderImpl.decode
File "apache_beam/coders/coder_impl.py", line 123, in apache_beam.coders.coder_impl.CoderImpl.decode_from_stream
File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/row_coder.py", line 215, in decode_from_stream
is_null in zip(self.components, nulls)))
File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/row_coder.py", line 215, in <genexpr>
is_null in zip(self.components, nulls)))
File "apache_beam/coders/coder_impl.py", line 259, in apache_beam.coders.coder_impl.CallbackCoderImpl.decode_from_stream
File "apache_beam/coders/coder_impl.py", line 261, in apache_beam.coders.coder_impl.CallbackCoderImpl.decode_from_stream
File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 414, in decode
return value.decode('utf-8')
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x83 in position 9: invalid start byte
"
Я не уверен, как решить эту проблему. Я использую это https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.gcp.spanner.html ?выделите = гаечный ключ #модуль-пример apache_beam.io.gcp.spanner в качестве отправной точки. Проблема, по-видимому, заключается в декодировании результатов, полученных с помощью Spanner. Практически нет документации о том, как указать схему для таблицы / таблиц Spanner, которые я пытаюсь запросить.
Существует также экспериментальный модуль ввода-вывода для Spanner, который не использует модуль расширения Java. Рекомендуется ли переключаться на экспериментальную версию?
Спасибо
Комментарии:
1. Схема объявлена в аргументе «row_type». В вашем случае ItemRow с именем tuple . Можете ли вы попробовать изменить тип item_id на int из unicode?
2. Фактическим идентификатором является СТРОКОВОЕ поле <MAX> в Cloud Spanner. Почему это должно быть int?
3. Я подумал, что если ваша схема была int в Spanner, то попытка прочитать ее как unicode создаст эту ошибку. Но поскольку ваш идентификатор на самом деле является строкой, что-то еще не так.
4. Может быть, вы можете попробовать изменить схему на байты и посмотреть, что она возвращает? Это не должно требовать декодирования, чтобы вы могли видеть, что Beam считает идентификатором.
5. Привет @Cubez, я изменил тип item_id на
bytes
, бегун выводит только следующую строку:row: BeamSchema_0beb6e87_96a5_4128_a80c_78e1697e7674(item_id=b'')
я уверен, что этого не должно быть. Мне удалось прочитать из Spanner, используя экспериментальный модуль ввода-вывода Spanner, используя те же учетные данные.
Ответ №1:
Я не смог запустить конвейер с использованием apache_beam.io.gcp.spanner
модуля, поэтому вместо этого я использовал apache_beam.io.gcp.experimental.spannerio
модуль.