Как принудительно сжать журнал темы Кафки?

#apache-kafka

#апач-кафка

Вопрос:

Используя Кафку 2.7.0 (в K8s), я создаю тестовую тему с cleanup.policy=compact :

 ./kafka-topics.sh --create --bootstrap-server kafka.core-kafka.svc.cluster.local:9092 --topic _test_quick_compaction_2021_12_02 --partitions 1 --replication-factor 3 --config cleanup.policy=compact  

Напишите ему несколько сообщений:

 kafkacat -b kafka.core-kafka.svc.cluster.local:9092 -P -t _test_quick_compaction_2021_12_02 -K: 1:a 2:b 3:c 1:d 2:e  

Измените настройки темы таким образом, чтобы уплотнение началось через 10 секунд:

 ./kafka-topics.sh --alter --zookeeper zookeeper.core-kafka.svc.cluster.local --topic _test_quick_compaction_2021_12_02 --config max.compaction.lag.ms=10000 --config min.cleanable.dirty.ratio=0.0 --config segment.ms=10000 --config delete.retention.ms=10000  

Подождите минутку, просто чтобы убедиться:

 sleep 60  

Проверьте содержание темы:

 kafkacat -C -e -o beginning -b kafka.core-kafka.svc.cluster.local:9092 -t _test_quick_compaction_2021_12_02 -K:  

И, к моему удивлению, содержание все еще

 1:a 2:b 3:c 1:d 2:e  

вместо того, чтобы

 3:c 1:d 2:e  

чего я и ожидал.

Почему тема не уплотнена, и что я могу сделать, чтобы ее форсировать?

Комментарии:

1. Я не думаю, что изменение segment.ms ретроактивно изменяет существующие сегменты, поэтому вам нужно будет создать тему с этим, создать данные, а затем подождать 10 секунд

2. @OneCricketeer Ах, это имеет смысл. Спасибо! Это также помогло мне найти способ принудительного уплотнения старых сегментов. После изменения настроек мне просто нужно было создать еще одно сообщение по темам, которое, как я полагаю, создает новый сегмент, делая старые пригодными для уплотнения. Таким образом, это работает так, как я и предполагал. 🙂

3. Круто. Не стесняйтесь указывать эти шаги в качестве ответа ниже

4. Активный сегмент не подлежит уплотнению.

Ответ №1:

Поскольку активные сегменты не подлежат уплотнению, хитрость заключалась в том, чтобы снова написать что-нибудь в теме, чтобы принудительно создать новый сегмент.

 # Create a test topic. ./kafka-topics.sh --create --bootstrap-server kafka.core-kafka.svc.cluster.local:9092 --topic _test_quick_compaction_2021_12_02 --partitions 1 --replication-factor 3 --config cleanup.policy=compact  # Write some messages to it. echo "1:an2:bn3:c" | kafkacat -b kafka.core-kafka.svc.cluster.local:9092 -P -t _test_quick_compaction_2021_12_02 -K:  # Check the topic content. kafkacat -C -e -o beginning -b kafka.core-kafka.svc.cluster.local:9092 -t _test_quick_compaction_2021_12_02 -K:  # Change the topic settings in a way such that compaction should kick in after 10 seconds. ./kafka-topics.sh --alter --zookeeper zookeeper.core-kafka.svc.cluster.local --topic _test_quick_compaction_2021_12_02 --config max.compaction.lag.ms=10000 --config min.cleanable.dirty.ratio=0.0 --config segment.ms=10000 --config delete.retention.ms=10000  # Wait for the last segment to outdate sleep 11  # Write new messages. echo "1:dn2:e" | kafkacat -b kafka.core-kafka.svc.cluster.local:9092 -P -t _test_quick_compaction_2021_12_02 -K:  # Check the topic content. kafkacat -C -e -o beginning -b kafka.core-kafka.svc.cluster.local:9092 -t _test_quick_compaction_2021_12_02 -K:  # Wait for this segment to outdate. sleep 11  # Write new messages again. echo "1:dn2:e" | kafkacat -b kafka.core-kafka.svc.cluster.local:9092 -P -t _test_quick_compaction_2021_12_02 -K:  # Check the topic content. kafkacat -C -e -o beginning -b kafka.core-kafka.svc.cluster.local:9092 -t _test_quick_compaction_2021_12_02 -K:  # Wait for compaction to happen. sleep 11  # Check the topic content to validate that it has been compacted. kafkacat -C -e -o beginning -b kafka.core-kafka.svc.cluster.local:9092 -t _test_quick_compaction_2021_12_02 -K:  # Revert the setting changes. ./kafka-topics.sh --alter --zookeeper zookeeper.core-kafka.svc.cluster.local --topic _test_quick_compaction_2021_12_02 --delete-config max.compaction.lag.ms --delete-config min.cleanable.dirty.ratio --delete-config segment.ms --delete-config delete.retention.ms  # Delete the topic # /home/th/kafka_2.13-2.7.0/bin/kafka-topics.sh --delete --bootstrap-server kafka.core-kafka.svc.cluster.local:9092 --topic _test_quick_compaction_2021_12_02