Нет вывода, когда одна сторона ОБЪЕДИНЕНИЯ пуста

#azure #iot #azure-eventhub #azure-stream-analytics #stream-analytics

Вопрос:

У меня есть задание Azure Stream Analytics, которое объединяет результаты нескольких запросов и выводит их в один приемник. Для этого я определяю свои запросы в инструкции WITH, затем объединяю их с помощью UNION и затем записываю их в свой приемник. Однако, к сожалению, я получаю выходные данные в приемник только в том случае, когда все мои запросы действительно имеют выходные данные, и именно здесь все идет не так.

У меня есть несколько запросов, которые непрерывно (каждые 5 минут) выдают результат, но у меня также есть некоторые запросы, которые редко выдают результат (возможно, несколько раз в день). Это приводит к тому, что выходные данные не получают никаких результатов, пока все запросы не получат что-то, что нужно вернуть. Кто — нибудь знает, почему это происходит и как я могу это исправить? Разве ОБЪЕДИНЕНИЕ не должно также давать результаты, когда набор A имеет результаты, а набор B-нет? Я запускаю это локально в коде VS, кстати, с подключением в реальном времени к концентратору событий.

Вот упрощенный пример 2 запросов (один с частым выводом, другой с нечастым выводом), которые идут не так:

 WITH
HarmonizedMeasurements AS (
    SELECT
        CAST(EHHARM.TimeStamp AS datetime) AS "TimeStamp",
        CAST(EHHARM.ValueNumber AS float) AS "ValueNumber",
        EHHARM.ValueBit AS "ValueBit",
        EHHARM.MeasurementName,
        EHHARM.PartName,
        EHHARM.ElementId,
        EHHARM.ElementName,
        EHHARM.ObjectName,
        EHHARM.TranslationTableId
    FROM EventHubHarmonizedMeasurements AS EHHARM TIMESTAMP BY "TimeStamp"
    PARTITION BY TranslationTableId
),

