Настройка моего сериализатора Apache Storm для повышения производительности

#performance #serialization #apache-storm #kryo

#Производительность #сериализация #apache-storm #kryo

Вопрос:

я новичок в Java и Apache Storm, и я хочу знать, как я могу ускорить работу! Я настраиваю кластер Storm с 2 физическими машинами по 8 ядер на каждой. Кластер работает отлично. Я настраиваю следующую тестовую топологию для измерения производительности:

 builder.setSpout("spout", new RandomNumberSpoutSingle(sizeOfArray), 10);
builder.setBolt("null", new NullBolt(), 4).allGrouping("spout");
 

RandomNumberSpoutSingle создает массив следующим образом:

   ArrayList<Integer> array = new ArrayList<Integer>();
 

Я заполняю его целыми числами sizeOfArray. Этот массив в сочетании с идентификатором создает мой кортеж.
Теперь я измеряю, сколько кортежей в секунду поступает в bolt с помощью allGrouping (я смотрю на «переданное» значение Storm GUI).

Если я поставлю sizeOfArray = 1024, будет отправлено около 173000 кортежей в секунду. Поскольку 1 кортеж должен составлять около 4 * 1024 байт, перемещается около 675 МБ / с.

Я прав до сих пор? Теперь мой вопрос: способен ли Storm / Kryo двигаться дальше? Как я могу это настроить? Есть ли настройки, которые я проигнорировал? Я хочу сериализовать больше кортежей в секунду! Если я использую локальную перетасовку, значения взлетают до небес, потому что ничего не нужно сериализовать, но мне нужны кортежи для всех рабочих. Ни процессор, ни память, ни сеть не заняты полностью.

Ответ №1:

Я думаю, что вы правильно рассчитали, хотя я не уверен, учитываются ли при сериализации накладные расходы Java для непримитивного целочисленного типа, что добавило бы еще несколько байтов к уравнению. Тем не менее, я также не уверен, что это лучший способ анализа производительности storm, поскольку это больше измеряется количеством кортежей в секунду, чем пропускной способностью.

Storm встроил сериализацию для примитивных типов, строк, байтовых массивов, ArrayList, HashMap и HashSet (source). Когда я программирую Java для максимальной производительности, я стараюсь максимально использовать примитивные типы. Было бы целесообразно использовать int[] вместо ArrayList<Integer> ? Я ожидал бы получить некоторую производительность от этого, если это возможно в вашей настройке.

Учитывая вышеупомянутые типы, которые storm может сериализовать «из коробки», я бы, скорее всего, воздержался от попыток улучшить производительность сериализации. Я предполагаю, что kryo довольно оптимизирован, и здесь будет очень сложно добиться чего-либо быстрее. Я также не уверен, является ли сериализация настоящим узким местом здесь или, скорее, что-то в вашей настройке топологии (см. Ниже).

Я бы посмотрел на другие настраиваемые параметры, которые связаны с внутри- и межрабочим взаимодействием. Хороший обзор можно найти здесь . В одной топологии, для которой производительность имеет решающее значение, я использую следующий установочный код для настройки таких параметров. Что лучше всего работает в вашем случае, нужно выяснить с помощью тестирования.

 int topology_executor_receive_buffer_size = 32768; // intra-worker messaging, default: 32768
int topology_transfer_buffer_size = 2048; // inter-worker messaging, default: 1000
int topology_producer_batch_size = 10; // intra-worker batch, default: 1
int topology_transfer_batch_size = 20; // inter-worker batch, default: 1
int topology_batch_flush_interval_millis = 10; // flush tuple creation ms, default: 1
double topology_stats_sample_rate = 0.001; // calculate metrics every 1000 messages, default: 0.05
conf.put("topology.executor.receive.buffer.size", topology_executor_receive_buffer_size);
conf.put("topology.transfer.buffer.size", topology_transfer_buffer_size);
conf.put("topology.producer.batch.size", topology_producer_batch_size);
conf.put("topology.transfer.batch.size", topology_transfer_batch_size);
conf.put("topology.batch.flush.interval.millis", topology_batch_flush_interval_millis);
conf.put("topology.stats.sample.rate", topology_stats_sample_rate);
 

Как вы заметили, производительность значительно возрастает, когда storm может использовать внутрирабочую обработку, поэтому я бы всегда советовал использовать это, если это возможно. Вы уверены, что вам нужна allGrouping? Если нет, я бы предложил использовать группировку в случайном порядке, которая фактически будет использовать локальную связь, если storm сочтет это целесообразным, если topology.disable.loadaware.messaging не установлено значение false . Я не уверен, будет ли allGrouping использовать локальную связь для тех компонентов, которые находятся на одном и том же работнике.

Еще одна вещь, которая меня интересует, — это конфигурация вашей топологии: у вас есть 10 носиков и 4 потребительских болта. Если болты не потребляют входящие кортежи намного быстрее, чем они создаются, может быть целесообразно использовать равное число для обоих компонентов. Из того, как вы описываете свой процесс, кажется, что вы используете подтверждение и сбой, потому что вы написали, что назначаете идентификатор своим кортежам. В случае, если гарантированная обработка отдельных кортежей не является абсолютным требованием, производительность, вероятно, можно повысить, переключившись на не привязанные кортежи. Подтверждение и сбой приводят к некоторым накладным расходам, поэтому я бы предположил более высокую пропускную способность кортежа, если он отключен.

И, наконец, вы также можете поэкспериментировать со значением максимального количества ожидающих кортежей (настраивается с помощью method .setMaxSpoutPending из носиков). Не уверен, что storm использует по умолчанию, однако, исходя из моего опыта, установка немного большего числа, чем то, что болты могут принимать вниз по потоку, обеспечивает более высокую пропускную способность. Посмотрите на емкость метрик и количество переданных кортежей в пользовательском интерфейсе storm.