Utworzyłem zadanie Azure Stream Analytics, które będzie pobierać dane wejściowe z usługi EventHub i zapisywać je w cosmosDB i Blob.

Widziałem, że czasami dane z eventHub się duplikują, w wyniku czego zduplikowane dane zostaną zapisane w pamięci cosmosDB i Blob.

Przykładowe dane wejściowe do usługi Stream Analytics z EventHub pokazano poniżej.

[
{
        "idnum":"00011XXX01",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        [
               {
                       "sig3":"04XXX",
                       "id":1
               },
               {
                       "sig3":"000000",
                       "id":61
               }
        ],
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        [
               {
                       "sig3":"03XXX",
                       "id":1
               },
               {
                       "sig3":"04XXX",
                       "id":1
               }
        ],
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        [
               {
                       "sig3":"03XXX",
                       "id":1
               },
               {
                       "sig3":"04XXX",
                       "id":1
               }
        ],
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        [
               {
                       "sig3":"03XXX",
                       "id":1
               },
               {
                       "sig3":"04XXX",
                       "id":1
               }
        ],
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        [
               {
                       "sig3":"03XXX",
                       "id":1
               },
               {
                       "sig3":"04XXX",
                       "id":1
               }
        ],
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00026XXX03",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        [
               {
                       "sig3":"03XXX",
                       "id":1
               },
               {
                       "sig3":"000000",
                       "id":61
               }
        ],
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

W powyższym przykładzie zdarzenie z idnum: 00086XXX02 powtarza się 3 razy.

Wykonuję poniższą analizę i otrzymuję wynik z duplikatami.

temp AS (
  SELECT
    input.idnum AS IDNUM,
    input.basetime AS BASETIME,
    input.time AS TIME,
    ROUND(input.sig1,5) AS SIG1,
    flatArrayElement as SIG2,
    udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
  FROM [input01] as input
  CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
  WHERE GetArrayLength(input.sig2) >=1
 ),
SIGNALS AS (
 SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3 
)

--Insert SIG2 to COSMOS Container
SELECT 
  t.IDNUM,
  t.BASETIME,
  t.TIME,
  t.SIG1,
  t.SIG2.ArrayValue.id AS ID,
  t.SIG2.ArrayValue.sig3 AS SIG3,
  t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId

Wynik będzie taki jak poniżej, gdzie występują zduplikowane zdarzenia dla "idnum": "00086XXX02"

[
{
        "idnum":"00011XXX01",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        "sig3":"04XXX",
        "id":1
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00011XXX01",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        "sig3":"000000",
        "id":61
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},              
{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        "sig3":"03XXX",
        "id":1
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        "sig3":"04XXX",
        "id":1
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        "sig3":"03XXX",
        "id":1
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        "sig3":"04XXX",
        "id":1
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        "sig3":"03XXX",
        "id":1
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        "sig3":"04XXX",
        "id":1
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        "sig3":"03XXX",
        "id":1
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
        "idnum":"00086XXX02",
        "basetime":0,
        "time":189834,
        "sig1":36.341587,
        "sig2":
        "sig3":"04XXX",
        "id":1
        "signals":
        [
               {
                       "timestamp":190915,
                       "value":45,
               },
               {
                       "timestamp":190915,
                       "value":10.2,
               },
               {
                       "timestamp":190915,
               },
               {
                       "timestamp":190915,
                       "value":0,
               }
        ],
        "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

Oczekiwane dane wyjściowe będą zdarzeniami bez duplikatów (dla podanej próbki nie powinno być zduplikowanych zdarzeń dla „idnum”: „00086XXX02”)

Przed zapisaniem danych w pamięci chcę usunąć zduplikowane zdarzenia. Czy można to zrobić ze Stream Analytics?

Tworzenie kolekcji bazy danych cosmos z unikatowym identyfikatorem to rozwiązanie z końca Cosmos, ale tutaj tabela już istnieje i czy możemy cokolwiek zrobić od końca usługi Stream Analytics?

1
Antony 19 grudzień 2019, 13:47

2 odpowiedzi

Upraszczam twój test sql, jak poniżej:

with t AS (
  SELECT
    flatArrayElement as SIG2
  FROM fromblob as input
  CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
  WHERE GetArrayLength(input.sig2) >=1
 )
SELECT 
  t.SIG2.ArrayValue.id AS ID,
  t.SIG2.ArrayValue.sig3 AS SIG3
FROM t PARTITION BY PartitionId

I tworzy zduplikowane dane z powodu metody GetArrayElements(), która wydaje mi się normalna. Tablica jest podzielona, wynik powinien zostać zduplikowany.

Na podstawie mojego doświadczenia i moich wyniki(oraz ta opinia), w ASA nie ma odrębnej metody. Powodem, dla którego myślę, jest to, że ASA przetwarza dane strumienia w czasie rzeczywistym, a nie dane statyczne, coś w rodzaju tabeli SQL. Nie może ocenić, czy dane są duplikowane w danej jednostce czasu.

W połączeniu z ostatnim przypadkiem bazy danych kosmosu (Jak znaleźć zduplikowane dokumenty w Cosmos DB), myślę, że kluczowym punktem rozwiązania jest uzyskanie głównej przyczyny: dlaczego centrum zdarzeń przetwarza zduplikowane dane źródłowe. Z pewnością można ustawić wyzwalacz db kosmosu, aby zapobiec przesyłaniu zduplikowanych danych do db. Ale myślę, że nie jest to skuteczny sposób.

0
Jay Gong 23 grudzień 2019, 10:14