Używam modułu multiprocessor.pool (), aby przyspieszyć pętlę "zawstydzająco równoległe". Właściwie mam zagnieżdżoną pętlę i używam multiprocessor.pool, aby przyspieszyć wewnętrzną pętlę. Na przykład bez równoległego pętli, mój kod byłby następujący:

outer_array=[random_array1]
inner_array=[random_array2]
output=[empty_array]    

for i in outer_array:
    for j in inner_array:
        output[j][i]=full_func(j,i)

Z równoległym:

import multiprocessing
from functools import partial

outer_array=[random_array1]
inner_array=[random_array2]
output=[empty_array]    

for i in outer_array:
    partial_func=partial(full_func,arg=i)     
    pool=multiprocessing.Pool() 
    output[:][i]=pool.map(partial_func,inner_array)
    pool.close()

Moim głównym pytaniem jest to, że jest to poprawny, a ja powinienem włączyć multiprocessing.pool () wewnątrz pętli lub jeśli zamiast tego powinienem stworzyć basen poza pętlą, tj.:

pool=multiprocessing.Pool() 
for i in outer_array:
     partial_func=partial(full_func,arg=i)     
     output[:][i]=pool.map(partial_func,inner_array)

Nie jestem pewien, czy powinienem zawierać linię "Basen.Zamknij ()" na końcu każdej pętli w drugim przykładzie powyżej; Jakie byłyby korzyści z tego?

Dzięki!

20
shadowprice 5 grudzień 2013, 02:32

3 odpowiedzi

Najlepsza odpowiedź

Idealnie, powinieneś zadzwonić do konstruktora Pool() dokładnie raz - nie przepełniony i wzmacniacz; jeszcze raz. W tworzeniu procesów roboczych są znaczne koszty ogólne, i płacisz te koszty za każdym razem, gdy wywołasz Pool(). Procesy stworzone przez single Pool() call pozostają! Kiedy kończą pracę, którą dałeś im w jednej części programu, trzymać się, czekając na więcej pracować.

Jeśli chodzi o Pool.close(), powinieneś zadzwonić do tego, kiedy - i tylko wtedy, gdy - nigdy nie zamierzasz przesłać więcej pracy na instancję Pool. Więc Pool.close() jest zazwyczaj nazywany, gdy skończy się równoległą część głównego programu. Następnie procesy robotnicze zakończy się, gdy odpowiednia zostanie zakończona.

Doskonała praktyka, aby zadzwonić Pool.join(), aby czekać na zakończenie procesów roboczych. Wśród innych powodów często nie ma dobrego sposobu, aby zgłaszać wyjątki w kodzie równoległym (wyjątki występują w kontekście tylko niejasno związane z tym, co robią główny program), a {X1}} zapewnia punkt synchronizacji, który może zgłaszać pewne wyjątki, które wystąpiły W procesach roboczych, które nigdy nie zobaczysz.

Baw się dobrze :-)

43
Tim Peters 4 grudzień 2013, 22:46
import itertools
import multiprocessing as mp

def job(params):
    a = params[0]
    b = params[1]
    return a*b

def multicore():
    a = range(1000)
    b = range(2000)
    paramlist = list(itertools.product(a,b))
    print(paramlist[0])
    pool = mp.Pool(processes = 4)
    res=pool.map(job, paramlist)
    for i in res:
        print(i)

if __name__=='__main__':
    multicore()

Co powiesz na to?

1
user9500792 16 marzec 2018, 02:59
import time
from pathos.parallel import stats
from pathos.parallel import ParallelPool as Pool


def work(x, y):
    return x * y


pool = Pool(5)
pool.ncpus = 4
pool.servers = ('localhost:5654',)
t1 = time.time()
results = pool.imap(work, range(1, 2), range(1, 11))
print("INFO: List is: %s" % list(results))
print(stats())
t2 = time.time()
print("TIMER: Function completed time is: %.5f" % (t2 - t1))
-1
Yagmur SAHIN 6 luty 2019, 12:14