#java #apache-flink
Вопрос:
У меня есть программа, выполняющая 2-фазную агрегацию, чтобы устранить перекос данных в моей работе. И я использовал простой ThreadLocalRandom
для создания суффикса к моему оригиналу, например :
private class KeyByTileWithSalt implements KeySelector<Type, String> {
@Override
public Long getKey(Type value) {
return value.toString() ThreadLocalRandom.current().nextLong(1, 8);
}
}
Но Flink выдает исключение NullPointerException при добавлении соли для ключа, который я выполняю агрегацию окон в каком-либо поле.
Я нашел аналогичное сообщение в списке рассылки flink и узнал причину, по которой может возникнуть исключение, но я все еще не могу найти ошибку в своей программе unstable of hash value
. Есть какие-нибудь идеи?
Ответ №1:
Flink полагается на результат keyBy
детерминированности во всем кластере. Это необходимо для того, чтобы каждый узел в кластере имел согласованное представление о том, какой узел отвечает за обработку каждого ключа. Имея ключ, зависящий от ThreadLocalRandom
вас, вы нарушили это предположение.
Вместо этого вы можете добавить поле в каждую запись, которую вы заполняете случайным значением во время приема, а затем использовать это поле в качестве ключа.