# #google-bigquery #apache-kafka-connect
Вопрос:
Мы пытаемся запустить kafka connect worker в GCP с помощью kubernetes с одним исходным соединителем, настроенным на Postgresql, одним соединителем приемника, синхронизирующимся с BigQuery, и управляемым объединением kafka. Разделы Кафки для смещений, конфигурации и статуса настраиваются в соответствии со спецификацией с 25, 1, 5 разделами соответственно, политикой компактной очистки и сроком хранения 7 дней.
Соединители запускаются через REST API. Швы соединителя источника работают нормально, но соединитель приемника через некоторое время начинает регистрировать эти предупреждения:
[2021-09-06 08:13:12,429] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-1/OffsetAndMetadata{offset=500, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.schema.table-1, com_sync_master_dev.schema.table-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
Кроме того, каждый перезапуск соединителя приемника начинается с самого начала, как будто он не может прочитать смещение для начала.
Перед проблемой брокер теряет соединение, соединитель останавливается, затем запускается перебалансировка.
2021-09-09 07:55:51,291] INFO [Worker clientId=connect-1, groupId=database-sync] Group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,295] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:55:51,295] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 5: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:55:51,298] INFO [Worker clientId=connect-1, groupId=database-sync] Discovered group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,300] DEBUG Putting 500 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask)
[2021-09-09 07:55:51,301] INFO [Worker clientId=connect-1, groupId=database-sync] Discovered group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:51,302] INFO [Worker clientId=connect-1, groupId=database-sync] Group coordinator *************.europe-west3.gcp.confluent.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-09-09 07:55:56,732] DEBUG re-attempting insertion (com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter)
[2021-09-09 07:55:56,735] DEBUG table insertion completed successfully (com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter)
[2021-09-09 07:55:56,739] DEBUG Wrote 500 rows over 1 successful calls and 0 failed calls. (com.wepay.kafka.connect.bigquery.write.batch.TableWriter)
[2021-09-09 07:55:56,736] INFO [Worker clientId=connect-1, groupId=database-sync] Broker coordinator was unreachable for 3000ms. Revoking previous assignment Assignment{error=0, leader='connect-1-fd48e893-1729-4df4-8d1e-3370c1e76e1f', leaderUrl='http://confluent-bigquery-connect:8083/', offset=555, connectorIds=[master-gcp-bq-sink, master-gcp-source], taskIds=[master-gcp-bq-sink-0, master-gcp-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
Смещения для соединителя приемника всегда перезапускаются с 0, и WorkerSinkTask пропускает последнюю фиксацию, журналы:
[2021-09-09 07:29:25,177] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, no change since last commit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:29:25,177] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1345: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,281] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Initializing and starting task for topics com_sync_master_dev.someshema.sometable (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,300] INFO WorkerSinkTask{id=master-gcp-bq-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,595] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Partitions assigned [com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,795] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-1 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:50:39,817] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:51:39,308] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:51:39,308] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:52:39,355] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 07:52:39,355] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
...
[2021-09-09 08:01:03,158] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Initializing and starting task for topics com_sync_master_dev.someshema.sometable (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,168] INFO WorkerSinkTask{id=master-gcp-bq-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,381] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Partitions assigned [com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,410] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-1 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:01:03,762] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Assigned topic partition com_sync_master_dev.someshema.sometable-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:02:03,145] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Skipping offset commit, task opted-out by returning no offsets from preCommit (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:02:03,145] DEBUG WorkerSinkTask{id=master-gcp-bq-sink-0} Finished offset commit successfully in 0 ms for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
....
[2021-09-09 08:09:17,085] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-0/OffsetAndMetadata{offset=395300, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-09-09 08:09:17,085] WARN WorkerSinkTask{id=master-gcp-bq-sink-0} Ignoring invalid task provided offset sometable-1/OffsetAndMetadata{offset=380428, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[com_sync_master_dev.someshema.sometable-1, com_sync_master_dev.someshema.sometable-0] (org.apache.kafka.connect.runtime.WorkerSinkTask)
Source configuration:
{
"name": "master-gcp-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.basic.auth.credentials.source": "******",
"key.converter.schema.registry.basic.auth.user.info":"*****",
"key.converter.schema.registry.url": "https://************.gcp.confluent.cloud",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"errors.tolerance": "none",
"errors.deadletterqueue.topic.name":"dlq_postgres_source",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable":true,
"errors.log.enable":true,
"errors.log.include.messages":true,
"value.converter.basic.auth.credentials.source": "******",
"value.converter.schema.registry.basic.auth.user.info":"***************",
"value.converter.schema.registry.url": "https://************.gcp.confluent.cloud",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"database.hostname": "hostname",
"database.port": "5432",
"database.user": "some_db_user",
"database.password": "***********",
"database.dbname" : "master",
"database.server.name": "com_master_dev",
"database.sslmode": "require",
"table.include.list": "schema.table",
"table.ignore.builtin": true,
"heartbeat.interval.ms": "5000",
"tasks.max": "1",
"slot.drop.on.stop": false,
"xmin.fetch.interval.ms": 0,
"interval.handling.mode": "numeric",
"binary.handling.mode": "bytes",
"sanitize.field.names": true,
"slot.max.retries":6,
"slot.retry.delay.ms": 10000,
"event.processing.failure.handling.mode": "fail",
"slot.name": "debezium",
"publication.name": "dbz_publication",
"decimal.handling.mode": "precise",
"snapshot.lock.timeout.ms": "10000",
"snapshot.mode":"initial",
"output.data.format": "AVRO",
"transforms": "unwrap",
"offset.flush.interval.ms": "0",
"offset.flush.timeout.ms" : "20000",
"max.batch.size": "1024",
"max.queue.size":"4096",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
Конфигурация раковины:
{
"name": "master-gcp-bq-sink",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.basic.auth.credentials.source": "*********",
"key.converter.schema.registry.basic.auth.user.info":"************",
"key.converter.schema.registry.url": "https://*********.europe-west3.gcp.confluent.cloud",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.basic.auth.credentials.source": "*******",
"value.converter.schema.registry.basic.auth.user.info":"****************************",
"value.converter.schema.registry.url": "https://*********.europe-west3.gcp.confluent.cloud",
"config.action.reload": "restart",
"topics": "com_master_dev.schema.table",
"project": "dev",
"defaultDataset": "schema",
"keyfile": "{********}",
"keySource": "JSON",
"errors.tolerance": "none",
"errors.deadletterqueue.topic.name":"dlq_bigquery_sink",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable":true,
"errors.log.enable":true,
"errors.log.include.messages":true,
"data.format":"AVRO",
"upsertEnabled": true,
"deleteEnabled": false,
"allowNewBigQueryFields": "true",
"sanitizeTopics": true,
"sanitizeFieldNames": true,
"autoCreateTables": true,
"timePartitioningType": "DAY",
"kafkaKeyFieldName":"key_placeholder",
"mergeIntervalMs": "60000",
"mergeRecordsThreshold": "-1",
"transforms": "unwrap",
"consumer.override.session.timeout.ms":"60000",
"consumer.override.fetch.max.bytes": "1048576",
"consumer.override.request.timeout.ms":"60000",
"consumer.override.reconnect.backoff.max.ms":"10000",
"consumer.override.reconnect.backoff.ms":"250",
"consumer.override.partition.assignment.strategy":"org.apache.kafka.clients.consumer.CooperativeStickyAssignor", // also tried with RoundRobinAssignor
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms": "RegexTransformation",
"transforms.RegexTransformation.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.RegexTransformation.regex":"(com_sync_master_dev.schema.)(.*)",
"transforms.RegexTransformation.replacement": "$2"
}
}
Что мы упускаем? Как избежать недопустимых смещений задач и убедиться, что соединитель приемника продолжается с предыдущего смещения?
Ответ №1:
Поэтому в Кафке вам нужно настроить группу потребителей для потребителя темы, чтобы воспользоваться смещением. В противном случае неисправный приемник просто возродится и не будет знать, с какого смещения он считывал, когда был жив в последний раз.
Когда член потребительской группы читает из темы, он сообщает о своем прогрессе брокеру, чтобы он знал, откуда он прочитал.
Похоже, что фиксация смещения по какой-то причине пропущена, и именно поэтому приемник всегда начинается со смещения 0.