Проблема конфигурации приемника подключения Кафки — «Игнорирование недопустимой задачи при условии смещения-раздел не назначен»

# #google-bigquery #apache-kafka-connect

Вопрос:

Мы пытаемся запустить kafka connect worker в GCP с помощью kubernetes с одним исходным соединителем, настроенным на Postgresql, одним соединителем приемника, синхронизирующимся с BigQuery, и управляемым объединением kafka. Разделы Кафки для смещений, конфигурации и статуса настраиваются в соответствии со спецификацией с 25, 1, 5 разделами соответственно, политикой компактной очистки и сроком хранения 7 дней.

Соединители запускаются через REST API. Швы соединителя источника работают нормально, но соединитель приемника через некоторое время начинает регистрировать эти предупреждения:

 [2021-09-06 08:13:12,429] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-1/OffsetAndMetadata{offset=500, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.schema.table-1, com_sync_master_dev.schema.table-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)

 

Кроме того, каждый перезапуск соединителя приемника начинается с самого начала, как будто он не может прочитать смещение для начала.

Перед проблемой брокер теряет соединение, соединитель останавливается, затем запускается перебалансировка.

 
2021-09-09 07:55:51,291] INFO [Worker clientId=connect-1, groupId=database-sync] Group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,295] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:55:51,295] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 5: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:55:51,298] INFO [Worker clientId=connect-1, groupId=database-sync] Discovered group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,300] DEBUG Putting 500 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask)
[2021-09-09 07:55:51,301] INFO [Worker clientId=connect-1, groupId=database-sync] Discovered group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,302] INFO [Worker clientId=connect-1, groupId=database-sync] Group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:56,732] DEBUG re-attempting insertion (com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter)
[2021-09-09 07:55:56,735] DEBUG table insertion completed successfully (com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter)
[2021-09-09 07:55:56,739] DEBUG Wrote 500 rows over 1 successful calls and 0 failed calls. (com.wepay.kafka.connect.bigquery.write.batch.TableWriter)
[2021-09-09 07:55:56,736] INFO [Worker clientId=connect-1, groupId=database-sync] Broker coordinator was unreachable for 3000ms. Revoking previous assignment Assignment{error=0, leader='connect-1-fd48e893-1729-4df4-8d1e-3370c1e76e1f', leaderUrl='http://confluent-bigquery-connect:8083/', offset=555, connectorIds=[master-gcp-bq-sink, master-gcp-source], taskIds=[master-gcp-bq-sink-0, master-gcp-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)

 

Смещения для соединителя приемника всегда перезапускаются с 0, и WorkerSinkTask пропускает последнюю фиксацию, журналы:

 [2021-09-09 07:29:25,177] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, no change since last commit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:29:25,177] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1345: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,281] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Initializing and starting task for topics com_sync_master_dev.someshema.sometable (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,300] INFO WorkerSinkTask{id=master-gcp-bq-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,595] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Partitions assigned [com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,795] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-1 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,817] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:51:39,308] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:51:39,308] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:52:39,355] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:52:39,355] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
