# #go #google-cloud-dataflow #apache-beam
#Вперед #google-облако-поток данных #apache-beam
Вопрос:
У меня есть реализация кода Apache Beam на Go SDK, как описано ниже. Конвейер состоит из 3 шагов. Один есть textio.Read
, другой есть CountLines
и последний шаг есть ProcessLines
. ProcessLines
этот шаг занимает около 10 секунд. Я просто добавил функцию ожидания для краткости.
Я вызываю конвейер с 20 рабочими. Когда я запускал конвейер, я ожидал, что 20 рабочих будут работать параллельно и textio.Read
прочитают 20 строк из файла и ProcessLines
выполнят 20 параллельных исполнений за 10 секунд. Однако конвейер работал не так. В настоящее время он работает таким образом, что textio.Read
считывает одну строку из файла, передает данные на следующий шаг и ожидает, пока ProcessLines
шаг не завершит свою 10-секундную работу. Параллелизм отсутствует, и во всем конвейере присутствует только одна строка из файла. Не могли бы вы, пожалуйста, разъяснить мне, что я делаю неправильно для параллелизма? Как мне следует обновить код для достижения параллелизма, как описано выше?
package main
import (
"context"
"flag"
"time"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)
// metrics to be monitored
var (
input = flag.String("input", "", "Input file (required).")
numberOfLines = beam.NewCounter("extract", "numberOfLines")
lineLen = beam.NewDistribution("extract", "lineLenDistro")
)
func countLines(ctx context.Context, line string) string {
lineLen.Update(ctx, int64(len(line)))
numberOfLines.Inc(ctx, 1)
return line
}
func processLines(ctx context.Context, line string) {
time.Sleep(10 * time.Second)
}
func CountLines(s beam.Scope, lines beam.PCollection) beam.PCollection
{
s = s.Scope("Count Lines")
return beam.ParDo(s, countLines, lines)
}
func ProcessLines(s beam.Scope, lines beam.PCollection) {
s = s.Scope("Process Lines")
beam.ParDo0(s, processLines, lines)
}
func main() {
// If beamx or Go flags are used, flags must be parsed first.
flag.Parse()
// beam.Init() is an initialization hook that must be called on startup. On
// distributed runners, it is used to intercept control.
beam.Init()
// Input validation is done as usual. Note that it must be after Init().
if *input == "" {
log.Fatal(context.Background(), "No input file provided")
}
p := beam.NewPipeline()
s := p.Root()
l := textio.Read(s, *input)
lines := CountLines(s, l)
ProcessLines(s, lines)
// Concept #1: The beamx.Run convenience wrapper allows a number of
// pre-defined runners to be used via the --runner flag.
if err := beamx.Run(context.Background(), p); err != nil {
log.Fatalf(context.Background(), "Failed to execute job: %v", err.Error())
}
}
Редактировать:
После того, как я получил ответ о том, что проблема может быть вызвана fusion, я изменил связанную часть кода, но это снова не сработало.
Теперь первый и второй шаг работают параллельно, однако третий шаг ProcessLines
не работает параллельно. Я внес только следующие изменения. Кто-нибудь может сказать мне, в чем проблема?
func AddRandomKey(s beam.Scope, col beam.PCollection) beam.PCollection {
return beam.ParDo(s, addRandomKeyFn, col)
}
func addRandomKeyFn(elm beam.T) (int, beam.T) {
return rand.Int(), elm
}
func countLines(ctx context.Context, _ int, lines func(*string) bool, emit func(string)) {
var line string
for lines(amp;line) {
lineLen.Update(ctx, int64(len(line)))
numberOfLines.Inc(ctx, 1)
emit(line)
}
}
func processLines(ctx context.Context, _ int, lines func(*string) bool) {
var line string
for lines(amp;line) {
time.Sleep(10 * time.Second)
numberOfLinesProcess.Inc(ctx, 1)
}
}
func CountLines(s beam.Scope, lines beam.PCollection) beam.PCollection {
s = s.Scope("Count Lines")
keyed := AddRandomKey(s, lines)
grouped := beam.GroupByKey(s, keyed)
return beam.ParDo(s, countLines, grouped)
}
func ProcessLines(s beam.Scope, lines beam.PCollection) {
s = s.Scope("Process Lines")
keyed := AddRandomKey(s, lines)
grouped := beam.GroupByKey(s, keyed)
beam.ParDo0(s, processLines, grouped)
}
Ответ №1:
Многие продвинутые исполнители конвейеров типа MapReduce объединяют этапы, которые могут выполняться в памяти вместе. Apache Beam и Dataflow не являются исключением.
Здесь происходит то, что три этапа вашего конвейера объединены и выполняются на одном компьютере. Кроме того, Go SDK в настоящее время, к сожалению, не поддерживает разделение Read
преобразования.
Для достижения параллелизма в третьем преобразовании вы можете прервать слияние между Read
и ProcessLines
. Вы можете сделать это, добавив случайный ключ к своим строкам и GroupByKey
преобразование.
В Python это было бы:
(p | beam.ReadFromText(...)
| CountLines()
| beam.Map(lambda x: (random.randint(0, 1000), x))
| beam.GroupByKey()
| beam.FlatMap(lambda k, v: v) # Discard the key, and return the values
| ProcessLines())
Это позволило бы вам выполнить распараллеливание ProcessLines
.
Комментарии:
1. Привет, Пабло, спасибо за ответ. Я отредактировал вопрос в соответствии с вашим предложением, однако у него все та же проблема. Это решило только слияние первого и второго шагов. Третий шаг все еще не работает параллельно.
2. Это странно. И вы добавили больше, чем несколько ключей? С большим количеством ключей Beam должен иметь возможность разделять и анализировать параллельно..
3. Точно, я добавил случайные ключи. Вы можете увидеть это в части вопросов. Я добавил отредактированную версию вопроса. Я использую
AddRandomKey
функцию, которая создает разные ключи. Я запустил конвейер с 20 рабочими, затем внезапно автоскалер разделил их на 5 рабочих.4. Я предоставляю последнее обновление о результате реализации. Попробовал реализацию в части редактирования с 200 000 строками данных в файле и обнаружил, что каждый шаг выполняется параллельно. Однако каждый шаг ожидает завершения выполнения предыдущего шага и начинает работать параллельно после завершения предыдущего шага. Каждый шаг выполняется параллельно, но одновременно в конвейере выполняется только один шаг. Конвейер все еще не полностью параллелен.