Интеграция с Spring TCP Server количество подключений более 5

#spring #spring-boot #tcp #spring-integration #serversocket

#spring #spring-boot #tcp #spring-интеграция #serversocket

Вопрос:

Сейчас я использую следующую версию Spring Boot и Spring integration.

 spring.boot.version 2.3.4.RELEASE
spring-integration  5.3.2.RELEASE
  

Мое требование — создать связь TCP-клиент-сервер, и я использую spring integration для того же. Шип отлично работает для одного соединения между клиентом и сервером, а также отлично работает ровно для 5 одновременных клиентских подключений.

В тот момент, когда я увеличил одновременные клиентские подключения с 5 до любых произвольных чисел, это не работает, но TCP-сервер принимает только 5 подключений.

Я использовал ‘ThreadAffinityClientConnectionFactory’, упомянутый @Gary Russell в одном из предыдущих комментариев (для аналогичных требований), но все равно не работает.

Ниже приведен код, который у меня есть на данный момент.

 @Slf4j
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class SocketConfig {

    @Value("${socket.host}")
    private String clientSocketHost;

    @Value("${socket.port}")
    private Integer clientSocketPort;

    @Bean
    public TcpOutboundGateway tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
        TcpOutboundGateway gate = new TcpOutboundGateway();
        //connectionFactory.setTaskExecutor(taskExecutor());
        gate.setConnectionFactory(clientCF());
        return gate;
    }

    @Bean
    public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory)  {
        TcpInboundGateway inGate = new TcpInboundGateway();
        inGate.setConnectionFactory(connectionFactory);
        inGate.setRequestChannel(fromTcp());
        return inGate;
    }

    @Bean
    public MessageChannel fromTcp() {
        return new DirectChannel();
    }

    // Outgoing requests
    @Bean
    public ThreadAffinityClientConnectionFactory clientCF() {
        TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(clientSocketHost, serverCF().getPort());
        tcpNetClientConnectionFactory.setSingleUse(true);
        ThreadAffinityClientConnectionFactory threadAffinityClientConnectionFactory = new ThreadAffinityClientConnectionFactory(
            tcpNetClientConnectionFactory);
        // Tested with the below too.
        // threadAffinityClientConnectionFactory.setTaskExecutor(taskExecutor());
        return threadAffinityClientConnectionFactory;
    }


    // Incoming requests
    @Bean
    public AbstractServerConnectionFactory serverCF() {
        log.info("Server Connection Factory");
        TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(clientSocketPort);
        tcpNetServerConnectionFactory.setSerializer(new CustomSerializer());
        tcpNetServerConnectionFactory.setDeserializer(new CustomDeserializer());
        tcpNetServerConnectionFactory.setSingleUse(true);
        return tcpNetServerConnectionFactory;
    }


    @Bean
    public TaskExecutor taskExecutor () {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(50);
        executor.setAllowCoreThreadTimeOut(true);
        executor.setKeepAliveSeconds(120);
        return executor;
    }

}
  

У кого-нибудь была такая же проблема с несколькими одновременными подключениями Tcp-клиентов более 5?

Спасибо

Клиентский код:

 @Component
@Slf4j
@RequiredArgsConstructor
public class ScheduledTaskService {

    // Timeout in milliseconds
    private static final int SOCKET_TIME_OUT = 18000;
    private static final int BUFFER_SIZE = 32000;
    private static final int ETX = 0x03;
    private static final String HEADER = "ABCDEF             ";
    private static final String data = "FIXED DARATA"
    private final AtomicInteger atomicInteger = new AtomicInteger();

    @Async
    @Scheduled(fixedDelay = 100000)
    public void sendDataMessage() throws IOException, InterruptedException {
        int numberOfRequests = 10;

        Callable<String> executeMultipleSuccessfulRequestTask = () -> socketSendNReceive();

        final Collection<Callable<String>> callables = new ArrayList<>();
        IntStream.rangeClosed(1, numberOfRequests).forEach(i-> {
            callables.add(executeMultipleSuccessfulRequestTask);
        });
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfRequests);

