Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

Cannot convert from [[B] to [com.example.DummyMessage] for GenericMessage #1147

Open
metalpalo opened this issue Aug 27, 2021 · 13 comments
Open
Assignees

Comments

@metalpalo
Copy link

Describe the issue
After upgrade from spring-cloud-starter-parent from Hoxton.SR1 to Hoxton.SR2 we received following error when consuming kafka messages avro encoded from legacy microservices used springboot 1.5.x:

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.example.DummyMessage] for GenericMessage [payload=byte[49], headers={kafka_timestampType=CREATE_TIME, kafka_receivedTopic=Topic_DummyMessages, originalContentType=application/vnd.dummymessage.v1+avro, spanTraceId=b12de318aedd5501, spanId=b12de318aedd5501, nativeHeaders={X-B3-TraceId=[b12de318aedd5501], spanTraceId=[b12de318aedd5501], X-B3-SpanId=[b12de318aedd5501], spanId=[b12de318aedd5501], X-B3-Sampled=[0], spanSampled=[0]}, kafka_offset=11, X-B3-SpanId=b12de318aedd5501, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@398b3b8d, X-B3-Sampled=0, X-B3-TraceId=b12de318aedd5501, id=0b716343-7904-258b-d4b6-12263b6a5216, kafka_receivedPartitionId=0, spanSampled=0, contentType=application/octet-stream, kafka_receivedTimestamp=1629983108296, kafka_groupId=DummyService, timestamp=1630042306997}]
	at org.springframework.cloud.stream.config.SmartPayloadArgumentResolver.resolveArgument(SmartPayloadArgumentResolver.java:123) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-3.0.2.RELEASE.jar:3.0.2.RELEASE]
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198) ~[spring-integration-core-5.2.3.RELEASE.jar:5.2.3.RELEASE]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1696) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1679) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1634) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

The message paylods on kafka had wrapped avro contentType withing application/octet-stream:
contentType="application/octet-stream" originalContentType="application/vnd.dummymessage.v1+avro"

Both producer and consumer service has set embeddedHeaders as headerMode.

To Reproduce
Steps to reproduce the behavior:

  1. Send avro kafka message from legacy SB 1.5.x microservices
  2. Kafka message exists on topic
  3. Start consuming SB microsevice with Hoxton.SR2
  4. Consumer received error mentioned above

Version of the framework
spring-cloud-starter-parent:Hoxton.SR2
spring-boot-starter-parent: 2.2.4.RELEASE
spring-cloud-stream-binder-kafka: 3.0.2.RELEASE

Expected behavior
Kafka message should be successfully processed as for following configuration:
spring-cloud-starter-parent:Hoxton.SR1
spring-boot-starter-parent: 2.2.2.RELEASE
spring-cloud-stream-binder-kafka: 3.0.1.RELEASE

Can somebody tell me what change there? Does exist some configuration for backward compatibility?
thanks
brmetalpalo

@metalpalo
Copy link
Author

metalpalo commented Sep 17, 2021

When I hardcoded this in dependencyManagement it works and also send message outside but I afraid of imcompatible issues.

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    <version>3.0.1.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
    <version>3.0.1.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>3.0.1.RELEASE</version>
</dependency>

I debuged it. When using vesrion 3.0.1 the AvroSchemaRegistryClientMessageConverter has injected OriginalContentTypeResolver, but 3.0.2 has DefaultContentTypeResolver

OriginalContentTypeResolver was originally set into AvroSchemaRegistryClientMessageConverter during avro auto configuration, but later reset to DefaultContentTypeResolver in CompositeMessageConverterFactory class

The class where this happened and is different between versions and it names ContentTypeConfiguration.

  1. It constructing ConfigurableCompositeMessageConverter(3.0.1) or CompositeMessageConverter(3.0.2).
  2. ContentTypeConfiguration(3.0.1) sets DefaultContentTypeResolver via CompositeMessageConverterFactory only for default messages converters
  3. ContentTypeConfiguration(3.0.2) sets DefaultContentTypeResolver via CompositeMessageConverterFactory also for custom converters where AvroSchemaRegistryClientMessageConverter included

