#apache-kafka #kafka-topic #apache-kafka-security
Вопрос:
У меня есть следующая логика топологии кафки, построенная на java.
KStreamlt;String, Eventgt; stream = bldr.stream(topicName,Consumed.with(Serdes.String(), eventSerde())); KGroupedStreamlt;String, Eventgt; aggregateByOrderID = stream.map((key, event) -gt; new KeyValuelt;gt;( event.getOrderId())).groupByKey(with(Serdes.String(), eventSerde())); final KTablelt;String, StoreValuegt; store = aggregateByOrderID.aggregate(() -gt; new event("", 0.0, "", "", true), (key, value, aggregate) -gt; { try { aggregate.setQuantity(value.getQuantity()); aggregate.setFlag(Boolean.TRUE); aggregate.setOrderId(key); aggregate.setEventId(value.getId()); return aggregate; } catch(NumberFormatException | NullPointerException ex) { LOGGER.error("Error occured"); } return aggregate; }, Materialized.with(Serdes.String(), eventSerde())); KGroupedStreamlt;String, Storegt; aggregateAccountQuantityStore = store.toStream().map((key, value) -gt; new KeyValuelt;gt;(value.getAccountNumber(), value)) .groupByKey(with(Serdes.String(), eventSerde())); final KTablelt;String, SellingQuantitygt; finalQuantity = aggregateAccountQuantityStore.aggregate(() -gt; new SellingQuantity("", 0.0, ""), (key, env, aggr) -gt; { Double b = aggr.getQuantity() env.getQuantity(); aggr.setQuantity(b); aggr.setAccountNumber(env.getAccountNumber()); aggr.setEventId(env.getEventId()); return aggr; }, Materialized.with(Serdes.String(), eventSerde())); finalQuantity.toStream().map((k, event) -gt; new KeyValuelt;gt;(k, event)).to(anotherTopic, Produced.with(Serdes.String(), eventSerde()));
И он реализован по протоколу SASL и использует аутентификацию Kerberos.
Проблема в том , что всякий раз, когда я изменяю логику топологии, я получаю исключение, подобное
Не авторизован для доступа к темам: [KSTREAM-АГРЕГАТ-ХРАНИЛИЩЕ СОСТОЯНИЙ-0000000009-перераспределение, KSTREAM-АГРЕГАТ-ХРАНИЛИЩЕ СОСТОЯНИЙ-0000000002-перераспределение]
Мы не знаем, как смягчить эту проблему или как решить ее навсегда.
Примечание: При локальной настройке мы не реализуем протокол безопасности, и это не вызывает никаких проблем.