#amazon-web-services #amazon-s3 #apache-kafka #apache-kafka-connect #s3-kafka-connector
#amazon-web-services #amazon-s3 #apache-kafka #apache-kafka-connect #s3-kafka-connector
Вопрос:
Я использую соединитель S3 уже пару недель, и я хочу изменить способ, которым соединитель называет каждый файл. Я использую HourlyBasedPartition, поэтому пути к каждому файлу уже достаточно, чтобы я мог найти каждый файл, и я хочу, чтобы имена файлов были чем-то общим для всех файлов, например, просто ‘Data.json.gzip’ (с соответствующим путем от разделителя).
Например, я хочу перейти от этого:
<prefix>/<topic>/<HourlyBasedPartition>/<topic> <kafkaPartition> <startOffset>.<format>
Для этого:
<prefix>/<topic>/<HourlyBasedPartition>/Data.<format>
Цель этого состоит в том, чтобы выполнить только один вызов S3 для последующей загрузки файлов, вместо того, чтобы сначала искать имя файла, а затем загружать его.
Просматривая файлы из папки с именем «kafka-connect-s3», я нашел этот файл: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java который в конце имеет некоторые из следующих функций:
private RecordWriter getWriter(SinkRecord record, String encodedPartition)
throws ConnectException {
if (writers.containsKey(encodedPartition)) {
return writers.get(encodedPartition);
}
String commitFilename = getCommitFilename(encodedPartition);
log.debug(
"Creating new writer encodedPartition='{}' filename='{}'",
encodedPartition,
commitFilename
);
RecordWriter writer = writerProvider.getRecordWriter(connectorConfig, commitFilename);
writers.put(encodedPartition, writer);
return writer;
}
private String getCommitFilename(String encodedPartition) {
String commitFile;
if (commitFiles.containsKey(encodedPartition)) {
commitFile = commitFiles.get(encodedPartition);
} else {
long startOffset = startOffsets.get(encodedPartition);
String prefix = getDirectoryPrefix(encodedPartition);
commitFile = fileKeyToCommit(prefix, startOffset);
commitFiles.put(encodedPartition, commitFile);
}
return commitFile;
}
private String fileKey(String topicsPrefix, String keyPrefix, String name) {
String suffix = keyPrefix dirDelim name;
return StringUtils.isNotBlank(topicsPrefix)
? topicsPrefix dirDelim suffix
: suffix;
}
private String fileKeyToCommit(String dirPrefix, long startOffset) {
String name = tp.topic()
fileDelim
tp.partition()
fileDelim
String.format(zeroPadOffsetFormat, startOffset)
extension;
return fileKey(topicsDir, dirPrefix, name);
}
Я не знаю, можно ли это настроить в соответствии с тем, что я хочу сделать, но, похоже, это как-то близко / связано с моими намерениями. Надеюсь, это поможет.
(Также отправил проблему на Github: https://github.com/confluentinc/kafka-connect-storage-cloud/issues/369 )
Комментарии:
1. Невозможно объединить несколько тематических разделов для записи в один файл. В противном случае вы можете установить почасовую частоту вращения так, чтобы она просто сбрасывалась в конце каждого часа, тогда у вас будет один раздел темы файла в час
2. Привет @Onecricket возможно, мой пост был неясным, но я уже использую эту конфигурацию, и она работает нормально. Каждый час я получаю новый файл для каждой темы, которая у меня есть, но я хочу изменить то, как соединитель называет эти файлы. Например, у меня есть тема в Twitter, и соединитель размещает файлы по этому пути (соответственно для каждого часа): topics/twitter/year=2020/month=10/day=07/hour=11/twitter 0 0002070632.json.gz . Фактическое имя файла — это то, что я хочу изменить.
3. Что ж, вам придется переопределить
fileKeyToCommit
метод, но он является закрытым и не предоставляется никаким интерфейсом в разделителе, например