Can somebody explain why this was changed there?

@olegz olegz transferred this issue from spring-cloud/spring-cloud-stream Sep 17, 2021
@metalpalo
Copy link
Author

metalpalo commented Sep 20, 2021

I have this as workaround solution working for me. But I don't know if its OK. Can somebody confirm that some problems can occur?

Slf4j
@Configuration
@ConditionalOnProperty(value = {"avro.configuration.use-original-content-type-resolver"}, matchIfMissing = true)
public class AvroConfiguration {

    @Autowired
    void fixContentTypeResolver4AvroSchemaRegistryClientMessageConverter(
            @Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME) CompositeMessageConverter converter) {
        log.info("replacing DefaultContentTypeResolver...");
        List<MessageConverter> converters = converter.getConverters();
        List<AvroSchemaRegistryClientMessageConverter> asrcmConverters = converters.stream()
                .filter(mc -> mc instanceof AvroSchemaRegistryClientMessageConverter)
                .filter(mc -> ((AvroSchemaRegistryClientMessageConverter) mc).getContentTypeResolver() instanceof DefaultContentTypeResolver)
                .map(mc -> ((AvroSchemaRegistryClientMessageConverter) mc))
                .collect(Collectors.toList());
        if (!CollectionUtils.isEmpty(asrcmConverters)) {
            log.info("replacing DefaultContentTypeResolver with custom OriginalContentTypeResolver for AvroSchemaRegistryClientMessageConverter instances {}", asrcmConverters);
            OriginalContentTypeResolver originalContentTypeResolver = new OriginalContentTypeResolver();
            for (AvroSchemaRegistryClientMessageConverter asrcmConverter : asrcmConverters) {
                asrcmConverter.setContentTypeResolver(originalContentTypeResolver);
            }
        }
    }

    //TODO: overtaken from schema registry client because is package protected
    public static class OriginalContentTypeResolver implements ContentTypeResolver {

        private static final String BINDER_ORIGINAL_CONTENT_TYPE = "originalContentType";

        private ConcurrentMap<String, MimeType> mimeTypeCache = new ConcurrentHashMap<>();

        @Override
        public MimeType resolve(MessageHeaders headers) {
            Object contentType = headers
                    .get(BINDER_ORIGINAL_CONTENT_TYPE) != null
                    ? headers.get(BINDER_ORIGINAL_CONTENT_TYPE)
                    : headers.get(MessageHeaders.CONTENT_TYPE);
            MimeType mimeType = null;
            if (contentType instanceof MimeType) {
                mimeType = (MimeType) contentType;
            } else if (contentType instanceof String) {
                mimeType = this.mimeTypeCache.get(contentType);
                if (mimeType == null) {
                    String valueAsString = (String) contentType;
                    mimeType = MimeType.valueOf(valueAsString);
                    this.mimeTypeCache.put(valueAsString, mimeType);
                }
            }
            return mimeType;
        }
    }
}

@metalpalo
Copy link
Author

metalpalo commented Sep 20, 2021

this commit replacing resolver also for all custom converters with default one:
spring-cloud/spring-cloud-stream@cce66f3#diff-262c0de1002df243e36569ab0fcb1b8590a04fac57bc91a377221cb5e4ee7ee4

@olegz Why this was changed? If I good understood, Im not able to define custom converter with custom contentType resolver now?

@sobychacko
Copy link
Contributor

@metalpalo Could you create a small reproducible sample and share that with us? Thank you!

@sobychacko
Copy link
Contributor

@metalpalo Closing the issue now. Feel free to re-open with a reproducible sample if you are still facing issues with this.

@metalpalo
Copy link
Author

