Концентратор событий — не получает ожидаемой пропускной способности

#c# #azure #asynchronous #azure-eventhub

#c# #azure #асинхронный #azure-eventhub

Вопрос:

Я настроил экземпляр Event Hub с 20 единицами пропускной способности и 32 разделами на стандартном уровне. Согласно документации, каждая единица пропускной способности равна 1 МБ / с. Поэтому в идеале я должен получать пропускную способность 20 МБ / с или 1,2 ГБ / мин. Пространство имен имеет только один event hub, и я единственный пользователь. Event hub настроен на западе США, который является ближайшим вариантом к месту отправки запросов.

Однако я вижу, что для обработки данных объемом 1,77 ГБ требуется не менее 10 минут. Я использую асинхронные пакетные вызовы и упаковываю каждый запрос до предела в 1 МБ. Я вижу огромную разницу во времени, затрачиваемом вызовом SendBatchAsync — оно варьируется от 0,15 до 25 секунд.

Вот мой код : (Пожалуйста, обратите внимание : я ограничен в использовании .Net Framework 4.5)

     static EventHubClient eventHubClient;        

    static Dictionary<int, List<EventData>> events = new Dictionary<int, List<EventData>>();
    static Dictionary<int, long> batchSizes = new Dictionary<int, long>();

    static long threshold = (long)(1e6 - 1000);

    static SemaphoreSlim concurrencySemaphore;
    static int maxConcurrency = 1;
    static void Main()
    {
        eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);

        Stopwatch stopWatch = new Stopwatch();
        stopWatch.Start();            

        using (concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
        {
            foreach (string record in GetRecords())
            {
                Tuple<int, EventData> currentEventDetails = GetEventData(record);
                int partitionId = currentEventDetails.Item1;
                EventData currentEvent = currentEventDetails.Item2;

                BatchOrSendAsync(partitionId, currentEvent);

            }

            SendRemainingAsync();
        }

        stopWatch.Stop();
        Console.WriteLine(string.Format("### total time taken = {0}", stopWatch.Elapsed.TotalSeconds.ToString()));

    }

    static async void BatchOrSendAsync(int partitionId, EventData currentEvent)
    {
        long batchSize = 0;

        batchSizes.TryGetValue(partitionId, out batchSize);
        long currentEventSize = currentEvent.SerializedSizeInBytes;

        if( batchSize   currentEventSize > threshold)
        {
            List<EventData> eventsToSend = events[partitionId];
            if (eventsToSend == null || eventsToSend.Count == 0)
            {
                if (currentEventSize > threshold)
                    throw new Exception("found event with size above threshold");
                return;
            }

            concurrencySemaphore.Wait();
            Stopwatch stopWatch = new Stopwatch();
            stopWatch.Start();
            await eventHubClient.SendBatchAsync(eventsToSend);
            stopWatch.Stop();
            Console.WriteLine(stopWatch.Elapsed.TotalSeconds.ToString());
            concurrencySemaphore.Release();

            events[partitionId] = new List<EventData> { currentEvent };
            batchSizes[partitionId] = currentEventSize;
        }
        else
        {
            if (!events.ContainsKey(partitionId))
            { 
                events[partitionId] = new List<EventData>();
                batchSizes[partitionId] = 0;
            }
            events[partitionId].Add(currentEvent);
            batchSizes[partitionId]  = currentEventSize;
        }
    }

    static async void SendRemainingAsync()
    {
        foreach(int partitionId in events.Keys)
        {                
            concurrencySemaphore.Wait();
            Stopwatch stopWatch = new Stopwatch();
            stopWatch.Start();                
            await eventHubClient.SendBatchAsync(events[partitionId]);
            stopWatch.Stop();
            Console.WriteLine(stopWatch.Elapsed.TotalSeconds.ToString());
            concurrencySemaphore.Release();
        }
    }
  

Примечание : увеличение maxConcurrency для семафора только уменьшает общее затраченное время, и вызов SendBatchAsync начинает выдавать ошибку, когда maxConcurrency равен 10

Что я должен сделать, чтобы улучшить пропускную способность?

Комментарии:

1. Ваши данные равномерно распределены по разделам? Я вижу, вы установили partitionId вручную.

2. Хороший момент… Я проверю это … несмотря на это, для отправки пакета размером 1 МБ требуется не менее 0,15 секунды, тогда как я плачу за пропускную способность 20 МБ / с… В идеале 1 МБ данных должны быть отправлены за 0,05 секунды …. кстати, я должен был также добавить это — 1) время, затрачиваемое на вызов, имеет тенденцию к it…it начинается в диапазоне 0,15 секунды и продолжает увеличиваться до 0,6 секунды, а затем возвращается к диапазону 0,15 секунды… этот шаблон продолжает повторяться …. и я вижу, что вызовы ome время от времени занимают 11 секунд, 15 секунд, 25 секунд… и 2) если я удалю PartitionKey, все равно потребуется примерно столько же 0,15 с на вызов в среднем