Załóżmy, że masz ramkę danych i chcesz odfiltrować wzorce wierszowe, dodając nową kolumnę pattern_name.
Typ kolumny pattern_name powinien być tablicą, ponieważ każdy wiersz może potencjalnie pasować do wielu wzorców.

# Input
df = spark.createDataFrame(
    [(1, 21, 'A foo text'), 
     (2, 42, 'A foo'),
     (2, 42, 'A foobar text'),
     (2, 42, 'barz'),],
     ['id_1', 'id_2', 'text']
)
# Patterns:
# pattern_foo_1:  id_1 = 1, id_2 = 21, text.rlike('foo')
# pattern_foo_2:  id_1 = 2, id_2 = 42, text.rlike('foo')
# pattern_foobar: id_1 = 2, id_2 = 42, text.rlike('foobar')

# Desired output: (null can also be empty string, doesn't matter)
+------+------+----------------+------------------------------------+
|  id_1|  id_2|            text|                        pattern_name|
+------+------+----------------+------------------------------------+
|     1|    21|    'A foo text'|                 ['pattern_foo_1', ]|
|     2|    42|         'A foo'|                 ['pattern_foo_2', ]|
|     2|    42| 'A foobar text'| ['pattern_foo_2', 'pattern_foobar']|
|     2|    42|          'barz'|                                null|
+------+------+----------------+------------------------------------+

Jak to zrobić w efektywny sposób (bez udf), ponieważ wejście jest bardzo duże?

W przeszłości mój df miał tylko jedno dopasowanie w wierszu na maksimum, więc użyłem kiedy funkcja (przykład poniżej). Ale to nie działa, jeśli masz wiele dopasowań w wierszu, gdzie potrzebujesz tablicy.

pattern_name_col = None
for pattern in pattern_list:
    if pattern_name_col is None:
        # pseudocode example
        pattern_name_col = when(
                                (col('id_1') == 1) & (col('id_2') == 21)
                                & (col('text').rlike('foo')),
                                'pattern_foo_1')
    else:
        pattern_name_col = pattern_name_col.when(..., ...)

df = df.withColumn('pattern_name', pattern_name_col).filter(col('pattern_name').isNotNull())
1
0vbb 5 styczeń 2022, 14:21

1 odpowiedź

Najlepsza odpowiedź

Możesz zdefiniować swoją listę patterns jako:

patterns = [
    (1, 21, "foo", "pattern_foo_1"), # (id_1, id_2, pattern, pattern_name)
    (2, 42, "foo", "pattern_foo_2"),
    (2, 42, "foobar", "pattern_foobar"),
]

Następnie używając funkcji array ze zrozumieniem listy i when możesz uzyskać kolumnę listy nazw wzorców:

import pyspark.sql.functions as F

df1 = df.withColumn(
    "pattern_name",
    F.array(*[
        F.when((F.col("id_1") == p[0]) & (F.col("id_2") == p[1]) & F.col("text").rlike(p[2]), p[3])
        for p in patterns
    ])
).withColumn(
    "pattern_name",
    F.expr("filter(pattern_name, x -> x is not null)")
)

df1.show(truncate=False)
#+----+----+-------------+-------------------------------+
#|id_1|id_2|text         |pattern_name                   |
#+----+----+-------------+-------------------------------+
#|1   |21  |A foo text   |[pattern_foo_1]                |
#|2   |42  |A foo        |[pattern_foo_2]                |
#|2   |42  |A foobar text|[pattern_foo_2, pattern_foobar]|
#|2   |42  |barz         |[]                             |
#+----+----+-------------+-------------------------------+

Możesz także utworzyć ramkę danych patterns_df z powyższej listy, a następnie użyć join, a następnie goupby + collect_list:

patterns_df = spark.createDataFrame(patterns, ["id_1", "id_2", "pattern", "pattern_name"])

df1 = df.alias("df").join(
    patterns_df.alias("p"),
    F.expr("df.id_1 = p.id_1 and df.id_2 = p.id_2 and df.text rlike p.pattern")
).groupBy("df.id_1", "df.id_2", "text").agg(
    F.collect_list("pattern_name").alias("pattern_name")
)
1
blackbishop 5 styczeń 2022, 15:01
Wielkie dzięki! Czy jest jakaś różnica w wydajności między obiema opcjami, tj. czy Spark Catalyst zdefiniowałby różne plany fizyczne? Nadal mam trudności z oceną tego za pomocą metody explain().
 – 
0vbb
5 styczeń 2022, 15:45
1
Powiedziałbym, że będzie to zależeć od twoich danych, zwłaszcza od rozmiaru twojej listy wzorów. W przypadku dużej listy lepszym rozwiązaniem może być dołączenie.
 – 
blackbishop
5 styczeń 2022, 15:48