Quarkus SSE Redis подписаться

#java #redis #publish-subscribe #server-sent-events #quarkus

#java #redis #опубликовать-подписаться #сервер-отправленные-события #quarkus

Вопрос:

Мне нравится выполнять SSE с ответом redis.subscribe в quarkus.

У меня есть образец из quarkus-quickstart для простого SSE

  @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("{name}/streaming")
  public Multi<String> greeting(@org.jboss.resteasy.annotations.jaxrs.PathParam String name) {
    return Multi.createFrom().publisher(vertx.periodicStream(2000).toMulti())
        .map(l -> String.format("Hello %s! (%s)%n", name, new Date()));
  }
 

Это работает хорошо, каждые 2 секунды я получал привет …. в моем веб-браузере

Теперь я пытаюсь подписаться на Redis, поэтому я должен получить сообщение от Redis.

Пример Redis :

 (cmd window 1)
SUBSCRIBE message-channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "message-channel"
3) (integer) 1

(cmd window 2)
PUBLISH  message-channel HelloWorld
(integer) 1

(cmd window 1)
1) "message"
2) "message-channel"
3) "HelloWorld"

 

Теперь я пробую это с quarkus SSE:

   @Inject
  ReactiveRedisClient reactiveRedisClient;

 @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("sse/redissse")
  public Multi<String> redissse() {
    List<String> subscriberList = new ArrayList();
    subscriberList.add("message-channel");

    return reactiveRedisClient.subscribe(subscriberList)
        .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
        .onItem().castTo(String.class);
  }

 

то, что я получил, было исключением:

 WARNING [io.ver.red.cli.imp.RedisConnectionImpl] (vert.x-eventloop-thread-0) No handler waiting for message: [subscribe, message-channel, 1]
 

Кто-нибудь может меня поддержать?
Есть ли простой пример?
Я понятия не имею об этом, я не могу получать сообщения Redis с публикацией «подписаться».

любые предложения…

Ответ №1:

Я не использовал Redis pub-sub, но я использовал Redis streams, и мне нужно было сделать что-то вроде этого:

`

 return Multi.createBy().repeating()
    .supplier(() -> this.reactiveRedisClient.subscribe(subscriberList)
                        .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
                        .onItem().castTo(String.class))
        .indefinitely()
        .onItem().disjoint();
 

`

Я думаю, поскольку pub-sub неблокирующий, он запускается один раз, а затем не ждет, пока поступит другое сообщение. Вы должны реализовать свой собственный while(true) цикл реактивным способом.

Ответ №2:

Теперь я делаю следующее:

   @Inject
  @RedisClientName("second")
  RedisClient redisClient2;

void onStart(@Observes StartupEvent ev) throws IOException {
  this.redisClient2.subscribe(List.of("message-channel"));
}


  @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  @SseElementType(MediaType.TEXT_PLAIN)
  @Path("/redis/subscribe")
  public Publisher<String> subscribechannel(){
     return eventBus.<String>consumer("io.vertx.redis.message-channel").toPublisherBuilder()
        .map(Message::body)
        .buildRs();
  }
 

Теперь это работает, но если я выполняю SSE не только из браузеров, они разделяют события. Таким образом, только один из них получил событие после каждого другого потребителя (браузера).