PipelineWise (Певец) не запускает трубопроводы

#python #etl #pipeline

Вопрос:

Это маловероятно, так как до этого было задано всего 4 вопроса о конвейере, но вот и все.

Я следил за этой документацией https://transferwise.github.io/pipelinewise/installation_guide/installation.html и я нахожусь на том этапе, когда я готов запустить свой конвейер. Я запускаю команду pipelinewise run_tap --tap jira --target s3 и получаю невероятно неприятную ошибку:

 time=2021-08-13 04:19:07 logger_name=pipelinewise log_level=INFO message=Profiling mode not enabled
time=2021-08-13 04:19:07 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=Running jira tap in s3 target
Traceback (most recent call last):
  File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 155, in create_filtered_tap_properties
    for stream_idx, stream in enumerate(properties.get('streams', tap_properties)):
AttributeError: 'NoneType' object has no attribute 'get'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/pipelinewise/dev-project/.virtualenvs/pipelinewise/bin/pipelinewise", line 33, in <module>
    sys.exit(load_entry_point('pipelinewise', 'console_scripts', 'pipelinewise')())
  File "/opt/pipelinewise/pipelinewise/cli/__init__.py", line 230, in main
    getattr(ppw_instance, args.command)()
  File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 985, in run_tap
    create_fallback=True)
  File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 271, in create_filtered_tap_properties
    raise Exception(f'Cannot create JSON file - {exc}') from exc
Exception: Cannot create JSON file - 'NoneType' object has no attribute 'get'
(pipelinewise) root@0ffe0939e1aa:/opt/pipelinewise/dev-project# pipelinewise run_tap --tap jira --target s3
time=2021-08-13 04:21:03 logger_name=pipelinewise log_level=INFO message=Profiling mode not enabled
time=2021-08-13 04:21:03 logger_name=pipelinewise.cli.pipelinewise log_level=INFO message=Running jira tap in s3 target
Traceback (most recent call last):
  File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 155, in create_filtered_tap_properties
    for stream_idx, stream in enumerate(properties.get('streams', tap_properties)):
AttributeError: 'NoneType' object has no attribute 'get'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/pipelinewise/dev-project/.virtualenvs/pipelinewise/bin/pipelinewise", line 33, in <module>
    sys.exit(load_entry_point('pipelinewise', 'console_scripts', 'pipelinewise')())
  File "/opt/pipelinewise/pipelinewise/cli/__init__.py", line 230, in main
    getattr(ppw_instance, args.command)()
  File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 985, in run_tap
    create_fallback=True)
  File "/opt/pipelinewise/pipelinewise/cli/pipelinewise.py", line 271, in create_filtered_tap_properties
    raise Exception(f'Cannot create JSON file - {exc}') from exc
Exception: Cannot create JSON file - 'NoneType' object has no attribute 'get' 

