Oto uproszczona wersja Kodeksu, z którą pracuję z: Mam klasę Python z metodą instancji, która zajmuje listę ciągów i oblicza wynik dla każdego ciągu, ostatecznie łącząc wyniki przed powrotem, jak:

class Foo(object):
    def do_task(stringList):
        for s in stringList:
            result = computeResult(s)
        # combine results below...

Ponieważ obliczenia z ciążem są niezależne (i dość drogie), staram się równoleglić działanie z klasą basenu w module wieloprocessującego. W ten sposób zdefiniowałem równoległe wersję do_task w następujący sposób (obecnie drukujemy oddzielne wyniki zamiast je łączyć):

def do_task_parallel(stringList):
    numProcs = 2
    pool = Pool(processes=numProcs)
    chunksize = int(math.ceil(len(stringList) / float(numProcs)))
    for result in pool.imap(self.do_task, stringList, chunksize):
        print result
    pool.close()

Zgodnie z moim rozumieniem, w jaki sposób działa basen w oparciu o dokumentację i przykłady, które przeczytałem, powinno to podzielić listę ratunkową na kawałki z grubsza wielkości Chunksyze, z których każdy jest składany jako zadanie do jednego z procesów w basenie. W ten sposób, jeśli mam listę stringList = ["foo1", "foo2", "foo3", "foo4"] podzielił się wśród 2 procesów (dając kawałek 2), basen powinien podzielić to w stringList1 = ["foo1", "foo2"] i stringList2 = ["foo3", "foo4"], które będą obsługiwane przez te dwa różne procesy równolegle.

Jednakże, gdy tworzę foo () obiekt i zadzwonić foo.do_task_parallel(stringList), wydaje się, że basen przechodzi każdy element mojego stringList oddzielnie do do_task (jako część jednego). Nie tylko to nie przyspiesza mojego kodu, ale to sprawia, że jest niepoprawne i faktycznie spowalnia go, jak do_task następnie wzywa computeResult na każdym znaku jednego ciągu wejściowego przeszedł na każdym z czterech oddzielnych połączenia. Spodziewałem się dwóch połączeń z każdym połączeniami wkładu z listą rozmiarów 2, a nie cztery połączenia obsługujące pojedynczy ciąg wejściowy. Sprawdziłem i chunksize jest rzeczywiście 2. Co robię źle? Jeśli pomoże, działa Python 2.7.3 w systemie Windows 7 przez Cygwin.

2
Tom Swift 24 listopad 2013, 05:39

2 odpowiedzi

Najlepsza odpowiedź

Twoje zrozumienie jest wyłączone ;-) {x0}} jest wyłącznie opcjonalną optymalizacją: zmienia się ona Nic o tym, co jest przekazywane do funkcji pracowników, daje tylko wskazówkę do maszyn {X1}} Ile zadań wysyłania wewnętrznych rur międzyprodukcyjnych na raz.

Jeśli chcesz, aby Twoja funkcja pracownika została przekazana listę ciągów, musisz wyraźnie kodować. Na przykład i trzymanie go na wielu liniach dla jasności:

chunks = [stringList[i: i+chunksize]
          for i in xrange(0, len(stringList), chunksize)]

for result in pool.imap(self.do_task, chunks):
    print result
9
Tim Peters 24 listopad 2013, 01:49

Pool.map i Pool.imap są zaprojektowane, aby zachowywać się równoważnie do wbudowanej funkcji Python map (z wyjątkiem równolegle). Jako taki stosują funkcję, którą dajesz im indywidualnie każdemu przedmiotowi w wejściu.

chunksize Oferty, w jaki przedmioty są zablokowane do zadania multiprocessingowe , ale nie wpływa na to, w jaki sposób funkcja jest wywoływana na poszczególnych elementach. (Zasadniczo, dla mapy / IMAP Handler zadań ma już wbudowany for item in input: ....)

Wygląda na to, co naprawdę chcesz zrobić, jest mapowanie wywołania computeResult za pomocą mapy równoległej, a następnie łącząc po nabyciu wyników.

2
Amber 24 listopad 2013, 01:44