Utworzyłem zadanie Azure Stream Analytics, które będzie pobierać dane wejściowe z usługi EventHub i zapisywać je w cosmosDB i Blob.

Widziałem, że czasami dane z eventHub się duplikują, w wyniku czego zduplikowane dane zostaną zapisane w pamięci cosmosDB i Blob.

Przykładowe dane wejściowe do usługi Stream Analytics z EventHub pokazano poniżej.

[
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"000000",
                                             "id":61
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00026XXX03",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"000000",
                                             "id":61
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

W powyższym przykładzie zdarzenie z idnum: 00086XXX02 powtarza się 3 razy.

Wykonuję poniższą analizę i otrzymuję wynik z duplikatami.

temp AS (
    SELECT
        input.idnum AS IDNUM,
        input.basetime AS BASETIME,
        input.time AS TIME,
        ROUND(input.sig1,5) AS SIG1,
        flatArrayElement as SIG2,
        udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
    FROM [input01] as input
    CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
    WHERE GetArrayLength(input.sig2) >=1
 ),
SIGNALS AS (
  SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3 
)

--Insert SIG2 to COSMOS Container
SELECT 
    t.IDNUM,
    t.BASETIME,
    t.TIME,
    t.SIG1,
    t.SIG2.ArrayValue.id AS ID,
    t.SIG2.ArrayValue.sig3 AS SIG3,
    t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId

Wynik będzie taki jak poniżej, gdzie występują zduplikowane zdarzenia dla "idnum": "00086XXX02"

[
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"000000",
               "id":61
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},                           
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

Oczekiwane dane wyjściowe będą zdarzeniami bez duplikatów (dla podanej próbki nie powinno być zduplikowanych zdarzeń dla „idnum”: „00086XXX02”)

Przed zapisaniem danych w pamięci chcę usunąć zduplikowane zdarzenia. Czy można to zrobić ze Stream Analytics?

Tworzenie kolekcji bazy danych cosmos z unikatowym identyfikatorem to rozwiązanie z końca Cosmos, ale tutaj tabela już istnieje i czy możemy cokolwiek zrobić od końca usługi Stream Analytics?

1
Antony 19 grudzień 2019, 13:47

2 odpowiedzi

Upraszczam twój test sql, jak poniżej:

with t AS (
    SELECT
        flatArrayElement as SIG2
    FROM fromblob as input
    CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
    WHERE GetArrayLength(input.sig2) >=1
 )
SELECT 
    t.SIG2.ArrayValue.id AS ID,
    t.SIG2.ArrayValue.sig3 AS SIG3
FROM t PARTITION BY PartitionId

I tworzy zduplikowane dane z powodu metody GetArrayElements(), która wydaje mi się normalna. Tablica jest podzielona, wynik powinien zostać zduplikowany.

Na podstawie mojego doświadczenia i moich wyniki(oraz ta opinia), w ASA nie ma odrębnej metody. Powodem, dla którego myślę, jest to, że ASA przetwarza dane strumienia w czasie rzeczywistym, a nie dane statyczne, coś w rodzaju tabeli SQL. Nie może ocenić, czy dane są duplikowane w danej jednostce czasu.

W połączeniu z ostatnim przypadkiem bazy danych kosmosu (Jak znaleźć zduplikowane dokumenty w Cosmos DB), myślę, że kluczowym punktem rozwiązania jest uzyskanie głównej przyczyny: dlaczego centrum zdarzeń przetwarza zduplikowane dane źródłowe. Z pewnością można ustawić wyzwalacz db kosmosu, aby zapobiec przesyłaniu zduplikowanych danych do db. Ale myślę, że nie jest to skuteczny sposób.

0
Jay Gong 23 grudzień 2019, 10:14