Mam dataframe z dwiema wieloma kolumnami, z których dwa są identyfikator i etykieta, jak pokazano poniżej.

+---+---+---+
| id| label|
+---+---+---+
|  1| "abc"|
|  1| "abc"|
|  1| "def"|
|  2| "def"|
|  2| "def"|
+---+---+---+

Chcę grupować "ID" i kruszyć kolumnę etykiety według liczby (zignoruj null) etykiety w strukturze danych mapy, a oczekiwany wynik jest taki, jak pokazano poniżej:

+---+---+--+--+--+--+--+--
| id| label             |
+---+-----+----+----+----+
|  1| {"abc":2, "def":1}|
|  2| {"def":2}         |
+---+-----+----+----+----+

Czy można to zrobić bez korzystania z funkcji agregujących zdefiniowanych przez użytkownika? Widziałem podobną odpowiedź Oto, ale nie jest ona agreguje na podstawie liczby każda sztuka.

Przepraszam, jeśli to pytanie jest głupie, jestem nowy zarówno w skali, jak i iskry.

Dzięki

-1
Karthik 15 styczeń 2020, 22:56

1 odpowiedź

Najlepsza odpowiedź

Bez niestandardowych UDFS.

import org.apache.spark.sql.functions.{map, collect_list}

df.groupBy("id", "label")
  .count
  .select($"id", map($"label", $"count").as("map"))
  .groupBy("id")
  .agg(collect_list("map"))
  .show(false)

+---+------------------------+                                                  
|id |collect_list(map)       |
+---+------------------------+
|1  |[[def -> 1], [abc -> 2]]|
|2  |[[def -> 2]]            |
+---+------------------------+

Za pomocą niestandardowego UDF,

import org.apache.spark.sql.functions.udf
val customUdf = udf((seq: Seq[String]) => {
  seq.groupBy(x => x).map(x => x._1 -> x._2.size)
})

df.groupBy("id")
  .agg(collect_list("label").as("list"))
  .select($"id", customUdf($"list").as("map"))
  .show(false)

+---+--------------------+
|id |map                 |
+---+--------------------+
|1  |[abc -> 2, def -> 1]|
|2  |[def -> 2]          |
+---+--------------------+
1
sen 15 styczeń 2020, 21:24