#performance #apache-kafka #apache-kafka-connect #hana
#Производительность #апачи-кафка #apache-kafka-connect #хана
Вопрос:
Я использую кластер kafka, состоящий из 3 хранителей зоопарка, 3 брокеров, 3 экземпляров connect и схемы-реестра.
экземпляры broker и connect представляют собой машины aws ec2 c5.2xlarge с 8 ядрами и 16 ГБ оперативной памяти.
версии kafka и java:
kafka-dump-log --version
5.5.3-ccs (Commit:82571ebfbfc60c9c)
java --version
openjdk 11.0.7 2020-04-14 LTS
OpenJDK Runtime Environment 18.9 (build 11.0.7 10-LTS)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.7 10-LTS, mixed mode, sharing)
Итак, я публикую 25 исходных соединителей SAP HANA, каждый соединитель использует одну таблицу из базы данных и записывает ее в одну тему с одним разделом.
Проблема в том, что я не получаю все таблицы consumesd, и процессоры, похоже, всегда работают на полную мощность. Если я отключу все разъемы и опубликую один из тех разъемов, которые ранее ничего не потребляли, в конечном итоге он сделает это примерно через 1 час.
вот пример конфигурации одного из этих разъемов:
{
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"connection.password": "xxxx",
"dialect.name": "SapHanaDatabaseDialect",
"connection.user": "xxx",
"numeric.mapping": "none",
"connection.url": "jdbc:sap://xxx",
"value.converter.schema.registry.url": "schema_url",
"value.converter.enhanced.avro.schema.support": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.enhanced.avro.schema.support": "true",
"key.converter.schema.registry.url": "schema_url",
"transforms": "castBinary,AddNamespace,TimestampConverter",
"transforms.castBinary.type": "com.costumecast.kafka.connect.transforms.Cast$Value",
"transforms.castBinary.spec": "SOME_COLUMN:string",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "string",
"transforms.TimestampConverter.field": "SOME_COLUMN_2",
"transforms.AddNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.AddNamespace.schema.name": "com.agcs.kafka.avro.TEST_TOPIC",
"tasks.max": "1",
"mode": "incrementing",
"topics": "TEST_TOPIC",
"TEST_TOPIC.table.name": ""database"."view.test_table"",
"TEST_TOPIC.incrementing.column.name": "SOME_COLUMN_2",
"batch.max.rows": "0",
"TEST_TOPIC.poll.interval.ms": "144000000"
}
в файле /etc/kafka/connect-distributed.properties нет ничего особенного:
bootstrap.servers=xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
group.id=STREAMS_SINK_dev_connect_cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://xxxx:xxxx
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://xxxx:xxxx
config.storage.topic=_STREAMS_SINK_dev_connect_configs
offset.storage.topic=_STREAMS_SINK_dev_connect_offsets
status.storage.topic=_STREAMS_SINK_dev_connect_statuses
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
plugin.path=/usr/share/java/
# Connect worker
security.protocol=SASL_SSL
ssl.truststore.location=/var/kafka-ssl/client.truststore.jks
ssl.truststore.password=xxx
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxx"
password="xxx";
# Embedded producer for source connectors
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=/var/kafka-ssl/client.truststore.jks
producer.ssl.truststore.password=xxx
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxx"
password="xxx";
# Embedded consumer for sink connectors
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=/var/kafka-ssl/client.truststore.jks
consumer.ssl.truststore.password=xxx
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxx"
password="xxx";
вот скриншот использования процессоров на разъемах.
Мой вопрос в том, почему производительность такая низкая? (таблицы в исходной базе данных невелики, все записи менее 1 МЛН) являются ли 25 соединителей слишком большими в этом контексте? или я наблюдал за каким-то параметром в конфигурации. Кроме того, почему загрузка процессора не успокаивается, когда в 22 таблицах больше нет данных для выборки, а остальные 3 таблицы просто ничего не делают?
был бы признателен за ваши идеи.
Комментарии:
1. Является ли высокая загрузка процессора только во время фотосъемки или публикации? Как скоро вы проводите опрос по каждому событию: poll.interval.ms ? Сколько записей вы ожидаете в среднем в секунду для каждого соединителя? Если вариант использования допускает это, вам следует указать больший пакет и больший опрос.
2. нет, высокая загрузка процессора постоянна. poll.interval.ms = 144000000 (40 часов) «batch.max.rows»: «0» означает собрать все в таблице (в каждой таблице менее 1 млн записей)
3. Если вы увеличите ведение журнала для подключаемых работников, получаете ли вы представление о том, чем они заняты? Вы можете настроить таргетинг на определенных регистраторов, например
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/admin/loggers/com.sap -d '{"level": "TRACE"}'
4. когда я отправляю запрос, я получаю
["com.sap","com.sap.kafka.client.hana.HANAJdbcClient","com.sap.kafka.connect.source.hana.HANASourceTask","com.sap.kafka.connect.source.querier.IncrColTableQuerier","com.sap.kafka.utils.ExecuteWithExceptions$","com.sap.kafka.utils.hana.HANAJdbcTypeConverter$"]
ответ. Но я не получаю никаких сообщений о трассировке в файлах журнала. Только информация `сбрасывает 0 выдающихся сообщений для фиксации смещения (org.apache. kafka.connect.runtime. WorkerSourceTask: 447)` иXMT: Solicit on eth0, interval 111770ms.
в основном