ToerenAandrijvingCategoriesMeasurements AS (
    SELECT
        AANDRCAT.TimeStamp AS "TimeStamp",
        AANDRCAT.ValueNumber AS "ValueNumber",
        AANDRCAT.ValueBit AS "ValueBit",
        AANDRCAT.MeasurementName AS "MeasurementName",
        AANDRCAT.PartName AS "PartName",
        AANDRCAT.ElementId AS "ElementId",
        AANDRCAT.ElementName AS "ElementName",
        AANDRCAT.ObjectName AS "ObjectName",
        AANDRCAT.TranslationTableId AS "TranslationTableId",
        CASE 
            WHEN (-5000 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= -1000) THEN 'Dalen'
            WHEN (-1000 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= -200) THEN 'Dalen Retarderen'
            WHEN (-200 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 0) THEN 'Stilstand'
            WHEN (0 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 250) THEN 'Nivelleren'
            WHEN (250 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 400) THEN 'Heffen Retarderen'
            WHEN (400 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 5000) THEN 'Heffen'
            ELSE 'NoCategory'
        END AS "Category"
    FROM HarmonizedMeasurements AS AANDRCAT
    WHERE
        AANDRCAT.ObjectName LIKE 'Schutsluis%' AND
        AANDRCAT.MeasurementName = 'Motortoerental terugkoppeling' AND
        AANDRCAT.ValueNumber <> 0
),
AandrijvingCatStartMeasurements AS (
    SELECT
        AANDRCAT.TimeStamp AS "StartTime",
        AANDRCAT.Category AS "Category",
        AANDRCAT.ElementId AS "ElementId",
        AANDRCAT.TranslationTableId AS "TranslationTableId"
    FROM ToerenAandrijvingCategoriesMeasurements AS AANDRCAT
    WHERE
        LAG(Category, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) <> Category
),
AandrijvingCatEndMeasurements AS (
    SELECT
        AANDRST.StartTime AS "EndTime",
        LAG(AANDRST.StartTime, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) AS "StartTime",
        LAG(AANDRST.Category, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) AS "Category",
        AANDRST.ElementId AS "ElementId",
        AANDRST.TranslationTableId AS "TranslationTableId"
    FROM AandrijvingCatStartMeasurements AS AANDRST
),
VermogenAandrijvingMeasurements AS (
    SELECT
        AANDRVER.TimeStamp AS "TimeStamp",
        AANDRVER.ValueNumber AS "ValueNumber",
        AANDRVER.ValueBit AS "ValueBit",
        CONCAT(AANDRVER.MeasurementName, ' ', AANDREN.Category) AS "MeasurementName",
        AANDRVER.PartName AS "PartName",
        AANDRVER.ElementId AS "ElementId",
        AANDRVER.ElementName AS "ElementName",
        AANDRVER.ObjectName AS "ObjectName",
        AANDRVER.TranslationTableId AS "TranslationTableId"
    FROM HarmonizedMeasurements AS AANDRVER
    LEFT JOIN AandrijvingCatEndMeasurements AS AANDREN
    ON DATEDIFF(minute, AANDRVER, AANDREN) BETWEEN 0 AND 30 AND
        AANDRVER.TimeStamp >= AANDREN.StartTime AND
        AANDRVER.Timestamp < AANDREN.EndTime AND
        AANDRVER.ElementId = AANDREN.ElementId AND
        AANDRVER.TranslationTableId = AANDREN.TranslationTableId
    INNER JOIN SQLMeasurementType AS MEAS
    ON MEAS.Name = CONCAT(AANDRVER.MeasurementName, ' ', AANDREN.Category)
    WHERE
        AANDRVER.ObjectName LIKE 'Schutsluis%' AND
        AANDRVER.MeasurementName = 'Vermogen'
),
LockDoorTop AS (
    SELECT
        Lock.TimeStamp AS "TimeStamp",
        Lock.ValueNumber AS "ValueNumber",
        Lock.ValueBit AS "ValueBit",
        Lock.MeasurementName,
        Lock.PartName,
        Lock.ElementId,
        Lock.ElementName,
        Lock.ObjectName,
        Lock.TranslationTableId
    FROM HarmonizedMeasurements AS Lock
    WHERE
        Lock.MeasurementName = 'Sluisdeur open' AND
        Lock.ElementName = 'Deur sluiskolk 1' AND
        Lock.PartName = 'Bovenhoofd' AND
        Lock.ValueBit = '1'
),
WaterLTop AS (
    SELECT
        WaterTop.TimeStamp AS "TimeStamp",
        WaterTop.ValueNumber AS "ValueNumber",
        WaterTop.ValueBit AS "ValueBit",
        WaterTop.MeasurementName,
        WaterTop.PartName,
        WaterTop.ElementId,
        WaterTop.ElementName,
        WaterTop.ObjectName,
        WaterTop.TranslationTableId
    FROM HarmonizedMeasurements AS WaterTop
    WHERE
        WaterTop.MeasurementName = 'Waterniveaumeting' AND
        WaterTop.ElementName = 'Sluiskolk 1' AND
        WaterTop.PartName = 'Opvaartzijde'
),
WaterLLock AS (
    SELECT
        WaterLock.TimeStamp AS "TimeStamp",
        WaterLock.ValueNumber AS "ValueNumber",
        WaterLock.ValueBit AS "ValueBit",
        WaterLock.MeasurementName,
        WaterLock.PartName,
        WaterLock.ElementId,
        WaterLock.ElementName,
        WaterLock.ObjectName,
        WaterLock.TranslationTableId
    FROM HarmonizedMeasurements AS WaterLock
    WHERE
        WaterLock.MeasurementName = 'Waterniveaumeting' AND
        WaterLock.ElementName = 'Sluiskolk 1' AND
        WaterLock.PartName = 'Sluiskamer'
),
WaterLevelTopMeasurements AS (
    SELECT
        LockDoor.TimeStamp AS "TimeStamp",
        CAST(ROUND((WaterLevelLock.ValueNumber - WaterLevelTop.ValueNumber), 2) AS float) AS "ValueNumber",
        null AS "ValueBit",
        MEAS.Name AS "MeasurementName",
        LockDoor.PartName AS "PartName",
        LockDoor.ElementId AS "ElementId",
        LockDoor.ElementName AS "ElementName",
        LockDoor.ObjectName AS "ObjectName",
        LockDoor.TranslationTableId AS "TranslationTableId"
    FROM LockDoorTop AS LockDoor
    JOIN WaterLTop AS WaterLevelTop
    ON  DATEDIFF(minute, LockDoor, WaterLevelTop) BETWEEN 0 AND 1 AND
        LockDoor.ObjectName = WaterLevelTop.ObjectName
    JOIN WaterLLock AS WaterLevelLock
    ON  DATEDIFF(minute, LockDoor, WaterLevelLock) BETWEEN 0 AND 1 AND
        LockDoor.ObjectName = WaterLevelLock.ObjectName
    INNER JOIN SQLMeasurementType AS MEAS
    ON MEAS.Name = 'Waterniveauverschil'
),

-- Combine queries
DatalakeCombinedMeasurements AS (
    SELECT * FROM VermogenAandrijvingMeasurements
    UNION
    SELECT * FROM WaterLevelTopMeasurements
)

-- Output data
SELECT *
INTO DatalakeHarmonizedMeasurements
FROM DatalakeCombinedMeasurements
PARTITION BY TranslationTableId
 

Комментарии:

1. Я попытаюсь взглянуть на это сегодня. Если это возможно, вам будет проще, если вы поделитесь образцами своих входных данных. Вы можете сделать это в VSCode довольно легко.

2. Я быстро взглянул, и мне интересно, не утомляет ли эта работа читателей в группе потребителей этого Центра событий (см. Здесь ). Вы видели что-нибудь в этом роде в журналах? Не могли бы вы попробовать создать 2 входа в ASA, каждый с другой группой потребителей, для каждого пути, который у вас здесь есть?

3. Привет @FlorianEiden спасибо, что изучили это, завтра я вернусь с образцом данных. Что касается истощения потребителей: первый запрос после оператора WITH должен предотвратить это, верно? Во всех моих запросах я ссылаюсь только на «Согласованные измерения», после этого я больше ничего не получаю из «EventHubHarmonizedMeasurements».

4. Этот подход работает, если у вас есть несколько выходов на одном и том же входе. Вы заставляете одного читателя пересекать их, используя WITH. Это неприменимо здесь, так как истощение будет исходить от СОЮЗА и множества самостоятельных объединений. Обратите внимание, что я все еще не на 100% уверен, что причина здесь в этом, хотя, даже если это действительно выглядит так, потому что вы должны получать предупреждения, если не ошибки, из-за истощения потребителей.

5. Я проверил вашу топологию запросов, и вы правы, не похоже, что должно произойти истощение потребителей. Сейчас я изучаю махинации с временной шкалой. Будет держать вас в курсе.