Mam funkcję, która pokazuje pasek postępu, podczas gdy inna funkcja działa, używając concurrent.futures.ThreadPoolExecutor.

def run_with_loading(function, args=[], kwargs={}, phrase="Loading...", bar_length=5):
    '''
    Run a function while showing a loading bar.
    '''
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        f = executor.submit(function, *args, **kwargs)
        while True:
            for i in range(bar_length):
                if f.done():
                    result = f.result()
                    if f.exception() or not result[0]:
                        c = "✗"
                    else:
                        c = "✔"
                    print(f"\33[2K\r{phrase} {c}")
                    return result
                sys.stdout.write(
                    f"\r{phrase} {'□' * i}{'■'}{'□' * (bar_length - i - 1)}")
                sys.stdout.flush()
                time.sleep(0.2)

Jednak to nadal sondaje, aby sprawdzić, czy funkcja pojawiła się, co 0,2 sekundy. Chociaż to działa, zastanawiam się, czy jest jakiś efektywny sposób, aby powiadomić funkcję run_with_loading, że rozpoczął się function. Muszę zachować, czy istnieje wyjątek, czy nie, który jest wyraźny w kodzie, więc może drukować .

0
Zeke Marffy 11 październik 2020, 22:57

1 odpowiedź

Najlepsza odpowiedź

Zamiast odpytywania rezultatu, należy użyć współbieżnych.Futures.as_completed do pętli przez wyniki:

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    futures = []
    futures.append(executor.submit(function, *args, **kwargs))
    for future in concurrent.futures.as_completed(futures):
        result = f.result()
        if f.exception() or not result[0]:
            c = "✗"
        else:
            c = "✔"
        print(f"\33[2K\r{phrase} {c}")

Możesz tutaj znaleźć dokumentację: Dokument futures funkcja as_completed iterator przejść przez wszystkie gotowe futures, które zakończyły się pomyślnie.

Musisz dostosować podejście do bar_length, ale mam nadzieję, że to pomaga ci zdobyć kolejny pomysł, jak czekać na swoje wyniki.

1
KimKulling 13 październik 2020, 08:01