Apache Beam Поток данных слишком медленный только для данных 18k

#java #google-cloud-platform #parallel-processing #apache-beam

#java #google-облачная платформа #параллельная обработка #apache-beam

Вопрос:

нам нужно выполнить сложные вычисления на простых, но многочисленных данных.
Входные данные — это строки в таблице BigQuery, состоящей из двух столбцов: ID (целое число) и DATA (СТРОКА). Значения ДАННЫХ имеют вид «1#2#3#4#…» с 36 значениями.
Выходные данные имеют ту же форму, но ДАННЫЕ просто преобразуются алгоритмом.
Это преобразование «один к одному».

Мы пробовали Apache Beam с Google Cloud Dataflow, но это не работает, возникают ошибки, как только создается несколько рабочих экземпляров.
Для нашего POC мы используем только 18 тыс. входных строк, целевой показатель составляет около 1 млн.

Вот облегченная версия класса (я удалил часть записи, поведение осталось прежним):

 public class MyClass {

static MyService myService = new MyService();

static class ExtractDataFn extends DoFn<TableRow, KV<Long, String>> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        Long id = Long.parseLong((String) c.element().get("ID"));  
        String data = (String) c.element().get("DATA");         
        c.output(KV.of(id, data));
    }
}

public interface Options extends PipelineOptions {
    String getInput();
    void setInput(String value);

    @Default.Enum("EXPORT")
    TypedRead.Method getReadMethod();
    void setReadMethod(TypedRead.Method value);

    @Validation.Required
    String getOutput();
    void setOutput(String value);
}

static void run(Options options) {
    Pipeline p = Pipeline.create(options);

    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("ID").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("DATA").setType("STRING"));
    TableSchema schema = new TableSchema().setFields(fields);

    PCollection<TableRow> rowsFromBigQuery = p.apply(
            BigQueryIO.readTableRows().from(options.getInput()).withMethod(options.getReadMethod())
    );              
    
    PCollection<KV<Long, String>> inputdata = rowsFromBigQuery.apply(ParDo.of(new ExtractDataFn()));
    PCollection<KV<Long, String>> outputData = applyTransform(inputdata);
    // Here goes the part where data are written in a BQ table
    p.run().waitUntilFinish();
}

static PCollection<KV<Long, String>> applyTransform(PCollection<KV<Long, String>> inputData) {      
    PCollection<KV<Long, String>> forecasts = inputData.apply(ParDo.of(new DoFn<KV<Long, String>, KV<Long, String>> () {
                    
        @ProcessElement
        public void processElement(@Element KV<Long, String> element, OutputReceiver<KV<Long, String>> receiver, ProcessContext c) {
            MyDto dto = new MyDto();
            List<Double> inputData = Arrays.asList(element.getValue().split("#")).stream().map(Double::valueOf).collect(Collectors.toList());
            dto.setInputData(inputData);                
            dto = myService.calculate(dto); // here is the time consuming operation
            String modifiedData = dto.getModifiedData().stream().map(Object::toString).collect(Collectors.joining(","));
            receiver.output(KV.of(element.getKey(), modifiedData));
        }
      }))
    ;
    return forecasts;
}

public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options);
}
  

}

В консоли GCP Logs мы видим, что количество рабочих увеличивается до 10, в течение примерно 5 минут оно уменьшается до 3 или 4, а затем у нас появляются такого рода сообщения (их несколько сотен), а CPU составляет около 0%:

 Proposing dynamic split of work unit myproject;2020-10-06_06_18_27-12689839210406435299;1231063355075246317 at {"fractionConsumed":0.5,"position":{"shufflePosition":"f_8A_wD_AAAB"}}
  

и

 Operation ongoing in step BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Read for at least 05m00s without outputting or completing in state read-shuffle at app//org.apache.beam.runners.dataflow.worker.ApplianceShuffleReader.readIncludingPosition(Native Method)
  

Если мы позволим ему запустить, он завершится с ошибкой такого рода :

 Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:77)
  

Если я изменю метод MyService.calculate на более быстрый, все данные обрабатываются только одним работником, и проблем не возникает. Проблема, похоже, возникает только при распараллеливании обработки.

Спасибо за вашу помощь

Комментарии:

1. Прежде всего, почему вы хотите распараллелить обработку? В настоящее время вы столкнулись с узким местом? Не могли бы вы подробнее рассказать о 1 миллионе данных, вы имеете в виду строки? Как строятся строки, какой тип данных — это числа. И, наконец, как форматируется вывод. Пожалуйста, приведите какой-нибудь пример ввода -> вывода

2. Мы хотим распараллелить, чтобы сократить время выполнения, которое в настоящее время является последовательным. В настоящее время это занимает более одного часа, и это слишком долго. Входные данные — это просто строки базы данных с двумя полями, числовым идентификатором и строкой типа 12 #20 # 32 # … содержащей 36 значений. Формат вывода тот же, но числовые значения преобразуются алгоритмом.

3. Требует ли работа, выполняемая в MyService, больших объемов памяти? Если да, можете ли вы проверить наличие ошибок ООМ в журналах.

4. Я не могу найти никаких ошибок ООМ ни в каких журналах. Я думаю, что объем памяти невелик. Не могли бы вы объяснить, что означает сообщение «Предложение динамического разделения рабочей единицы»? Я думаю, что это то, что поток данных пытается сделать и терпит неудачу. Но я не понимаю, что это значит.

5. Другая информация, я попытался выполнить конвейер со следующими параметрами: —diskSizeGb= 250 —workerDiskType=compute.googleapis.com/projects/myproject/zones/europe-west1-d/diskTypes/pd-ssd но это ничего не меняет. Есть ли какие-то атрибуты виртуальной машины, которую я создал для размещения приложения, которые можно было бы более адаптировать? Я думал, что рабочие были другими экземплярами, независимыми от виртуальной машины, на которой выполняется приложение.

Ответ №1:

Решением было настроить брандмауэр, добавив правило, разрешающее общение между рабочими.

https://cloud.google.com/dataflow/docs/guides/routes-firewall