И , глядя на файл pipelinewise.py , я вижу, что ошибка кроется где-то здесь:

 try:
            # Load JSON files
            properties = utils.load_json(tap_properties)
            state = utils.load_json(tap_state)

            # Create a dictionary for tables that don't meet filter criteria
            fallback_properties = copy.deepcopy(properties) if create_fallback else {}

            # Foreach stream (table) in the original properties
            for stream_idx, stream in enumerate(properties.get('streams', tap_properties)):
                initial_sync_required = False

                # Collect required properties from the properties file
                tap_stream_id = stream.get('tap_stream_id')
                table_name = stream.get('table_name')
                metadata = stream.get('metadata', [])

                # Collect further properties from the properties file under the metadata key
                table_meta = {}
                meta_idx = 0
                for meta_idx, meta in enumerate(metadata):
                    if isinstance(meta, dict) and len(meta.get('breadcrumb', [])) == 0:
                        table_meta = meta.get('metadata')
                        break

                # Can we make sure that the stream has the right metadata?
                # To be safe, check if no right metadata has been found, then throw an exception.
                if not table_meta:
                    self.logger.error('Stream %s has no metadata with no breadcrumbs: %s.', tap_stream_id, metadata)
                    raise Exception(f'Missing metadata in stream {tap_stream_id}')

                selected = table_meta.get('selected', False)
                replication_method = table_meta.get('replication-method', None)

                # Detect if initial sync is required. Look into the state file, get the bookmark
                # for the current stream (table) and if valid bookmark doesn't exist then
                # initial sync is required
                bookmarks = state.get('bookmarks', {}) if isinstance(state, dict) else {}

                new_stream = False

                # if stream not in bookmarks, then it's a new table
                if tap_stream_id not in bookmarks:
                    new_stream = True
                    initial_sync_required = True
                else:
                    stream_bookmark = bookmarks[tap_stream_id]

                    if self._is_initial_sync_required(replication_method, stream_bookmark):
                        initial_sync_required = True

                # Compare actual values to the filter conditions.
                # Set the "selected" key to True if actual values meet the filter criteria
                # Set the "selected" key to False if the actual values don't meet the filter criteria
                # pylint: disable=too-many-boolean-expressions
                if (
                        (f_selected is None or selected == f_selected) and
                        (f_target_type is None or target_type in f_target_type) and
                        (f_tap_type is None or tap_type in f_tap_type) and
                        (f_replication_method is None or replication_method in f_replication_method) and
                        (f_initial_sync_required is None or initial_sync_required == f_initial_sync_required)
                ):
                    self.logger.debug("""Filter condition(s) matched:
                        Table              : %s
                        Tap Stream ID      : %s
                        Selected           : %s
                        Replication Method : %s
                        Init Sync Required : %s
                    """, table_name, tap_stream_id, selected, replication_method, initial_sync_required)

                    # Filter condition matched: mark table as selected to sync
                    properties['streams'][stream_idx]['metadata'][meta_idx]['metadata']['selected'] = True
                    filtered_tap_stream_ids.append(tap_stream_id)

                    # Filter condition matched:
                    # if the stream is a new table and is a singer stream, then mark it as selected to sync in the
                    # the fallback properties as well if the table is selected in the original properties.
                    # Otherwise, mark it as not selected
                    if create_fallback:
                        if new_stream and replication_method in [self.INCREMENTAL, self.LOG_BASED]:
                            fallback_properties['streams'][stream_idx]['metadata'][meta_idx]['metadata'][
                                'selected'] = True
                            if selected:
                                fallback_filtered_stream_ids.append(tap_stream_id)
                        else:
                            fallback_properties['streams'][stream_idx]['metadata'][meta_idx]['metadata'][
                                'selected'] = False
                else:
                    # Filter condition didn't match: mark table as not selected to sync
                    properties['streams'][stream_idx]['metadata'][meta_idx]['metadata']['selected'] = False

                    # Filter condition didn't match: mark table as selected to sync in the fallback properties
                    # Fallback only if the table is selected in the original properties
                    if create_fallback and selected is True:
                        fallback_properties['streams'][stream_idx]['metadata'][meta_idx]['metadata']['selected'] = True
                        fallback_filtered_stream_ids.append(tap_stream_id)

            # Save the generated properties file(s) and return
            # Fallback required: Save filtered and fallback properties JSON
            if create_fallback:
                # Save to files: filtered and fallback properties
                temp_properties_path = utils.create_temp_file(dir=self.get_temp_dir(),
                                                              prefix='properties_',
                                                              suffix='.json')[1]
                utils.save_json(properties, temp_properties_path)

                temp_fallback_properties_path = utils.create_temp_file(dir=self.get_temp_dir(),
                                                                       prefix='properties_',
                                                                       suffix='.json')[1]
                utils.save_json(fallback_properties, temp_fallback_properties_path)

                return temp_properties_path, 
                       filtered_tap_stream_ids, 
                       temp_fallback_properties_path, 
                       fallback_filtered_stream_ids

            # Fallback not required: Save only the filtered properties JSON
            temp_properties_path = utils.create_temp_file(dir=self.get_temp_dir(),
                                                          prefix='properties_',
                                                          suffix='.json')[1]
            utils.save_json(properties, temp_properties_path)

            return temp_properties_path, filtered_tap_stream_ids

        except Exception as exc:
            raise Exception(f'Cannot create JSON file - {exc}') from exc 

Я подозреваю, что возможно, что кран (разъем) Jira установлен неправильно. Но я понятия не имею, как его установить. В моем докерфайле буквально написано ARG connectors=all «и позже». /install.sh —разъемы=$разъемы. Согласно документам, Jira является соединителем по умолчанию.

Это также может быть проблемой с этим циклом:

 for stream_idx, stream in enumerate(properties.get('streams', tap_properties)):
            initial_sync_required = False
 

Как я уже сказал, это долгий путь, но, возможно, у кого-то есть ключ к тому, почему я постоянно сталкиваюсь с этой проблемой. Спасибо.

Изменить: добавление деталей:

По конвейеру.класс py, у нас есть tap_properties = self.tap['files']['properties'] , и ранее в классе , который я вижу self.tap = self.get_tap(args.target, args.tap) , и я думаю taps = target['taps'] , позже, и это здесь:

 {
"alert_handlers": null,
"targets": [
    {
        "id": "s3",
        "name": "S3 Target connector",
        "status": "ready",
        "taps": [
            {
                "enabled": true,
                "id": "jira",
                "name": "Jira",
                "owner": "foo@bar.com",
                "send_alert": true,
                "stream_buffer_size": 0,
                "type": "tap-jira"
            }
        ],
        "type": "target-s3-csv"
    }
]
}
 

Комментарии:

1. Проблема в том, что properties ее нет. Это значит tap_properties , что пришел с пустыми руками. Это все, что мы можем сделать в прошлом. Вы знаете, как tap_properties должно было быть поставлено?

2. Спасибо. Я вижу это по конвейеру. класс py у нас есть tap_properties = self.tap['files']['properties'] , и ранее в определении класса я вижу self.tap = self.get_tap(args.target, args.tap) , и я думаю taps = target['taps'] , что позже, и это здесь: (См. Следующий комментарий)

3. { "alert_handlers": null, "targets": [ { "id": "s3", "name": "S3 Target connector", "status": "ready", "taps": [ { "enabled": true, "id": "jira", "name": "Jira", "owner": "foo@bar.com", "send_alert": true, "stream_buffer_size": 0, "type": "tap-jira" } ], "type": "target-s3-csv" } ] }

4. Добавьте это к вашему вопросу. Это слишком запутанно без форматирования.

5. Добавлен. Еще раз спасибо.