#python #python-3.x #typeerror #apache-beam
Вопрос:
Я пытаюсь написать некоторые базовые apache_beam
конвейеры для учебных целей, но я не могу найти причину, по которой один из типов моих преобразований возвращается как Union[int,str]
вместо Tuple[str,int]
. Почему это происходит?
Ошибка возникает непосредственно перед Format
этапом в конвейере. Я использую типографские знаки, поэтому получаю ошибку :
Exception has occurred: TypeCheckError
Type hint violation for 'Format': requires Tuple[str, int] but got Union[int, str] for element
Когда я отлаживаю, временно удаляя шрифт , я нахожу, что element
это действительно тип Union[int,str]
, но я не могу объяснить, почему.
Трубопровод :
def run(argv=None):
"""Main entry point; runs a word_count pipeline"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
dest="input",
default="data/input/count_words.txt",
help="Input file to process",
)
parser.add_argument(
"--output",
dest="output",
default="data/output/filtered_output_count_words.txt",
help="Processed output file",
)
parser.add_argument(
"--runner",
dest="runner",
default="DirectRunner",
help="Runner used to process the apache beam pipeline",
)
args = parser.parse_known_args(argv)[0]
beam_options = PipelineOptions(runner=args.runner)
with beam.Pipeline(options=beam_options) as pipeline:
filtered_words = (
pipeline
| "Read" >> beam.io.ReadFromText(args.input)
| "CountWords" >> CountWords()
| "Filter" >> beam.ParDo(FilterWords(pattern="I'm|trying|example"))
)
assert_that(
filtered_words,
equal_to([("I'm", 4), ("trying", 4), ("example", 4)]),
)
(
filtered_words
# | "Print" >> beam.Map(print)
| "Format" >> beam.ParDo(FormatCountText())
Другие элементы трубопровода :
class FilterWords(beam.DoFn):
def __init__(self, pattern: str):
"""ParDo to filter a bunch o' words amp; their number of occurrences
:param pattern: regexp pattern to use for filtering
:type pattern: str
"""
super()
self.pattern = pattern
def process(
self,
element: Tuple[str, int],
# words_to_keep: List[str],
) -> Tuple[str, int]:
word, _ = element
if re.match(self.pattern, word):
logging.info(
f"The word '{word}' matches the pattern {self.pattern}"
)
yield element
# yield Tuple(word, _)
else:
logging.debug(
f"The word '{word}' does not match the pattern {self.pattern}"
)
class CountWords(beam.PTransform):
def expand(
self,
pcoll: beam.PCollection,
) -> beam.PCollection:
return (
pcoll
| "Extract" >> beam.ParDo(ExtractWordsFromRow(), delimiter=" ")
| "Unpuncutate"
>> beam.ParDo(RemovePunctuation(), symbols=[",", "."])
| "Count" >> beam.combiners.Count.PerElement()
)
class ExtractWordsFromRow(beam.DoFn):
def process(
self,
element: str,
delimiter: str,
) -> List[str]:
# Extract items within element, in this case 1 line into multiple words
words = str(element).split(delimiter)
return words
class RemovePunctuation(beam.DoFn):
def process(
self,
element: str,
symbols: List[str],
) -> Iterable[str]:
word = element
for symbol in symbols:
word = word.replace(symbol, "")
yield word
class FormatCountText(beam.DoFn):
def process(
self,
element: Tuple[str, int],
):
word, count = element
yield f"{word}: {count}"
Ответ №1:
Это потому, что в вашей декларации FilterWords.process
вы заявляете об этом как о возвращении Tuple[str, int]
. process
Метод DoFn должен возвращать an Iterable[T]
для получения a PCollection[T]
, поэтому он должен быть объявлен как возвращаемый -> Iterable[Tuple[str, int]]
(из-за оператора yield возвращаемое значение process
фактически является генератором).
( Union[str, int]
Это связано с тем фактом, что если бы вы действительно вернули a Tuple[str, int]
, строка и int были бы добавлены по отдельности в результирующую коллекцию PC.)
Комментарии:
1. Это совершенно верно 🙂 Спасибо вам за понимание