#azure #iot #azure-eventhub #azure-stream-analytics #stream-analytics
Вопрос:
У меня есть задание Azure Stream Analytics, которое объединяет результаты нескольких запросов и выводит их в один приемник. Для этого я определяю свои запросы в инструкции WITH, затем объединяю их с помощью UNION и затем записываю их в свой приемник. Однако, к сожалению, я получаю выходные данные в приемник только в том случае, когда все мои запросы действительно имеют выходные данные, и именно здесь все идет не так.
У меня есть несколько запросов, которые непрерывно (каждые 5 минут) выдают результат, но у меня также есть некоторые запросы, которые редко выдают результат (возможно, несколько раз в день). Это приводит к тому, что выходные данные не получают никаких результатов, пока все запросы не получат что-то, что нужно вернуть. Кто — нибудь знает, почему это происходит и как я могу это исправить? Разве ОБЪЕДИНЕНИЕ не должно также давать результаты, когда набор A имеет результаты, а набор B-нет? Я запускаю это локально в коде VS, кстати, с подключением в реальном времени к концентратору событий.
Вот упрощенный пример 2 запросов (один с частым выводом, другой с нечастым выводом), которые идут не так:
WITH
HarmonizedMeasurements AS (
SELECT
CAST(EHHARM.TimeStamp AS datetime) AS "TimeStamp",
CAST(EHHARM.ValueNumber AS float) AS "ValueNumber",
EHHARM.ValueBit AS "ValueBit",
EHHARM.MeasurementName,
EHHARM.PartName,
EHHARM.ElementId,
EHHARM.ElementName,
EHHARM.ObjectName,
EHHARM.TranslationTableId
FROM EventHubHarmonizedMeasurements AS EHHARM TIMESTAMP BY "TimeStamp"
PARTITION BY TranslationTableId
),
ToerenAandrijvingCategoriesMeasurements AS (
SELECT
AANDRCAT.TimeStamp AS "TimeStamp",
AANDRCAT.ValueNumber AS "ValueNumber",
AANDRCAT.ValueBit AS "ValueBit",
AANDRCAT.MeasurementName AS "MeasurementName",
AANDRCAT.PartName AS "PartName",
AANDRCAT.ElementId AS "ElementId",
AANDRCAT.ElementName AS "ElementName",
AANDRCAT.ObjectName AS "ObjectName",
AANDRCAT.TranslationTableId AS "TranslationTableId",
CASE
WHEN (-5000 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= -1000) THEN 'Dalen'
WHEN (-1000 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= -200) THEN 'Dalen Retarderen'
WHEN (-200 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 0) THEN 'Stilstand'
WHEN (0 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 250) THEN 'Nivelleren'
WHEN (250 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 400) THEN 'Heffen Retarderen'
WHEN (400 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 5000) THEN 'Heffen'
ELSE 'NoCategory'
END AS "Category"
FROM HarmonizedMeasurements AS AANDRCAT
WHERE
AANDRCAT.ObjectName LIKE 'Schutsluis%' AND
AANDRCAT.MeasurementName = 'Motortoerental terugkoppeling' AND
AANDRCAT.ValueNumber <> 0
),
AandrijvingCatStartMeasurements AS (
SELECT
AANDRCAT.TimeStamp AS "StartTime",
AANDRCAT.Category AS "Category",
AANDRCAT.ElementId AS "ElementId",
AANDRCAT.TranslationTableId AS "TranslationTableId"
FROM ToerenAandrijvingCategoriesMeasurements AS AANDRCAT
WHERE
LAG(Category, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) <> Category
),
AandrijvingCatEndMeasurements AS (
SELECT
AANDRST.StartTime AS "EndTime",
LAG(AANDRST.StartTime, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) AS "StartTime",
LAG(AANDRST.Category, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) AS "Category",
AANDRST.ElementId AS "ElementId",
AANDRST.TranslationTableId AS "TranslationTableId"
FROM AandrijvingCatStartMeasurements AS AANDRST
),
VermogenAandrijvingMeasurements AS (
SELECT
AANDRVER.TimeStamp AS "TimeStamp",
AANDRVER.ValueNumber AS "ValueNumber",
AANDRVER.ValueBit AS "ValueBit",
CONCAT(AANDRVER.MeasurementName, ' ', AANDREN.Category) AS "MeasurementName",
AANDRVER.PartName AS "PartName",
AANDRVER.ElementId AS "ElementId",
AANDRVER.ElementName AS "ElementName",
AANDRVER.ObjectName AS "ObjectName",
AANDRVER.TranslationTableId AS "TranslationTableId"
FROM HarmonizedMeasurements AS AANDRVER
LEFT JOIN AandrijvingCatEndMeasurements AS AANDREN
ON DATEDIFF(minute, AANDRVER, AANDREN) BETWEEN 0 AND 30 AND
AANDRVER.TimeStamp >= AANDREN.StartTime AND
AANDRVER.Timestamp < AANDREN.EndTime AND
AANDRVER.ElementId = AANDREN.ElementId AND
AANDRVER.TranslationTableId = AANDREN.TranslationTableId
INNER JOIN SQLMeasurementType AS MEAS
ON MEAS.Name = CONCAT(AANDRVER.MeasurementName, ' ', AANDREN.Category)
WHERE
AANDRVER.ObjectName LIKE 'Schutsluis%' AND
AANDRVER.MeasurementName = 'Vermogen'
),
LockDoorTop AS (
SELECT
Lock.TimeStamp AS "TimeStamp",
Lock.ValueNumber AS "ValueNumber",
Lock.ValueBit AS "ValueBit",
Lock.MeasurementName,
Lock.PartName,
Lock.ElementId,
Lock.ElementName,
Lock.ObjectName,
Lock.TranslationTableId
FROM HarmonizedMeasurements AS Lock
WHERE
Lock.MeasurementName = 'Sluisdeur open' AND
Lock.ElementName = 'Deur sluiskolk 1' AND
Lock.PartName = 'Bovenhoofd' AND
Lock.ValueBit = '1'
),
WaterLTop AS (
SELECT
WaterTop.TimeStamp AS "TimeStamp",
WaterTop.ValueNumber AS "ValueNumber",
WaterTop.ValueBit AS "ValueBit",
WaterTop.MeasurementName,
WaterTop.PartName,
WaterTop.ElementId,
WaterTop.ElementName,
WaterTop.ObjectName,
WaterTop.TranslationTableId
FROM HarmonizedMeasurements AS WaterTop
WHERE
WaterTop.MeasurementName = 'Waterniveaumeting' AND
WaterTop.ElementName = 'Sluiskolk 1' AND
WaterTop.PartName = 'Opvaartzijde'
),
WaterLLock AS (
SELECT
WaterLock.TimeStamp AS "TimeStamp",
WaterLock.ValueNumber AS "ValueNumber",
WaterLock.ValueBit AS "ValueBit",
WaterLock.MeasurementName,
WaterLock.PartName,
WaterLock.ElementId,
WaterLock.ElementName,
WaterLock.ObjectName,
WaterLock.TranslationTableId
FROM HarmonizedMeasurements AS WaterLock
WHERE
WaterLock.MeasurementName = 'Waterniveaumeting' AND
WaterLock.ElementName = 'Sluiskolk 1' AND
WaterLock.PartName = 'Sluiskamer'
),
WaterLevelTopMeasurements AS (
SELECT
LockDoor.TimeStamp AS "TimeStamp",
CAST(ROUND((WaterLevelLock.ValueNumber - WaterLevelTop.ValueNumber), 2) AS float) AS "ValueNumber",
null AS "ValueBit",
MEAS.Name AS "MeasurementName",
LockDoor.PartName AS "PartName",
LockDoor.ElementId AS "ElementId",
LockDoor.ElementName AS "ElementName",
LockDoor.ObjectName AS "ObjectName",
LockDoor.TranslationTableId AS "TranslationTableId"
FROM LockDoorTop AS LockDoor
JOIN WaterLTop AS WaterLevelTop
ON DATEDIFF(minute, LockDoor, WaterLevelTop) BETWEEN 0 AND 1 AND
LockDoor.ObjectName = WaterLevelTop.ObjectName
JOIN WaterLLock AS WaterLevelLock
ON DATEDIFF(minute, LockDoor, WaterLevelLock) BETWEEN 0 AND 1 AND
LockDoor.ObjectName = WaterLevelLock.ObjectName
INNER JOIN SQLMeasurementType AS MEAS
ON MEAS.Name = 'Waterniveauverschil'
),
-- Combine queries
DatalakeCombinedMeasurements AS (
SELECT * FROM VermogenAandrijvingMeasurements
UNION
SELECT * FROM WaterLevelTopMeasurements
)
-- Output data
SELECT *
INTO DatalakeHarmonizedMeasurements
FROM DatalakeCombinedMeasurements
PARTITION BY TranslationTableId
Комментарии:
1. Я попытаюсь взглянуть на это сегодня. Если это возможно, вам будет проще, если вы поделитесь образцами своих входных данных. Вы можете сделать это в VSCode довольно легко.
2. Я быстро взглянул, и мне интересно, не утомляет ли эта работа читателей в группе потребителей этого Центра событий (см. Здесь ). Вы видели что-нибудь в этом роде в журналах? Не могли бы вы попробовать создать 2 входа в ASA, каждый с другой группой потребителей, для каждого пути, который у вас здесь есть?
3. Привет @FlorianEiden спасибо, что изучили это, завтра я вернусь с образцом данных. Что касается истощения потребителей: первый запрос после оператора WITH должен предотвратить это, верно? Во всех моих запросах я ссылаюсь только на «Согласованные измерения», после этого я больше ничего не получаю из «EventHubHarmonizedMeasurements».
4. Этот подход работает, если у вас есть несколько выходов на одном и том же входе. Вы заставляете одного читателя пересекать их, используя WITH. Это неприменимо здесь, так как истощение будет исходить от СОЮЗА и множества самостоятельных объединений. Обратите внимание, что я все еще не на 100% уверен, что причина здесь в этом, хотя, даже если это действительно выглядит так, потому что вы должны получать предупреждения, если не ошибки, из-за истощения потребителей.
5. Я проверил вашу топологию запросов, и вы правы, не похоже, что должно произойти истощение потребителей. Сейчас я изучаю махинации с временной шкалой. Будет держать вас в курсе.