Jestem zdezorientowany z wieloprocesującym Pythona.

Próbuję przyspieszyć funkcję, które przetwarzają struny z bazy danych, ale musiałem źle zrozumieć, w jaki sposób działanie wieloprocesowe, ponieważ funkcja trwa dłużej, gdy podano do puli pracowników niż z "normalnym przetwarzaniem".

Tutaj przykład tego, co próbuję osiągnąć.

from time import clock, time
from multiprocessing import Pool, freeze_support

from random import choice


def foo(x):
    TupWerteMany = []
    for i in range(0,len(x)):
         TupWerte = []
          s = list(x[i][3])
          NewValue = choice(s)+choice(s)+choice(s)+choice(s)
          TupWerte.append(NewValue)
          TupWerte = tuple(TupWerte)

          TupWerteMany.append(TupWerte)
     return TupWerteMany



 if __name__ == '__main__':
     start_time = time()
     List = [(u'1', u'aa', u'Jacob', u'Emily'),
        (u'2', u'bb', u'Ethan', u'Kayla')]
     List1 = List*1000000

     # METHOD 1 : NORMAL (takes 20 seconds) 
     x2 = foo(List1)
     print x2[1:3]

     # METHOD 2 : APPLY_ASYNC (takes 28 seconds)
     #    pool = Pool(4)
     #    Werte = pool.apply_async(foo, args=(List1,))
     #    x2 = Werte.get()
     #    print '--------'
     #    print x2[1:3]
     #    print '--------'

     # METHOD 3: MAP (!! DOES NOT WORK !!)

     #    pool = Pool(4)
     #    Werte = pool.map(foo, args=(List1,))
     #    x2 = Werte.get()
     #    print '--------'
     #    print x2[1:3]
     #    print '--------'


     print 'Time Elaspse: ', time() - start_time

Moje pytania:

  1. Dlaczego aplikacja_Sync trwa dłużej niż "normalny sposób"?
  2. Co robię źle z mapą?
  3. Czy ma sens, aby przyspieszyć takie zadania w ogóle wieloprocesowe?
  4. Wreszcie: W końcu przeczytałem tutaj, zastanawiam się, czy wieloprocessing w Pythonie w ogóle działa w oknach?
3
user1043144 25 sierpień 2012, 00:12

3 odpowiedzi

Najlepsza odpowiedź

Więc twój pierwszy problem polega na tym, że nie ma rzeczywistego równoległości, co dzieje się w foo(x), przechodzisz całą listę do funkcji raz.

1) Ideą basenu procesowego jest posiadanie wielu procesów wykonywania obliczeń na oddzielnych kawałkach niektórych danych.

 # METHOD 2 : APPLY_ASYNC
 jobs = 4
 size = len(List1)
 pool = Pool(4)
 results = []
 # split the list into 4 equally sized chunks and submit those to the pool
 heads = range(size/jobs, size, size/jobs) + [size]
 tails = range(0,size,size/jobs)
 for tail,head in zip(tails, heads):
      werte = pool.apply_async(foo, args=(List1[tail:head],))
      results.append(werte)

 pool.close()
 pool.join() # wait for the pool to be done

 for result in results:
      werte = result.get() # get the return value from the sub jobs

Daje to tylko rzeczywiste przyspieszenie, jeśli czas potrzebny na przetworzenie każdego kawałka jest większy niż czas, aby rozpocząć proces, w przypadku czterech procesów i czterech miejsc pracy, oczywiście, oczywiście te dynamika zmienia się, jeśli jesteś " Mam 4 procesy i 100 miejsc pracy do wykonania. Pamiętaj, że tworzysz zupełnie nowy interpreter Pythona cztery razy, to nie jest bezpłatne.

2) Problem, który masz z mapą, jest to, że stosuje się foo do każdego elementu w List1 w oddzielnym procesie zajmie to sporo czasu. Więc jeśli jesteś basenem ma 4 procesy {X2}} Will pojawi się element listy cztery razy i wyślij go do procesu, który zostanie rozwiązany - czekać na proces, aby zakończyć - popu proces do końca. Ma to sens, jeśli przetwarzanie pojedynczego przedmiotu zajmuje długi czas, podobnie jak na przykład, jeśli każdy element jest nazwą pliku wskazującym na jeden plik tekstowy GIGABYTE. Ale jak stoi mapę po prostu weźmie pojedynczy ciąg liście i przejdzie go do foo, gdzie apply_async zajmuje kawałek listy. Wypróbuj następujący kod

def foo(thing):
    print thing

map(foo, ['a','b','c','d'])

To wbudowana mapa Pythona i uruchomi pojedynczy proces, ale pomysł jest dokładnie taki sam dla wersji wielostronowej.

Dodano zgodnie z J.F.Sebastian komentarzem: Możesz jednak użyć argumentu chunksize do map, aby określić przybliżoną wielkość dla każdego kawałka.

pool.map(foo, List1, chunksize=size/jobs) 

Nie wiem jednak, jeśli jest problem z map w systemie Windows, ponieważ nie mam dostępnych do testowania.

3) Tak, biorąc pod uwagę, że twój problem jest wystarczająco duży, aby usprawiedliwić rozwidlenie nowych tłumaczy Pythona

4) Nie można dać ostatecznej odpowiedzi na tym, ponieważ zależy od liczby rdzeni / procesorów itp., Ale ogólnie powinno być dobrze w oknach.

2
Matti Lyra 25 sierpień 2012, 08:46

W pytaniu (2) z wytycznymi DUGAL i MATTI, zorientowałem się, co poszło nie tak. Oryginalna funkcja FOO przetwarza listę list, podczas gdy mapa wymaga funkcji do przetwarzania pojedynczych elementów.

Nowa funkcja powinna być

def foo2 (x):
    TupWerte = []
    s = list(x[3])
    NewValue = choice(s)+choice(s)+choice(s)+choice(s)
    TupWerte.append(NewValue)
    TupWerte = tuple(TupWerte)
    return TupWerte

I blok do tego zadzwonienia:

jobs = 4
size = len(List1)
pool = Pool()
#Werte = pool.map(foo2, List1, chunksize=size/jobs)
Werte = pool.map(foo2, List1)
pool.close()
print Werte[1:3]

Dzięki was wszystkim, którzy pomogli mi to zrozumieć.

Wyniki wszystkich metod: W przypadku listy * 2 rekordy Mio: normalne 13,3 sekundy, równolegle z async: 7,5 sekundy, równolegle z mapą z ChunckSize: 7.3, bez chunksyzacji 5,2 sekundy

0
user1043144 25 sierpień 2012, 11:54

Oto ogólny szablon wieloprocesowy, jeśli jesteś zainteresowany.

import multiprocessing as mp
import time

def worker(x):
    time.sleep(0.2)
    print "x= %s, x squared = %s" % (x, x*x)
    return x*x

def apply_async():
    pool = mp.Pool()
    for i in range(100):
        pool.apply_async(worker, args = (i, ))
    pool.close()
    pool.join()

if __name__ == '__main__':
    apply_async()

Wynik wygląda następująco:

x= 0, x squared = 0
x= 1, x squared = 1
x= 2, x squared = 4
x= 3, x squared = 9
x= 4, x squared = 16
x= 6, x squared = 36
x= 5, x squared = 25
x= 7, x squared = 49
x= 8, x squared = 64
x= 10, x squared = 100
x= 11, x squared = 121
x= 9, x squared = 81
x= 12, x squared = 144

Jak widać, liczby nie są w porządku, ponieważ są one wykonywane asynchronicznie.

-1
IT Ninja 24 sierpień 2012, 22:41