#java #linux #kotlin #sockets #asynchronous
Вопрос:
После написания нагрузочного теста для сервера, над которым я работаю, я заметил, что тест зависает в случайных местах. После нескольких проходов удаления кода я закончил этот тест:
import mu.KotlinLogging import org.testng.annotations.Test import java.net.InetSocketAddress import java.net.StandardSocketOptions import java.nio.ByteBuffer import java.nio.channels.AsynchronousServerSocketChannel import java.nio.channels.AsynchronousSocketChannel import java.nio.channels.CompletionHandler import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger import kotlin.concurrent.thread import kotlin.test.assertEquals class Reproduce { private val serverPort = 1234 private val testVal: Byte = 42 private val logger = KotlinLogging.logger {} private val acceptServer = AtomicInteger(0) private val acceptClient = AtomicInteger(0) private var running = CountDownLatch(1); private fun runServer() { AsynchronousServerSocketChannel.open().use { it.setOption(StandardSocketOptions.SO_REUSEADDR, true) it.bind(InetSocketAddress(serverPort)) running.countDown() logger.info { "Server started" } try { it.accept(null, object : CompletionHandlerlt;AsynchronousSocketChannel, Unit?gt; { override fun completed(client: AsynchronousSocketChannel, attachment: Unit?) { logger.info { "${acceptServer.incrementAndGet()} server accept succeeded" } val buf = ByteBuffer.allocate(1) buf.put(0, testVal) client.write(buf, null, object : CompletionHandlerlt;Int, Unit?gt; { override fun completed(result: Int, attachment: Unit?) { assertEquals(1, result) client.close() } override fun failed(exc: Throwable, attachment: Unit?) { logger.error(exc) { } } }) it.accept(null, this) } override fun failed(exc: Throwable, attachment: Unit?) { logger.error { "${acceptServer.incrementAndGet()} server accept failed" } it.accept(null, this) } }) } catch(e: Throwable) { logger.error(e) { } } while(true) { Thread.sleep(1000) } } } @Test fun doTest() { thread(name = "Server") { runServer() } running.await() val latch = CountDownLatch(1000) (1..1000).forEach { _ -gt; val socket = AsynchronousSocketChannel.open() socket.connect( InetSocketAddress("127.0.0.1", serverPort), null, object : CompletionHandlerlt;Void, Unit?gt; { override fun completed(result: Void?, attachment: Unit?) { logger.info { "${acceptClient.incrementAndGet()} client accept succeeded" } val buf = ByteBuffer.allocate(1) socket.read(buf, null, object : CompletionHandlerlt;Int, Unit?gt; { override fun completed(result: Int, attachment: Unit?) { assertEquals(1, result) assertEquals(testVal, buf.get(0)) latch.countDown() socket.close() } override fun failed(exc: Throwable, attachment: Unit?) { logger.error(exc) { } } }) } override fun failed(exc: Throwable, attachment: Unit?) { logger.error(exc) { } } }) } latch.await() } }
Я вижу в сообщениях журнала, что все клиенты подключаются довольно быстро, но сервер принимает журналы, останавливается примерно через 300, а затем, кажется, зависает здесь на неопределенный срок (клиенты также не работают, и latch.await
вызов никогда не возвращается).
Я думаю, что проблема где-то в клиентском коде, потому что при запуске nc localhost 1234
я вижу немедленное подтверждение из журнала сервера.
Сеть должна быть идеально асинхронной без каких-либо блокирующих вызовов, что также подтверждается журналами (номера потоков регистрируются, и я вижу только около 80 различных потоков).
Обновление: Так что просто для удовольствия я попытался переписать этот асинхронный клиентский код в синхронный код C (и запустить один поток на соединение), и у меня возникла та же проблема (здесь также применяется обходной путь netcat)! Поэтому я предполагаю, что теперь проблема заключается в том, как Linux обрабатывает несколько подключений к сокетам в одном процессе. Может быть, кто-то знает, что именно вызывает замедление работы клиента и как это обойти?