@@ -23,8 +23,6 @@ public static class KafkaClientExtensions
2323 private const string KafkaContentTypeAttributeName = "content-type" ;
2424 private const string SpecVersionKafkaHeader = KafkaHeaderPrefix + "specversion" ;
2525
26- // TODO: Avoid all the byte[] -> string conversions? If we didn't care about case-sensitivity, we could prepare byte arrays to perform comparisons with.
27-
2826 /// <summary>
2927 /// Indicates whether this message holds a single CloudEvent.
3028 /// </summary>
@@ -35,7 +33,7 @@ public static class KafkaClientExtensions
3533 /// <returns>true, if the request is a CloudEvent</returns>
3634 public static bool IsCloudEvent ( this Message < string , byte [ ] > message ) =>
3735 GetHeaderValue ( message , SpecVersionKafkaHeader ) is object ||
38- MimeUtilities . IsCloudEventsContentType ( ExtractContentType ( message ) ) ;
36+ MimeUtilities . IsCloudEventsContentType ( GetHeaderValue ( message , KafkaContentTypeAttributeName ) ) ;
3937
4038 /// <summary>
4139 /// Converts this Kafka message into a CloudEvent object.
@@ -66,7 +64,7 @@ public static CloudEvent ToCloudEvent(this Message<string, byte[]> message,
6664 throw new InvalidOperationException ( ) ;
6765 }
6866
69- var contentType = ExtractContentType ( message ) ;
67+ var contentType = GetHeaderValue ( message , KafkaContentTypeAttributeName ) ;
7068
7169 CloudEvent cloudEvent ;
7270
@@ -78,11 +76,10 @@ public static CloudEvent ToCloudEvent(this Message<string, byte[]> message,
7876 else
7977 {
8078 // Binary mode
81- if ( ! ( GetHeaderValue ( message , SpecVersionKafkaHeader ) is byte [ ] versionIdBytes ) )
79+ if ( ! ( GetHeaderValue ( message , SpecVersionKafkaHeader ) is string versionId ) )
8280 {
8381 throw new ArgumentException ( "Request is not a CloudEvent" ) ;
8482 }
85- string versionId = Encoding . UTF8 . GetString ( versionIdBytes ) ;
8683 CloudEventsSpecVersion version = CloudEventsSpecVersion . FromVersionId ( versionId )
8784 ?? throw new ArgumentException ( $ "Unknown CloudEvents spec version '{ versionId } '", nameof ( message ) ) ;
8885
@@ -115,12 +112,6 @@ public static CloudEvent ToCloudEvent(this Message<string, byte[]> message,
115112 return Validation . CheckCloudEventArgument ( cloudEvent , nameof ( message ) ) ;
116113 }
117114
118- private static string ExtractContentType ( Message < string , byte [ ] > message )
119- {
120- var headerValue = GetHeaderValue ( message , KafkaContentTypeAttributeName ) ;
121- return headerValue is null ? null : Encoding . UTF8 . GetString ( headerValue ) ;
122- }
123-
124115 private static void InitPartitioningKey ( Message < string , byte [ ] > message , CloudEvent cloudEvent )
125116 {
126117 if ( ! string . IsNullOrEmpty ( message . Key ) )
@@ -129,9 +120,13 @@ private static void InitPartitioningKey(Message<string, byte[]> message, CloudEv
129120 }
130121 }
131122
132- private static byte [ ] GetHeaderValue ( MessageMetadata message , string headerName ) =>
133- message . Headers . FirstOrDefault ( x => string . Equals ( x . Key , headerName , StringComparison . InvariantCultureIgnoreCase ) )
134- ? . GetValueBytes ( ) ;
123+ /// <summary>
124+ /// Returns the last header value with the given name, decoded using UTF-8, or null if there is no such header.
125+ /// </summary>
126+ private static string GetHeaderValue ( MessageMetadata message , string headerName ) =>
127+ Validation . CheckNotNull ( message , nameof ( message ) ) . Headers is null
128+ ? null
129+ : message . Headers . TryGetLastBytes ( headerName , out var bytes ) ? Encoding . UTF8 . GetString ( bytes ) : null ;
135130
136131 /// <summary>
137132 /// Converts a CloudEvent to a Kafka message.
0 commit comments