Utworzyłem zadanie Azure Stream Analytics, które będzie pobierać dane wejściowe z usługi EventHub i zapisywać je w cosmosDB i Blob.
Widziałem, że czasami dane z eventHub się duplikują, w wyniku czego zduplikowane dane zostaną zapisane w pamięci cosmosDB i Blob.
Przykładowe dane wejściowe do usługi Stream Analytics z EventHub pokazano poniżej.
[
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"04XXX",
"id":1
},
{
"sig3":"000000",
"id":61
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00026XXX03",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"000000",
"id":61
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]
W powyższym przykładzie zdarzenie z idnum: 00086XXX02 powtarza się 3 razy.
Wykonuję poniższą analizę i otrzymuję wynik z duplikatami.
temp AS (
SELECT
input.idnum AS IDNUM,
input.basetime AS BASETIME,
input.time AS TIME,
ROUND(input.sig1,5) AS SIG1,
flatArrayElement as SIG2,
udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
FROM [input01] as input
CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
WHERE GetArrayLength(input.sig2) >=1
),
SIGNALS AS (
SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3
)
--Insert SIG2 to COSMOS Container
SELECT
t.IDNUM,
t.BASETIME,
t.TIME,
t.SIG1,
t.SIG2.ArrayValue.id AS ID,
t.SIG2.ArrayValue.sig3 AS SIG3,
t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId
Wynik będzie taki jak poniżej, gdzie występują zduplikowane zdarzenia dla "idnum": "00086XXX02"
[
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"000000",
"id":61
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]
Oczekiwane dane wyjściowe będą zdarzeniami bez duplikatów (dla podanej próbki nie powinno być zduplikowanych zdarzeń dla „idnum”: „00086XXX02”)
Przed zapisaniem danych w pamięci chcę usunąć zduplikowane zdarzenia. Czy można to zrobić ze Stream Analytics?
Tworzenie kolekcji bazy danych cosmos z unikatowym identyfikatorem to rozwiązanie z końca Cosmos, ale tutaj tabela już istnieje i czy możemy cokolwiek zrobić od końca usługi Stream Analytics?
2 odpowiedzi
Upraszczam twój test sql, jak poniżej:
with t AS (
SELECT
flatArrayElement as SIG2
FROM fromblob as input
CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
WHERE GetArrayLength(input.sig2) >=1
)
SELECT
t.SIG2.ArrayValue.id AS ID,
t.SIG2.ArrayValue.sig3 AS SIG3
FROM t PARTITION BY PartitionId
I tworzy zduplikowane dane z powodu metody GetArrayElements()
, która wydaje mi się normalna. Tablica jest podzielona, wynik powinien zostać zduplikowany.
Na podstawie mojego doświadczenia i moich wyniki(oraz ta opinia), w ASA nie ma odrębnej metody. Powodem, dla którego myślę, jest to, że ASA przetwarza dane strumienia w czasie rzeczywistym, a nie dane statyczne, coś w rodzaju tabeli SQL. Nie może ocenić, czy dane są duplikowane w danej jednostce czasu.
W połączeniu z ostatnim przypadkiem bazy danych kosmosu (Jak znaleźć zduplikowane dokumenty w Cosmos DB), myślę, że kluczowym punktem rozwiązania jest uzyskanie głównej przyczyny: dlaczego centrum zdarzeń przetwarza zduplikowane dane źródłowe. Z pewnością można ustawić wyzwalacz db kosmosu, aby zapobiec przesyłaniu zduplikowanych danych do db. Ale myślę, że nie jest to skuteczny sposób.
Możesz użyć Distinct, aby usunąć zduplikowane wydarzenia. Dostępna jest dokumentacja online: https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-stream-analytics-query-patterns#remove-duplicate-events-in-a-window< /a>
Przykład:
With Temp AS (
SELECT
COUNT(DISTINCT Time) AS CountTime,
Value,
DeviceId
FROM Input TIMESTAMP BY Time
GROUP BY Value, DeviceId, SYSTEM.TIMESTAMP()
)
SELECT
AVG(Value) AS AverageValue,
DeviceId
INTO Output
FROM Temp
GROUP BY DeviceId,TumblingWindow(minute, 5)
Podobne pytania
Powiązane pytania
Nowe pytania
azure
Microsoft Azure to platforma jako usługa i infrastruktura jako usługa w chmurze. Użyj tego tagu w przypadku pytań dotyczących programowania dotyczących platformy Azure. Ogólną pomoc dotyczącą serwera można uzyskać pod adresem Super User lub Server Fault.