#java #google-cloud-platform #parallel-processing #apache-beam
#java #google-облачная платформа #параллельная обработка #apache-beam
Вопрос:
нам нужно выполнить сложные вычисления на простых, но многочисленных данных.
Входные данные — это строки в таблице BigQuery, состоящей из двух столбцов: ID (целое число) и DATA (СТРОКА). Значения ДАННЫХ имеют вид «1#2#3#4#…» с 36 значениями.
Выходные данные имеют ту же форму, но ДАННЫЕ просто преобразуются алгоритмом.
Это преобразование «один к одному».
Мы пробовали Apache Beam с Google Cloud Dataflow, но это не работает, возникают ошибки, как только создается несколько рабочих экземпляров.
Для нашего POC мы используем только 18 тыс. входных строк, целевой показатель составляет около 1 млн.
Вот облегченная версия класса (я удалил часть записи, поведение осталось прежним):
public class MyClass {
static MyService myService = new MyService();
static class ExtractDataFn extends DoFn<TableRow, KV<Long, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
Long id = Long.parseLong((String) c.element().get("ID"));
String data = (String) c.element().get("DATA");
c.output(KV.of(id, data));
}
}
public interface Options extends PipelineOptions {
String getInput();
void setInput(String value);
@Default.Enum("EXPORT")
TypedRead.Method getReadMethod();
void setReadMethod(TypedRead.Method value);
@Validation.Required
String getOutput();
void setOutput(String value);
}
static void run(Options options) {
Pipeline p = Pipeline.create(options);
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("ID").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("DATA").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
PCollection<TableRow> rowsFromBigQuery = p.apply(
BigQueryIO.readTableRows().from(options.getInput()).withMethod(options.getReadMethod())
);
PCollection<KV<Long, String>> inputdata = rowsFromBigQuery.apply(ParDo.of(new ExtractDataFn()));
PCollection<KV<Long, String>> outputData = applyTransform(inputdata);
// Here goes the part where data are written in a BQ table
p.run().waitUntilFinish();
}
static PCollection<KV<Long, String>> applyTransform(PCollection<KV<Long, String>> inputData) {
PCollection<KV<Long, String>> forecasts = inputData.apply(ParDo.of(new DoFn<KV<Long, String>, KV<Long, String>> () {
@ProcessElement
public void processElement(@Element KV<Long, String> element, OutputReceiver<KV<Long, String>> receiver, ProcessContext c) {
MyDto dto = new MyDto();
List<Double> inputData = Arrays.asList(element.getValue().split("#")).stream().map(Double::valueOf).collect(Collectors.toList());
dto.setInputData(inputData);
dto = myService.calculate(dto); // here is the time consuming operation
String modifiedData = dto.getModifiedData().stream().map(Object::toString).collect(Collectors.joining(","));
receiver.output(KV.of(element.getKey(), modifiedData));
}
}))
;
return forecasts;
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
}
В консоли GCP Logs мы видим, что количество рабочих увеличивается до 10, в течение примерно 5 минут оно уменьшается до 3 или 4, а затем у нас появляются такого рода сообщения (их несколько сотен), а CPU составляет около 0%:
Proposing dynamic split of work unit myproject;2020-10-06_06_18_27-12689839210406435299;1231063355075246317 at {"fractionConsumed":0.5,"position":{"shufflePosition":"f_8A_wD_AAAB"}}
и
Operation ongoing in step BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Read for at least 05m00s without outputting or completing in state read-shuffle at app//org.apache.beam.runners.dataflow.worker.ApplianceShuffleReader.readIncludingPosition(Native Method)
Если мы позволим ему запустить, он завершится с ошибкой такого рода :
Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:77)
Если я изменю метод MyService.calculate на более быстрый, все данные обрабатываются только одним работником, и проблем не возникает. Проблема, похоже, возникает только при распараллеливании обработки.
Спасибо за вашу помощь
Комментарии:
1. Прежде всего, почему вы хотите распараллелить обработку? В настоящее время вы столкнулись с узким местом? Не могли бы вы подробнее рассказать о 1 миллионе данных, вы имеете в виду строки? Как строятся строки, какой тип данных — это числа. И, наконец, как форматируется вывод. Пожалуйста, приведите какой-нибудь пример ввода -> вывода
2. Мы хотим распараллелить, чтобы сократить время выполнения, которое в настоящее время является последовательным. В настоящее время это занимает более одного часа, и это слишком долго. Входные данные — это просто строки базы данных с двумя полями, числовым идентификатором и строкой типа 12 #20 # 32 # … содержащей 36 значений. Формат вывода тот же, но числовые значения преобразуются алгоритмом.
3. Требует ли работа, выполняемая в MyService, больших объемов памяти? Если да, можете ли вы проверить наличие ошибок ООМ в журналах.
4. Я не могу найти никаких ошибок ООМ ни в каких журналах. Я думаю, что объем памяти невелик. Не могли бы вы объяснить, что означает сообщение «Предложение динамического разделения рабочей единицы»? Я думаю, что это то, что поток данных пытается сделать и терпит неудачу. Но я не понимаю, что это значит.
5. Другая информация, я попытался выполнить конвейер со следующими параметрами: —diskSizeGb= 250 —workerDiskType=compute.googleapis.com/projects/myproject/zones/europe-west1-d/diskTypes/pd-ssd но это ничего не меняет. Есть ли какие-то атрибуты виртуальной машины, которую я создал для размещения приложения, которые можно было бы более адаптировать? Я думал, что рабочие были другими экземплярами, независимыми от виртуальной машины, на которой выполняется приложение.
Ответ №1:
Решением было настроить брандмауэр, добавив правило, разрешающее общение между рабочими.
https://cloud.google.com/dataflow/docs/guides/routes-firewall