There are two basic types of locking mechanisms used in NoSQL storages for handling concurrent writes — optimistic and pessimistic. Pessimistic locking mechanism is based on acquiring/releasing write lock on each write operation. Optimistic locking mechanism is based on entity versioning — to make it short you can save entity only with the same version that exists in the storage. See a pseudocode:
do { D document = storage.get(key) document = update(document) storage.save(document) } while (version already modified)
Optimistic locking can be enabled for the document via adding @Version
field in the document. When saving such document into the storage — you will either get success or an exception OptimisticLockingFailureException.class
. This exception is thrown when version
field in your document is not the same as in the storage. So to save the document — you will need to enable retries for OptimisticLockingFailureException.class
.
In this example we are going to use a simple document that will store movies already watched by user:
package com.example.demo.persistence.optimisticlocking;
import lombok.Builder;
import lombok.Singular;
import lombok.Value;
import lombok.experimental.NonFinal;
import org.springframework.data.aerospike.mapping.Document;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
import java.util.List;
@Value
@Builder(toBuilder = true)
@Document
public class WatchedMoviesDocument {
@Id
String key;
@Singular
List<String> watchedMovies;
@NonFinal
@Version // (1)
Long version;
}
Document explained:
-
@Version
enables optimistic locking, so that concurrent updates are not lost when saving an entity.
Next we need to have a simple repository:
package com.example.demo.persistence.optimisticlocking;
import org.springframework.data.repository.CrudRepository;
public interface WatchedMoviesDocumentRepository extends CrudRepository<WatchedMoviesDocument, String> {
}
The next part is the most interesting as it contains the update logic and retries for handling optimistic lock errors:
package com.example.demo.persistence.optimisticlocking;
import lombok.RequiredArgsConstructor;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.dao.TransientDataAccessResourceException;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
@RequiredArgsConstructor
public class WatchedMoviesOperations {
private final WatchedMoviesDocumentRepository repository;
@Retryable( // (1)
include = { // (2)
QueryTimeoutException.class,
TimeoutException.class,
TransientDataAccessResourceException.class,
OptimisticLockingFailureException.class // (3)
},
maxAttempts = 5,
backoff = @Backoff(
delay = 3,
multiplier = 2,
random = true
)
)
public void addWatchedMovie(String id, String newWatchedMovie) {
WatchedMoviesDocument watchedMoviesDocument = repository.findById(id) // (4)
.map(existingDocument -> updateExistingDocument(existingDocument, newWatchedMovie)) // (5)
.orElseGet(() -> createNewDocumentWithMovie(id, newWatchedMovie)); // (6)
repository.save(watchedMoviesDocument); // (7)
}
private WatchedMoviesDocument createNewDocumentWithMovie(String id, String newWatchedMovie) {
return WatchedMoviesDocument.builder()
.key(id)
.watchedMovie(newWatchedMovie)
.build();
}
private WatchedMoviesDocument updateExistingDocument(WatchedMoviesDocument existingDocument, String newWatchedMovie) {
// NOTE: we do not create new document here, but only update existing while retaining the version
return existingDocument.toBuilder()
.watchedMovie(newWatchedMovie)
.build();
}
public List<String> getWatchedMovies(String id) {
return repository.findById(id)
.map(WatchedMoviesDocument::getWatchedMovies)
.orElseGet(Collections::emptyList);
}
}
WatchedMoviesOperations
explained:
-
@Retryable
enables retries on the method. For more details on@Retryable
refer to: Basic error handling. -
include
specifies exception types that should be retried. -
OptimisticLockingFailureException.class
enables retries for this exception type. This means that when you have a failing concurrent update — it will be retried. -
repository.findById(id)
gets document from the storage. -
.map(existingDocument → updateExistingDocument(existingDocument, newWatchedMovie))
updates existing document according to the requirements. In current caseupdateExistingDocument
will only addnewWatchedMovie
to the list ofwatchedMovies
. Note, that we are using Lombok’s.toBuilder()
method — it copies existing document without any modifications, this is important because we need to leaveversion
and other fields of the document as is; and after that — we add onlynewWatchedMovie
to the document. -
.orElseGet) → createNewDocumentWithMovie(id, newWatchedMovie
creates new document if there is no document in the storage for the given key. -
repository.save(watchedMoviesDocument)
saves new/updated document with all modifications in the storage.
Concurrent test is not trivial, but is required for checking concurrent behavior:
package com.example.demo.persistence.optimisticlocking;
import com.example.demo.DemoApplicationTests;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
public class WatchedMoviesOperationsConcurrentTest extends DemoApplicationTests {
private int NUMBER_OF_THREADS = 5;
private int NUMBER_OF_TASKS = 10;
private CountDownLatch latch = new CountDownLatch(1);
private ExecutorService executor;
@Autowired
WatchedMoviesOperations watchedMoviesOperations;
@BeforeEach
public void before() {
executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
}
@AfterEach
public void after() {
executor.shutdownNow();
}
@Test
void addWatchedMovie_addsMoviesConcurrently() throws Exception {
String id = "age::" + UUID.randomUUID();
runTasksAndWaitForCompletion(() -> watchedMoviesOperations.addWatchedMovie(id, "Movie " + UUID.randomUUID()));
List<String> watchedMovies = watchedMoviesOperations.getWatchedMovies(id);
assertThat(watchedMovies).hasSize(NUMBER_OF_TASKS);
}
@RequiredArgsConstructor
private class LatchAwareTask implements Runnable {
private final Runnable task;
@Override
public void run() {
try {
latch.await();
task.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private void runTasksAndWaitForCompletion(Runnable runnable) throws Exception {
//submit tasks
CompletableFuture[] futures = range(0, NUMBER_OF_TASKS)
.mapToObj(index -> new LatchAwareTask(runnable))
.map(task -> CompletableFuture.runAsync(task, executor))
.toArray(CompletableFuture[]::new);
//start all tasks simultaneously
latch.countDown();
//wait for completion
CompletableFuture.allOf(futures).get(10, TimeUnit.SECONDS);
}
}