Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.WindowStore;
Expand Down Expand Up @@ -61,7 +62,9 @@ public void close() {
}

static StateStore wrapWithReadWriteStore(final StateStore store) {
if (store instanceof TimestampedKeyValueStore) {
if (store instanceof TimestampedKeyValueStoreWithHeaders) {
return new TimestampedKeyValueStoreReadWriteDecoratorWithHeaders<>((TimestampedKeyValueStoreWithHeaders<?, ?>) store);
} else if (store instanceof TimestampedKeyValueStore) {
return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store);
} else if (store instanceof VersionedKeyValueStore) {
return new VersionedKeyValueStoreReadWriteDecorator<>((VersionedKeyValueStore<?, ?>) store);
Expand Down Expand Up @@ -326,4 +329,13 @@ public KeyValueIterator<Windowed<K>, AGG> fetch(final K keyFrom,
return wrapped().fetch(keyFrom, keyTo);
}
}

static class TimestampedKeyValueStoreReadWriteDecoratorWithHeaders<K, V>
extends KeyValueStoreReadWriteDecorator<K, ValueTimestampHeaders<V>>
implements TimestampedKeyValueStoreWithHeaders<K, V> {

TimestampedKeyValueStoreReadWriteDecoratorWithHeaders(final TimestampedKeyValueStoreWithHeaders<K, V> inner) {
super(inner);
}
}
}
25 changes: 23 additions & 2 deletions streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public final class Stores {
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
Objects.requireNonNull(name, "name cannot be null");
return new RocksDBKeyValueBytesStoreSupplier(name, false);
return new RocksDBKeyValueBytesStoreSupplier(name, false, false);
}

/**
Expand All @@ -113,7 +113,28 @@ public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String na
*/
public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name) {
Objects.requireNonNull(name, "name cannot be null");
return new RocksDBKeyValueBytesStoreSupplier(name, true);
return new RocksDBKeyValueBytesStoreSupplier(name, true, false);
}

}

/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a
* {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link KeyValueStore} or a {@link VersionedKeyValueStore}
* you should use {@link #KeyValueStore(String)} or
* {@link #persistentVersionedKeyValueStore(String, Duration)}, respectively,
* to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a persistent key-(headers/timestamp/value) store
*/
public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStoreWithHeaders(final String name) {
Objects.requireNonNull(name, "name cannot be null");
return new RocksDBKeyValueBytesStoreSupplier(name, true, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ public class RocksDBKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupp

private final String name;
private final boolean returnTimestampedStore;
private final boolean returnHeadersStore;

public RocksDBKeyValueBytesStoreSupplier(final String name,
final boolean returnTimestampedStore) {
final boolean returnTimestampedStore,
final boolean returnHeadersStore) {
this.name = name;
this.returnTimestampedStore = returnTimestampedStore;
this.returnHeadersStore = returnHeadersStore;
}

@Override
Expand All @@ -38,7 +41,9 @@ public String name() {

@Override
public KeyValueStore<Bytes, byte[]> get() {
return returnTimestampedStore ?
return returnHeadersStore ?
new RocksDBTimestampedStoreWithHeaders(name, metricsScope()) :
returnTimestampedStore ?
new RocksDBTimestampedStore(name, metricsScope()) :
new RocksDBStore(name, metricsScope());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.ValueTimestampHeaders;

import java.util.List;
import java.util.Objects;

/**
* Builder for {@link TimestampedKeyValueStoreWithHeaders} instances.
*
* This is analogous to {@link TimestampedKeyValueStoreBuilder}, but uses
* {@link ValueTimestampHeaders} as the value wrapper and wires up the
* header-aware store stack (change-logging, caching, metering).
*/
public class TimestampedKeyValueStoreBuilderWithHeaders<K, V>
extends AbstractStoreBuilder<K, ValueTimestampHeaders<V>, TimestampedKeyValueStoreWithHeaders<K, V>> {

private final KeyValueBytesStoreSupplier storeSupplier;

public TimestampedKeyValueStoreBuilderWithHeaders(final KeyValueBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
super(
storeSupplier.name(),
keySerde,
valueSerde == null ? null : new ValueTimestampHeadersSerde<>(valueSerde),
time
);
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null");
this.storeSupplier = storeSupplier;
}

@Override
public TimestampedKeyValueStoreWithHeaders<K, V> build() {
KeyValueStore<Bytes, byte[]> store = storeSupplier.get();

if (!(store instanceof HeadersBytesStore)) {
if (store.persistent()) {
store = new TimestampedToHeadersStoreAdapter(store);
} else {
store = new InMemoryTimestampedKeyValueStoreWithHeadersMarker(store);
}
}

return new MeteredTimestampedKeyValueStoreWithHeaders<>(
maybeWrapCaching(maybeWrapLogging(store)),
storeSupplier.metricsScope(),
time,
keySerde,
valueSerde
);
}

private KeyValueStore<Bytes, byte[]> maybeWrapCaching(final KeyValueStore<Bytes, byte[]> inner) {
if (!enableCaching) {
return inner;
}
return new CachingKeyValueStore(inner, true);
}

private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final KeyValueStore<Bytes, byte[]> inner) {
if (!enableLogging) {
return inner;
}
return new ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders(inner);
}

private static final class InMemoryTimestampedKeyValueStoreWithHeadersMarker
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, Bytes, byte[]>
implements KeyValueStore<Bytes, byte[]>, HeadersBytesStore {

private InMemoryTimestampedKeyValueStoreWithHeadersMarker(final KeyValueStore<Bytes, byte[]> wrapped) {
super(wrapped);
if (wrapped.persistent()) {
throw new IllegalArgumentException("Provided store must not be a persistent store, but it is.");
}
}

@Override
public void put(final Bytes key,
final byte[] value) {
wrapped().put(key, value);
}

@Override
public byte[] putIfAbsent(final Bytes key,
final byte[] value) {
return wrapped().putIfAbsent(key, value);
}

@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
}

@Override
public byte[] delete(final Bytes key) {
return wrapped().delete(key);
}

@Override
public byte[] get(final Bytes key) {
return wrapped().get(key);
}

@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
throw new UnsupportedOperationException("Range queries are not supported by in-memory timestamped key-value stores with headers");
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
final Bytes to) {
throw new UnsupportedOperationException("Range queries are not supported by in-memory timestamped key-value stores with headers");
}

@Override
public KeyValueIterator<Bytes, byte[]> all() {
throw new UnsupportedOperationException("Range queries are not supported by in-memory timestamped key-value stores with headers");
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseAll() {
throw new UnsupportedOperationException("Range queries are not supported by in-memory timestamped key-value stores with headers");
}

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
final PS prefixKeySerializer) {
throw new UnsupportedOperationException("Range queries are not supported by in-memory timestamped key-value stores with headers");
}

@Override
public long approximateNumEntries() {
return wrapped().approximateNumEntries();
}

@Override
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {

throw new UnsupportedOperationException("Queries are not supported by in-memory timestamped key-value stores with headers");
}

@Override
public boolean persistent() {
return false;
}
}
}
Loading
Loading