Jestem nowy w Pythonie i Rapids.AI i próbuję odtworzyć SKLearn KMean w wielowęzłowym GPU (mam 2 GPU) przy użyciu Dask i RAPIDs (używam rapids z jego dockerem, który również montuje Jupyter Notebook).

Kod, który pokazuję poniżej (również pokazuję przykład zestawu danych Iris) zawiesza się, a komórka notebooka jupyter nigdy się nie kończy. Próbowałem użyć magicznego klucza %debug, a także pulpitu nawigacyjnego Dask, ale nie wyciągnąłem żadnych jasnych wniosków (jedyny wniosek, który wydaje mi się, że może wynikać z device_m_csv.iloc, ale nie jestem co do tego pewien). Inną rzeczą, która może być, jest to, że zapominam o niektórych wait(), compute() lub persistent() (naprawdę, nie jestem pewien, w jakich sytuacjach należy ich używać poprawnie).

Wytłumaczę kod dla lepszego czytania:

  • Przede wszystkim potrzebny import
  • Następnie zaczyna się od algorytmu K średnich (separator: ####################### ...)
  • Utwórz klaster CUDA z 2 pracownikami, po jednym na GPU (mam 2 GPU) i 1 wątek dla pracownika (przeczytałem, że jest to zalecana wartość) i uruchom klienta
  • Odczytaj zestaw danych z pliku CSV, tworząc 2 partycje (chunksize = '2kb')
  • Podziel poprzedni zbiór danych na dane (bardziej znane jako X) i etykiety ((bardziej znane jako y)
  • Utwórz wystąpienie cu_KMeans przy użyciu Dask
  • Dopasuj model
  • Przewiduj wartości
  • Sprawdź uzyskany wynik

Przepraszam, że nie mogę zaoferować więcej danych, ale nie mogłem ich uzyskać. Cokolwiek jest konieczne do rozwiązania wątpliwości, z przyjemnością to udzielę.

Gdzie lub w czym myślisz, że jest problem?

Z góry bardzo dziękuję.

%%time

# Import libraries and show its versions
import numpy as np; print('NumPy Version:', np.__version__)
import pandas as pd; print('Pandas Version:', pd.__version__)
import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
import nvstrings, nvcategory
import cupy; print('cuPY Version:', cupy.__version__)
import cudf; print('cuDF Version:', cudf.__version__)
import cuml; print('cuML Version:', cuml.__version__)
import dask; print('Dask Version:', dask.__version__)
import dask_cuda; print('DaskCuda Version:', dask_cuda.__version__)
import dask_cudf; print('DaskCuDF Version:', dask_cudf.__version__)
import matplotlib; print('MatPlotLib Version:', matplotlib.__version__)
import seaborn as sns; print('SeaBorn Version:', sns.__version__)
#import timeimport warnings

from dask import delayed
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, wait
from dask_ml.cluster import KMeans as skmKMeans
from dask_cuda import LocalCUDACluster

from sklearn import metrics
from sklearn.cluster import KMeans as skKMeans
from sklearn.metrics import adjusted_rand_score as sk_adjusted_rand_score, silhouette_score as sk_silhouette_score
from cuml.cluster import KMeans as cuKMeans
from cuml.dask.cluster.kmeans import KMeans as cumKMeans
from cuml.metrics import adjusted_rand_score as cu_adjusted_rand_score

# Configure matplotlib library
import matplotlib.pyplot as plt
%matplotlib inline

# Configure seaborn library
sns.set()
#sns.set(style="white", color_codes=True)
%config InlineBackend.figure_format = 'svg'

# Configure warnings
#warnings.filterwarnings("ignore")


####################################### KMEANS #############################################################
# Create local cluster
cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)
client = Client(cluster)

# Identify number of workers
n_workers = len(client.has_what().keys())

# Read data in host memory
device_m_csv = dask_cudf.read_csv('./DataSet/iris.csv', header = 0, delimiter = ',', chunksize='2kB') # Get complete CSV. Chunksize is 2kb for getting 2 partitions
#x = host_data.iloc[:, [0,1,2,3]].values
device_m_data = device_m_csv.iloc[:, [0, 1, 2, 3]] # Get data columns
device_m_labels = device_m_csv.iloc[:, 4] # Get labels column

# Plot data
#sns.pairplot(device_csv.to_pandas(), hue='variety');

# Define variables
label_type = { 'Setosa': 1, 'Versicolor': 2, 'Virginica': 3 } # Dictionary of variables type

# Create KMeans
cu_m_kmeans = cumKMeans(init = 'k-means||',
                     n_clusters = len(device_m_labels.unique()),
                     oversampling_factor = 40,
                     random_state = 0)
# Fit data in KMeans
cu_m_kmeans.fit(device_m_data)

# Predict data
cu_m_kmeans_labels_predicted = cu_m_kmeans.predict(device_m_data).compute()

# Check score
#print('Cluster centers:\n',cu_m_kmeans.cluster_centers_)
#print('adjusted_rand_score: ', sk_adjusted_rand_score(device_m_labels, cu_m_kmeans.labels_))
#print('silhouette_score: ', sk_silhouette_score(device_m_data.to_pandas(), cu_m_kmeans_labels_predicted))

# Close local cluster
client.close()
cluster.close()

Przykład zbioru danych Iris:

IrisDatasetExample


EDYCJA 1

@Corey, to jest mój wynik przy użyciu twojego kodu:

NumPy Version: 1.17.5
Pandas Version: 0.25.3
Scikit-Learn Version: 0.22.1
cuPY Version: 6.7.0
cuDF Version: 0.12.0
cuML Version: 0.12.0
Dask Version: 2.10.1
DaskCuda Version: 0+unknown
DaskCuDF Version: 0.12.0
MatPlotLib Version: 3.1.3
SeaBorn Version: 0.10.0
Cluster centers:
           0         1         2         3
0  5.006000  3.428000  1.462000  0.246000
1  5.901613  2.748387  4.393548  1.433871
2  6.850000  3.073684  5.742105  2.071053
adjusted_rand_score:  0.7302382722834697
silhouette_score:  0.5528190123564102
4
JuMoGar 6 marzec 2020, 14:56

2 odpowiedzi

Najlepsza odpowiedź

Zmodyfikowałem nieco twój odtwarzalny przykład i byłem w stanie stworzyć wynik w najnowszym nocnym wydaniu RAPIDS.

To jest wynik działania skryptu.

(cuml_dev_2) cjnolet@deeplearn ~ $ python ~/kmeans_mnmg_reproduce.py 
NumPy Version: 1.18.1
Pandas Version: 0.25.3
Scikit-Learn Version: 0.22.2.post1
cuPY Version: 7.2.0
cuDF Version: 0.13.0a+3237.g61e4d9c
cuML Version: 0.13.0a+891.g4f44f7f
Dask Version: 2.11.0+28.g10db6ba
DaskCuda Version: 0+unknown
DaskCuDF Version: 0.13.0a+3237.g61e4d9c
MatPlotLib Version: 3.2.0
SeaBorn Version: 0.10.0
/share/software/miniconda3/envs/cuml_dev_2/lib/python3.7/site-packages/dask/array/random.py:27: FutureWarning: dask.array.random.doc_wraps is deprecated and will be removed in a future version
  FutureWarning,
/share/software/miniconda3/envs/cuml_dev_2/lib/python3.7/site-packages/distributed/dashboard/core.py:79: UserWarning: 
Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
  warnings.warn("\n" + msg)
bokeh.server.util - WARNING - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
Cluster centers:
           0         1         2         3
0  5.883607  2.740984  4.388525  1.434426
1  5.006000  3.428000  1.462000  0.246000
2  6.853846  3.076923  5.715385  2.053846
adjusted_rand_score:  0.7163421126838475
silhouette_score:  0.5511916046195927

A oto zmodyfikowany skrypt, który wygenerował takie dane wyjściowe:

    # Import libraries and show its versions
    import numpy as np; print('NumPy Version:', np.__version__)
    import pandas as pd; print('Pandas Version:', pd.__version__)
    import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
    import nvstrings, nvcategory
    import cupy; print('cuPY Version:', cupy.__version__)
    import cudf; print('cuDF Version:', cudf.__version__)
    import cuml; print('cuML Version:', cuml.__version__)
    import dask; print('Dask Version:', dask.__version__)
    import dask_cuda; print('DaskCuda Version:', dask_cuda.__version__)
    import dask_cudf; print('DaskCuDF Version:', dask_cudf.__version__)
    import matplotlib; print('MatPlotLib Version:', matplotlib.__version__)
    import seaborn as sns; print('SeaBorn Version:', sns.__version__)
    #import timeimport warnings

    from dask import delayed
    import dask.dataframe as dd
    from dask.distributed import Client, LocalCluster, wait
    from dask_ml.cluster import KMeans as skmKMeans
    from dask_cuda import LocalCUDACluster

    from sklearn import metrics
    from sklearn.cluster import KMeans as skKMeans
    from sklearn.metrics import adjusted_rand_score as sk_adjusted_rand_score, silhouette_score as sk_silhouette_score
    from cuml.cluster import KMeans as cuKMeans
    from cuml.dask.cluster.kmeans import KMeans as cumKMeans
    from cuml.metrics import adjusted_rand_score as cu_adjusted_rand_score
    # Configure matplotlib library
    import matplotlib.pyplot as plt

    # Configure seaborn library
    sns.set()
    #sns.set(style="white", color_codes=True)
    # Configure warnings
    #warnings.filterwarnings("ignore")


    ####################################### KMEANS #############################################################
    # Create local cluster
    cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)
    client = Client(cluster)

    # Identify number of workers
    n_workers = len(client.has_what().keys())

    # Read data in host memory
    from sklearn.datasets import load_iris

    loader = load_iris()

    #x = host_data.iloc[:, [0,1,2,3]].values
    device_m_data = dask_cudf.from_cudf(cudf.from_pandas(pd.DataFrame(loader.data)), npartitions=2) # Get data columns
    device_m_labels = dask_cudf.from_cudf(cudf.from_pandas(pd.DataFrame(loader.target)), npartitions=2)

    # Plot data
    #sns.pairplot(device_csv.to_pandas(), hue='variety');

    # Define variables
    label_type = { 'Setosa': 1, 'Versicolor': 2, 'Virginica': 3 } # Dictionary of variables type

    # Create KMeans
    cu_m_kmeans = cumKMeans(init = 'k-means||',
                     n_clusters = len(np.unique(loader.target)),
                     oversampling_factor = 40,
                     random_state = 0)
    # Fit data in KMeans
    cu_m_kmeans.fit(device_m_data)

    # Predict data
    cu_m_kmeans_labels_predicted = cu_m_kmeans.predict(device_m_data).compute()

    # Check score
    print('Cluster centers:\n',cu_m_kmeans.cluster_centers_)
    print('adjusted_rand_score: ', sk_adjusted_rand_score(loader.target, cu_m_kmeans_labels_predicted.values.get()))
    print('silhouette_score: ', sk_silhouette_score(device_m_data.compute().to_pandas(), cu_m_kmeans_labels_predicted))

    # Close local cluster
    client.close()
    cluster.close()