...
[2021-09-09 08:01:03,158] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Initializing and starting task for topics com_sync_master_dev.someshema.sometable (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,168] INFO WorkerSinkTask{id=master-gcp-bq-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,381] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Partitions assigned [com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,410] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-1 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,762] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:02:03,145] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:02:03,145] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
....
[2021-09-09 08:09:17,085] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-0/OffsetAndMetadata{offset=395300, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:09:17,085] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-1/OffsetAndMetadata{offset=380428, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
 

Source configuration:

 {
"name": "master-gcp-source",
"config": {
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "plugin.name": "pgoutput",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.basic.auth.credentials.source": "******",
  "key.converter.schema.registry.basic.auth.user.info":"*****",
  "key.converter.schema.registry.url": "https://************.gcp.confluent.cloud",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "errors.tolerance": "none",
  "errors.deadletterqueue.topic.name":"dlq_postgres_source",
  "errors.deadletterqueue.topic.replication.factor": 1,
  "errors.deadletterqueue.context.headers.enable":true,
  "errors.log.enable":true,
  "errors.log.include.messages":true,
  "value.converter.basic.auth.credentials.source": "******",
  "value.converter.schema.registry.basic.auth.user.info":"***************",
  "value.converter.schema.registry.url": "https://************.gcp.confluent.cloud",
  "transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
  "database.hostname": "hostname",
  "database.port": "5432",
  "database.user": "some_db_user",
  "database.password": "***********",
  "database.dbname" : "master",
  "database.server.name": "com_master_dev",
  "database.sslmode": "require",
  "table.include.list": "schema.table",
  "table.ignore.builtin": true,
  "heartbeat.interval.ms": "5000",
  "tasks.max": "1",
  "slot.drop.on.stop": false,
  "xmin.fetch.interval.ms": 0,
  "interval.handling.mode": "numeric",
  "binary.handling.mode": "bytes",
  "sanitize.field.names": true,
  "slot.max.retries":6,
  "slot.retry.delay.ms": 10000,
  "event.processing.failure.handling.mode": "fail",
  "slot.name": "debezium",
  "publication.name": "dbz_publication",
  "decimal.handling.mode": "precise",
  "snapshot.lock.timeout.ms": "10000",
  "snapshot.mode":"initial",
  "output.data.format": "AVRO",
  "transforms": "unwrap",
  "offset.flush.interval.ms": "0",
  "offset.flush.timeout.ms" : "20000",
  "max.batch.size": "1024",
  "max.queue.size":"4096",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
 

Конфигурация раковины:

 {
"name": "master-gcp-bq-sink",
"config": {
  "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
  "tasks.max": "1",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.basic.auth.credentials.source": "*********",
  "key.converter.schema.registry.basic.auth.user.info":"************",
  "key.converter.schema.registry.url": "https://*********.europe-west3.gcp.confluent.cloud",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.basic.auth.credentials.source": "*******",
  "value.converter.schema.registry.basic.auth.user.info":"****************************",
  "value.converter.schema.registry.url": "https://*********.europe-west3.gcp.confluent.cloud",
  "config.action.reload": "restart",
  "topics": "com_master_dev.schema.table",
  "project": "dev",
  "defaultDataset": "schema",
  "keyfile": "{********}",
  "keySource": "JSON",
  "errors.tolerance": "none",
  "errors.deadletterqueue.topic.name":"dlq_bigquery_sink",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable":true,
  "errors.log.enable":true,
  "errors.log.include.messages":true,
   "data.format":"AVRO",
  "upsertEnabled": true,
  "deleteEnabled": false,
  "allowNewBigQueryFields": "true",
  "sanitizeTopics": true,
  "sanitizeFieldNames": true,
  "autoCreateTables": true,
  "timePartitioningType": "DAY",
  "kafkaKeyFieldName":"key_placeholder",
  "mergeIntervalMs": "60000",
  "mergeRecordsThreshold": "-1",
  "transforms": "unwrap",
  "consumer.override.session.timeout.ms":"60000",
  "consumer.override.fetch.max.bytes": "1048576",
  "consumer.override.request.timeout.ms":"60000",   
  "consumer.override.reconnect.backoff.max.ms":"10000",
  "consumer.override.reconnect.backoff.ms":"250",
  "consumer.override.partition.assignment.strategy":"org.apache.kafka.clients.consumer.CooperativeStickyAssignor", // also tried with RoundRobinAssignor
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms": "RegexTransformation",
  "transforms.RegexTransformation.type":"org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.RegexTransformation.regex":"(com_sync_master_dev.schema.)(.*)",
  "transforms.RegexTransformation.replacement": "$2"
}
}
 

Что мы упускаем? Как избежать недопустимых смещений задач и убедиться, что соединитель приемника продолжается с предыдущего смещения?

Ответ №1:

Поэтому в Кафке вам нужно настроить группу потребителей для потребителя темы, чтобы воспользоваться смещением. В противном случае неисправный приемник просто возродится и не будет знать, с какого смещения он считывал, когда был жив в последний раз.

Когда член потребительской группы читает из темы, он сообщает о своем прогрессе брокеру, чтобы он знал, откуда он прочитал.

Похоже, что фиксация смещения по какой-то причине пропущена, и именно поэтому приемник всегда начинается со смещения 0.