Właśnie aktualizujemy Spring-Cloud-Stream do wersji 3.0.0 i napotykamy następujące problemy:

W przypadku takiego stylu funkcjonalnego:

public class EventProcessor {

    private final PriceValidator priceValidator;

    @Bean
    public Function<Flux<EnrichedValidationRequest>, Flux<ValidationResult>> validate() {
        return enrichedValidationRequestFlux -> enrichedValidationRequestFlux
                .map(ProcessingContext::new)
                .flatMap(priceValidator::validateAndMap);
    }
}

Plik application.yaml wygląda następująco:

spring.cloud.stream:
  default-binder: kafka
  kafka:
    binder:
      brokers: ${kafka.broker.prod}
      auto-create-topics: false
  function.definition: validate

# INPUT: enrichedValidationRequests
spring.cloud.stream.bindings.validate-in-0:
  destination: ${kafka.topic.${spring.application.name}.input.enrichedValidationRequests}
  group: ${spring.application.name}.${STAGE:NOT_SET}
  consumer:
    useNativeDecoding: true


spring.cloud.stream.kafka.bindings.validate-in-0:
  consumer:
    configuration:
      key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value.deserializer: de.pricevalidator.deserializer.EnrichedValidationRequestDeserializer


# OUTPUT: validationResults
spring.cloud.stream.bindings.validate-out-0:
  destination: validationResultsTmp
  producer:
    useNativeEncoding: true

spring.cloud.stream.kafka.bindings.validate-out-0:
  producer:
    compression.type: lz4
    messageKeyExpression: payload.offerKey
    configuration:
      key.serializer: org.apache.kafka.common.serialization.StringSerializer
      value.serializer: de.pricevalidator.serializer.ValidationResultSerializer

Wygląda na to, że serializacja jest wykonywana dwukrotnie — kiedy przechwycimy wiadomości, które są tworzone w temacie kafka, konsument po prostu wyświetla je jako JSON (ciągi), ale teraz jest to nieczytelny byte[]. Ponadto odbiorcy podrzędni w produkcji nie mogą już deserializować wiadomości. O dziwo, deserializacja komunikatów wejściowych wydaje się działać dobrze, bez względu na to, co umieścimy we właściwościach konsumenta (na poziomie bindera lub domyślnego kafka) Mamy wrażenie, że ten błąd „wrócił”, ale nie możemy znaleźć dokładnego miejsca w kodzie: https://github.com/spring-cloud/spring-cloud-stream/issues/1536

Nasze (brzydkie) obejście:

@Slf4j
@Configuration
public class KafkaMessageConverterConfiguration {

    @ConditionalOnProperty(value = "spring.cloud.stream.default-binder", havingValue = "kafka")
    @Bean
    public MessageConverter validationResultConverter(BinderTypeRegistry binder, ObjectMapper objectMapper) {
        return new AbstractMessageConverter(MimeType.valueOf("application/json")) {
            @Override
            protected boolean supports(final Class<?> clazz) {
                return ValidationResult.class.isAssignableFrom(clazz);
            }

            @Override
            protected Object convertToInternal(final Object payload, final MessageHeaders headers, final Object conversionHint) {
                return payload;
            }
        };
    }
}

Czy istnieje „właściwy” sposób ustawienia niestandardowego serializatora lub uzyskania natywnego kodowania, tak jak było wcześniej?

1
smlgbl 17 grudzień 2019, 18:24
Co ciekawe, mamy prawdopodobnie powiązany problem podczas deserializacji (w innym reaktywnym projekcie, który nie używa stylu funkcjonalnego, ale podejście StreamListener) i targetClass w MappingJackson2MessageConverter.convertFromInternal kończy się na Flux - powinien po prostu zwrócić ładunek, ale Oczywiście ładunek nie jest Fluxem, ale innym obiektem. Przed wersją 3.0.0 zawsze dołączaliśmy zależność spring-cloud-stream-reactive, która zapewniała komponent o nazwie „MessageChannelToInputFluxParameterAdapter”, który dokładnie to robił.
 – 
smlgbl
17 grudzień 2019, 18:43

1 odpowiedź

To był problem zgłoszony zaraz po 3.0.0.RELEASE - https: //github.com/spring-cloud/spring-cloud-stream/commit/74aee8102898dbff96a570d2d2624571b259e141. Został rozwiązany i będzie dostępny w 3.0.1.RELEASE (Horsham.SR1) za kilka dni.

0
Oleg Zhurakousky 17 grudzień 2019, 20:52