Java отправляет на EventHub (не завершая работу изящно)

#java #azure #gradle #azure-eventhub

Вопрос:

Я пытаюсь отправить много данных с сервера на свой концентратор событий, однако, когда я пытаюсь использовать пример кода из документа Microsoft, данные отправляются, но приложение никогда не останавливается. Я заполняю свой список массивов 100 данными одновременно, а затем запускаю метод.

     public static void publishEvents(List<EventData> allEvents) {
        // create a producer client
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

       

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                          eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }
 

Я попытался использовать асинхронный подход, который, кажется, работает нормально, но создание нового соединения для каждых отправляемых данных кажется плохой идеей и при использовании этого для отправки большого количества данных.

         EventHubProducerAsyncClient producer = new EventHubClientBuilder()
                .connectionString(connectionString)
                .buildAsyncProducerClient();

        // Create a batch and add a sample log to it
        producer.createBatch().flatMap(batch ->
        {
            batch.tryAdd(new EventData(event));


            return producer.send(batch);
        }).subscribe(unused -> {
                },
                error -> System.err.println("Couldn't send logs, there's an error: "   error.getStackTrace()),
                () ->
                {
                    System.out.println("Send complete!");
                    // Close the connection
                    producer.close();
                });

    }
 

Потребуется некоторая помощь, чтобы устранить проблему, когда приложение никогда не выходит.

Ответ №1:

Я полагаю, что это может быть связано с ошибкой здесь, когда мы не распоряжаемся ресурсом, чтобы позволить ему закрыться должным образом. Ваш упрек выглядит так: https://github.com/Azure/azure-sdk-for-java/issues/22305

Для асинхронного кода .subscribe() вызов просто настраивает подписку и вызывает вызов, прежде чем перейти к следующей строке кода. (Это неблокирующий вызов.) Если бы ваш основной метод выглядел так, как показано ниже, программа, вероятно, завершилась бы до того, как она успела создать соединение/сеанс / ссылку AMQP на концентратор событий. Так вот почему программа закончилась так изящно.

 public static void main(String[] args) {
    EventHubProducerAsyncClient producer = new EventHubClientBuilder()
            .connectionString(connectionString)
            .buildAsyncProducerClient();

    // Create a batch and add a sample log to it
    producer.createBatch().flatMap(batch -> {
        batch.tryAdd(new EventData(event));
        return producer.send(batch);
    }).subscribe(unused -> {
        },
        error -> System.err.println("Couldn't send logs, there's an error: "   error.getStackTrace()),
        () -> {
            System.out.println("Send complete!");
            // Close the connection
            producer.close();
        });
}