#python #etl #pipeline
Вопрос:
Это маловероятно, так как до этого было задано всего 4 вопроса о конвейере, но вот и все.
Я следил за этой документацией https://transferwise.github.io/pipelinewise/installation_guide/installation.html и я нахожусь на том этапе, когда я готов запустить свой конвейер. Я запускаю команду pipelinewise run_tap --tap jira --target s3
и получаю невероятно неприятную ошибку:
time=2021-08-13 04:19:07 logger_name=pipelinewise log_level=INFO message=Profiling mode not enabled
time=2021-08-13 04:19:07 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=Running jira tap in s3 target
Traceback (most recent call last):
File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 155, in create_filtered_tap_properties
for stream_idx, stream in enumerate(properties.get('streams', tap_properties)):
AttributeError: 'NoneType' object has no attribute 'get'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/pipelinewise/dev-project/.virtualenvs/pipelinewise/bin/pipelinewise", line 33, in <module>
sys.exit(load_entry_point('pipelinewise', 'console_scripts', 'pipelinewise')())
File "/opt/pipelinewise/pipelinewise/cli/__init__.py", line 230, in main
getattr(ppw_instance, args.command)()
File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 985, in run_tap
create_fallback=True)
File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 271, in create_filtered_tap_properties
raise Exception(f'Cannot create JSON file - {exc}') from exc
Exception: Cannot create JSON file - 'NoneType' object has no attribute 'get'
(pipelinewise) root@0ffe0939e1aa:/opt/pipelinewise/dev-project# pipelinewise run_tap --tap jira --target s3
time=2021-08-13 04:21:03 logger_name=pipelinewise log_level=INFO message=Profiling mode not enabled
time=2021-08-13 04:21:03 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=Running jira tap in s3 target
Traceback (most recent call last):
File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 155, in create_filtered_tap_properties
for stream_idx, stream in enumerate(properties.get('streams', tap_properties)):
AttributeError: 'NoneType' object has no attribute 'get'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/pipelinewise/dev-project/.virtualenvs/pipelinewise/bin/pipelinewise", line 33, in <module>
sys.exit(load_entry_point('pipelinewise', 'console_scripts', 'pipelinewise')())
File "/opt/pipelinewise/pipelinewise/cli/__init__.py", line 230, in main
getattr(ppw_instance, args.command)()
File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 985, in run_tap
create_fallback=True)
File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 271, in create_filtered_tap_properties
raise Exception(f'Cannot create JSON file - {exc}') from exc
Exception: Cannot create JSON file - 'NoneType' object has no attribute 'get'
И , глядя на файл pipelinewise.py
, я вижу, что ошибка кроется где-то здесь:
try:
# Load JSON files
properties = utils.load_json(tap_properties)
state = utils.load_json(tap_state)
# Create a dictionary for tables that don't meet filter criteria
fallback_properties = copy.deepcopy(properties) if create_fallback else {}
# Foreach stream (table) in the original properties
for stream_idx, stream in enumerate(properties.get('streams', tap_properties)):
initial_sync_required = False
# Collect required properties from the properties file
tap_stream_id = stream.get('tap_stream_id')
table_name = stream.get('table_name')
metadata = stream.get('metadata', [])
# Collect further properties from the properties file under the metadata key
table_meta = {}
meta_idx = 0
for meta_idx, meta in enumerate(metadata):
if isinstance(meta, dict) and len(meta.get('breadcrumb', [])) == 0:
table_meta = meta.get('metadata')
break
# Can we make sure that the stream has the right metadata?
# To be safe, check if no right metadata has been found, then throw an exception.
if not table_meta:
self.logger.error('Stream %s has no metadata with no breadcrumbs: %s.', tap_stream_id, metadata)
raise Exception(f'Missing metadata in stream {tap_stream_id}')
selected = table_meta.get('selected', False)
replication_method = table_meta.get('replication-method', None)
# Detect if initial sync is required. Look into the state file, get the bookmark
# for the current stream (table) and if valid bookmark doesn't exist then
# initial sync is required
bookmarks = state.get('bookmarks', {}) if isinstance(state, dict) else {}
new_stream = False
# if stream not in bookmarks, then it's a new table
if tap_stream_id not in bookmarks:
new_stream = True
initial_sync_required = True
else:
stream_bookmark = bookmarks[tap_stream_id]
if self._is_initial_sync_required(replication_method, stream_bookmark):
initial_sync_required = True
# Compare actual values to the filter conditions.
# Set the "selected" key to True if actual values meet the filter criteria
# Set the "selected" key to False if the actual values don't meet the filter criteria
# pylint: disable=too-many-boolean-expressions
if (
(f_selected is None or selected == f_selected) and
(f_target_type is None or target_type in f_target_type) and
(f_tap_type is None or tap_type in f_tap_type) and
(f_replication_method is None or replication_method in f_replication_method) and
(f_initial_sync_required is None or initial_sync_required == f_initial_sync_required)
):
self.logger.debug("""Filter condition(s) matched:
Table : %s
Tap Stream ID : %s
Selected : %s
Replication Method : %s
Init Sync Required : %s
""", table_name, tap_stream_id, selected, replication_method, initial_sync_required)
# Filter condition matched: mark table as selected to sync
properties['streams'][stream_idx]['metadata'][meta_idx]['metadata']['selected'] = True
filtered_tap_stream_ids.append(tap_stream_id)
# Filter condition matched:
# if the stream is a new table and is a singer stream, then mark it as selected to sync in the
# the fallback properties as well if the table is selected in the original properties.
# Otherwise, mark it as not selected
if create_fallback:
if new_stream and replication_method in [self.INCREMENTAL, self.LOG_BASED]:
fallback_properties['streams'][stream_idx]['metadata'][meta_idx]['metadata'][
'selected'] = True
if selected:
fallback_filtered_stream_ids.append(tap_stream_id)
else:
fallback_properties['streams'][stream_idx]['metadata'][meta_idx]['metadata'][
'selected'] = False
else:
# Filter condition didn't match: mark table as not selected to sync
properties['streams'][stream_idx]['metadata'][meta_idx]['metadata']['selected'] = False
# Filter condition didn't match: mark table as selected to sync in the fallback properties
# Fallback only if the table is selected in the original properties
if create_fallback and selected is True:
fallback_properties['streams'][stream_idx]['metadata'][meta_idx]['metadata']['selected'] = True
fallback_filtered_stream_ids.append(tap_stream_id)
# Save the generated properties file(s) and return
# Fallback required: Save filtered and fallback properties JSON
if create_fallback:
# Save to files: filtered and fallback properties
temp_properties_path = utils.create_temp_file(dir=self.get_temp_dir(),
prefix='properties_',
suffix='.json')[1]
utils.save_json(properties, temp_properties_path)
temp_fallback_properties_path = utils.create_temp_file(dir=self.get_temp_dir(),
prefix='properties_',
suffix='.json')[1]
utils.save_json(fallback_properties, temp_fallback_properties_path)
return temp_properties_path,
filtered_tap_stream_ids,
temp_fallback_properties_path,
fallback_filtered_stream_ids
# Fallback not required: Save only the filtered properties JSON
temp_properties_path = utils.create_temp_file(dir=self.get_temp_dir(),
prefix='properties_',
suffix='.json')[1]
utils.save_json(properties, temp_properties_path)
return temp_properties_path, filtered_tap_stream_ids
except Exception as exc:
raise Exception(f'Cannot create JSON file - {exc}') from exc
Я подозреваю, что возможно, что кран (разъем) Jira установлен неправильно. Но я понятия не имею, как его установить. В моем докерфайле буквально написано ARG connectors=all
«и позже». /install.sh —разъемы=$разъемы. Согласно документам, Jira является соединителем по умолчанию.
Это также может быть проблемой с этим циклом:
for stream_idx, stream in enumerate(properties.get('streams', tap_properties)):
initial_sync_required = False
Как я уже сказал, это долгий путь, но, возможно, у кого-то есть ключ к тому, почему я постоянно сталкиваюсь с этой проблемой. Спасибо.
Изменить: добавление деталей:
По конвейеру.класс py, у нас есть tap_properties = self.tap['files']['properties']
, и ранее в классе , который я вижу self.tap = self.get_tap(args.target, args.tap)
, и я думаю taps = target['taps']
, позже, и это здесь:
{
"alert_handlers": null,
"targets": [
{
"id": "s3",
"name": "S3 Target connector",
"status": "ready",
"taps": [
{
"enabled": true,
"id": "jira",
"name": "Jira",
"owner": "foo@bar.com",
"send_alert": true,
"stream_buffer_size": 0,
"type": "tap-jira"
}
],
"type": "target-s3-csv"
}
]
}
Комментарии:
1. Проблема в том, что
properties
ее нет. Это значитtap_properties
, что пришел с пустыми руками. Это все, что мы можем сделать в прошлом. Вы знаете, какtap_properties
должно было быть поставлено?2. Спасибо. Я вижу это по конвейеру. класс py у нас есть
tap_properties = self.tap['files']['properties']
, и ранее в определении класса я вижуself.tap = self.get_tap(args.target, args.tap)
, и я думаюtaps = target['taps']
, что позже, и это здесь: (См. Следующий комментарий)3.
{ "alert_handlers": null, "targets": [ { "id": "s3", "name": "S3 Target connector", "status": "ready", "taps": [ { "enabled": true, "id": "jira", "name": "Jira", "owner": "foo@bar.com", "send_alert": true, "stream_buffer_size": 0, "type": "tap-jira" } ], "type": "target-s3-csv" } ] }
4. Добавьте это к вашему вопросу. Это слишком запутанно без форматирования.
5. Добавлен. Еще раз спасибо.