Реактор Netty Multicast UDP — правильно ли я это делаю?

#java #udp #netty #multicast #reactor

#Ява #udp #нетти #многоадресная рассылка #реактор

Вопрос:

Я построил простой сервер Java UDP, который использует пул потоков фиксированного размера для обработки всего. Один поток используется для приема пакетов, а остальные потоки используются для обработки пакетов и отправки ответа.

Я пытаюсь добиться того же с реактором Нетти, но я не уверен, что делаю это правильно.

Вот мой UDP-сервер только для Java. Я знаю, что он уродливый, дело не в этом. 😀

 package test;  import com.google.common.util.concurrent.ThreadFactoryBuilder;  import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.MulticastSocket; import java.net.NetworkInterface; import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory;  import static org.apache.commons.io.IOUtils.closeQuietly;  import static java.net.NetworkInterface.getNetworkInterfaces; import static java.nio.charset.Charset.defaultCharset; import static java.util.Collections.list; import static java.util.concurrent.Executors.newFixedThreadPool;  public class UDPServer {   static final String SSDP_ADDRESS = "239.255.255.250";  static final int SSDP_PORT = 1900;   private static MulticastSocket listenSocket = null;  private static DatagramSocket responseSocket = null;   private static final ExecutorService executor;   static {  final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("java-udp-%s").setDaemon(false).build();  executor = newFixedThreadPool(8, threadFactory);  }   private static volatile boolean shouldBeRunning = false;   public static void main(final String[] args) throws IOException {  Runtime.getRuntime().addShutdownHook(new Thread(UDPServer::shutdown));   final NetworkInterface networkInterface = getNetworkInterface();   listenSocket = new MulticastSocket(SSDP_PORT);  listenSocket.setLoopbackMode(true);  listenSocket.joinGroup(new InetSocketAddress(SSDP_ADDRESS, SSDP_PORT), networkInterface);   responseSocket = new DatagramSocket(null);  responseSocket.setReuseAddress(true);  responseSocket.bind(new InetSocketAddress(SSDP_PORT));   executor.submit(UDPServer::handlePackets);  }   static void log(final String message) {  System.out.println("["   Thread.currentThread().getName()   "] "   message);  }   static NetworkInterface getNetworkInterface() throws SocketException {  return list(getNetworkInterfaces())  .stream()  .filter(ni -gt; list(ni.getInetAddresses()).stream().anyMatch(address -gt; (address instanceof Inet4Address) amp;amp; address.isSiteLocalAddress()))  .findFirst()  .orElseThrow();  }   private static void shutdown() {  shouldBeRunning = false;   if (listenSocket != null) {  log("Closing the listen socket");  closeQuietly(listenSocket);  }   if (responseSocket != null) {  log("Closing the response socket");  closeQuietly(responseSocket);  }   log("Shutting down the executor");  executor.shutdown();  }   private static void handlePackets() {  log("Waiting for packets");   shouldBeRunning = true;   while (shouldBeRunning) {  final byte[] inPacketPayload = new byte[8192];  final DatagramPacket inPacket = new DatagramPacket(inPacketPayload, inPacketPayload.length);   try {  listenSocket.receive(inPacket);  } catch (SocketTimeoutException e) {  continue;  } catch (Exception e) {  if (shouldBeRunning) {  log("Fatal error");  e.printStackTrace(System.out);  }   return;  }   log("Got a packet");   executor.submit(() -gt; safeHandlePacket(inPacket));  }  }   private static void safeHandlePacket(final DatagramPacket inPacket) {  try {  handlePacket(inPacket);  } catch (IOException e) {  if (shouldBeRunning) {  log("Failed to handle a packet");  e.printStackTrace(System.out);  }  }  }   private static void handlePacket(final DatagramPacket inPacket) throws IOException {  final String ssdpRequest = new String(inPacket.getData(), 0, inPacket.getLength(), defaultCharset()).strip();   if (!ssdpRequest.startsWith("M-SEARCH")) {  return;  }   log("--n==================================================n"    "The packet is from "   inPacket.getAddress().getHostAddress()   ":"   inPacket.getPort()   "n"    "==================================================n"    ssdpRequest   "n==================================================");   if (ssdpRequest.contains("MAN: "ssdp:discover"") amp;amp; ssdpRequest.contains("ST: upnp:rootdevice")) {  sendResponse(inPacket);  }  }   private static void sendResponse(final DatagramPacket inPacket) throws IOException {  log("Sending the response");   final String response = "HTTP/1.1 200 OKrn"    "EXT: rn"    "ST: upnp:rootdevicern"    "USN: uuid:abc123::upnp:rootdevicern"    "SERVER: MyTestServerrn"    "CACHE-CONTROL: max-age=1800rnrn";   final byte[] responseBytes = response.getBytes(defaultCharset());   final DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length, inPacket.getSocketAddress());  responseSocket.send(responsePacket);   log("Response sent");  }  }  