Czy możesz podać wyniki dotyczące wersji tych bibliotek? Poleciłbym również uruchomienie zmodyfikowanego skryptu i sprawdzenie, czy działa to pomyślnie. Jeśli nie, możemy zagłębić się dalej, aby dowiedzieć się, czy jest to związane z Dockerem, wersją RAPIDS, czy coś innego.

Jeśli masz dostęp do wiersza poleceń, w którym działa Twój notatnik Jupyter, pomocne może być włączenie logowania, przekazując verbose=True podczas konstruowania obiektu KMeans. Może to pomóc nam wyodrębnić miejsca, w których utknęły.

3
Corey J. Nolet 6 marzec 2020, 20:55

Dokumentacja Dask jest naprawdę dobra i obszerna, chociaż przyznaję, że czasami elastyczność i ilość funkcji, jeśli zapewnia, może być nieco przytłaczająca. Myślę, że pomaga postrzeganie Dask jako interfejsu API do przetwarzania rozproszonego, który daje użytkownikowi kontrolę nad kilkoma różnymi warstwami wykonywania, a każda warstwa zapewnia bardziej precyzyjną kontrolę.

compute(), wait() i persist() to pojęcia wynikające ze sposobu, w jaki zadania leżące u podstaw serii obliczeń rozproszonych są planowane na zbiorze pracowników. To, co jest wspólne dla wszystkich tych obliczeń, to wykres wykonania, który przedstawia zdalne zadania i ich wzajemne zależności. W pewnym momencie ten wykres wykonania zostaje zaplanowany na zestawie procesów roboczych. Dask zapewnia dwa interfejsy API, w zależności od tego, czy zadania leżące u podstaw wykresu są planowane natychmiast (chętnie), czy też obliczenia muszą być uruchamiane ręcznie (leniwie).

