Storm Kafka не распараллеливается должным образом

#apache-kafka #apache-storm

#apache-kafka #apache-storm

Вопрос:

У нас возникла проблема, касающаяся параллелизма задач внутри одной топологии. Нам не удается добиться хорошей, плавной скорости обработки.

Мы используем Kafka и Storm для построения системы с разными топологиями, где данные обрабатываются по цепочке топологий, связанных с использованием тем Kafka.

Мы используем Kafka 1.0.0 и Storm 1.2.1.

Загрузка невелика по количеству сообщений, около 2000 в день, однако каждая задача может занимать довольно много времени. В частности, для обработки каждой задачи в одной топологии может потребоваться разное количество времени, обычно от 1 до 20 минут. При последовательной обработке пропускной способности недостаточно для обработки всех входящих сообщений. Все топологии и система Kafka установлены на одном компьютере (16 ядер, 16 ГБ оперативной памяти).

Поскольку сообщения независимы и могут обрабатываться параллельно, мы пытаемся использовать параллельные возможности Storm для повышения пропускной способности.

Для этого топология была настроена следующим образом:

  • 4 рабочих
  • значение подсказки параллелизма равно 10
  • Размер сообщения при чтении из Kafka достаточно велик, чтобы прочитать около 8 задач в каждом сообщении.
  • В разделах Kafka используется коэффициент репликации = 1 и разделы = 10.

При такой конфигурации мы наблюдаем следующее поведение в этой топологии.

  • Топология Storm считывает из Kafka около 7-8 задач за один пакет (размер задачи не фиксирован), максимальный размер сообщения 128 КБ.
  • Одновременно вычисляется около 4-5 задач. Работа более или менее равномерно распределена между работниками. Некоторые рабочие выполняют 1 задачу, другие — 2 и обрабатывают их одновременно.
  • По мере завершения задач запускаются оставшиеся задачи.
  • Проблема нехватки ресурсов возникает, когда для обработки остается всего 1-2 задачи. Другие рабочие ожидают простоя, пока все задачи не будут завершены.
  • Когда все задачи завершены, сообщение подтверждается и отправляется на следующую топологию.
  • Из Kafka считывается новый пакет, и процесс запускается снова.

У нас есть две основные проблемы. Во-первых, даже с 4 рабочими элементами и 10 подсказками о параллелизме запускаются только 4-5 задач. Во-вторых, больше никакие пакеты не запускаются, пока есть незавершенная работа, даже если это всего лишь 1 задача.

Проблема не в том, что у нас недостаточно работы, поскольку мы пытались вставить 2000 задач в начале, так что работы еще много.

Мы попытались увеличить параметр «maxSpoutsPending», ожидая, что топология будет считывать больше пакетов и ставить их в очередь одновременно, но, похоже, они передаются по конвейеру внутри, а не обрабатываются одновременно.

Топология создается с использованием следующего кода:

 private static StormTopology buildTopologyOD() {
    //This is the marker interface BrokerHosts.
    BrokerHosts hosts = new ZkHosts(configuration.getProperty(ZKHOSTS));
    TridentKafkaConfig tridentConfigCorrelation = new TridentKafkaConfig(hosts, configuration.getProperty(TOPIC_FROM_CORRELATOR_NAME));

    tridentConfigCorrelation.scheme = new RawMultiScheme();
    tridentConfigCorrelation.fetchSizeBytes = Integer.parseInt(configuration.getProperty(MAX_SIZE_BYTES_CORRELATED_STREAM));

    OpaqueTridentKafkaSpout spoutCorrelator = new OpaqueTridentKafkaSpout(tridentConfigCorrelation);

    TridentTopology topology = new TridentTopology();

    Stream existingObject = topology.newStream("kafka_spout_od1", spoutCorrelator)
            .shuffle()
            .each(new Fields("bytes"), new ProcessTask(), new Fields(RESULT_FIELD, OBJECT_FIELD))
            .parallelismHint(Integer.parseInt(configuration.getProperty(PARALLELISM_HINT)));

    //Create a state Factory to produce outputs to kafka topics.
    TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
            .withProducerProperties(kafkaProperties)
            .withKafkaTopicSelector(new ODTopicSelector())
            .withTridentTupleToKafkaMapper(new ODTupleToKafkaMapper());

    existingObject.partitionPersist(stateFactory, new Fields(RESULT_FIELD, OBJECT_FIELD), new TridentKafkaUpdater(), new Fields(OBJECT_FIELD));

    return topology.build();
}
  

