ZeroMQ с пружиной (пружинная интеграция-zeromq)

#spring-boot #spring-integration #zeromq #messaging #spring-messaging

Вопрос:

Я использую spring-integration-zeromq и пытаюсь настроить параметры аутентификации.

     @Bean
    ZeroMqChannel zeroMqPubSubChannel(ZContext context, ObjectMapper objectMapper) {
        ZeroMqChannel channel = new ZeroMqChannel(context, true);
        channel.setConnectUrl("tcp://localhost:6001:6002");
        channel.setConsumeDelay(Duration.ofMillis(100));
        channel.setMessageConverter(new GenericMessageConverter());
        channel.setSendSocketConfigurer(socket -> {
            socket.setZAPDomain("global".getBytes());
            socket.setCurveServer(true);
            socket.setCurvePublicKey("my_public_key".getBytes());
            socket.setCurveSecretKey("my_secret_key".getBytes());
        });
        EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
        channel.setMessageMapper(mapper);
        channel.afterPropertiesSet();

        channel.subscribe(m -> System.out.println(m));
        return channel;
    }
 

Однако, похоже, что результаты setSendSocketConfigurer игнорируются.
В org.springframework.integration.zeromq.channel.ZeroMqChannel sendSocketConnectionConfigurer вводится как пустой лямбда-код и передается как таковой prepareSendSocketMono ; поэтому мой вызов setSendSocketConfigurer , следовательно, не имеет никакого эффекта, поскольку он заменяет свойство только в экземпляре ZeroMqChannel, но не применяется к уже созданному к тому времени сокету mono. Как правильно настроить аутентификацию? Я что-то упускаю?


ОБНОВЛЕНИЕ 1

После исправления, предоставленного Артемом Биланом, конфигураторы сокетов, похоже, начали применяться к каналу, но связь перестала работать. Я применил рекомендации и попробовал настроить ZeroMqProxy в надежде, что это даст мне обходной путь, но все равно безуспешно: даже моя подписка на ведение журнала в той же конфигурации не проходит проверку подлинности (хотя это работает, если я удаляю вызовы конфигураторов сокетов).

 @Configuration
public class ZeroMQConfig {
    @Bean
    ZeroMqProxy zeroMqProxy(ZContext context, @Value("${zmq.channel.port.frontend}") int frontendPort,
                            @Value("${zmq.channel.port.backend}") int backendPort) {
        ZeroMqProxy proxy = new ZeroMqProxy(context, ZeroMqProxy.Type.SUB_PUB);
        proxy.setExposeCaptureSocket(true);
        proxy.setFrontendPort(frontendPort);
        proxy.setBackendPort(backendPort);
        ZCert cert = new ZCert();
        proxy.setFrontendSocketConfigurer(socket -> {
            socket.setCurvePublicKey(cert.getPublicKey());
            socket.setCurveSecretKey(cert.getSecretKey());
            socket.setCurveServerKey(Z85.decode("my_server_public_key"));
        });
        proxy.setBackendSocketConfigurer(socket -> {
            socket.setCurvePublicKey(cert.getPublicKey());
            socket.setCurveSecretKey(cert.getSecretKey());
            socket.setCurveServerKey(Z85.decode("my_server_public_key"));
        });
        return proxy;
    }

    @Bean
    public ZContext zContext() {
        ZContext context = new ZContext();
        ZAuth auth = new ZAuth(context);
        auth.configureCurve(ZAuth.CURVE_ALLOW_ANY);
        auth.setVerbose(true);
        return context;
    }

    @Bean(name = "zeroMqPubChannel")
    ZeroMqChannel zeroMqPubChannel(ZContext context, ObjectMapper objectMapper, ZeroMqProxy proxy){
        ZeroMqChannel channel = new ZeroMqChannel(context, true);
        channel.setZeroMqProxy(proxy);
        channel.setConsumeDelay(Duration.ofMillis(100));
        channel.setMessageConverter(new GenericMessageConverter());
        EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
        channel.setMessageMapper(mapper);
        return channel;
    }

    @Bean
    @ServiceActivator(inputChannel = "zeroMqPubChannel")
    public MessageHandler subscribe() {
        return message -> System.out.println(message);
    }
}
 

Ответ №1:

Да… Я вижу, ты указываешь. Это ошибка: мы должны отложить his.sendSocketConfigurer использование до тех пор, пока действительно не произойдет взаимодействие с a socket . Я исправлю это достаточно скоро.

А пока пара замечаний к вашей конфигурации:

Вы не должны называть afterPropertiesSet() себя. Позвольте контексту приложения Spring управлять своими обратными вызовами для вас!

