#c# #azure #asynchronous #azure-eventhub
#c# #azure #асинхронный #azure-eventhub
Вопрос:
Я настроил экземпляр Event Hub с 20 единицами пропускной способности и 32 разделами на стандартном уровне. Согласно документации, каждая единица пропускной способности равна 1 МБ / с. Поэтому в идеале я должен получать пропускную способность 20 МБ / с или 1,2 ГБ / мин. Пространство имен имеет только один event hub, и я единственный пользователь. Event hub настроен на западе США, который является ближайшим вариантом к месту отправки запросов.
Однако я вижу, что для обработки данных объемом 1,77 ГБ требуется не менее 10 минут. Я использую асинхронные пакетные вызовы и упаковываю каждый запрос до предела в 1 МБ. Я вижу огромную разницу во времени, затрачиваемом вызовом SendBatchAsync — оно варьируется от 0,15 до 25 секунд.
Вот мой код : (Пожалуйста, обратите внимание : я ограничен в использовании .Net Framework 4.5)
static EventHubClient eventHubClient;
static Dictionary<int, List<EventData>> events = new Dictionary<int, List<EventData>>();
static Dictionary<int, long> batchSizes = new Dictionary<int, long>();
static long threshold = (long)(1e6 - 1000);
static SemaphoreSlim concurrencySemaphore;
static int maxConcurrency = 1;
static void Main()
{
eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
using (concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
foreach (string record in GetRecords())
{
Tuple<int, EventData> currentEventDetails = GetEventData(record);
int partitionId = currentEventDetails.Item1;
EventData currentEvent = currentEventDetails.Item2;
BatchOrSendAsync(partitionId, currentEvent);
}
SendRemainingAsync();
}
stopWatch.Stop();
Console.WriteLine(string.Format("### total time taken = {0}", stopWatch.Elapsed.TotalSeconds.ToString()));
}
static async void BatchOrSendAsync(int partitionId, EventData currentEvent)
{
long batchSize = 0;
batchSizes.TryGetValue(partitionId, out batchSize);
long currentEventSize = currentEvent.SerializedSizeInBytes;
if( batchSize currentEventSize > threshold)
{
List<EventData> eventsToSend = events[partitionId];
if (eventsToSend == null || eventsToSend.Count == 0)
{
if (currentEventSize > threshold)
throw new Exception("found event with size above threshold");
return;
}
concurrencySemaphore.Wait();
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
await eventHubClient.SendBatchAsync(eventsToSend);
stopWatch.Stop();
Console.WriteLine(stopWatch.Elapsed.TotalSeconds.ToString());
concurrencySemaphore.Release();
events[partitionId] = new List<EventData> { currentEvent };
batchSizes[partitionId] = currentEventSize;
}
else
{
if (!events.ContainsKey(partitionId))
{
events[partitionId] = new List<EventData>();
batchSizes[partitionId] = 0;
}
events[partitionId].Add(currentEvent);
batchSizes[partitionId] = currentEventSize;
}
}
static async void SendRemainingAsync()
{
foreach(int partitionId in events.Keys)
{
concurrencySemaphore.Wait();
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
await eventHubClient.SendBatchAsync(events[partitionId]);
stopWatch.Stop();
Console.WriteLine(stopWatch.Elapsed.TotalSeconds.ToString());
concurrencySemaphore.Release();
}
}
Примечание : увеличение maxConcurrency для семафора только уменьшает общее затраченное время, и вызов SendBatchAsync начинает выдавать ошибку, когда maxConcurrency равен 10
Что я должен сделать, чтобы улучшить пропускную способность?
Комментарии:
1. Ваши данные равномерно распределены по разделам? Я вижу, вы установили partitionId вручную.
2. Хороший момент… Я проверю это … несмотря на это, для отправки пакета размером 1 МБ требуется не менее 0,15 секунды, тогда как я плачу за пропускную способность 20 МБ / с… В идеале 1 МБ данных должны быть отправлены за 0,05 секунды …. кстати, я должен был также добавить это — 1) время, затрачиваемое на вызов, имеет тенденцию к it…it начинается в диапазоне 0,15 секунды и продолжает увеличиваться до 0,6 секунды, а затем возвращается к диапазону 0,15 секунды… этот шаблон продолжает повторяться …. и я вижу, что вызовы ome время от времени занимают 11 секунд, 15 секунд, 25 секунд… и 2) если я удалю PartitionKey, все равно потребуется примерно столько же 0,15 с на вызов в среднем