#google-cloud-dataflow #apache-beam
#google-cloud-dataflow #apache-beam
Вопрос:
У меня есть PCollection[str]
и я хочу сгенерировать случайные пары.
Исходя из Apache Spark, моя стратегия заключалась в том, чтобы:
- скопируйте исходную PCollection
- случайным образом перетасуйте его
- заархивируйте его с помощью оригинальной PCollection
Однако, похоже, я не могу найти способ заархивировать 2 PCollections…
Ответ №1:
Это интересный и не очень распространенный вариант использования, потому что, как говорит @chamikara, в потоке данных нет гарантии упорядоченности. Однако я подумал о реализации решения, в котором вы перетасовываете входную PCollection, а затем соединяете последовательные элементы на основе состояния . Я нашел некоторые оговорки в пути, но я подумал, что, возможно, стоит поделиться в любом случае.
Во-первых, я использовал Python SDK, но средство запуска потока данных пока не поддерживает DoFn с сохранением состояния. Это работает с прямым запуском, но: 1) это не масштабируемо и 2) сложно перетасовывать записи без многопоточности. Конечно, простым решением для последнего является передача уже перетасованной PCollection в конвейер (мы можем использовать другое задание для предварительной обработки данных). В противном случае мы можем адаптировать этот пример к Java SDK.
На данный момент я решил попробовать перемешать и соединить его с одним конвейером. Я действительно не знаю, помогает ли это или усложняет ситуацию, но код можно найти здесь .
Вкратце, DoFn с отслеживанием состояния просматривает буфер и, если он пуст, помещает в него текущий элемент. В противном случае из буфера извлекается предыдущий элемент и выводится кортеж (previous_element, current_element):
class PairRecordsFn(beam.DoFn):
"""Pairs two consecutive elements after shuffle"""
BUFFER = BagStateSpec('buffer', PickleCoder())
def process(self, element, buffer=beam.DoFn.StateParam(BUFFER)):
try:
previous_element = list(buffer.read())[0]
except:
previous_element = []
unused_key, value = element
if previous_element:
yield (previous_element, value)
buffer.clear()
else:
buffer.add(value)
Конвейер добавляет ключи к элементам ввода, как требуется для использования DoFn с сохранением состояния. Здесь будет компромисс, потому что вы потенциально можете назначить один и тот же ключ всем элементам с помощью beam.Map(lambda x: (1, x))
. Это не будет хорошо распараллеливаться, но это не проблема, поскольку мы в любом случае используем Direct Runner (имейте это в виду, если используете Java SDK). Однако это не приведет к перетасовке записей. Если вместо этого мы перетасуем большое количество ключей, мы получим большее количество «потерянных» элементов, которые нельзя соединить (поскольку состояние сохраняется для каждого ключа, и мы назначаем их случайным образом, у нас может быть нечетное количество записей для каждого ключа):
pairs = (p
| 'Create Events' >> beam.Create(data)
| 'Add Keys' >> beam.Map(lambda x: (randint(1,4), x))
| 'Pair Records' >> beam.ParDo(PairRecordsFn())
| 'Check Results' >> beam.ParDo(LogFn()))
В моем случае я получил что-то вроде:
INFO:root:('one', 'three')
INFO:root:('two', 'five')
INFO:root:('zero', 'six')
INFO:root:('four', 'seven')
INFO:root:('ten', 'twelve')
INFO:root:('nine', 'thirteen')
INFO:root:('eight', 'fourteen')
INFO:root:('eleven', 'sixteen')
...
РЕДАКТИРОВАТЬ: Я подумал о другом способе сделать это с помощью Sample.FixedSizeGlobally
объединителя. Хорошо то, что он лучше перемешивает данные, но вам нужно знать количество элементов априори (иначе нам потребовался бы первоначальный проход данных), и, похоже, он возвращает все элементы вместе. Вкратце, я дважды инициализирую одну и ту же PCollection, но применяю разные порядки перемещения в случайном порядке и назначаю индексы в DoFn с сохранением состояния. Это гарантирует, что индексы уникальны для элементов в одной и той же PCollection (даже если порядок не гарантируется). В моем случае обе PCollections будут иметь ровно по одной записи для каждого ключа в диапазоне [0, 31]. Преобразование CoGroupByKey объединит обе PCollections по одному и тому же индексу, таким образом, получая случайные пары элементов:
pc1 = (p
| 'Create Events 1' >> beam.Create(data)
| 'Sample 1' >> combine.Sample.FixedSizeGlobally(NUM_ELEMENTS)
| 'Split Sample 1' >> beam.ParDo(SplitFn())
| 'Add Dummy Key 1' >> beam.Map(lambda x: (1, x))
| 'Assign Index 1' >> beam.ParDo(IndexAssigningStatefulDoFn()))
pc2 = (p
| 'Create Events 2' >> beam.Create(data)
| 'Sample 2' >> combine.Sample.FixedSizeGlobally(NUM_ELEMENTS)
| 'Split Sample 2' >> beam.ParDo(SplitFn())
| 'Add Dummy Key 2' >> beam.Map(lambda x: (2, x))
| 'Assign Index 2' >> beam.ParDo(IndexAssigningStatefulDoFn()))
zipped = ((pc1, pc2)
| 'Zip Shuffled PCollections' >> beam.CoGroupByKey()
| 'Drop Index' >> beam.Map(lambda (x, y):y)
| 'Check Results' >> beam.ParDo(LogFn()))
Полный код здесь
Результаты:
INFO:root:(['ten'], ['nineteen'])
INFO:root:(['twenty-three'], ['seven'])
INFO:root:(['twenty-five'], ['twenty'])
INFO:root:(['twelve'], ['twenty-one'])
INFO:root:(['twenty-six'], ['twenty-five'])
INFO:root:(['zero'], ['twenty-three'])
...
Ответ №2:
Как насчет применения преобразования ParDo к обеим PCollections, которые прикрепляют ключи к элементам, и запуска двух PCollections с помощью преобразования CoGroupByKey?
Пожалуйста, обратите внимание, что Beam не гарантирует порядок элементов в PCollection, поэтому выходные элементы могут быть переупорядочены после любого шага, но, похоже, это должно быть нормально для вашего варианта использования, поскольку вам просто нужен некоторый случайный порядок.