diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java index 9c682ffd..fd26969e 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java @@ -7,6 +7,7 @@ import tech.ydb.yoj.databind.expression.OrderExpression; import tech.ydb.yoj.databind.schema.ObjectSchema; import tech.ydb.yoj.databind.schema.Schema; +import tech.ydb.yoj.repository.db.TableUtils; import tech.ydb.yoj.repository.db.Entity; import tech.ydb.yoj.repository.db.EntityExpressions; import tech.ydb.yoj.repository.db.EntityIdSchema; @@ -184,6 +185,11 @@ public V find(Class viewType, Entity.Id id) { return transaction.doInTransaction("find(" + id + ")", type, shard -> shard.find(id, viewType)); } + @Override + public > List find(Set ids) { + return TableUtils.find(transaction.getTransactionLocal(), this, ids); + } + @Override @SuppressWarnings("unchecked") public > List find(Range range) { @@ -534,11 +540,6 @@ private > boolean readTableFilter(T e, ReadTableParams V find(Class viewType, Entity.Id id) { return res.isEmpty() ? null : res.get(0); } + @Override + public > List find(Set ids) { + return TableUtils.find(executor.getTransactionLocal(), this, ids); + } + @Override public > List find(Range range) { return postLoad(executor.execute(YqlStatement.findRange(type, range), range)); @@ -461,11 +466,6 @@ public > void migrate(ID id) { executor.getTransactionLocal().projectionCache().save(entityToSave); } - @Override - public FirstLevelCache getFirstLevelCache() { - return executor.getTransactionLocal().firstLevelCache(); - } - @Override @NonNull public T postLoad(T e) { diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java index ae58f271..7477ed11 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java @@ -7,6 +7,7 @@ import lombok.NonNull; import tech.ydb.yoj.databind.expression.FilterExpression; import tech.ydb.yoj.databind.expression.OrderExpression; +import tech.ydb.yoj.repository.db.TableUtils; import tech.ydb.yoj.repository.db.Entity; import tech.ydb.yoj.repository.db.Entity.Id; import tech.ydb.yoj.repository.db.EntityIdSchema; @@ -16,7 +17,6 @@ import tech.ydb.yoj.repository.db.Tx; import tech.ydb.yoj.repository.db.ViewSchema; import tech.ydb.yoj.repository.db.bulk.BulkParams; -import tech.ydb.yoj.repository.db.cache.FirstLevelCache; import tech.ydb.yoj.repository.db.cache.TransactionLocal; import tech.ydb.yoj.repository.db.readtable.ReadTableParams; import tech.ydb.yoj.repository.db.statement.Changeset; @@ -235,6 +235,11 @@ public V find(Class viewType, Entity.Id id) { return res.isEmpty() ? null : res.get(0); } + @Override + public > List find(Set ids) { + return TableUtils.find(executor.getTransactionLocal(), this, ids); + } + @Override public > List find(Range range) { return postLoad(executor.execute(YqlStatement.findRange(type, range), range)); @@ -461,11 +466,6 @@ public > void migrate(ID id) { executor.getTransactionLocal().projectionCache().save(entityToSave); } - @Override - public FirstLevelCache getFirstLevelCache() { - return executor.getTransactionLocal().firstLevelCache(); - } - @Override @NonNull public T postLoad(T e) { diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/Table.java b/repository/src/main/java/tech/ydb/yoj/repository/db/Table.java index 10d8e7df..a2140eea 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/Table.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/Table.java @@ -1,11 +1,9 @@ package tech.ydb.yoj.repository.db; -import com.google.common.collect.Sets; import lombok.NonNull; import tech.ydb.yoj.databind.expression.FilterExpression; import tech.ydb.yoj.databind.expression.OrderExpression; import tech.ydb.yoj.repository.db.bulk.BulkParams; -import tech.ydb.yoj.repository.db.cache.FirstLevelCache; import tech.ydb.yoj.repository.db.list.ListRequest; import tech.ydb.yoj.repository.db.list.ListResult; import tech.ydb.yoj.repository.db.list.ViewListResult; @@ -15,8 +13,6 @@ import javax.annotation.CheckForNull; import javax.annotation.Nullable; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -26,7 +22,6 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; import static java.util.stream.Stream.concat; public interface Table> { @@ -43,6 +38,8 @@ public interface Table> { V find(Class viewType, Entity.Id id); + > List find(Set ids); + > List find(Range range); > List findIds(Range range); @@ -158,10 +155,6 @@ default > Stream readTableIds() { return readTableIds(ReadTableParams.getDefault()); } - default FirstLevelCache getFirstLevelCache() { - return null; - } - @NonNull default T find(Entity.Id id, Supplier throwIfAbsent) throws X { T found = find(id); @@ -270,54 +263,6 @@ default ViewListResult list(Class viewType, List return ViewListResult.forPage(request, viewType, nextPage); } - default > List find(Set ids) { - if (ids.isEmpty()) { - return List.of(); - } - - var orderBy = EntityExpressions.defaultOrder(getType()); - var cache = getFirstLevelCache(); - var isPartialIdMode = ids.iterator().next().isPartial(); - - var foundInCache = ids.stream() - .filter(cache::containsKey) - .map(cache::peek) - .flatMap(Optional::stream) - .collect(Collectors.toMap(Entity::getId, Function.identity())); - var remainingIds = Sets.difference(ids, foundInCache.keySet()); - var foundInDb = findUncached(remainingIds, null, orderBy, null); - - var merged = new HashMap, T>(); - - // some entries found in db with partial id query may already be in cache (after update/delete), - // so we must return actual entries from cache - for (var entry : foundInDb) { - var id = entry.getId(); - if (cache.containsKey(id)) { - var cached = cache.peek(id); - cached.ifPresent(t -> merged.put(id, t)); - // not present means marked as deleted in cache - } else { - merged.put(id, this.postLoad(entry)); - } - } - - // add entries found in cache and not fetched from db - for (var pair : foundInCache.entrySet()) { - var id = pair.getKey(); - var entry = pair.getValue(); - merged.put(id, entry); - } - - if (!isPartialIdMode) { - Set> foundInDbIds = foundInDb.stream().map(Entity::getId).collect(toSet()); - Set> foundInCacheIds = new HashSet<>(foundInCache.keySet()); - Sets.difference(Sets.difference(ids, foundInDbIds), foundInCacheIds).forEach(cache::putEmpty); - } - - return merged.values().stream().sorted(EntityIdSchema.SORT_ENTITY_BY_ID).collect(Collectors.toList()); - } - default void bulkUpsert(List input, BulkParams params) { throw new UnsupportedOperationException(); } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/TableUtils.java b/repository/src/main/java/tech/ydb/yoj/repository/db/TableUtils.java new file mode 100644 index 00000000..ea2fb0c1 --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/TableUtils.java @@ -0,0 +1,68 @@ +package tech.ydb.yoj.repository.db; + +import com.google.common.collect.Sets; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import tech.ydb.yoj.repository.db.cache.TransactionLocal; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toSet; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class TableUtils { + public static , ID extends Entity.Id> List find( + TransactionLocal transactionLocal, Table table, Set ids) { + if (ids.isEmpty()) { + return List.of(); + } + + var orderBy = EntityExpressions.defaultOrder(table.getType()); + var cache = transactionLocal.firstLevelCache(); + var isPartialIdMode = ids.iterator().next().isPartial(); + + var foundInCache = ids.stream() + .filter(cache::containsKey) + .map(cache::peek) + .flatMap(Optional::stream) + .collect(Collectors.toMap(Entity::getId, Function.identity())); + var remainingIds = Sets.difference(ids, foundInCache.keySet()); + var foundInDb = table.findUncached(remainingIds, null, orderBy, null); + + var merged = new HashMap, E>(); + + // some entries found in db with partial id query may already be in cache (after update/delete), + // so we must return actual entries from cache + for (var entry : foundInDb) { + var id = entry.getId(); + if (cache.containsKey(id)) { + var cached = cache.peek(id); + cached.ifPresent(t -> merged.put(id, t)); + // not present means marked as deleted in cache + } else { + merged.put(id, table.postLoad(entry)); + } + } + + // add entries found in cache and not fetched from db + for (var pair : foundInCache.entrySet()) { + var id = pair.getKey(); + var entry = pair.getValue(); + merged.put(id, entry); + } + + if (!isPartialIdMode) { + Set> foundInDbIds = foundInDb.stream().map(Entity::getId).collect(toSet()); + Set> foundInCacheIds = new HashSet<>(foundInCache.keySet()); + Sets.difference(Sets.difference(ids, foundInDbIds), foundInCacheIds).forEach(cache::putEmpty); + } + + return merged.values().stream().sorted(EntityIdSchema.SORT_ENTITY_BY_ID).collect(Collectors.toList()); + } +}