Каковы основные проблемы при использовании websocket для получения потока данных?

#java #spring-boot #websocket #apache-kafka

#java #весенняя загрузка #websocket #apache-kafka

Вопрос:

Я делаю это впервые. Где я собираюсь читать поток данных с помощью websocket.

Вот мой фрагмент кода

Приложение RsvpApplication

 @SpringBootApplication
public class RsvpApplication {

    private static final String MEETUP_RSVPS_ENDPOINT = "ws://stream.myapi.com/2/rsvps";

    public static void main(String[] args) {
        SpringApplication.run(RsvpApplication.class, args);
    }

    @Bean
    public ApplicationRunner initializeConnection(
        RsvpsWebSocketHandler rsvpsWebSocketHandler) {
            return args -> {
                
                System.out.println("initializeConnection");
                
                WebSocketClient rsvpsSocketClient = new StandardWebSocketClient();

                rsvpsSocketClient.doHandshake(
                    rsvpsWebSocketHandler, MEETUP_RSVPS_ENDPOINT);           
            };
        }
}
  

RsvpsWebSocketHandler

 @Component
class RsvpsWebSocketHandler extends AbstractWebSocketHandler {

    private static final Logger logger = 
            Logger.getLogger(RsvpsWebSocketHandler.class.getName());
        
    private final RsvpsKafkaProducer rsvpsKafkaProducer;

    public RsvpsWebSocketHandler(RsvpsKafkaProducer rsvpsKafkaProducer) {    
        this.rsvpsKafkaProducer = rsvpsKafkaProducer;
    }

    @Override
    public void handleMessage(WebSocketSession session,
            WebSocketMessage<?> message) {
        logger.log(Level.INFO, "New RSVP:n {0}", message.getPayload());
        
        System.out.println("handleMessage");
        
        rsvpsKafkaProducer.sendRsvpMessage(message);        
    }
}
  

RsvpsKafkaProducer

 @Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
  
    private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;

    private final Source source;

    public RsvpsKafkaProducer(Source source) {
        this.source = source;
    }

    public void sendRsvpMessage(WebSocketMessage<?> message) {
        
         System.out.println("sendRsvpMessage");
        source.output()
                .send(MessageBuilder.withPayload(message.getPayload())
                        .build(),
                        SENDING_MESSAGE_TIMEOUT_MS);   
    }
}
  

Насколько я знаю и читал о websocket, ему требуется одноразовое подключение, и поток данных будет течь непрерывно, пока какая-либо сторона (клиент или сервер) не остановится.

Я создаю его впервые, поэтому пытаюсь охватить основные сценарии, которые могут возникнуть при обработке более 10000 сообщений в минуту. Всего брокеров kafka два с достаточным пространством.

  1. Что можно сделать, если соединение потеряется и снова начнет получать сообщения от webscoket после подключения обратно, где оно было оставлено при последнем сбое, и отправлять сообщения в дальнейший Kafka broker?

  2. Что можно сделать, чтобы приостановить websocket, чтобы продолжать отправлять сообщения в брокере, если он достиг порогового значения необработанных сообщений (в брокере)?

  3. Что можно сделать, когда брокер достиг своего порога, запустить отдельный процесс, чтобы проверить доступное пространство в брокере для отправки дополнительных сообщений и дать указание возобновить отправку сообщений в kafka broker?

  4. Пожалуйста, поделитесь другими проблемами, которые необходимо учитывать при настройке этой вещи?

Комментарии:

1. Брокер должен прекратить чтение из websocket, когда его внутренняя очередь заполнена. Это приведет к тому, что отправитель не отправит отправку, пока она не начнет чтение снова.

2. @Marquis of Lorne, итак, при пороговом значении, если брокер перестанет получать сообщения, повлияет ли это как-либо на обработчика, который отправляет сообщения брокеру? И что произойдет с потоковыми данными websocket, которые также будут приостановлены? Прекратит ли потоковая передача данных?