Sorry for late answer, I was very busy.
I start thinking if possible to create some example, because I dont have any example avro object.

But question was simple. Why you reset contentType resolver on custom converters?
Good example is AvroSchemaRegistryClientMessageConverter, which support chunks like
contentType="application/octet-stream" originalContentType="application/vnd.dummymessage.v1+avro"
via own OriginalContentTypeResolver.

But in commit mentioned above it was reset to DefaultContentTypeResolver.

@sobychacko sobychacko reopened this Oct 12, 2021
@sobychacko
Copy link
Contributor

@metalpalo Could you point which commit made that change? It's not clear where that change is made, also where do you see DefaultContentTypeResolver? Seems like this is something that needs to be further debugged. If it happens with any custom converters, it may be able to be easily produced (A sample app where we can see the problem will help a lot - you may not need the AVRO converter if it happens with other custom converters). Another thing - the Spring Cloud Schema Registry project is no longer actively maintained, therefore you may not want to rely on AvroSchemaRegistryClientMessageConverter going forward. Are you using Spring Cloud Schema Registry? Kafka binder can support other schema registries such as the one from Confluent.

@metalpalo
Copy link
Author

@sobychacko
Regarding the example, maybe original content type is used only for some binary messages, I can check later if json contentype is wrapped too, but Im not sure.

I use AvroSchemaRegistryClientMessageConverter from library, verrsion 1.0.13.RELEASE:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency> 

Ancestor of this class, namely AbstractAvroMessageConverter, implicitly sets OriginalContentTypeResolver as contenType resolver:

protected AbstractAvroMessageConverter(Collection<MimeType> supportedMimeTypes, AvroSchemaServiceManager manager) {
	super(supportedMimeTypes);
	setContentTypeResolver(new OriginalContentTypeResolver());
	this.avroSchemaServiceManager = manager;
}

This converter was recognized like custom one his resolver was replaced with DefaultContentTypeResolver via class CompositeMessageConverterFactory where first parameter is list of custom converters.
Before this commit, first parameter is empty ArrayList and customer converters are not modified there.
Look at following commit, class ContentTypeConfiguration :
spring-cloud/spring-cloud-stream@cce66f3#diff-262c0de1002df243e36569ab0fcb1b8590a04fac57bc91a377221cb5e4ee7ee4

@sobychacko
Copy link
Contributor

I cannot locate the class DefaultContentTypeResolver in that commit or in the codebase. Also, do we need to run the schema registry from Spring for this? In order to help you further triaging this issue, I would really like to see a small sample application with steps to reproduce the issue. Could you create one on GitHub and share it with us? Or you can send a PR with the possible fix that you think is appropriate, which we can then review.

@olegz
Copy link
Contributor

olegz commented Oct 14, 2021

@metalpalo as Soby suggested several times, please provide a reproducible sample.
The originalContentType is not used by spring-cloud-stream since 2.0, so not exactly sure what is going on here. So a small reproducible sample with only a bare minimum would help us understand the issue.

@metalpalo
Copy link
Author

metalpalo commented Oct 21, 2021

I created sample of springboot application based on
spring-cloud-starter-parent:Hoxton.SR1
spring-boot-starter-parent: 2.2.2.RELEASE
spring-cloud-stream-binder-kafka: 3.0.1.RELEASE

This application does nothing special, just logs assigned content type resolvers for founded message converter.
I added schema registry client and avro dependencies necessary for AvroMessageConverterAutoConfiguration to registry custom AvroSchemaRegistryClientMessageConverter. AvroSchemaRegistryClientMessageConverter has assigned OriginalContentTypeResolver implicitly.

logs from SR1 are following:

Converter: org.springframework.cloud.schema.registry.avro.AvroSchemaRegistryClientMessageConverter@63cd2cd2 -> ContentTypeResolver: org.springframework.cloud.schema.registry.avro.OriginalContentTypeResolver@44aa2e13
Converter: org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@4b55ff0a -> ContentTypeResolver: DefaultContentTypeResolver[defaultMimeType=application/json]
Converter: org.springframework.cloud.stream.converter.CompositeMessageConverterFactory$1@256a0d95 -> ContentTypeResolver: DefaultContentTypeResolver[defaultMimeType=application/json]
Converter: org.springframework.cloud.stream.converter.ObjectStringMessageConverter@2f3928ac -> ContentTypeResolver: DefaultContentTypeResolver[defaultMimeType=application/json]

logs from SR2 are following:

Converter: org.springframework.cloud.schema.registry.avro.AvroSchemaRegistryClientMessageConverter@6de6faa6 -> ContentTypeResolver: DefaultContentTypeResolver[defaultMimeType=application/json]
Converter: org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@1625789b -> ContentTypeResolver: DefaultContentTypeResolver[defaultMimeType=application/json]
Converter: org.springframework.cloud.stream.converter.CompositeMessageConverterFactory$1@5a1dddba -> ContentTypeResolver: DefaultContentTypeResolver[defaultMimeType=application/json]
Converter: org.springframework.cloud.stream.converter.ObjectStringMessageConverter@c4d2c44 -> ContentTypeResolver: DefaultContentTypeResolver[defaultMimeType=application/json]

So SR2 replace implicit content type resolver with default one. Everything is done in mentioned class ContentTypeConfiguration:
spring-cloud/spring-cloud-stream@cce66f3#diff-262c0de1002df243e36569ab0fcb1b8590a04fac57bc91a377221cb5e4ee7ee4

AvroSchemaRegistryClientMessageConverter@63cd2cd2 -> OriginalContentTypeResolver@44aa2e13
vs
AvroSchemaRegistryClientMessageConverter@6de6faa6 -> DefaultContentTypeResolver

I think this will have impact also on another custom message converters.

ResolverTypeProject.zip

@sobychacko
Copy link
Contributor

@metalpalo Thanks for the sample. Why are you using a very old version of spring-cloud here? Even in the Hoxton line, the latest is SR11. You are testing with SR1/SR2. However, this is not the problem you are running into. My guess is that even if you update to a later version, you might see the same issues.

Here are my suggestions. Update to the latest Hoxton if you can or better yet, the latest 3.1 version of SCSt (spring-cloud 2020.0.x). You are using AvroSchemaRegistryClientMessageConverter which is supposed to work with Spring's schema registry, but it seems like you are using Confluent schema registry. We don't recommend using this message converter against Confluent schema registry any longer. If you are using Confluent's schema registry, consider using the AVRO serializers from Kafka/Confluent directly, and make sure to use useNativeEncoding/useNativeDecoding to true on the Spring Cloud Stream side so that it doesn't try to convert the messages.

Do you have a strong reason to use AvroSchemaRegistryClientMessageConverter against Confluent Schema Registry? If that is the case, I think the best course of action is to patch things on your end rather than making changes on the framework side.

Here is a sample that demonstrates SCSt + Confluent Schema Registry + AVRO: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/schema-registry-samples/schema-registry-confluent-avro-serializer

Please update how you want to proceed with this issue so that we can close it or take further actions. Thanks!

@metalpalo
Copy link
Author

@sobychacko I used last Hoxton version for this moment, namely SR12 and same problem. I just wanted to illustrate where/when the problem started, that means changes between SR1 ->SR2.

Regarding your suggested upgrades to newer SCS, using avro serializers from Kafka/Confluent and so on...
It must be first analyzed, if possible and not broken compatibilities with other microservices producer/consumers.
Probably for this moment I will keep my patch class and we will see later.
Further, thanks for example, I will check it,

But in summary, I wanted to focus on general problem. What if somebody implement custom message converter with own resolver and releases >= SR2 will override it to default one?

Thanks

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Development

No branches or pull requests

3 participants