Oba te interfejsy API tworzą wykres wykonywania w miarę tworzenia zadań zależnych od wyników innych zadań. Pierwsza z nich używa interfejsu API dask.futures do natychmiastowego wykonania asynchronicznego, którego wyniki czasami możesz chcieć wait() włączyć przed wykonaniem innych operacji. Interfejs API dask.delayed jest używany do leniwych wykonań i wymaga wywołania metod takich jak compute() lub persist() w celu rozpoczęcia obliczeń.

Najczęściej użytkownicy bibliotek takich jak RAPIDS są bardziej zainteresowani manipulowaniem swoimi danymi i nie przejmują się tym, jak te manipulacje są planowane na zbiorze pracowników. Obiekty dask.dataframe i dask.array są zbudowane na bazie interfejsów API delayed i futures. Większość użytkowników wchodzi w interakcję z tymi strukturami danych zamiast interakcji z obiektami delayed i futures, ale nie jest złym pomysłem bycie ich świadomym, jeśli kiedykolwiek zajdzie potrzeba dokonania jakichś transformacji danych poza tym, co jest dystrybuowane dataframe i array zapewniają.

dask.dataframe i dask.array budują leniwe wykresy wykonania tam, gdzie jest to w ogóle możliwe, i zapewniają metodę compute() do materializacji wykresu i zwrócenia wyniku klientowi. Oba zapewniają również metodę persist() do asynchronicznego rozpoczynania obliczeń w tle. wait() jest przydatne, jeśli chcesz rozpocząć obliczenia w tle, ale nie chcesz zwracać wyników do klienta.

Mam nadzieję, że to jest pomocne.

1
Corey J. Nolet 12 marzec 2020, 14:51