Próbuję słuchać kolejki RabbitMq w mojej aplikacji Spring Boot. W moim pliku RabbitConfig.java mam następujące ziarna:

// RabbitConfig.java
// ...Queue, exchange, binding beans, etc

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setConcurrentConsumers(2);
    container.setPrefetchCount(100);
    container.setQueueNames(QUEUE_NAME);
    return container;
}

@Bean
public AmqpTemplate getTemplate(ConnectionFactory connectionFactory) {
    final RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(new Jackson2JsonMessageConverter(mapper));
    return template;
}

Utworzyłem kontener nasłuchiwania, ponieważ muszę wprowadzić w nim pewne ustawienia, takie jak licznik pobierania wstępnego.

Stworzyłem również inną klasę jako odbiornik wiadomości, która wygląda następująco:

@Component
public class MyMessageListener {
    @RabbitListener(queues = QUEUE_NAME)
    public void messageHandler(MyMessageObj message, Channel channel) throws IOException {
        // process message...
}

Jednak gdy uruchamiam aplikację, generuje ona błąd Error Handler converted exception to fatal. Wygląda na to, że błąd występuje, ponieważ kontener oczekiwał, że messageHandler będzie miał inną sygnaturę metody.

Myślę, że mogłem zrobić coś nieprawidłowego w kontenerze, ponieważ kiedy go usuwam, aplikacja jest w stanie uruchomić i nasłuchiwać kolejki, z wyjątkiem tego, że nie mogę skonfigurować ustawień w kontenerze.

Co zrobiłem źle i co powinienem zrobić, aby kontener używał prawidłowo messageHandler?

1
Carven 20 grudzień 2019, 04:33
>@Bean >SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) Co to za kontener? Nie ma słuchacza. Kontener odbiornika dla @RabbitListener jest tworzony przez fabrykę kontenera odbiornika skonfigurowaną przy użyciu właściwości Boot. Musisz pokazać pełny ślad stosu, abyśmy mogli zrozumieć, czym jest wyjątek konwersji.
 – 
Gary Russell
20 grudzień 2019, 17:41

1 odpowiedź

Czy mógłbyś spróbować zaimplementować klasę MessageListener, która implementuje org.springframework.amqp.core.MessageListener i implementuje metodę "onMessage(Message message)"

Gdy masz już klasę @Configuration, w której zdefiniowałeś SimpleMessageListenerContainer, musisz mieć odpowiednią klasę, która implementuje org.springframework.amqp.core.MessageListener i implementuje metodę "onMessage(Message message)".

Daj mi znać, jeśli to pomoże.

@Konfiguracja public class MessageConfig {

private static String queue = "ARRQueue";
@Bean
ConnectionFactory connectionFactory(){
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
    cachingConnectionFactory.setUsername("guest");
    cachingConnectionFactory.setPassword("guest");
    return cachingConnectionFactory;
}

@Bean
MessageListenerContainer messageListenerContainer(){
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setConnectionFactory(connectionFactory());
    simpleMessageListenerContainer.setQueues(new Queue(queue, true));
    simpleMessageListenerContainer.setMessageListener(new MessageListener());
    return simpleMessageListenerContainer;
}

}

Public class MessageListener implementuje org.springframework.amqp.core.MessageListener { @Override public void onMessage(Message message) { System.out.println("MessageListener.onMessage : " + new String(message.getBody())); }

1
Rahul A R 15 luty 2020, 16:11