Найти строку с максимальным количеством токенов с помощью Apache Beam Python SDK

#python #apache-beam

#python #apache-beam

Вопрос:

У меня есть PCollection, содержащий строки. Я хочу разделить каждую строку на пробел и найти список токенов с максимальным размером и сохранить размер в переменной как int .

Рассмотрим этот пример ввода:

 sentences = ['This is the first sentence',
             'Second sentence',
             'Yet another sentence']

with beam.Pipeline(options=PipelineOptions()) as p:
       pcoll = p | 'Create' >> beam.Create(sentences)
  

Предложения после разделения будут:

 ['This', 'is', 'the', 'first', 'sentence'] -> 5
['Second', 'sentence'] -> 2
['Yet', 'another', 'sentence'] -> 3
  

Я хочу сохранить значение 5 в переменной.

Как мне это сделать? Я наткнулся на этот пост в блоге, но он не полностью соответствует моей цели. Автор распечатывает результирующий PCollection, но я хочу использовать это значение позже на других этапах конвейера.

Ответ №1:

Вы можете сделать это с Top.of помощью преобразования. Вкратце, мы разделяем каждое предложение, а затем вычисляем длину токена. С Top помощью мы просто хотим получить результат номер один, и мы передаем лямбда-функцию в качестве критерия сравнения, чтобы отсортировать их по длине слова:

 sentences = sentences = ['This is the first sentence',
       'Second sentence',
       'Yet another sentence']

longest_sentence = (p
  | 'Read Sentences' >> beam.Create(sentences)
  | 'Split into Words' >> beam.Map(lambda x: x.split(' '))
  | 'Map Token Length'      >> beam.Map(lambda x: (x, len(x)))
  | 'Top Sentence' >> combine.Top.Of(1, lambda a,b: a[1]<b[1])
  | 'Save Variable'         >> beam.ParDo(SaveMaxFn()))
  

где SaveMaxFn() :

 class SaveMaxFn(beam.DoFn):
  """Stores max in global variables"""
  def process(self, element):
    length = element[0][1]
    logging.info("Longest sentence: %s tokens", length)

    return element
  

и length является глобальной переменной:

 global length
  

Результат:

 INFO:root:Longest sentence: 5 token(s)
  

Полный код:

 import argparse, logging

import apache_beam as beam
import apache_beam.transforms.combiners as combine
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class SaveMaxFn(beam.DoFn):
  """Stores max in global variables"""
  def process(self, element):
    length = element[0][1]
    logging.info("Longest sentence: %s token(s)", length)

    return element


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  global length

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  p = beam.Pipeline(options=pipeline_options)

  sentences = sentences = ['This is the first sentence',
             'Second sentence',
             'Yet another sentence']

  longest_sentence = (p
    | 'Read Sentences' >> beam.Create(sentences)
    | 'Split into Words' >> beam.Map(lambda x: x.split(' '))
    | 'Map Token Length'      >> beam.Map(lambda x: (x, len(x)))
    | 'Top Sentence' >> combine.Top.Of(1, lambda a,b: a[1]<b[1])
    | 'Save Variable'         >> beam.ParDo(SaveMaxFn()))

  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
  

Комментарии:

1. @xennygrimmato не могли бы вы помочь мне понять, что принимает аргумент ‘b’ в лямбда-функции? Что мы передаем за ‘b’?