# #elasticsearch #google-bigquery #apache-beam
Вопрос:
У меня есть конвейер потока данных Google Cloud, который считывает данные из кластера Elasticsearch 5.x в AWS.
Общие этапы обработки в моем конвейере-чтение из Elasticsearch и запись в BigQuery.
Код, который я использую, похож на приведенный ниже код
pipeline.apply( ElasticsearchIO.read().withConnectionConfiguration(...) .withBatchSize(10000) .withQuery("lt;small-querygt;")
Теперь, когда я запускаю свою lt;small-querygt;
программу непосредственно в Кибане, она возвращает 700 документов. Однако при развертывании моего конвейера потоков данных требуется:
- очень много времени, чтобы прочитать этот небольшой запрос (скажем, около ~15 минут)
- он читает МНОГО дубликатов документов. Я заметил это, когда наблюдал график выполнения в консоли потока данных. Шаг чтения Elasticsearch показывает ~70 тыс. элементов, добавленных в конвейер по завершении. Я подтвердил, что есть тонны повторяющихся строк, когда я также посмотрел на свою таблицу BigQuery.
Теперь у меня есть пара вопросов о моем трубопроводе:
- Как мне добиться точной однократной обработки/чтения с помощью операции чтения Elasticsearch?
- Должен ли я реализовать шаг дедупликации после чтения из Elasticsearch?
Мы высоко ценим любую помощь в улучшении целостности данных и обработки конвейера.