        List<Future<String>> taskFutureList = executorService.invokeAll(callables);
        List<String> strings = taskFutureList.stream().map(future -> {
            try {
                return future.get(20000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return "";
        }).collect(Collectors.toList());

        strings.forEach(string -> log.info("Message received from the server: {} ", string));

    }

    public String socketSendNReceive() throws IOException{
        int requestCounter = atomicInteger.incrementAndGet();

        String host = "localhost";
        int port = 8000;

        Socket socket = new Socket();
        InetSocketAddress address = new InetSocketAddress(host, port);
        socket.connect(address, SOCKET_TIME_OUT);
        socket.setSoTimeout(SOCKET_TIME_OUT);

        //Send the message to the server
        OutputStream os = socket.getOutputStream();
        BufferedOutputStream bos = new BufferedOutputStream(os);

        bos.write(HEADER.getBytes());
        bos.write(data.getBytes());
        bos.write(ETX);
        bos.flush();
//        log.info("Message sent to the server : {} ",  envio);

        //Get the return message from the server
        InputStream is = socket.getInputStream();
        String response =  receber(is);
        log.info("Received response");
        return response;
    }

    private String receber(InputStream in) throws IOException {
        final StringBuffer stringBuffer = new StringBuffer();
        int readLength;
        byte[] buffer;
        buffer = new byte[BUFFER_SIZE];
        do {
            if(Objects.nonNull(in)) {
                log.info("Input Stream not null");
            }
            readLength = in.read(buffer);
            log.info("readLength : {}  ", readLength);
            if(readLength > 0){
                stringBuffer.append(new String(buffer),0,readLength);
                log.info("String ******");
            }
        } while (buffer[readLength-1] != ETX);
        buffer = null;
        stringBuffer.deleteCharAt(resposta.length()-1);
        return stringBuffer.toString();
    }
}
  

Комментарии:

1. В фреймворке нет ничего, что ограничивало бы количество подключений до 5; если вы можете предоставить небольшой полный пример, демонстрирующий такое поведение, я могу взглянуть, чтобы понять, что не так.

2. Привет, Гэри, спасибо за быстрый ответ. Теперь я добавил клиентский код к вопросу, который отправит 10 одновременных сообщений в сокет сервера. Когда клиентский код пытается отправить 10 сообщений одновременно, из сокета сервера принимается только 5 сообщений, и на них будет получено только 5 ответов.

3. Тот же клиентский код тестируется на другом стандартном сокете сервера Java (не использует интеграцию с Spring ), но отлично работает для 10 одновременных запросов на подключение к клиенту . Ссылка на ссылку baeldung.com/a-guide-to-java-sockets

4. Я вижу, что вы не закрываете сокет, но это не должно иметь значения. Я не вижу причин, по которым вы были бы ограничены 5 подключениями. Опять же, мне нужен полный пример проекта, размещенный где-нибудь, чтобы я мог видеть, что происходит.

5. Для этого требуется несколько проприетарных jar — вы можете сократить его до минимума и без Lombok?

Ответ №1:

Поскольку вы открываете все соединения одновременно, вам необходимо увеличить backlog свойство на фабрике подключений к серверу.

По умолчанию значение равно 5.

 /**
 * The number of sockets in the connection backlog. Default 5;
 * increase if you expect high connection rates.
 * @param backlog The backlog to set.
 */
public void setBacklog(int backlog) {
  

Комментарии:

1. Вау. работает как шарм. Ранее вы упоминали, что текущий код не закрывает соединения. Где мы должны закрыть соединения? Я предполагаю, что, поскольку я устанавливаю tcpNetServerConnectionFactory.setSingleUse(true); а также tcpNetClientConnectionFactory.setSingleUse(true); соединения будут автоматически закрыты. Если понимание неверно, то в какой момент соединения должны быть закрыты.

2. Большое спасибо за то же самое. Очень признателен за ответ!!!

3. Да, настройка singleUse=true приведет к тому, что сервер закроет сокет после отправки ответа. Но я думаю, что было бы чище сделать это на стороне клиента после получения ответа. Кажется неправильным «полагаться» на то, что сервер делает это за вас. Вы не используете фабрику клиентских подключений в этом клиенте, вы создаете Socket ее самостоятельно.

4. Примечание ниже: когда я запускаю проверку процесса на порту сокета (8000), я вижу множество состояний «CLOSE_WAIT». Похоже, что сокет сервера не закрывает эти соединения после отправки ответа? Клиентский код, который используется ранее, не может быть изменен, поскольку он используется внешним клиентом, и поэтому мы несем ответственность за закрытие подключений после отправки успешного ответа клиенту.

5. Да; если клиент не закроет сокет, он останется в CLOSE_WAIT; для приложения неправильно не закрывать его. Google CLOSE_WAIT.