Как вернуть кортеж вместо объединения

#python #python-3.x #typeerror #apache-beam

Вопрос:

Я пытаюсь написать некоторые базовые apache_beam конвейеры для учебных целей, но я не могу найти причину, по которой один из типов моих преобразований возвращается как Union[int,str] вместо Tuple[str,int] . Почему это происходит?

Ошибка возникает непосредственно перед Format этапом в конвейере. Я использую типографские знаки, поэтому получаю ошибку :

 Exception has occurred: TypeCheckError
Type hint violation for 'Format': requires Tuple[str, int] but got Union[int, str] for element
 

Когда я отлаживаю, временно удаляя шрифт , я нахожу, что element это действительно тип Union[int,str] , но я не могу объяснить, почему.

Трубопровод :

 def run(argv=None):
    """Main entry point; runs a word_count pipeline"""

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input",
        dest="input",
        default="data/input/count_words.txt",
        help="Input file to process",
    )
    parser.add_argument(
        "--output",
        dest="output",
        default="data/output/filtered_output_count_words.txt",
        help="Processed output file",
    )
    parser.add_argument(
        "--runner",
        dest="runner",
        default="DirectRunner",
        help="Runner used to process the apache beam pipeline",
    )
    args = parser.parse_known_args(argv)[0]
    beam_options = PipelineOptions(runner=args.runner)
    with beam.Pipeline(options=beam_options) as pipeline:
        filtered_words = (
            pipeline
            | "Read" >> beam.io.ReadFromText(args.input)
            | "CountWords" >> CountWords()
            | "Filter" >> beam.ParDo(FilterWords(pattern="I'm|trying|example"))
        )
        assert_that(
            filtered_words,
            equal_to([("I'm", 4), ("trying", 4), ("example", 4)]),
        )
        (
            filtered_words
            # | "Print" >> beam.Map(print)
            | "Format" >> beam.ParDo(FormatCountText())
            
 

Другие элементы трубопровода :

 class FilterWords(beam.DoFn):
    def __init__(self, pattern: str):
        """ParDo to filter a bunch o' words amp; their number of occurrences

        :param pattern: regexp pattern to use for filtering
        :type pattern: str
        """
        super()
        self.pattern = pattern

    def process(
        self,
        element: Tuple[str, int],
        # words_to_keep: List[str],
    ) -> Tuple[str, int]:
        word, _ = element
        if re.match(self.pattern, word):
            logging.info(
                f"The word '{word}' matches the pattern {self.pattern}"
            )
            yield element
            # yield Tuple(word, _)
        else:
            logging.debug(
                f"The word '{word}' does not match the pattern {self.pattern}"
            )

class CountWords(beam.PTransform):
    def expand(
        self,
        pcoll: beam.PCollection,
    ) -> beam.PCollection:
        return (
            pcoll
            | "Extract" >> beam.ParDo(ExtractWordsFromRow(), delimiter=" ")
            | "Unpuncutate"
            >> beam.ParDo(RemovePunctuation(), symbols=[",", "."])
            | "Count" >> beam.combiners.Count.PerElement()
        )


class ExtractWordsFromRow(beam.DoFn):
    def process(
        self,
        element: str,
        delimiter: str,
    ) -> List[str]:
        # Extract items within element, in this case 1 line into multiple words
        words = str(element).split(delimiter)
        return words


class RemovePunctuation(beam.DoFn):
    def process(
        self,
        element: str,
        symbols: List[str],
    ) -> Iterable[str]:
        word = element
        for symbol in symbols:
            word = word.replace(symbol, "")
        yield word


class FormatCountText(beam.DoFn):
    def process(
        self,
        element: Tuple[str, int],
    ):
        word, count = element
        yield f"{word}: {count}"
 

Ответ №1:

Это потому, что в вашей декларации FilterWords.process вы заявляете об этом как о возвращении Tuple[str, int] . process Метод DoFn должен возвращать an Iterable[T] для получения a PCollection[T] , поэтому он должен быть объявлен как возвращаемый -> Iterable[Tuple[str, int]] (из-за оператора yield возвращаемое значение process фактически является генератором).

( Union[str, int] Это связано с тем фактом, что если бы вы действительно вернули a Tuple[str, int] , строка и int были бы добавлены по отдельности в результирующую коллекцию PC.)

Комментарии:

1. Это совершенно верно 🙂 Спасибо вам за понимание