#java #redis #publish-subscribe #server-sent-events #quarkus
#java #redis #опубликовать-подписаться #сервер-отправленные-события #quarkus
Вопрос:
Мне нравится выполнять SSE с ответом redis.subscribe в quarkus.
У меня есть образец из quarkus-quickstart для простого SSE
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
@Path("{name}/streaming")
public Multi<String> greeting(@org.jboss.resteasy.annotations.jaxrs.PathParam String name) {
return Multi.createFrom().publisher(vertx.periodicStream(2000).toMulti())
.map(l -> String.format("Hello %s! (%s)%n", name, new Date()));
}
Это работает хорошо, каждые 2 секунды я получал привет …. в моем веб-браузере
Теперь я пытаюсь подписаться на Redis, поэтому я должен получить сообщение от Redis.
Пример Redis :
(cmd window 1)
SUBSCRIBE message-channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "message-channel"
3) (integer) 1
(cmd window 2)
PUBLISH message-channel HelloWorld
(integer) 1
(cmd window 1)
1) "message"
2) "message-channel"
3) "HelloWorld"
Теперь я пробую это с quarkus SSE:
@Inject
ReactiveRedisClient reactiveRedisClient;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
@Path("sse/redissse")
public Multi<String> redissse() {
List<String> subscriberList = new ArrayList();
subscriberList.add("message-channel");
return reactiveRedisClient.subscribe(subscriberList)
.onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
.onItem().castTo(String.class);
}
то, что я получил, было исключением:
WARNING [io.ver.red.cli.imp.RedisConnectionImpl] (vert.x-eventloop-thread-0) No handler waiting for message: [subscribe, message-channel, 1]
Кто-нибудь может меня поддержать?
Есть ли простой пример?
Я понятия не имею об этом, я не могу получать сообщения Redis с публикацией «подписаться».
любые предложения…
Ответ №1:
Я не использовал Redis pub-sub, но я использовал Redis streams, и мне нужно было сделать что-то вроде этого:
`
return Multi.createBy().repeating()
.supplier(() -> this.reactiveRedisClient.subscribe(subscriberList)
.onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
.onItem().castTo(String.class))
.indefinitely()
.onItem().disjoint();
`
Я думаю, поскольку pub-sub неблокирующий, он запускается один раз, а затем не ждет, пока поступит другое сообщение. Вы должны реализовать свой собственный while(true)
цикл реактивным способом.
Ответ №2:
Теперь я делаю следующее:
@Inject
@RedisClientName("second")
RedisClient redisClient2;
void onStart(@Observes StartupEvent ev) throws IOException {
this.redisClient2.subscribe(List.of("message-channel"));
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
@Path("/redis/subscribe")
public Publisher<String> subscribechannel(){
return eventBus.<String>consumer("io.vertx.redis.message-channel").toPublisherBuilder()
.map(Message::body)
.buildRs();
}
Теперь это работает, но если я выполняю SSE не только из браузеров, они разделяют события. Таким образом, только один из них получил событие после каждого другого потребителя (браузера).