Асинхронные сокеты Java NIO становятся чрезвычайно медленными после нескольких сотен подключений

#java #linux #kotlin #sockets #asynchronous

Вопрос:

После написания нагрузочного теста для сервера, над которым я работаю, я заметил, что тест зависает в случайных местах. После нескольких проходов удаления кода я закончил этот тест:

 import mu.KotlinLogging import org.testng.annotations.Test import java.net.InetSocketAddress import java.net.StandardSocketOptions import java.nio.ByteBuffer import java.nio.channels.AsynchronousServerSocketChannel import java.nio.channels.AsynchronousSocketChannel import java.nio.channels.CompletionHandler import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger import kotlin.concurrent.thread import kotlin.test.assertEquals  class Reproduce {  private val serverPort = 1234  private val testVal: Byte = 42  private val logger = KotlinLogging.logger {}  private val acceptServer = AtomicInteger(0)  private val acceptClient = AtomicInteger(0)  private var running = CountDownLatch(1);   private fun runServer() {  AsynchronousServerSocketChannel.open().use {  it.setOption(StandardSocketOptions.SO_REUSEADDR, true)  it.bind(InetSocketAddress(serverPort))  running.countDown()  logger.info { "Server started" }  try {  it.accept(null, object : CompletionHandlerlt;AsynchronousSocketChannel, Unit?gt; {  override fun completed(client: AsynchronousSocketChannel, attachment: Unit?) {  logger.info { "${acceptServer.incrementAndGet()} server accept succeeded" }  val buf = ByteBuffer.allocate(1)  buf.put(0, testVal)  client.write(buf, null, object : CompletionHandlerlt;Int, Unit?gt; {  override fun completed(result: Int, attachment: Unit?) {  assertEquals(1, result)  client.close()  }   override fun failed(exc: Throwable, attachment: Unit?) {  logger.error(exc) { }  }   })  it.accept(null, this)   }   override fun failed(exc: Throwable, attachment: Unit?) {  logger.error { "${acceptServer.incrementAndGet()} server accept failed" }  it.accept(null, this)  }  })  } catch(e: Throwable) {  logger.error(e) { }  }  while(true) {  Thread.sleep(1000)  }  }  }   @Test  fun doTest() {  thread(name = "Server") { runServer() }  running.await()   val latch = CountDownLatch(1000)  (1..1000).forEach { _ -gt;  val socket = AsynchronousSocketChannel.open()  socket.connect(  InetSocketAddress("127.0.0.1", serverPort), null, object : CompletionHandlerlt;Void, Unit?gt; {  override fun completed(result: Void?, attachment: Unit?) {  logger.info { "${acceptClient.incrementAndGet()} client accept succeeded" }  val buf = ByteBuffer.allocate(1)  socket.read(buf, null, object : CompletionHandlerlt;Int, Unit?gt; {  override fun completed(result: Int, attachment: Unit?) {  assertEquals(1, result)  assertEquals(testVal, buf.get(0))  latch.countDown()  socket.close()  }   override fun failed(exc: Throwable, attachment: Unit?) {  logger.error(exc) { }  }  })  }   override fun failed(exc: Throwable, attachment: Unit?) {  logger.error(exc) { }  }  })  }  latch.await()  } }  

Я вижу в сообщениях журнала, что все клиенты подключаются довольно быстро, но сервер принимает журналы, останавливается примерно через 300, а затем, кажется, зависает здесь на неопределенный срок (клиенты также не работают, и latch.await вызов никогда не возвращается).

Я думаю, что проблема где-то в клиентском коде, потому что при запуске nc localhost 1234 я вижу немедленное подтверждение из журнала сервера.

Сеть должна быть идеально асинхронной без каких-либо блокирующих вызовов, что также подтверждается журналами (номера потоков регистрируются, и я вижу только около 80 различных потоков).

Обновление: Так что просто для удовольствия я попытался переписать этот асинхронный клиентский код в синхронный код C (и запустить один поток на соединение), и у меня возникла та же проблема (здесь также применяется обходной путь netcat)! Поэтому я предполагаю, что теперь проблема заключается в том, как Linux обрабатывает несколько подключений к сокетам в одном процессе. Может быть, кто-то знает, что именно вызывает замедление работы клиента и как это обойти?