Вы не должны подписываться на MessageChannel определение компонента в нем. Вместо этого подумайте о том, чтобы иметь @ServiceActivator(inputChannel = "zeroMqPubSubChannel") . Смотрите дополнительную информацию в документах: https://docs.spring.io/spring-integration/reference/html/messaging-endpoints.html#service-activator

К сожалению, нет способа передать эту настройку во внутренний ZMQ.Socket экземпляр…

Обновить

Рабочий тест с авторизацией кривой в ZeroMQ:

 @Test
void testPubSubWithCurve() throws InterruptedException {
    ZContext CONTEXT = new ZContext();
    new ZAuth(CONTEXT).configureCurve(ZAuth.CURVE_ALLOW_ANY).setVerbose(true);

    ZMQ.Curve.KeyPair frontendKeyPair = ZMQ.Curve.generateKeyPair();
    ZMQ.Curve.KeyPair backendKeyPair = ZMQ.Curve.generateKeyPair();

    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setBeanName("subPubCurveProxy");
    proxy.setFrontendSocketConfigurer(socket -> {
        socket.setZAPDomain("global".getBytes());
        socket.setCurveServer(true);
        socket.setCurvePublicKey(frontendKeyPair.publicKey.getBytes());
        socket.setCurveSecretKey(frontendKeyPair.secretKey.getBytes());
    });
    proxy.setBackendSocketConfigurer(socket -> {
        socket.setZAPDomain("global".getBytes());
        socket.setCurveServer(true);
        socket.setCurvePublicKey(backendKeyPair.publicKey.getBytes());
        socket.setCurveSecretKey(backendKeyPair.secretKey.getBytes());
    });
    proxy.afterPropertiesSet();
    proxy.start();

    ZeroMqChannel channel = new ZeroMqChannel(CONTEXT, true);
    channel.setZeroMqProxy(proxy);
    channel.setBeanName("testChannelWithCurve");
    channel.setSendSocketConfigurer(socket -> {
        ZCert clientCert = new ZCert();
        socket.setCurvePublicKey(clientCert.getPublicKey());
        socket.setCurveSecretKey(clientCert.getSecretKey());
        socket.setCurveServerKey(frontendKeyPair.publicKey.getBytes());
    });
    channel.setSubscribeSocketConfigurer(socket -> {
                ZCert clientCert = new ZCert();
                socket.setCurvePublicKey(clientCert.getPublicKey());
                socket.setCurveSecretKey(clientCert.getSecretKey());
                socket.setCurveServerKey(backendKeyPair.publicKey.getBytes());
            }
    );
    channel.setConsumeDelay(Duration.ofMillis(10));
    channel.afterPropertiesSet();

    BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();

    channel.subscribe(received::offer);
    channel.subscribe(received::offer);

    await().until(() -> proxy.getBackendPort() > 0);

    // Give it some time to connect and subscribe
    Thread.sleep(1000);

    GenericMessage<String> testMessage = new GenericMessage<>("test1");
    assertThat(channel.send(testMessage)).isTrue();

    Message<?> message = received.poll(10, TimeUnit.SECONDS);
    assertThat(message).isNotNull().isEqualTo(testMessage);
    message = received.poll(10, TimeUnit.SECONDS);
    assertThat(message).isNotNull().isEqualTo(testMessage);

    channel.destroy();
    proxy.stop();
    CONTEXT.close();
}
 

Комментарии:

1. Спасибо за быстрый ответ! Есть ли проблема с jira/github, которую я мог бы использовать для отслеживания?

2. Вот исправление: github.com/spring-projects/spring-integration/pull/3553

3. Я смог убедиться, что аутентификация каким-то образом применяется сейчас, когда мои сообщения перестали приходить (она по — прежнему работает без конфигураторов сокетов и работает с теми же ключами в моем PoC без пружины) — не уверен, связано ли это с тем, что я где-то перепутал настройки аутентификации или настройки аутентификации применяются неправильно. Я вернусь, если что-нибудь найду

4. github.com/aquarellian/springboot_experiments — основная ветвь содержит несколько рабочий код (без авторизации). (Разбор сообщений там нарушен, потому что JsonConverter не разрешает базовое строковое сообщение, но, по крайней мере, я могу видеть свое сообщение в подписчиках-и у меня есть своя логика, работающая с моим домашним GenericMessage<запрос>). github.com/aquarellian/springboot_experiments/tree/use_auth содержит код, который использует авторизацию через ZCert — ни один подписчик не получает сообщение. STR: Запустите приложение SpringBoot, ZMQServer и ZMQSubscriber, обновите локальный хост:8080

5. Пожалуйста, найдите ОБНОВЛЕННУЮ информацию в моем ответе. Он показывает кривую аутентификации для прокси-сервера как сервера и ZeroMqChannel как клиента. Если я ошибусь с каким-то ключом, он просто ничего не покажет, но, по-видимому, он не подключается и не подписывается…