Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -144,6 +145,7 @@ public void logChange(final String storeName,
final Bytes key,
final byte[] value,
final long timestamp,
final Headers headers,
final Position position) {
throw new UnsupportedOperationException("this should not happen: logChange() not supported in global processor context.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -120,6 +121,7 @@ void logChange(final String storeName,
final Bytes key,
final byte[] value,
final long timestamp,
final Headers headers,
final Position position);

String changelogFor(final String storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
Expand Down Expand Up @@ -124,20 +123,14 @@ public void logChange(final String storeName,
final Bytes key,
final byte[] value,
final long timestamp,
final Headers headers,
final Position position) {
throwUnsupportedOperationExceptionIfStandby("logChange");

final TopicPartition changelogPartition = stateManager().registeredChangelogPartitionFor(storeName);

final Headers headers;
if (!consistencyEnabled) {
headers = null;
} else {
// Add the vector clock to the header part of every record
headers = new RecordHeaders();
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
headers.add(new RecordHeader(ChangelogRecordDeserializationHelper.CHANGELOG_POSITION_HEADER_KEY,
PositionSerde.serialize(position).array()));
if (consistencyEnabled) {
addVectorClockToHeaders(headers, position);
}

collector.send(
Expand All @@ -153,6 +146,12 @@ public void logChange(final String storeName,
null);
}

private void addVectorClockToHeaders(final Headers headers, final Position position) {
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
headers.add(new RecordHeader(ChangelogRecordDeserializationHelper.CHANGELOG_POSITION_HEADER_KEY,
PositionSerde.serialize(position).array()));
}

/**
* @throws StreamsException if an attempt is made to access this state store from an unknown node
* @throws UnsupportedOperationException if the current streamTask type is standby
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -52,7 +54,7 @@ private void maybeSetEvictionListener() {
if (wrapped() instanceof MemoryLRUCache) {
((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
// pass null to indicate removal
log(key, null, internalContext.recordContext().timestamp());
log(key, null, internalContext.recordContext().timestamp(), new RecordHeaders());
});
}
}
Expand All @@ -66,7 +68,7 @@ public long approximateNumEntries() {
public void put(final Bytes key,
final byte[] value) {
wrapped().put(key, value);
log(key, value, internalContext.recordContext().timestamp());
log(key, value, internalContext.recordContext().timestamp(), new RecordHeaders());
}

@Override
Expand All @@ -75,7 +77,7 @@ public byte[] putIfAbsent(final Bytes key,
final byte[] previous = wrapped().putIfAbsent(key, value);
if (previous == null) {
// then it was absent
log(key, value, internalContext.recordContext().timestamp());
log(key, value, internalContext.recordContext().timestamp(), new RecordHeaders());
}
return previous;
}
Expand All @@ -84,7 +86,7 @@ public byte[] putIfAbsent(final Bytes key,
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
log(entry.key, entry.value, internalContext.recordContext().timestamp());
log(entry.key, entry.value, internalContext.recordContext().timestamp(), new RecordHeaders());
}
}

Expand All @@ -97,7 +99,7 @@ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(
@Override
public byte[] delete(final Bytes key) {
final byte[] oldValue = wrapped().delete(key);
log(key, null, internalContext.recordContext().timestamp());
log(key, null, internalContext.recordContext().timestamp(), new RecordHeaders());
return oldValue;
}

Expand Down Expand Up @@ -128,7 +130,7 @@ public KeyValueIterator<Bytes, byte[]> reverseAll() {
return wrapped().reverseAll();
}

void log(final Bytes key, final byte[] value, final long timestamp) {
internalContext.logChange(name(), key, value, timestamp, wrapped().getPosition());
void log(final Bytes key, final byte[] value, final long timestamp, final Headers headers) {
internalContext.logChange(name(), key, value, timestamp, headers, wrapped().getPosition());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueStore;

Expand All @@ -32,9 +33,9 @@ public void put(final Bytes key, final byte[] value) {
// we need to log the full new list and thus call get() on the inner store below
// if the value is a tombstone, we delete the whole list and thus can save the get call
if (value == null) {
log(key, null, internalContext.recordContext().timestamp());
log(key, null, internalContext.recordContext().timestamp(), new RecordHeaders());
} else {
log(key, wrapped().get(key), internalContext.recordContext().timestamp());
log(key, wrapped().get(key), internalContext.recordContext().timestamp(), new RecordHeaders());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -73,13 +74,13 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Byte
@Override
public void remove(final Windowed<Bytes> sessionKey) {
wrapped().remove(sessionKey);
internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.recordContext().timestamp(), wrapped().getPosition());
internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.recordContext().timestamp(), new RecordHeaders(), wrapped().getPosition());
}

@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
wrapped().put(sessionKey, aggregate);
internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.recordContext().timestamp(), wrapped().getPosition());
internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.recordContext().timestamp(), new RecordHeaders(), wrapped().getPosition());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueStore;
Expand All @@ -35,7 +36,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey
public void put(final Bytes key,
final byte[] valueAndTimestamp) {
wrapped().put(key, valueAndTimestamp);
log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp), new RecordHeaders());
}

@Override
Expand All @@ -44,7 +45,7 @@ public byte[] putIfAbsent(final Bytes key,
final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp);
if (previous == null) {
// then it was absent
log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp), new RecordHeaders());
}
return previous;
}
Expand All @@ -54,7 +55,7 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
final byte[] valueAndTimestamp = entry.value;
log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp), new RecordHeaders());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.List;

import static org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
import static org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.rawValue;
import static org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.timestamp;

/**
* Change-logging wrapper for a timestamped key-value bytes store whose values also carry headers.
* <p>
* the header-aware serialized value format produced by {@link ValueTimestampHeadersSerializer}.
* <p>
* Semantics:
* - The inner store value format is:
* [ varint header_length ][ header_bytes ][ 8-byte timestamp ][ value_bytes ]
* - The changelog record value logged via {@code log(...)} remains just {@code value_bytes}
* (no timestamp, no headers), and the timestamp is logged separately.
*/
public class ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
extends ChangeLoggingKeyValueBytesStore {

ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders(final KeyValueStore<Bytes, byte[]> inner) {
super(inner);
}

@Override
public void put(final Bytes key,
final byte[] valueTimestampHeaders) {
wrapped().put(key, valueTimestampHeaders);
log(
key,
rawValue(valueTimestampHeaders),
valueTimestampHeaders == null
? internalContext.recordContext().timestamp()
: timestamp(valueTimestampHeaders),
headers(valueTimestampHeaders)
);
}

@Override
public byte[] putIfAbsent(final Bytes key,
final byte[] valueTimestampHeaders) {
final byte[] previous = wrapped().putIfAbsent(key, valueTimestampHeaders);
if (previous == null) {
// then it was absent
log(
key,
rawValue(valueTimestampHeaders),
valueTimestampHeaders == null
? internalContext.recordContext().timestamp()
: timestamp(valueTimestampHeaders),
headers(valueTimestampHeaders)
);
}
return previous;
}

@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
final byte[] valueTimestampHeaders = entry.value;
log(
entry.key,
rawValue(valueTimestampHeaders),
valueTimestampHeaders == null
? internalContext.recordContext().timestamp()
: timestamp(valueTimestampHeaders),
headers(valueTimestampHeaders)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.WindowStore;

Expand All @@ -37,6 +38,7 @@ void log(final Bytes key,
key,
rawValue(valueAndTimestamp),
valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.recordContext().timestamp(),
new RecordHeaders(),
wrapped().getPosition()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.VersionedBytesStore;
Expand All @@ -39,7 +41,7 @@ public class ChangeLoggingVersionedKeyValueBytesStore extends ChangeLoggingKeyVa
@Override
public long put(final Bytes key, final byte[] value, final long timestamp) {
final long validTo = inner.put(key, value, timestamp);
log(key, value, timestamp);
log(key, value, timestamp, new RecordHeaders());
return validTo;
}

Expand All @@ -51,17 +53,17 @@ public byte[] get(final Bytes key, final long asOfTimestamp) {
@Override
public byte[] delete(final Bytes key, final long timestamp) {
final byte[] oldValue = inner.delete(key, timestamp);
log(key, null, timestamp);
log(key, null, timestamp, new RecordHeaders());
return oldValue;
}

@Override
public void log(final Bytes key, final byte[] value, final long timestamp) {
@Override public void log(final Bytes key, final byte[] value, final long timestamp, final Headers headers) {
internalContext.logChange(
name(),
key,
value,
timestamp,
headers,
wrapped().getPosition()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -129,7 +130,7 @@ public void put(final Bytes key,
}

void log(final Bytes key, final byte[] value) {
internalContext.logChange(name(), key, value, internalContext.recordContext().timestamp(), wrapped().getPosition());
internalContext.logChange(name(), key, value, internalContext.recordContext().timestamp(), new RecordHeaders(), wrapped().getPosition());
}

private int maybeUpdateSeqnumForDups() {
Expand Down
Loading
Loading