Добавление новых пользовательских свойств в Kafka Connect

#java #apache-kafka #apache-kafka-connect

#java #apache-kafka #apache-kafka-connect

Вопрос:

Я пытаюсь добавить новый пользовательский конвертер Kafka, который является модификацией JsonConverterConfig в connect-json. Я пытаюсь добавить какое-то новое пользовательское свойство, скажем, «schemas.modifications.enable» в конвертер, который расширяет JsonConverterConfig. Но Kafka connect не может найти подробную информацию о конвертере. Мой фрагмент кода :

 public class ModifiedJsonConfig extends JsonConverterConfig {

public static final String SCHEMAS_MODIFY_CONFIG = "schemas.modifications.enable";
public static final boolean SCHEMAS_MODIFY_CONFIG_DEFAULT = true;
private static final String SCHEMAS_MODIFY_CONFIG_DOC = "The maximum number of schemas that can be cached in this converter instance.";
private static final String SCHEMAS_MODIFY_CONFIG_DISPLAY = "Schema Cache Size";


private final static ConfigDef CONFIG;

static {
    String group = "Schemas-modification";
    int orderInGroup = 0;
    CONFIG = ConverterConfig.newConfigDef();
    CONFIG.define(SCHEMAS_MODIFY_CONFIG, Type.BOOLEAN, SCHEMAS_MODIFY_CONFIG_DEFAULT, Importance.HIGH, SCHEMAS_MODIFY_CONFIG_DOC, group,
            orderInGroup  , Width.MEDIUM, SCHEMAS_MODIFY_CONFIG_DISPLAY);
}

public static ConfigDef configDef() {
    return CONFIG;
}

public ModifiedJsonConfig(Map<String, ?> props) {
    super(props);
}

public boolean schemasModified() {
    return getBoolean(SCHEMAS_MODIFY_CONFIG);
    }
}
  

Но я получаю сообщение об ошибке здесь :

ОШИБКА остановки из-за ошибки (org.apache.kafka.connect.cli.ConnectDistributed:83) org.apache.kafka.common.config.Исключение ConfigException: неизвестная конфигурация ‘schemas.modifications.enable’

Но я определил эту конфигурацию. Было бы очень полезно, если бы вы помогли мне установить здесь пользовательское свойство конвертера.

Заранее спасибо.

Комментарии:

1. Какую версию connect вы используете? Знаете ли вы об этом: issues.apache.org/jira/browse/KAFKA-6981

2. Да. Я наткнулся на проблему, пытаясь ее решить. Но я использую Kafka 2.1.1, и я думаю, что это разрешено с Kafka 2.0.0.

3. @james.bondu, есть ли реализация целиком Converter или только добавление ModifiedJsonConfig ? Пожалуйста, добавьте также больше деталей: конфигурация kafka-connect, в конечном итоге пример конфигурации соединителя, немного больше трассировки стека и т.д.

4. Согласен. Неясно, как ваш конвертер использует этот класс конфигурации