Mam wiosenną butę Kafka Streams Maven App. Używam sprężyny-boot-startera-rodzic 2.4.4 dla moich sprężynowych zależności i strumieni Kafka 2.7.0.

Utknąłem w bieżąco z testami

java.lang.NullPointerException Podczas próby załadowania konfiguracji mojej aplikacji zasoby / aplikacja.yml lub test / zasoby / test / test.Resources lub test / zasoby / aplikacja.

Mam klasę konfiguracyjną z tą adnotacje i getters i osadników dla pól, które są zdefiniowane z taką samą nazwą jak w aplikacji.yml

package com.acme.rtc.configuration;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@ConfigurationProperties(prefix = "topics")
@Component
@Configuration
public class ConfigProps {

    private String MATRIXX_ADJ_EVENT_TOPIC;

    private String OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC;

    private String OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC;

    private String EVENTS_NO_MVNO;


    public void setMATRIXX_ADJ_EVENT_TOPIC(String MATRIXX_ADJ_EVENT_TOPIC) {
        this.MATRIXX_ADJ_EVENT_TOPIC = MATRIXX_ADJ_EVENT_TOPIC;
    }

    public void setOUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC(String OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC) {
        this.OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC = OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC;
    }

    public void setOUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC(String OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC) {
        this.OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC = OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC;
    }

    public String getEVENTS_NO_MVNO() {
        return EVENTS_NO_MVNO;
    }

    public void setEVENTS_NO_MVNO(String EVENTS_NO_MVNO) {
        this.EVENTS_NO_MVNO = EVENTS_NO_MVNO;
    }

    public String getMATRIXX_ADJ_EVENT_TOPIC() {
        return MATRIXX_ADJ_EVENT_TOPIC;
    }

    public String getOUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC() {
        return OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC;
    }

    public String getOUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC() {
        return OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC;
    }

}

Robię @autowire tej klasy w mojej klasie testowej i aplikacji,

@Autowired
ConfigProps cp;

I próbując uzyskać dostęp do pól przy użyciu cp.getBootstrapServerHost(), ale rozwiązuje się do nullpointer w mojej klasie testowej. Ale prawidłowo rozwiązuje w mojej klasie aplikacji ...

Moja klasa testowa wygląda tak

package distinct;

import com.acme.rtc.configuration.ConfigProps;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import com.acme.rtc.configuration.KafkaConfiguration;
import com.acme.rtc.configuration.TopologyConfiguration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import java.util.List;

import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertEquals;

@SpringBootTest
@ContextConfiguration(classes = TopologyConfiguration.class)
@SpringJUnitConfig
public class TestWithTopologyTestDriver {
    private TestInputTopic<String, String> inputTopicWrong;
    private TestOutputTopic<String, String> outputTopicWrong;

    private TestInputTopic<String, String> inputTopicRight;
    private TestOutputTopic<String, String> outputTopicRight;
    private TopologyTestDriver topologyTestDriver;

    @Autowired
    ConfigProps configProps;

    @BeforeEach
    public void setUp() {
        KafkaProperties properties = new KafkaProperties();
        properties.setBootstrapServers(singletonList("localhost:9092"));
        KafkaStreamsConfiguration config = new KafkaConfiguration(properties).getStreamsConfig();
        StreamsBuilder sb = new StreamsBuilder();
        Topology topology = new TopologyConfiguration().createTopology(sb);
        topologyTestDriver = new TopologyTestDriver(topology, config.asProperties());
        inputTopicWrong =
                topologyTestDriver.createInputTopic(configProps.getMATRIXX_ADJ_EVENT_TOPIC(), new StringSerializer(),
                        new StringSerializer());
        outputTopicWrong =
                topologyTestDriver.createOutputTopic(configProps.getOUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC(), new StringDeserializer(),
                        new StringDeserializer());

        inputTopicRight =
                topologyTestDriver.createInputTopic(configProps.getMATRIXX_ADJ_EVENT_TOPIC(), new StringSerializer(),
                        new StringSerializer());
        outputTopicRight =
                topologyTestDriver.createOutputTopic(configProps.getOUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC(), new StringDeserializer(),
                        new StringDeserializer());
    }

    @AfterEach
    public void tearDown() {
        topologyTestDriver.close();
    }
@Test
    void wrongDistinctTopology() {
        testTopology(inputTopicWrong, outputTopicWrong);
    }}

