#java #azure #gradle #azure-eventhub
Вопрос:
Я пытаюсь отправить много данных с сервера на свой концентратор событий, однако, когда я пытаюсь использовать пример кода из документа Microsoft, данные отправляются, но приложение никогда не останавливается. Я заполняю свой список массивов 100 данными одновременно, а затем запускаю метод.
public static void publishEvents(List<EventData> allEvents) {
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
Я попытался использовать асинхронный подход, который, кажется, работает нормально, но создание нового соединения для каждых отправляемых данных кажется плохой идеей и при использовании этого для отправки большого количества данных.
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncProducerClient();
// Create a batch and add a sample log to it
producer.createBatch().flatMap(batch ->
{
batch.tryAdd(new EventData(event));
return producer.send(batch);
}).subscribe(unused -> {
},
error -> System.err.println("Couldn't send logs, there's an error: " error.getStackTrace()),
() ->
{
System.out.println("Send complete!");
// Close the connection
producer.close();
});
}
Потребуется некоторая помощь, чтобы устранить проблему, когда приложение никогда не выходит.
Ответ №1:
Я полагаю, что это может быть связано с ошибкой здесь, когда мы не распоряжаемся ресурсом, чтобы позволить ему закрыться должным образом. Ваш упрек выглядит так: https://github.com/Azure/azure-sdk-for-java/issues/22305
Для асинхронного кода .subscribe()
вызов просто настраивает подписку и вызывает вызов, прежде чем перейти к следующей строке кода. (Это неблокирующий вызов.) Если бы ваш основной метод выглядел так, как показано ниже, программа, вероятно, завершилась бы до того, как она успела создать соединение/сеанс / ссылку AMQP на концентратор событий. Так вот почему программа закончилась так изящно.
public static void main(String[] args) {
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncProducerClient();
// Create a batch and add a sample log to it
producer.createBatch().flatMap(batch -> {
batch.tryAdd(new EventData(event));
return producer.send(batch);
}).subscribe(unused -> {
},
error -> System.err.println("Couldn't send logs, there's an error: " error.getStackTrace()),
() -> {
System.out.println("Send complete!");
// Close the connection
producer.close();
});
}