Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running into exception while using cloudevent values in kafka streams #597

Open
supriya-albal-polestar opened this issue Oct 10, 2023 · 1 comment
Labels
needs-user-input The author of an issue or PR should provide more details

Comments

@supriya-albal-polestar
Copy link

I want to take join between a kafka stream and a ktable. The poc works fine with stream data. However, when I use CloudEvent, I keep running into some or other issue related to serialization.

Here is my code sample -

Map<String, Object> ceSerializerConfigs = new HashMap<>();
ceSerializerConfigs.put(ENCODING_CONFIG, Encoding.STRUCTURED);
ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);

CloudEventSerializer serializer = new CloudEventSerializer();
serializer.configure(ceSerializerConfigs, false);

CloudEventDeserializer deserializer = new CloudEventDeserializer();
deserializer.configure(ceSerializerConfigs, false);
Serde<CloudEvent> cloudEventSerde = Serdes.serdeFrom(serializer, deserializer);
KStream<String, CloudEvent> kStream = builder.stream("stream-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KTable<String, CloudEvent> kTable = builder.table("ktable-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KStream<String, CloudEvent> joined = kStream
    .join(kTable, (left, right) -> CloudEventBuilder.v1().withId(left.getId().concat(right.getId())).build());
joined.to(output, Produced.with(Serdes.String(), eventsSerde));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamProps);
kafkaStreams.start();

I also tried using WrapperSerde - Issue with configuring Serdes for Kafka Streams

However I keep running into exception -

   18:12:08.691 [basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000002, topic=cloudevent-ktable, partition=0, offset=80, stacktrace=java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent)
 
     Caused by: java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent) at io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] at io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) ~[kafka-streams-2.8.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:73) ~[kafka-streams-2.8.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:30) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:200) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:200) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:122) ~[kafka-streams-2.8.0.jar:?]
     18:12:08.691 [basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000002, topic=cloudevent-ktable, partition=0, offset=80, stacktrace=java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent)

Caused by: java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent) at io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] at io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) ~[kafka-streams-2.8.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:73) ~[kafka-streams-2.8.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:30) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:200) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:200) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120) ~[kafka-streams-2.8.0.jar:?] at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:122) ~[kafka-streams-2.8.0.jar:?]

Am I missing anything, any help from CloudEvent team is appreciated.

@pierDipi
Copy link
Member

pierDipi commented Feb 8, 2024

The deserializer only supports the Kafka protocol binding for cloudevents as per spec https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/kafka-protocol-binding.md, I believe the records are not formatted as expected by the spec ?

@pierDipi pierDipi added the needs-user-input The author of an issue or PR should provide more details label Feb 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-user-input The author of an issue or PR should provide more details
Projects
None yet
Development

No branches or pull requests

2 participants