Skip to content

Commit

Permalink
new-tx-i: new Tx interface pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Feb 15, 2024
1 parent 51c3007 commit ab3eaf1
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -288,13 +291,45 @@ public void deferFinallyRollbackRetryable() {

@Test
public void deferFinallyNotInTxContext() {
db.tx(() -> Tx.Current.get().deferFinally(() -> assertFalse(Tx.Current.exists())));
db.txC(tx -> tx.deferFinally(() -> assertFalse(Tx.Current.exists())));
}

@Test
public void myTest() {
db.txC(tx -> {
// new. Usefull because you see that table depends on tx;
Db1.project(tx).findAll();

// new. Usefull because you can initialize db one time in tx and use call table without pass tx any time
Db2.of(tx).project().findAll();

// old
db.projects().findAll();
});
}

// Examples of new DB patterns. User can use one or both;

@AllArgsConstructor(access = AccessLevel.PRIVATE)
final static class Db1 {
public static TestEntityOperations.ProjectTable project(Tx tx) {
return new TestEntityOperations.ProjectTable(tx.table(Project.class));
}
}

@RequiredArgsConstructor(staticName = "of")
final static class Db2 {
private final Tx tx;

TestEntityOperations.ProjectTable project() {
return new TestEntityOperations.ProjectTable(tx.table(Project.class));
}
}

@Test
public void deferFinallyRollbackNotInTxContext() {
assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> db.tx(() -> {
Tx.Current.get().deferFinally(() -> assertFalse(Tx.Current.exists()));
assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> db.txC(tx -> {
tx.deferFinally(() -> assertFalse(Tx.Current.exists()));
throw new RuntimeException();
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -181,23 +182,25 @@ public ScanBuilder scan() {

@Override
public void tx(Runnable runnable) {
tx(() -> {
runnable.run();
return null;
});
txC(__ -> runnable.run());
}

@Override
public <T> T tx(Supplier<T> supplier) {
return tx(__ -> supplier.get());
}

@Override
public <T> T tx(Function<Tx, T> func) {
if (name == null) {
return withGeneratedNameAndLine().tx(supplier);
return withGeneratedNameAndLine().tx(func);
}

checkSeparatePolicy(separatePolicy, name);
return txImpl(supplier);
return txImpl(func);
}

private <T> T txImpl(Supplier<T> supplier) {
private <T> T txImpl(Function<Tx, T> func) {
RetryableException lastRetryableException = null;
TxImpl lastTx = null;
try (Timer ignored = totalDuration.labels(name).startTimer()) {
Expand All @@ -207,7 +210,7 @@ private <T> T txImpl(Supplier<T> supplier) {
T result;
try (var ignored1 = attemptDuration.labels(name).startTimer()) {
lastTx = new TxImpl(name, repository.startTransaction(options), options);
result = runAttempt(supplier, lastTx);
result = runAttempt(lastTx, func);
}

if (options.isDryRun()) {
Expand Down Expand Up @@ -258,11 +261,11 @@ private String getExceptionNameForMetric(RetryableException e) {
return Strings.removeSuffix(e.getClass().getSimpleName(), "Exception");
}

private <T> T runAttempt(Supplier<T> supplier, TxImpl tx) {
private <T> T runAttempt(TxImpl tx, Function<Tx, T> func) {
try (var ignored2 = MDC.putCloseable("tx", formatTx());
var ignored3 = MDC.putCloseable("tx-id", formatTxId());
var ignored4 = MDC.putCloseable("tx-name", formatTxName(false))) {
return tx.run(supplier);
return tx.run(func);
}
}

Expand Down
4 changes: 4 additions & 0 deletions repository/src/main/java/tech/ydb/yoj/repository/db/Tx.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import java.util.function.Supplier;

public interface Tx {
default <T extends Entity<T>> Table<T> table(Class<T> cls) {
return getRepositoryTransaction().table(cls);
}

void defer(Runnable runnable);

void deferFinally(Runnable runnable);
Expand Down
10 changes: 5 additions & 5 deletions repository/src/main/java/tech/ydb/yoj/repository/db/TxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import java.util.function.Function;

final class TxImpl implements Tx {
private static final Logger log = LoggerFactory.getLogger(TxImpl.class);
Expand All @@ -32,10 +32,10 @@ public TxImpl(String name, RepositoryTransaction repositoryTransaction, TxOption
this.logStatementOnSuccess = options.isLogStatementOnSuccess();
}

<R> R run(Supplier<R> supplier) {
<R> R run(Function<Tx, R> func) {
R value;
try {
value = Current.runInTx(this, () -> runImpl(supplier));
value = Current.runInTx(this, () -> runImpl(func));
} catch (Exception e) {
if (Interrupts.isInterruptException(e)) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -72,11 +72,11 @@ public void deferBeforeCommit(Runnable runnable) {
deferredBeforeCommit.add(runnable);
}

private <R> R runImpl(Supplier<R> supplier) {
private <R> R runImpl(Function<Tx, R> func) {
Stopwatch sw = Stopwatch.createStarted();
R res;
try {
res = supplier.get();
res = func.apply(this);
deferredBeforeCommit.forEach(Runnable::run);
} catch (Throwable t) {
doRollback(isBusinessException(t),
Expand Down
11 changes: 11 additions & 0 deletions repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import tech.ydb.yoj.repository.db.exception.RetryableException;

import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public interface TxManager {
Expand Down Expand Up @@ -148,6 +150,15 @@ default TxManager noLogging() {
*/
void tx(Runnable runnable);

<T> T tx(Function<Tx, T> supplier);

default void txC(Consumer<Tx> consumer) {
tx(tx -> {
consumer.accept(tx);
return null;
});
}

/**
* Start a transaction-like session of read-only statements. Each statement will be executed <em>separately</em>,
* with the specified isolation level (online consistent read-only, by default).
Expand Down

0 comments on commit ab3eaf1

Please sign in to comment.