#spring #spring-boot #tcp #spring-integration #serversocket
#spring #spring-boot #tcp #spring-интеграция #serversocket
Вопрос:
Сейчас я использую следующую версию Spring Boot и Spring integration.
spring.boot.version 2.3.4.RELEASE
spring-integration 5.3.2.RELEASE
Мое требование — создать связь TCP-клиент-сервер, и я использую spring integration для того же. Шип отлично работает для одного соединения между клиентом и сервером, а также отлично работает ровно для 5 одновременных клиентских подключений.
В тот момент, когда я увеличил одновременные клиентские подключения с 5 до любых произвольных чисел, это не работает, но TCP-сервер принимает только 5 подключений.
Я использовал ‘ThreadAffinityClientConnectionFactory’, упомянутый @Gary Russell в одном из предыдущих комментариев (для аналогичных требований), но все равно не работает.
Ниже приведен код, который у меня есть на данный момент.
@Slf4j
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class SocketConfig {
@Value("${socket.host}")
private String clientSocketHost;
@Value("${socket.port}")
private Integer clientSocketPort;
@Bean
public TcpOutboundGateway tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
TcpOutboundGateway gate = new TcpOutboundGateway();
//connectionFactory.setTaskExecutor(taskExecutor());
gate.setConnectionFactory(clientCF());
return gate;
}
@Bean
public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory) {
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setConnectionFactory(connectionFactory);
inGate.setRequestChannel(fromTcp());
return inGate;
}
@Bean
public MessageChannel fromTcp() {
return new DirectChannel();
}
// Outgoing requests
@Bean
public ThreadAffinityClientConnectionFactory clientCF() {
TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(clientSocketHost, serverCF().getPort());
tcpNetClientConnectionFactory.setSingleUse(true);
ThreadAffinityClientConnectionFactory threadAffinityClientConnectionFactory = new ThreadAffinityClientConnectionFactory(
tcpNetClientConnectionFactory);
// Tested with the below too.
// threadAffinityClientConnectionFactory.setTaskExecutor(taskExecutor());
return threadAffinityClientConnectionFactory;
}
// Incoming requests
@Bean
public AbstractServerConnectionFactory serverCF() {
log.info("Server Connection Factory");
TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(clientSocketPort);
tcpNetServerConnectionFactory.setSerializer(new CustomSerializer());
tcpNetServerConnectionFactory.setDeserializer(new CustomDeserializer());
tcpNetServerConnectionFactory.setSingleUse(true);
return tcpNetServerConnectionFactory;
}
@Bean
public TaskExecutor taskExecutor () {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(50);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(50);
executor.setAllowCoreThreadTimeOut(true);
executor.setKeepAliveSeconds(120);
return executor;
}
}
У кого-нибудь была такая же проблема с несколькими одновременными подключениями Tcp-клиентов более 5?
Спасибо
Клиентский код:
@Component
@Slf4j
@RequiredArgsConstructor
public class ScheduledTaskService {
// Timeout in milliseconds
private static final int SOCKET_TIME_OUT = 18000;
private static final int BUFFER_SIZE = 32000;
private static final int ETX = 0x03;
private static final String HEADER = "ABCDEF ";
private static final String data = "FIXED DARATA"
private final AtomicInteger atomicInteger = new AtomicInteger();
@Async
@Scheduled(fixedDelay = 100000)
public void sendDataMessage() throws IOException, InterruptedException {
int numberOfRequests = 10;
Callable<String> executeMultipleSuccessfulRequestTask = () -> socketSendNReceive();
final Collection<Callable<String>> callables = new ArrayList<>();
IntStream.rangeClosed(1, numberOfRequests).forEach(i-> {
callables.add(executeMultipleSuccessfulRequestTask);
});
ExecutorService executorService = Executors.newFixedThreadPool(numberOfRequests);
List<Future<String>> taskFutureList = executorService.invokeAll(callables);
List<String> strings = taskFutureList.stream().map(future -> {
try {
return future.get(20000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return "";
}).collect(Collectors.toList());
strings.forEach(string -> log.info("Message received from the server: {} ", string));
}
public String socketSendNReceive() throws IOException{
int requestCounter = atomicInteger.incrementAndGet();
String host = "localhost";
int port = 8000;
Socket socket = new Socket();
InetSocketAddress address = new InetSocketAddress(host, port);
socket.connect(address, SOCKET_TIME_OUT);
socket.setSoTimeout(SOCKET_TIME_OUT);
//Send the message to the server
OutputStream os = socket.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(os);
bos.write(HEADER.getBytes());
bos.write(data.getBytes());
bos.write(ETX);
bos.flush();
// log.info("Message sent to the server : {} ", envio);
//Get the return message from the server
InputStream is = socket.getInputStream();
String response = receber(is);
log.info("Received response");
return response;
}
private String receber(InputStream in) throws IOException {
final StringBuffer stringBuffer = new StringBuffer();
int readLength;
byte[] buffer;
buffer = new byte[BUFFER_SIZE];
do {
if(Objects.nonNull(in)) {
log.info("Input Stream not null");
}
readLength = in.read(buffer);
log.info("readLength : {} ", readLength);
if(readLength > 0){
stringBuffer.append(new String(buffer),0,readLength);
log.info("String ******");
}
} while (buffer[readLength-1] != ETX);
buffer = null;
stringBuffer.deleteCharAt(resposta.length()-1);
return stringBuffer.toString();
}
}
Комментарии:
1. В фреймворке нет ничего, что ограничивало бы количество подключений до 5; если вы можете предоставить небольшой полный пример, демонстрирующий такое поведение, я могу взглянуть, чтобы понять, что не так.
2. Привет, Гэри, спасибо за быстрый ответ. Теперь я добавил клиентский код к вопросу, который отправит 10 одновременных сообщений в сокет сервера. Когда клиентский код пытается отправить 10 сообщений одновременно, из сокета сервера принимается только 5 сообщений, и на них будет получено только 5 ответов.
3. Тот же клиентский код тестируется на другом стандартном сокете сервера Java (не использует интеграцию с Spring ), но отлично работает для 10 одновременных запросов на подключение к клиенту . Ссылка на ссылку baeldung.com/a-guide-to-java-sockets
4. Я вижу, что вы не закрываете сокет, но это не должно иметь значения. Я не вижу причин, по которым вы были бы ограничены 5 подключениями. Опять же, мне нужен полный пример проекта, размещенный где-нибудь, чтобы я мог видеть, что происходит.
5. Для этого требуется несколько проприетарных jar — вы можете сократить его до минимума и без Lombok?
Ответ №1:
Поскольку вы открываете все соединения одновременно, вам необходимо увеличить backlog
свойство на фабрике подключений к серверу.
По умолчанию значение равно 5.
/**
* The number of sockets in the connection backlog. Default 5;
* increase if you expect high connection rates.
* @param backlog The backlog to set.
*/
public void setBacklog(int backlog) {
Комментарии:
1. Вау. работает как шарм. Ранее вы упоминали, что текущий код не закрывает соединения. Где мы должны закрыть соединения? Я предполагаю, что, поскольку я устанавливаю tcpNetServerConnectionFactory.setSingleUse(true); а также tcpNetClientConnectionFactory.setSingleUse(true); соединения будут автоматически закрыты. Если понимание неверно, то в какой момент соединения должны быть закрыты.
2. Большое спасибо за то же самое. Очень признателен за ответ!!!
3. Да, настройка
singleUse=true
приведет к тому, что сервер закроет сокет после отправки ответа. Но я думаю, что было бы чище сделать это на стороне клиента после получения ответа. Кажется неправильным «полагаться» на то, что сервер делает это за вас. Вы не используете фабрику клиентских подключений в этом клиенте, вы создаетеSocket
ее самостоятельно.4. Примечание ниже: когда я запускаю проверку процесса на порту сокета (8000), я вижу множество состояний «CLOSE_WAIT». Похоже, что сокет сервера не закрывает эти соединения после отправки ответа? Клиентский код, который используется ранее, не может быть изменен, поскольку он используется внешним клиентом, и поэтому мы несем ответственность за закрытие подключений после отправки успешного ответа клиенту.
5. Да; если клиент не закроет сокет, он останется в CLOSE_WAIT; для приложения неправильно не закрывать его. Google CLOSE_WAIT.