затем мой UDP-клиент, который я использую для его тестирования. И сервер, и клиент работают должным образом.

 package test;  import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress;  import static test.UDPServer.SSDP_ADDRESS; import static test.UDPServer.SSDP_PORT;  import static java.nio.charset.Charset.defaultCharset;  public class UDPClient {   public static void main(String[] args) throws IOException {  final String query = "M-SEARCH * HTTP/1.1rn"    "Host: 239.255.255.250:1900rn"    "MAN: "ssdp:discover"rn"    "ST: upnp:rootdevicernrn";   final byte[] queryBytes = query.getBytes();  final DatagramPacket queryPacket = new DatagramPacket(queryBytes, queryBytes.length, InetAddress.getByName(SSDP_ADDRESS), SSDP_PORT);   System.out.println("Creating the socket");  final DatagramSocket socket = new DatagramSocket();   System.out.println("Sending the query");  socket.send(queryPacket);   final byte[] responseBytes = new byte[8192];  final DatagramPacket receivePacket = new DatagramPacket(responseBytes, responseBytes.length);   System.out.println("Waiting for a response");  socket.receive(receivePacket);   System.out.println("Got a response");  System.out.println(new String(receivePacket.getData(), 0, receivePacket.getLength(), defaultCharset()));   socket.close();  }  }  

And finally my Reactor Netty UDP server:

 package test;  import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.buffer.ByteBuf; import io.netty.channel.socket.DatagramPacket; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.resources.LoopResources; import reactor.netty.udp.UdpServer;  import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory;  import static io.netty.buffer.Unpooled.copiedBuffer; import static io.netty.channel.ChannelOption.SO_REUSEADDR; import static io.netty.channel.socket.InternetProtocolFamily.IPv4; import static test.UDPServer.SSDP_ADDRESS; import static test.UDPServer.SSDP_PORT; import static test.UDPServer.getNetworkInterface; import static test.UDPServer.log;  import static java.nio.charset.Charset.defaultCharset; import static java.util.concurrent.Executors.newFixedThreadPool;  public class NettyUDPServer {   private static final ExecutorService executor;   static {  final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("netty-udp-%s").setDaemon(false).build();  executor = newFixedThreadPool(8, threadFactory);  }   private static Connection disposableServer = null;   public static void main(String[] args) throws UnknownHostException, SocketException {  Runtime.getRuntime().addShutdownHook(new Thread(NettyUDPServer::shutdown));   final NetworkInterface networkInterface = getNetworkInterface();   final InetAddress ssdpAddress = InetAddress.getByName(SSDP_ADDRESS);   final UdpServer server =  UdpServer.create()  .option(SO_REUSEADDR, true)  .bindAddress(() -gt; new InetSocketAddress(SSDP_PORT))  .runOn(LoopResources.create("udp"), IPv4)  .handle((in, out) -gt; out.sendObject(in.join(ssdpAddress, networkInterface)  .thenMany(in.receiveObject())  .map(DatagramPacket.class::cast)  .flatMap(NettyUDPServer::safeHandlePacket)));   server.warmup().block();   disposableServer = server.bindNow();  disposableServer.onDispose().block();  }   private static void shutdown() {  log("Closing the server");   if (disposableServer != null) {  log("Disposing the server");  disposableServer.disposeNow();  }   log("Shutting down the executor");  executor.shutdown();  }   private static Publisherlt;DatagramPacketgt; safeHandlePacket(final DatagramPacket inPacket) {  try {  return handlePacket(inPacket);  } catch (IOException e) {  return Mono.error(e);  }  }   private static Publisherlt;DatagramPacketgt; handlePacket(final DatagramPacket inPacket) throws IOException {  log("Got a packet");   final ByteBuf inPacketContent = inPacket.content();  final byte[] inPacketPayload = new byte[inPacketContent.readableBytes()];  inPacketContent.readBytes(inPacketPayload);   final String request = new String(inPacketPayload, defaultCharset()).strip();   if (!request.startsWith("M-SEARCH")) {  return Mono.never();  }   final InetSocketAddress sender = inPacket.sender();   log("--n==================================================n"    "The packet is from "   sender.getAddress().getHostAddress()   ":"   sender.getPort()   "n"    "==================================================n"    request   "n==================================================");   if (request.contains("MAN: "ssdp:discover"") amp;amp; request.contains("ST: upnp:rootdevice")) {  return sendResponse(inPacket);  } else {  return Mono.never();  }  }   private static Publisherlt;DatagramPacketgt; sendResponse(final DatagramPacket inPacket) {  log("Assembling the response");   final String response = "HTTP/1.1 200 OKrn"    "EXT: rn"    "ST: upnp:rootdevicern"    "USN: uuid:abc123::upnp:rootdevicern"    "SERVER: MyTestServerrn"    "CACHE-CONTROL: max-age=1800rnrn";   return Mono.just(new DatagramPacket(copiedBuffer(response, defaultCharset()), inPacket.sender()));  }  }  

Это тоже работает, но является ли это правильным способом его реализации?

Кроме того, когда я вызываю handlePacket внутри safeHandlePacket IntelliJ, мне выдается предупреждение о недопустимом вызове метода блокировки.

И часть услуг исполнителя также отсутствует.