Как изменить имя файла объекта S3, загруженного с помощью соединителя Kafka Connect S3?

#amazon-web-services #amazon-s3 #apache-kafka #apache-kafka-connect #s3-kafka-connector

#amazon-web-services #amazon-s3 #apache-kafka #apache-kafka-connect #s3-kafka-connector

Вопрос:

Я использую соединитель S3 уже пару недель, и я хочу изменить способ, которым соединитель называет каждый файл. Я использую HourlyBasedPartition, поэтому пути к каждому файлу уже достаточно, чтобы я мог найти каждый файл, и я хочу, чтобы имена файлов были чем-то общим для всех файлов, например, просто ‘Data.json.gzip’ (с соответствующим путем от разделителя).

Например, я хочу перейти от этого:

 <prefix>/<topic>/<HourlyBasedPartition>/<topic> <kafkaPartition> <startOffset>.<format>
  

Для этого:

 <prefix>/<topic>/<HourlyBasedPartition>/Data.<format>
  

Цель этого состоит в том, чтобы выполнить только один вызов S3 для последующей загрузки файлов, вместо того, чтобы сначала искать имя файла, а затем загружать его.

Просматривая файлы из папки с именем «kafka-connect-s3», я нашел этот файл: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java который в конце имеет некоторые из следующих функций:

 private RecordWriter getWriter(SinkRecord record, String encodedPartition)
      throws ConnectException {
    if (writers.containsKey(encodedPartition)) {
      return writers.get(encodedPartition);
    }
    String commitFilename = getCommitFilename(encodedPartition);
    log.debug(
        "Creating new writer encodedPartition='{}' filename='{}'",
        encodedPartition,
        commitFilename
    );
    RecordWriter writer = writerProvider.getRecordWriter(connectorConfig, commitFilename);
    writers.put(encodedPartition, writer);
    return writer;
  }

  private String getCommitFilename(String encodedPartition) {
    String commitFile;
    if (commitFiles.containsKey(encodedPartition)) {
      commitFile = commitFiles.get(encodedPartition);
    } else {
      long startOffset = startOffsets.get(encodedPartition);
      String prefix = getDirectoryPrefix(encodedPartition);
      commitFile = fileKeyToCommit(prefix, startOffset);
      commitFiles.put(encodedPartition, commitFile);
    }
    return commitFile;
  }

  private String fileKey(String topicsPrefix, String keyPrefix, String name) {
    String suffix = keyPrefix   dirDelim   name;
    return StringUtils.isNotBlank(topicsPrefix)
           ? topicsPrefix   dirDelim   suffix
           : suffix;
  }

  private String fileKeyToCommit(String dirPrefix, long startOffset) {
    String name = tp.topic()
                        fileDelim
                        tp.partition()
                        fileDelim
                        String.format(zeroPadOffsetFormat, startOffset)
                        extension;
    return fileKey(topicsDir, dirPrefix, name);
  }
  

Я не знаю, можно ли это настроить в соответствии с тем, что я хочу сделать, но, похоже, это как-то близко / связано с моими намерениями. Надеюсь, это поможет.

(Также отправил проблему на Github: https://github.com/confluentinc/kafka-connect-storage-cloud/issues/369 )

Комментарии:

1. Невозможно объединить несколько тематических разделов для записи в один файл. В противном случае вы можете установить почасовую частоту вращения так, чтобы она просто сбрасывалась в конце каждого часа, тогда у вас будет один раздел темы файла в час

2. Привет @Onecricket возможно, мой пост был неясным, но я уже использую эту конфигурацию, и она работает нормально. Каждый час я получаю новый файл для каждой темы, которая у меня есть, но я хочу изменить то, как соединитель называет эти файлы. Например, у меня есть тема в Twitter, и соединитель размещает файлы по этому пути (соответственно для каждого часа): topics/twitter/year=2020/month=10/day=07/hour=11/twitter 0 0002070632.json.gz . Фактическое имя файла — это то, что я хочу изменить.

3. Что ж, вам придется переопределить fileKeyToCommit метод, но он является закрытым и не предоставляется никаким интерфейсом в разделителе, например