#spring-boot #spring-integration #zeromq #messaging #spring-messaging
Вопрос:
Я использую spring-integration-zeromq и пытаюсь настроить параметры аутентификации.
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context, ObjectMapper objectMapper) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
channel.setMessageConverter(new GenericMessageConverter());
channel.setSendSocketConfigurer(socket -> {
socket.setZAPDomain("global".getBytes());
socket.setCurveServer(true);
socket.setCurvePublicKey("my_public_key".getBytes());
socket.setCurveSecretKey("my_secret_key".getBytes());
});
EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
channel.setMessageMapper(mapper);
channel.afterPropertiesSet();
channel.subscribe(m -> System.out.println(m));
return channel;
}
Однако, похоже, что результаты setSendSocketConfigurer игнорируются.
В org.springframework.integration.zeromq.channel.ZeroMqChannel sendSocketConnectionConfigurer
вводится как пустой лямбда-код и передается как таковой prepareSendSocketMono
; поэтому мой вызов setSendSocketConfigurer
, следовательно, не имеет никакого эффекта, поскольку он заменяет свойство только в экземпляре ZeroMqChannel, но не применяется к уже созданному к тому времени сокету mono. Как правильно настроить аутентификацию? Я что-то упускаю?
ОБНОВЛЕНИЕ 1
После исправления, предоставленного Артемом Биланом, конфигураторы сокетов, похоже, начали применяться к каналу, но связь перестала работать. Я применил рекомендации и попробовал настроить ZeroMqProxy в надежде, что это даст мне обходной путь, но все равно безуспешно: даже моя подписка на ведение журнала в той же конфигурации не проходит проверку подлинности (хотя это работает, если я удаляю вызовы конфигураторов сокетов).
@Configuration
public class ZeroMQConfig {
@Bean
ZeroMqProxy zeroMqProxy(ZContext context, @Value("${zmq.channel.port.frontend}") int frontendPort,
@Value("${zmq.channel.port.backend}") int backendPort) {
ZeroMqProxy proxy = new ZeroMqProxy(context, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(frontendPort);
proxy.setBackendPort(backendPort);
ZCert cert = new ZCert();
proxy.setFrontendSocketConfigurer(socket -> {
socket.setCurvePublicKey(cert.getPublicKey());
socket.setCurveSecretKey(cert.getSecretKey());
socket.setCurveServerKey(Z85.decode("my_server_public_key"));
});
proxy.setBackendSocketConfigurer(socket -> {
socket.setCurvePublicKey(cert.getPublicKey());
socket.setCurveSecretKey(cert.getSecretKey());
socket.setCurveServerKey(Z85.decode("my_server_public_key"));
});
return proxy;
}
@Bean
public ZContext zContext() {
ZContext context = new ZContext();
ZAuth auth = new ZAuth(context);
auth.configureCurve(ZAuth.CURVE_ALLOW_ANY);
auth.setVerbose(true);
return context;
}
@Bean(name = "zeroMqPubChannel")
ZeroMqChannel zeroMqPubChannel(ZContext context, ObjectMapper objectMapper, ZeroMqProxy proxy){
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setZeroMqProxy(proxy);
channel.setConsumeDelay(Duration.ofMillis(100));
channel.setMessageConverter(new GenericMessageConverter());
EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
channel.setMessageMapper(mapper);
return channel;
}
@Bean
@ServiceActivator(inputChannel = "zeroMqPubChannel")
public MessageHandler subscribe() {
return message -> System.out.println(message);
}
}
Ответ №1:
Да… Я вижу, ты указываешь. Это ошибка: мы должны отложить his.sendSocketConfigurer
использование до тех пор, пока действительно не произойдет взаимодействие с a socket
. Я исправлю это достаточно скоро.
А пока пара замечаний к вашей конфигурации:
Вы не должны называть afterPropertiesSet()
себя. Позвольте контексту приложения Spring управлять своими обратными вызовами для вас!
Вы не должны подписываться на MessageChannel
определение компонента в нем. Вместо этого подумайте о том, чтобы иметь @ServiceActivator(inputChannel = "zeroMqPubSubChannel")
. Смотрите дополнительную информацию в документах: https://docs.spring.io/spring-integration/reference/html/messaging-endpoints.html#service-activator
К сожалению, нет способа передать эту настройку во внутренний ZMQ.Socket
экземпляр…
Обновить
Рабочий тест с авторизацией кривой в ZeroMQ:
@Test
void testPubSubWithCurve() throws InterruptedException {
ZContext CONTEXT = new ZContext();
new ZAuth(CONTEXT).configureCurve(ZAuth.CURVE_ALLOW_ANY).setVerbose(true);
ZMQ.Curve.KeyPair frontendKeyPair = ZMQ.Curve.generateKeyPair();
ZMQ.Curve.KeyPair backendKeyPair = ZMQ.Curve.generateKeyPair();
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setBeanName("subPubCurveProxy");
proxy.setFrontendSocketConfigurer(socket -> {
socket.setZAPDomain("global".getBytes());
socket.setCurveServer(true);
socket.setCurvePublicKey(frontendKeyPair.publicKey.getBytes());
socket.setCurveSecretKey(frontendKeyPair.secretKey.getBytes());
});
proxy.setBackendSocketConfigurer(socket -> {
socket.setZAPDomain("global".getBytes());
socket.setCurveServer(true);
socket.setCurvePublicKey(backendKeyPair.publicKey.getBytes());
socket.setCurveSecretKey(backendKeyPair.secretKey.getBytes());
});
proxy.afterPropertiesSet();
proxy.start();
ZeroMqChannel channel = new ZeroMqChannel(CONTEXT, true);
channel.setZeroMqProxy(proxy);
channel.setBeanName("testChannelWithCurve");
channel.setSendSocketConfigurer(socket -> {
ZCert clientCert = new ZCert();
socket.setCurvePublicKey(clientCert.getPublicKey());
socket.setCurveSecretKey(clientCert.getSecretKey());
socket.setCurveServerKey(frontendKeyPair.publicKey.getBytes());
});
channel.setSubscribeSocketConfigurer(socket -> {
ZCert clientCert = new ZCert();
socket.setCurvePublicKey(clientCert.getPublicKey());
socket.setCurveSecretKey(clientCert.getSecretKey());
socket.setCurveServerKey(backendKeyPair.publicKey.getBytes());
}
);
channel.setConsumeDelay(Duration.ofMillis(10));
channel.afterPropertiesSet();
BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();
channel.subscribe(received::offer);
channel.subscribe(received::offer);
await().until(() -> proxy.getBackendPort() > 0);
// Give it some time to connect and subscribe
Thread.sleep(1000);
GenericMessage<String> testMessage = new GenericMessage<>("test1");
assertThat(channel.send(testMessage)).isTrue();
Message<?> message = received.poll(10, TimeUnit.SECONDS);
assertThat(message).isNotNull().isEqualTo(testMessage);
message = received.poll(10, TimeUnit.SECONDS);
assertThat(message).isNotNull().isEqualTo(testMessage);
channel.destroy();
proxy.stop();
CONTEXT.close();
}
Комментарии:
1. Спасибо за быстрый ответ! Есть ли проблема с jira/github, которую я мог бы использовать для отслеживания?
2. Вот исправление: github.com/spring-projects/spring-integration/pull/3553
3. Я смог убедиться, что аутентификация каким-то образом применяется сейчас, когда мои сообщения перестали приходить (она по — прежнему работает без конфигураторов сокетов и работает с теми же ключами в моем PoC без пружины) — не уверен, связано ли это с тем, что я где-то перепутал настройки аутентификации или настройки аутентификации применяются неправильно. Я вернусь, если что-нибудь найду
4. github.com/aquarellian/springboot_experiments — основная ветвь содержит несколько рабочий код (без авторизации). (Разбор сообщений там нарушен, потому что JsonConverter не разрешает базовое строковое сообщение, но, по крайней мере, я могу видеть свое сообщение в подписчиках-и у меня есть своя логика, работающая с моим домашним GenericMessage<запрос>). github.com/aquarellian/springboot_experiments/tree/use_auth содержит код, который использует авторизацию через ZCert — ни один подписчик не получает сообщение. STR: Запустите приложение SpringBoot, ZMQServer и ZMQSubscriber, обновите локальный хост:8080
5. Пожалуйста, найдите ОБНОВЛЕННУЮ информацию в моем ответе. Он показывает кривую аутентификации для прокси-сервера как сервера и
ZeroMqChannel
как клиента. Если я ошибусь с каким-то ключом, он просто ничего не покажет, но, по-видимому, он не подключается и не подписывается…