Tworzę wniosek, który zużywa dane z Kafki i wykonuje na niej pewne przetwarzanie. Ponieważ skala, na której ta aplikacja powinna pracować, jest ogromna, używam współbieżnego słuchacza wsadowego (max.poll.records = 20) z usługą wykonawcą Java 8. Poniżej znajduje się kodu z komentarzami, w których utknąłem:

    //Below method is invoked by spring kafka concurrent batch listener
public void onMessage(List<ConsumerRecord<String,String>> records , Acknowledgement ack)
{
    CompletableFuture.runAsync(() -> records.stream().forEach(record -> 
    doSomeProcessingAsynchronously()),new ThreadPoolExecutor(10,10,0,TimeUnit.MILLISECONDS. new 
    LinkedBlockingQueue<>(20)));
    //I want to execute all consumed records first only then flow should come after this line
    //and acknowledge complete batch
    ack.acknowledge();
}

Jakąkolwiek sugestię, jak to zrobić? Z góry dziękuję

0
Vatsalya Saxena 18 marzec 2020, 13:39

1 odpowiedź

Najlepsza odpowiedź

Dodaj

CountDownLatch latch = new CountDownLatch(records.size());

Następnie w Lambda zlicz go dla każdego zakończonego zadania.

Przed potwierdzeniem boolean ok = latch.await(60, TimeUnit.SECONDS);.

(Sprawdź odpowiednio na limit czasu i działania).

records.stream().forEach(...

Powiedział, że nie prowadzisz przetwarzania równolegle; Po prostu używasz wszystkich 20 sekwencyjnie na innym wątku.

Musisz przenieść foreach na zewnątrz.

0
Gary Russell 18 marzec 2020, 17:37