и конфигурация создана как:

 private static Config createConfig(boolean local) {
    Config conf = new Config();
    conf.setMaxSpoutPending(1); // Also tried 2..6
    conf.setNumWorkers(4);

    return conf;
}
  

Можем ли мы что-нибудь сделать для повышения производительности, либо за счет увеличения количества параллельных задач, либо / и избежания «голодания» при завершении обработки пакета?

Ответ №1:

Я нашел старый пост о storm-пользователях Натана Марца, касающийся настройки параллелизма для Trident:

Я рекомендую использовать функцию «name» для присвоения имен частям вашего потока, чтобы пользовательский интерфейс показывал вам, какие болты каким разделам соответствуют.

Trident упаковывает операции в как можно меньшее количество блоков. Кроме того, он никогда не перераспределяет ваш поток, если вы не выполнили операцию, которая явно включает перераспределение (например, shuffle, groupBy, partitionBy, глобальную агрегацию и т.д.). Это свойство Trident гарантирует, что вы можете контролировать порядок / полуупорядоченность того, как обрабатываются данные. Таким образом, в этом случае все, что находится перед groupBy, должно иметь одинаковый параллелизм, иначе Trident пришлось бы перераспределять поток. И поскольку вы не сказали, что хотите перераспределить поток, он не может этого сделать. Вы можете получить другой параллелизм для spout по сравнению каждый из них следует за введением операции перераспределения, вот так:

stream.parallelismHint(1).shuffle().each(…).каждый (…).parallelismHint(3).groupBy(…);

Я думаю, вы могли бы захотеть установить parallelismHint как для вашего spout, так и для вашего .each .

Что касается одновременной обработки нескольких пакетов, вы правы, что это то, для чего maxSpoutPending предназначено в Trident. Попробуйте проверить в пользовательском интерфейсе Storm, действительно ли выбрано ваше максимальное ожидающее значение потока. Также попробуйте включить ведение журнала отладки для MasterBatchCoordinator. Из этого протоколирования вы должны быть в состоянии определить, находятся ли несколько пакетов в процессе выполнения одновременно или нет.

Когда вы говорите, что несколько пакетов не обрабатываются одновременно, вы имеете в виду ProcessTask? Имейте в виду, что одним из свойств Trident является то, что обновления состояния упорядочиваются с учетом пакетов. Если у вас, например, maxSpoutPending = 3 и пакеты 1, 2 и 3 находятся в процессе выполнения, Trident не будет отправлять больше пакетов для обработки, пока не будет записан пакет 1, после чего он отправит еще один. Таким образом, медленные пакеты могут блокировать передачу большего количества данных, даже если 2 и 3 полностью обработаны, им приходится ждать завершения 1 и записи.

Если вам не нужно пакетное и упорядочивающее поведение Trident, вы могли бы попробовать обычный Storm вместо этого.

Скорее побочное замечание, но вы, возможно, захотите рассмотреть возможность перехода с storm-kafka на storm-kafka-client . Это не важно для этого вопроса, но вы не сможете перейти на Kafka 2.x, не сделав этого, и это проще, прежде чем вы получите кучу состояний для миграции.

Комментарии:

1. Спасибо за ваш комментарий. Я попытался установить parallelismHint также, как вы сказали, с теми же результатами. Значение для maxSpoutPending подбирается, но, по-видимому, не используется, или, по крайней мере, не так, как я ожидал. Мы используем Trident, потому что, AFAIK, обычная топология может не обрабатывать некоторые задачи или обрабатывать некоторые более одного раза. Я бы не возражал, чтобы Trident блокировал эмиссию, если они обрабатываются и отправляются в конце, но, похоже, они даже не обрабатываются.

2. В таком случае я не уверен, в чем проблема. Попробуйте использовать список рассылки storm-user, там, вероятно, найдутся люди, у которых есть опыт настройки топологий Trident storm.apache.org/getting-help.html

3. Хорошо, я попробую! Спасибо