Azure Stream Analytics — Как сортировать данные концентратора событий на основе временных меток и пользовательских полей идентификаторов

#azure-data-lake #azure-eventhub #azure-stream-analytics

Вопрос:

Я пытаюсь отсортировать входные данные концентратора событий с помощью Stream Analytics и заполнить выходные данные в ADLS 2 в формате CSV, но данные не отсортированы.

Я использовал функцию azure (триггер таймера), которая последовательно извлекает данные из SQL Server (поле «Порядок по идентификатору«) и отправляет данные о пакетных событиях в концентратор событий.

В Stream Analytics я использую поток концентратора событий в качестве входных данных и ADLS 2 в качестве выходных.

Я попробовал и CollectTop (), и TopOne() для сортировки данных.

Кроме того, я использую «ОТМЕТКУ ВРЕМЕНИ ПО времени просмотра».

Ниже приведен запрос ASA:

данные о событиях: Ввод

adls2: Вывод

 With 
    Step1 as (   
        SELECT eventdata.Id as Id, eventdata.ClientMacAddress, eventdata.SeenEpoch, 
        eventdata.SeenTime, System.Timestamp() t   
        FROM eventdata
        TIMESTAMP BY SeenTime
    ),
    Step2 as (
        SELECT TopOne() OVER (ORDER BY Id asc) as topEvent  
    FROM Step1 
    Group by TumblingWindow(minute, 10)
    )
    SELECT udf.convertJsonToString(topEvent)
        INTO
        [adls2]
    FROM Step2
 

Заранее благодарен вам за помощь.

Комментарии:

1. Если я правильно читаю этот запрос, вы получаете только 1 запись в окне. Вы хотите сказать, что запись не является наименьшим идентификатором (идентификатор верхнего ПОРЯДКА BU Id ASC) этого окна? Можете ли вы подтвердить, что идентификатор относится к типам bigint/float/datetime (которые являются единственными поддерживаемыми типами здесь, согласно документу ). Также вы пытались сопоставить ПРЕДЕЛЬНУЮ ПРОДОЛЖИТЕЛЬНОСТЬ с окном (10 минут)?

2. Спасибо Флориану за ваш ответ. Да, идентификатор имеет тип BigInt. Нет, я не пробовал ОГРАНИЧИТЬ ПРОДОЛЖИТЕЛЬНОСТЬ. Кроме того, в настоящее время я отправляю случайное число событий каждые 1 минуту и упорядочиваю их по идентификатору asc. Теперь та же последовательность должна присутствовать в ASAL каждые 10 минут, когда окно рушится и нажимается на ADLS 2. Надеюсь, вы в состоянии понять мой вопрос!

3. С помощью этой функции TOPONE() вы будете получать только 1 запись за 10 минут. Вот чего я не понимаю. Не могли бы вы обновить запрос до такого, который возвращает весь ожидаемый набор данных? Спасибо!