Gdzie TopologyConfiguration jest moją aplikacją i ma to podpis

package com.acme.rtc.configuration;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Component;

@Configuration
@ConfigurationProperties(prefix = "topics")
@Component
@RequiredArgsConstructor
public class TopologyConfiguration {
    @Autowired
    Environment env;

    @Autowired
    ConfigProps configProps;

    private void acmeStreamsTopoloy(StreamsBuilder streamsBuilder) {

        Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
        System.out.println("ConfigProps.getMattrix: "+configProps.getMATRIXX_ADJ_EVENT_TOPIC());


        KStream<String, String> inputStream =
                streamsBuilder.stream(configProps.getMATRIXX_ADJ_EVENT_TOPIC(), Consumed.with(Serdes.String(), Serdes.String()));
        KStream<String, String>[] branches = inputStream.branch(
                (key, value)-> value.contains("KGN"),
                (key, value)-> value.contains("LEB"),
                (key, value)->true);

        branches[0].to(configProps.getOUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC());
        branches[1].to(configProps.getOUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC());
        branches[2].to(configProps.getEVENTS_NO_MVNO());

    }


    @Bean
    public Topology createTopology(StreamsBuilder streamsBuilder) {
        acmeStreamsTopoloy(streamsBuilder);
        return streamsBuilder.build();
    }


}

Klasa mojej Kafkakonfiguration

package com.acme.rtc.configuration;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration {

    public static final String APP_ID = "acme-stream-rtc";
    private final KafkaProperties kafkaProperties;


    @Autowired
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration getStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS,false);
        KafkaStreamsConfiguration streamsConfig = new KafkaStreamsConfiguration(props);
        return streamsConfig;
    }


}

My Application.yml ma odpowiednią składnię itp.

spring:
  kafka:
    bootstrap-servers: localhost:9092
  json:
    value:
      default:
        type: true
kafka:
  streams:
    properties:
      default:
        value:
          serde: org.springframework.kafka.support.serializer.JsonSerde
      admin:
    security:
      protocol: SSL
    ssl:
      trust-store-location: ${TRUSTSTORE_LOCATION}
      trust-store-password: ${TRUSTSTORE_PASSWORD}
      key-store-location: ${KEYSTORE_LOCATION}
      key-store-password: ${KEYSTORE_PASSWORD}
      key-password: ${KEY_PASSWORD}


topics:
  MATRIXX_ADJ_EVENT_TOPIC: input-matrixx-adj-event
  OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC: output-KGN-adj-event
  OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC: output-LEB-adj-event
  EVENTS_NO_MVNO: events-no-mvno-spec

Moja główna klasa

package com.acme.rtc;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;

@SpringBootApplication
@EnableKafkaStreams
public class StreamProcessing {

    public static void main(String[] args) {
        SpringApplication.run(StreamProcessing.class, args);
    }

}

Nie jestem pewien, czy brakowało kontekstu, gdy autowizowuję klasę konfiguracyjną lub jeśli potrzebuję dalszych adnotacji na mojej klasie testowej.

0
Tabber 13 kwiecień 2021, 18:48

1 odpowiedź

Najlepsza odpowiedź

Dla Junit4 potrzebujesz @Runwith(SpringJUnit4ClassRunner.class) obok @ContextConfiguration.

Dla Junit5 użyj @SpringJUnitConfig.

Aby jednak ładować właściwości, potrzebujesz @SpringBootTest.

Boot 2.4 używa Junit5.

I nie powinieneś mieć @ConfigurationProperties na teście.

EDYTUJ

Właśnie przetestowałem go bez problemów.

@Configuration
public class Config {

    @Bean
    String str() {
        return "str";
    }

}


@ConfigurationProperties(prefix = "foo")
@Component
public class MyProps {

    String bar;

    public String getBar() {
        return this.bar;
    }

    public void setBar(String bar) {
        this.bar = bar;
    }

    @Override
    public String toString() {
        return "MyProps [bar=" + this.bar + "]";
    }

}

@SpringBootApplication
public class So67078244Application {

    public static void main(String[] args) {
        SpringApplication.run(So67078244Application.class, args);
    }

}

@SpringBootTest
class So67078244ApplicationTests {

    @Autowired
    MyProps props;

    @Test
    void contextLoads() {
        System.out.println(this.props);
    }

}
foo.bar=baz
MyProps [bar=baz]
0
Gary Russell 13 kwiecień 2021, 16:56