Проблема с производительностью исходного разъема Kafka JDBC — постоянное полное использование процессоров

#performance #apache-kafka #apache-kafka-connect #hana

#Производительность #апачи-кафка #apache-kafka-connect #хана

Вопрос:

Я использую кластер kafka, состоящий из 3 хранителей зоопарка, 3 брокеров, 3 экземпляров connect и схемы-реестра.

экземпляры broker и connect представляют собой машины aws ec2 c5.2xlarge с 8 ядрами и 16 ГБ оперативной памяти.

версии kafka и java:

 kafka-dump-log --version
5.5.3-ccs (Commit:82571ebfbfc60c9c)

java --version
openjdk 11.0.7 2020-04-14 LTS
OpenJDK Runtime Environment 18.9 (build 11.0.7 10-LTS)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.7 10-LTS, mixed mode, sharing)
 

Итак, я публикую 25 исходных соединителей SAP HANA, каждый соединитель использует одну таблицу из базы данных и записывает ее в одну тему с одним разделом.

Проблема в том, что я не получаю все таблицы consumesd, и процессоры, похоже, всегда работают на полную мощность. Если я отключу все разъемы и опубликую один из тех разъемов, которые ранее ничего не потребляли, в конечном итоге он сделает это примерно через 1 час.

вот пример конфигурации одного из этих разъемов:

 {
    "connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
    "connection.password": "xxxx",
    "dialect.name": "SapHanaDatabaseDialect",
    "connection.user": "xxx",
    "numeric.mapping": "none",
    "connection.url": "jdbc:sap://xxx",

    "value.converter.schema.registry.url": "schema_url",
    "value.converter.enhanced.avro.schema.support": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",

    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.enhanced.avro.schema.support": "true",
    "key.converter.schema.registry.url": "schema_url",

    "transforms": "castBinary,AddNamespace,TimestampConverter",
    "transforms.castBinary.type": "com.costumecast.kafka.connect.transforms.Cast$Value",
    "transforms.castBinary.spec": "SOME_COLUMN:string",

    "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.target.type": "string",
    "transforms.TimestampConverter.field": "SOME_COLUMN_2",

    "transforms.AddNamespace.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
    "transforms.AddNamespace.schema.name": "com.agcs.kafka.avro.TEST_TOPIC",

    "tasks.max": "1",
    "mode": "incrementing",

    "topics": "TEST_TOPIC",
    "TEST_TOPIC.table.name": ""database"."view.test_table"",
    "TEST_TOPIC.incrementing.column.name": "SOME_COLUMN_2",

    "batch.max.rows": "0",
    "TEST_TOPIC.poll.interval.ms": "144000000"

  }
 

в файле /etc/kafka/connect-distributed.properties нет ничего особенного:

 bootstrap.servers=xxxx:xxxx,xxxx:xxxx,xxxx:xxxx

group.id=STREAMS_SINK_dev_connect_cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://xxxx:xxxx
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://xxxx:xxxx

config.storage.topic=_STREAMS_SINK_dev_connect_configs
offset.storage.topic=_STREAMS_SINK_dev_connect_offsets
status.storage.topic=_STREAMS_SINK_dev_connect_statuses

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

plugin.path=/usr/share/java/
# Connect worker
security.protocol=SASL_SSL
ssl.truststore.location=/var/kafka-ssl/client.truststore.jks
ssl.truststore.password=xxx
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required 
  username="xxx" 
  password="xxx";

# Embedded producer for source connectors
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=/var/kafka-ssl/client.truststore.jks
producer.ssl.truststore.password=xxx
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required 
  username="xxx" 
  password="xxx";

# Embedded consumer for sink connectors
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=/var/kafka-ssl/client.truststore.jks
consumer.ssl.truststore.password=xxx
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required 
  username="xxx" 
  password="xxx"; 
 

вот скриншот использования процессоров на разъемах.

введите описание изображения здесь

Мой вопрос в том, почему производительность такая низкая? (таблицы в исходной базе данных невелики, все записи менее 1 МЛН) являются ли 25 соединителей слишком большими в этом контексте? или я наблюдал за каким-то параметром в конфигурации. Кроме того, почему загрузка процессора не успокаивается, когда в 22 таблицах больше нет данных для выборки, а остальные 3 таблицы просто ничего не делают?

был бы признателен за ваши идеи.

Комментарии:

1. Является ли высокая загрузка процессора только во время фотосъемки или публикации? Как скоро вы проводите опрос по каждому событию: poll.interval.ms ? Сколько записей вы ожидаете в среднем в секунду для каждого соединителя? Если вариант использования допускает это, вам следует указать больший пакет и больший опрос.

2. нет, высокая загрузка процессора постоянна. poll.interval.ms = 144000000 (40 часов) «batch.max.rows»: «0» означает собрать все в таблице (в каждой таблице менее 1 млн записей)

3. Если вы увеличите ведение журнала для подключаемых работников, получаете ли вы представление о том, чем они заняты? Вы можете настроить таргетинг на определенных регистраторов, например curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/admin/loggers/com.sap -d '{"level": "TRACE"}'

4. когда я отправляю запрос, я получаю ["com.sap","com.sap.kafka.client.hana.HANAJdbcClient","com.sap.kafka.connect.source.hana.HANASourceTask","com.sap.kafka.connect.source.querier.IncrColTableQuerier","com.sap.kafka.utils.ExecuteWithExceptions$","com.sap.kafka.utils.hana.HANAJdbcTypeConverter$"] ответ. Но я не получаю никаких сообщений о трассировке в файлах журнала. Только информация `сбрасывает 0 выдающихся сообщений для фиксации смещения (org.apache. kafka.connect.runtime. WorkerSourceTask: 447)` и XMT: Solicit on eth0, interval 111770ms. в основном