Napisałem niestandardowy komponent w Apache Camel. Wielbłąd z powodzeniem tworzy swojego konsumenta i za pomocą mojego URI, ale nie zostałeś powołany procesor. Oto mój fragment kodu konsumenckiego (w Kotlinie):

class SoroushBotConsumer (private val endpoint: MyEndpoint, processor: Processor) : DefaultConsumer(endpoint, processor) {
    val objectMapper:ObjectMapper = ObjectMapper();
    init {
        startListening()
    }
    private fun startListening() {
        val client = ClientBuilder.newBuilder().register(SseFeature::class.java).build()
        val target = client.target("MY_URL"))
        while(true){
            var e: EventInput?  target.request().get(EventInput::class.java)!!

            val inboundEvent = e.read()
            val exchange = endpoint.createExchange()
            exchange.getIn().body = objectMapper.readValue(inboundEvent.rawData,MessageModel::class.java)
            try {
                processor.process(exchange)
            } catch (e: Exception) {
                if (exchange.exception != null) {
                exceptionHandler.handleException("Error processing exchange",exchange, exchange.exception)
            }
        }
    }
}

Wszystko działa dobrze w konsumenta, ale nie zostanie wykonany procesor. Oto jak tworzę trasę!

var context = DefaultCamelContext()
context.addRoutes(object : RouteBuilder() {
    override fun configure() {
        from("myapp://getMessage/).process{
            println(it.getIn())
        }.to("myapp://sendMessage/")
    }
})
context.start();
Thread.sleep(100000);
context.stop();

Nie dzwoni do tego procesu ani nie tworzy mojego producenta. (Nawet nie zadzwoń do MyEndpoint::createProducer())

Kiedy zastępowałem oświadczenie from z innym punktem końcowym, takim jak file, wszystko działa dobrze.

Aktualizacja: Kiedy rozszerzam mojego konsumenta z ScheduledPollConsumer Zaimplementuj metodę pull, wszystko pójdzie dobrze.

-1
Hossein Nasr 25 luty 2019, 10:40

2 odpowiedzi

Najlepsza odpowiedź

Jego, ponieważ musimy ukończyć konstruktor konsumenta i napisać logikę odbierania wiadomości w doStart()

class SoroushBotConsumer (private val endpoint: MyEndpoint, processor: Processor) : DefaultConsumer(endpoint, processor) {
    val objectMapper:ObjectMapper = ObjectMapper();
    override fun doStart() {
        val client = ClientBuilder.newBuilder().register(SseFeature::class.java).build()
        val target = client.target("MY_URL"))
        while(true){
            var e: EventInput?  target.request().get(EventInput::class.java)!!

            val inboundEvent = e.read()
            val exchange = endpoint.createExchange()
            exchange.getIn().body = objectMapper.readValue(inboundEvent.rawData,MessageModel::class.java)
            try {
                processor.process(exchange)
            } catch (e: Exception) {
                if (exchange.exception != null) {
                    exceptionHandler.handleException("Error processing exchange",exchange, exchange.exception)
                }
            }
        }
    }
}
0
Hossein Nasr 25 luty 2019, 11:47

Nie jest to dobry pomysł, aby mieć niekończącą się pętlę w metodzie Dostarta, gdzie porwasz obecny wątek, który nigdy nie kończy. Zamiast tego powinieneś skonfigurować wątek w tła, który prowadzi tę pracę i od Dostartu można skonfigurować ten wątek i pozwolić mu uruchomić. Innymi słowy, w jaki sposób składnik "odbiera" komunikaty są w 100% specyficzne, ponieważ każdy z nich ma swój własny sposób. A w metodzie Dostop masz następnie logikę, aby zatrzymać tę wątek w tle i czyszczenia dowolnego zasobów.

1
Claus Ibsen 26 luty 2019, 04:15