Flink создает исключение NullPointerException при добавлении соли для ключа и агрегации окон в каком-либо поле

#java #apache-flink

Вопрос:

У меня есть программа, выполняющая 2-фазную агрегацию, чтобы устранить перекос данных в моей работе. И я использовал простой ThreadLocalRandom для создания суффикса к моему оригиналу, например :

   private class KeyByTileWithSalt implements KeySelector<Type, String> {
    @Override
    public Long getKey(Type value) {
      return value.toString()   ThreadLocalRandom.current().nextLong(1, 8);
    }
  }
 

Но Flink выдает исключение NullPointerException при добавлении соли для ключа, который я выполняю агрегацию окон в каком-либо поле.

Я нашел аналогичное сообщение в списке рассылки flink и узнал причину, по которой может возникнуть исключение, но я все еще не могу найти ошибку в своей программе unstable of hash value . Есть какие-нибудь идеи?

Ответ №1:

Flink полагается на результат keyBy детерминированности во всем кластере. Это необходимо для того, чтобы каждый узел в кластере имел согласованное представление о том, какой узел отвечает за обработку каждого ключа. Имея ключ, зависящий от ThreadLocalRandom вас, вы нарушили это предположение.

Вместо этого вы можете добавить поле в каждую запись, которую вы заполняете случайным значением во время приема, а затем использовать это поле в качестве ключа.