#java #udp #netty #multicast #reactor
#Ява #udp #нетти #многоадресная рассылка #реактор
Вопрос:
Я построил простой сервер Java UDP, который использует пул потоков фиксированного размера для обработки всего. Один поток используется для приема пакетов, а остальные потоки используются для обработки пакетов и отправки ответа.
Я пытаюсь добиться того же с реактором Нетти, но я не уверен, что делаю это правильно.
Вот мой UDP-сервер только для Java. Я знаю, что он уродливый, дело не в этом. 😀
package test; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.MulticastSocket; import java.net.NetworkInterface; import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import static org.apache.commons.io.IOUtils.closeQuietly; import static java.net.NetworkInterface.getNetworkInterfaces; import static java.nio.charset.Charset.defaultCharset; import static java.util.Collections.list; import static java.util.concurrent.Executors.newFixedThreadPool; public class UDPServer { static final String SSDP_ADDRESS = "239.255.255.250"; static final int SSDP_PORT = 1900; private static MulticastSocket listenSocket = null; private static DatagramSocket responseSocket = null; private static final ExecutorService executor; static { final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("java-udp-%s").setDaemon(false).build(); executor = newFixedThreadPool(8, threadFactory); } private static volatile boolean shouldBeRunning = false; public static void main(final String[] args) throws IOException { Runtime.getRuntime().addShutdownHook(new Thread(UDPServer::shutdown)); final NetworkInterface networkInterface = getNetworkInterface(); listenSocket = new MulticastSocket(SSDP_PORT); listenSocket.setLoopbackMode(true); listenSocket.joinGroup(new InetSocketAddress(SSDP_ADDRESS, SSDP_PORT), networkInterface); responseSocket = new DatagramSocket(null); responseSocket.setReuseAddress(true); responseSocket.bind(new InetSocketAddress(SSDP_PORT)); executor.submit(UDPServer::handlePackets); } static void log(final String message) { System.out.println("[" Thread.currentThread().getName() "] " message); } static NetworkInterface getNetworkInterface() throws SocketException { return list(getNetworkInterfaces()) .stream() .filter(ni -gt; list(ni.getInetAddresses()).stream().anyMatch(address -gt; (address instanceof Inet4Address) amp;amp; address.isSiteLocalAddress())) .findFirst() .orElseThrow(); } private static void shutdown() { shouldBeRunning = false; if (listenSocket != null) { log("Closing the listen socket"); closeQuietly(listenSocket); } if (responseSocket != null) { log("Closing the response socket"); closeQuietly(responseSocket); } log("Shutting down the executor"); executor.shutdown(); } private static void handlePackets() { log("Waiting for packets"); shouldBeRunning = true; while (shouldBeRunning) { final byte[] inPacketPayload = new byte[8192]; final DatagramPacket inPacket = new DatagramPacket(inPacketPayload, inPacketPayload.length); try { listenSocket.receive(inPacket); } catch (SocketTimeoutException e) { continue; } catch (Exception e) { if (shouldBeRunning) { log("Fatal error"); e.printStackTrace(System.out); } return; } log("Got a packet"); executor.submit(() -gt; safeHandlePacket(inPacket)); } } private static void safeHandlePacket(final DatagramPacket inPacket) { try { handlePacket(inPacket); } catch (IOException e) { if (shouldBeRunning) { log("Failed to handle a packet"); e.printStackTrace(System.out); } } } private static void handlePacket(final DatagramPacket inPacket) throws IOException { final String ssdpRequest = new String(inPacket.getData(), 0, inPacket.getLength(), defaultCharset()).strip(); if (!ssdpRequest.startsWith("M-SEARCH")) { return; } log("--n==================================================n" "The packet is from " inPacket.getAddress().getHostAddress() ":" inPacket.getPort() "n" "==================================================n" ssdpRequest "n=================================================="); if (ssdpRequest.contains("MAN: "ssdp:discover"") amp;amp; ssdpRequest.contains("ST: upnp:rootdevice")) { sendResponse(inPacket); } } private static void sendResponse(final DatagramPacket inPacket) throws IOException { log("Sending the response"); final String response = "HTTP/1.1 200 OKrn" "EXT: rn" "ST: upnp:rootdevicern" "USN: uuid:abc123::upnp:rootdevicern" "SERVER: MyTestServerrn" "CACHE-CONTROL: max-age=1800rnrn"; final byte[] responseBytes = response.getBytes(defaultCharset()); final DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length, inPacket.getSocketAddress()); responseSocket.send(responsePacket); log("Response sent"); } }
затем мой UDP-клиент, который я использую для его тестирования. И сервер, и клиент работают должным образом.
package test; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import static test.UDPServer.SSDP_ADDRESS; import static test.UDPServer.SSDP_PORT; import static java.nio.charset.Charset.defaultCharset; public class UDPClient { public static void main(String[] args) throws IOException { final String query = "M-SEARCH * HTTP/1.1rn" "Host: 239.255.255.250:1900rn" "MAN: "ssdp:discover"rn" "ST: upnp:rootdevicernrn"; final byte[] queryBytes = query.getBytes(); final DatagramPacket queryPacket = new DatagramPacket(queryBytes, queryBytes.length, InetAddress.getByName(SSDP_ADDRESS), SSDP_PORT); System.out.println("Creating the socket"); final DatagramSocket socket = new DatagramSocket(); System.out.println("Sending the query"); socket.send(queryPacket); final byte[] responseBytes = new byte[8192]; final DatagramPacket receivePacket = new DatagramPacket(responseBytes, responseBytes.length); System.out.println("Waiting for a response"); socket.receive(receivePacket); System.out.println("Got a response"); System.out.println(new String(receivePacket.getData(), 0, receivePacket.getLength(), defaultCharset())); socket.close(); } }
And finally my Reactor Netty UDP server:
package test; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.buffer.ByteBuf; import io.netty.channel.socket.DatagramPacket; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.resources.LoopResources; import reactor.netty.udp.UdpServer; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import static io.netty.buffer.Unpooled.copiedBuffer; import static io.netty.channel.ChannelOption.SO_REUSEADDR; import static io.netty.channel.socket.InternetProtocolFamily.IPv4; import static test.UDPServer.SSDP_ADDRESS; import static test.UDPServer.SSDP_PORT; import static test.UDPServer.getNetworkInterface; import static test.UDPServer.log; import static java.nio.charset.Charset.defaultCharset; import static java.util.concurrent.Executors.newFixedThreadPool; public class NettyUDPServer { private static final ExecutorService executor; static { final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("netty-udp-%s").setDaemon(false).build(); executor = newFixedThreadPool(8, threadFactory); } private static Connection disposableServer = null; public static void main(String[] args) throws UnknownHostException, SocketException { Runtime.getRuntime().addShutdownHook(new Thread(NettyUDPServer::shutdown)); final NetworkInterface networkInterface = getNetworkInterface(); final InetAddress ssdpAddress = InetAddress.getByName(SSDP_ADDRESS); final UdpServer server = UdpServer.create() .option(SO_REUSEADDR, true) .bindAddress(() -gt; new InetSocketAddress(SSDP_PORT)) .runOn(LoopResources.create("udp"), IPv4) .handle((in, out) -gt; out.sendObject(in.join(ssdpAddress, networkInterface) .thenMany(in.receiveObject()) .map(DatagramPacket.class::cast) .flatMap(NettyUDPServer::safeHandlePacket))); server.warmup().block(); disposableServer = server.bindNow(); disposableServer.onDispose().block(); } private static void shutdown() { log("Closing the server"); if (disposableServer != null) { log("Disposing the server"); disposableServer.disposeNow(); } log("Shutting down the executor"); executor.shutdown(); } private static Publisherlt;DatagramPacketgt; safeHandlePacket(final DatagramPacket inPacket) { try { return handlePacket(inPacket); } catch (IOException e) { return Mono.error(e); } } private static Publisherlt;DatagramPacketgt; handlePacket(final DatagramPacket inPacket) throws IOException { log("Got a packet"); final ByteBuf inPacketContent = inPacket.content(); final byte[] inPacketPayload = new byte[inPacketContent.readableBytes()]; inPacketContent.readBytes(inPacketPayload); final String request = new String(inPacketPayload, defaultCharset()).strip(); if (!request.startsWith("M-SEARCH")) { return Mono.never(); } final InetSocketAddress sender = inPacket.sender(); log("--n==================================================n" "The packet is from " sender.getAddress().getHostAddress() ":" sender.getPort() "n" "==================================================n" request "n=================================================="); if (request.contains("MAN: "ssdp:discover"") amp;amp; request.contains("ST: upnp:rootdevice")) { return sendResponse(inPacket); } else { return Mono.never(); } } private static Publisherlt;DatagramPacketgt; sendResponse(final DatagramPacket inPacket) { log("Assembling the response"); final String response = "HTTP/1.1 200 OKrn" "EXT: rn" "ST: upnp:rootdevicern" "USN: uuid:abc123::upnp:rootdevicern" "SERVER: MyTestServerrn" "CACHE-CONTROL: max-age=1800rnrn"; return Mono.just(new DatagramPacket(copiedBuffer(response, defaultCharset()), inPacket.sender())); } }
Это тоже работает, но является ли это правильным способом его реализации?
Кроме того, когда я вызываю handlePacket
внутри safeHandlePacket
IntelliJ, мне выдается предупреждение о недопустимом вызове метода блокировки.
И часть услуг исполнителя также отсутствует.