Próbuję napisać narzędzie do wyszukiwania sieci przy użyciu Dask. Funkcja obiektywna wywołuje metodę klasy, która zawiera duży plik danych. Próbuję użyć DASH, aby równoległy do obliczeń do rozwiązania wielu podstawowych bez konieczności kopiowania oryginalnej klasy / Dataframe. Nie znalazłem żadnych rozwiązań w dokumentacji, więc publikuję tutaj przykład zabawka:

import pickle
from dask.distributed import Client, LocalCluster
from multiprocessing import current_process


class TestClass:
    def __init__(self):
        self.param = 0

    def __getstate__(self):
        print("I am pickled!")
        return self.__dict__

    def loss(self, ext_param):
        self.param += 1
        print(f"{current_process().pid}: {hex(id(self))}:  {self.param}: {ext_param} ")
        return f"{self.param}_{ext_param}"


def objective_function(param):
    return test_instance.loss(param)

if __name__ == '__main__':

    test_instance = TestClass()
    print(hex(id(test_instance)))
    cluster = LocalCluster(n_workers=2)
    client = Client(cluster)
    futures = client.map(objective_function, range(20))
    result = client.gather(futures)
    print(result)
    
# ---- OUTPUT RESULTS ----
# 0x7fe0a5056d30
# I am pickled!
# I am pickled!
# 11347: 0x7fb9bcfa0588:  1: 0
# 11348: 0x7fb9bd0a2588:  1: 1
# 11347: 0x7fb9bcf94240:  1: 2
# 11348: 0x7fb9bd07b6a0:  1: 3
# 11347: 0x7fb9bcf945f8:  1: 4 
# ['1_0', '1_1', '1_2', '1_3', '1_4']

Mam następujące pytania:

  1. Dlaczego następująca funkcja marynata zwana dwukrotnie?
  2. Zauważam, że każda z iteracji funkcji mapy Użyj świeżej kopii test_instance, jak widać z różnych adresów klasy na każdym z iteracji, a także fakt, że atrybut test_instance.param jest ustawiony na 0 na każdej iteracji (to zachowanie różni się od standardowej implementacji multiprocessing.pool podświetlone Tutaj). Zakładam, że podczas każdej iteracji każdy proces otrzyma świeżą kopię klas marynowanej - jest to poprawne?
  3. Po z (2), ile kopii test_instance jest w pamięci podczas obliczeń? Czy to 1 (dla pierwotnej instancji w gwint głównym) + 1 (kopia marynowana) + 2 (instancje obecne w każdym z procesów) = 4? Czy jest jakiś sposób, aby uzyskać tę wartość do 1?

Zauważyłem, że niektóre wspólne rozwiązania pamięci są dostępne za pomocą biblioteki Ray, jak zaproponowane w Niniejsza kwestia GitHub .

1
Raven 27 październik 2020, 20:38

1 odpowiedź

Najlepsza odpowiedź

Dlaczego następująca funkcja marynata zwana dwukrotnie?

Zwykle marynata Pythona skutecznie wiązuje zmienne instancji i odniesienie do klasy w importowanym module. W __main__, może to być niewiarygodne, a Dask spada z powrotem do Cloudpickle (co również wywołuje ogniste wewnętrznie). Wygląda na mnie, jak sprawdzenie, aby być w {x1}} w distributed.protocol.pickle.dumps może się zdarzyć przed pierwszą próbą marynowania.

Podczas każdej iteracji każdy proces otrzyma świeżą kopię klasy marynowanej

Tak. Za każdym razem, gdy DASH prowadzi zadanie, opisuje dane wejściowe, tworząc kopię NW instancji. Należy pamiętać, że pracownicy DASH są prawdopodobnie tworzone przez technikę Fork_server, więc pamięć nie jest po prostu skopiowana (jest to bezpieczny sposób robienia rzeczy).

Możesz "rozpraszać" instancję dla pracowników przed obliczeniami, a mogą ponownie wykorzystać swoją lokalną kopię, ale zadania DASH nie powinny pracować przez odwzorowanie obiektów, ale zwracając wyniki (tj. Funkcjonalnie).

Ile kopii Test_instance jest w pamięci

1 w kliencie plus jeden na zadanie wykonane. Wersje serializowane mogą być również w pobliżu, prawdopodobnie jeden trzymany na wykresie, który jest tymczasowo na kliencie, a następnie przechowywany na harmonogramie; Będzie to również tymczasowo w pamięci robotniczej podczas gdy będąc pochodzi z opuszczonym. Dla niektórych typów możliwe jest zero-kopiowanie DE / SER.

Jeśli zadania są bardzo duże ze względu na rozmiar obiektu, zdecydowanie powinieneś wcześniej "rozpraszać" (client.scatter).

Czy jest jakiś sposób, aby uzyskać tę wartość do 1?

Możesz uruchomić w procesie harmonogramu i / lub pracowników do udostępniania pamięci, ale oczywiście tracisz równoległość do Gil.

Może możesz wypróbować Actor Interface? Wzór wydaje się pasować do Twojego przepływu pracy.

2
mdurant 28 październik 2020, 13:05