#java #spring-boot #websocket #apache-kafka
#java #весенняя загрузка #websocket #apache-kafka
Вопрос:
Я делаю это впервые. Где я собираюсь читать поток данных с помощью websocket.
Вот мой фрагмент кода
Приложение RsvpApplication
@SpringBootApplication
public class RsvpApplication {
private static final String MEETUP_RSVPS_ENDPOINT = "ws://stream.myapi.com/2/rsvps";
public static void main(String[] args) {
SpringApplication.run(RsvpApplication.class, args);
}
@Bean
public ApplicationRunner initializeConnection(
RsvpsWebSocketHandler rsvpsWebSocketHandler) {
return args -> {
System.out.println("initializeConnection");
WebSocketClient rsvpsSocketClient = new StandardWebSocketClient();
rsvpsSocketClient.doHandshake(
rsvpsWebSocketHandler, MEETUP_RSVPS_ENDPOINT);
};
}
}
RsvpsWebSocketHandler
@Component
class RsvpsWebSocketHandler extends AbstractWebSocketHandler {
private static final Logger logger =
Logger.getLogger(RsvpsWebSocketHandler.class.getName());
private final RsvpsKafkaProducer rsvpsKafkaProducer;
public RsvpsWebSocketHandler(RsvpsKafkaProducer rsvpsKafkaProducer) {
this.rsvpsKafkaProducer = rsvpsKafkaProducer;
}
@Override
public void handleMessage(WebSocketSession session,
WebSocketMessage<?> message) {
logger.log(Level.INFO, "New RSVP:n {0}", message.getPayload());
System.out.println("handleMessage");
rsvpsKafkaProducer.sendRsvpMessage(message);
}
}
RsvpsKafkaProducer
@Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;
private final Source source;
public RsvpsKafkaProducer(Source source) {
this.source = source;
}
public void sendRsvpMessage(WebSocketMessage<?> message) {
System.out.println("sendRsvpMessage");
source.output()
.send(MessageBuilder.withPayload(message.getPayload())
.build(),
SENDING_MESSAGE_TIMEOUT_MS);
}
}
Насколько я знаю и читал о websocket, ему требуется одноразовое подключение, и поток данных будет течь непрерывно, пока какая-либо сторона (клиент или сервер) не остановится.
Я создаю его впервые, поэтому пытаюсь охватить основные сценарии, которые могут возникнуть при обработке более 10000 сообщений в минуту. Всего брокеров kafka два с достаточным пространством.
-
Что можно сделать, если соединение потеряется и снова начнет получать сообщения от webscoket после подключения обратно, где оно было оставлено при последнем сбое, и отправлять сообщения в дальнейший Kafka broker?
-
Что можно сделать, чтобы приостановить websocket, чтобы продолжать отправлять сообщения в брокере, если он достиг порогового значения необработанных сообщений (в брокере)?
-
Что можно сделать, когда брокер достиг своего порога, запустить отдельный процесс, чтобы проверить доступное пространство в брокере для отправки дополнительных сообщений и дать указание возобновить отправку сообщений в kafka broker?
-
Пожалуйста, поделитесь другими проблемами, которые необходимо учитывать при настройке этой вещи?
Комментарии:
1. Брокер должен прекратить чтение из websocket, когда его внутренняя очередь заполнена. Это приведет к тому, что отправитель не отправит отправку, пока она не начнет чтение снова.
2. @Marquis of Lorne, итак, при пороговом значении, если брокер перестанет получать сообщения, повлияет ли это как-либо на обработчика, который отправляет сообщения брокеру? И что произойдет с потоковыми данными websocket, которые также будут приостановлены? Прекратит ли потоковая передача данных?