diff --git a/vlib/datatypes/README.md b/vlib/datatypes/README.md index 5c1251e5681027..48420158ec7986 100644 --- a/vlib/datatypes/README.md +++ b/vlib/datatypes/README.md @@ -28,4 +28,5 @@ println(stack) - [x] Min heap (priority queue) - [x] Set - [x] Quadtree +- [x] CRDT Conflict-free replicated data type - [ ] ... diff --git a/vlib/datatypes/crdt/README.md b/vlib/datatypes/crdt/README.md new file mode 100644 index 00000000000000..09f3ed58f5d1f9 --- /dev/null +++ b/vlib/datatypes/crdt/README.md @@ -0,0 +1,160 @@ +# A V implementation of CRDTs + +The following state-based counters and sets have currently been implemented. + +## Counters + +### G-Counter + +A grow-only counter (G-Counter) can only increase in one direction. +The increment operation increases the value of current replica by 1. +The merge operation combines values from distinct replicas by taking the maximum of each replica. + +```v +import datatypes.crdt { new_gcounter } + +mut gcounter := new_gcounter() + +// We can increase the counter monotonically by 1. +gcounter.increment() + +// Twice. +gcounter.increment() + +// Or we can pass in an arbitrary delta to apply as an increment. +gcounter.increment_value(2) + +// Should print '4' as the result. +println(gcounter.value()) + +mut another_counter := new_gcounter() + +// We can merge counter between each other +another_counter.merge(gcounter) +gcounter.merge(another_counter) +assert another_counter.value() == gcounter.value() +``` + +### PN-Counter + +A positive-negative counter (PN-Counter) is a CRDT that can both increase or +decrease and converge correctly in the light of commutative +operations. Both `.increment()` and `.decrement()` operations are allowed and thus +negative values are possible as a result. + +```v +import datatypes.crdt { new_pncounter } + +mut pncounter := new_pncounter() + +// We can increase the counter by 1. +pncounter.increment() + +// Or more. +pncounter.increment_value(100) + +// And similarly decrease its value by 1. +pncounter.decrement() + +// Or more. +pncounter.decrement_value(100) + +// End result should equal '0' here. +assert pncounter.value() == 0 +``` + +## Sets + +### G-Set + +A grow-only (G-Set) set to which element/s can be added to. Removing element +from the set is not possible. + +```v +import datatypes.crdt { new_gset } + +mut obj := 'dummy-object' +mut gset := new_gset[string]() + +gset.add(obj) + +// Should always print 'true' as `obj` exists in the g-set. +println(gset.lookup(obj)) +``` + +### 2P-Set + +Two-phase set (2P-Set) allows both additions and removals to the set. +Internally it comprises of two G-Sets, one to keep track of additions +and the other for removals. + +```v +import datatypes.crdt { new_two_phase_set } + +obj := 'dummy-object' + +mut twophaseset := new_two_phase_set[string]() + +twophaseset.add(obj) + +// Remove the object that we just added to the set, emptying it. +twophaseset.remove(obj) + +// Should return 'false' as the obj doesn't exist within the set. +assert twophaseset.lookup(obj) == false +``` + +### LWW-Element-Set + +Last-write-wins element set (LWW-Element-Set) keeps track of element additions +and removals but with respect to the timestamp that is attached to each +element. Timestamps should be unique and have ordering properties. + +```v +import datatypes.crdt { new_lwweset } + +obj := 'dummy-object' +mut lwweset := new_lwweset[string]() + +// Here, we remove the object first before we add it in. For a +// 2P-set the object would be deemed absent from the set. But for +// a LWW-set the object should be present because `.add()` follows +// `.remove()`. +lwweset.remove(obj) +lwweset.add(obj) + +// This should print 'true' because of the above. +assert lwweset.lookup(obj) == true +``` + +### OR-Set + +An OR-Set (Observed-Removed-Set) allows deletion and addition of +elements similar to LWW-e-Set, but does not surface only the most recent one. +Additions are uniquely tracked via tags and an element is considered member of the set. +If the deleted set consists of all the tags present within additions. + +```v +import datatypes.crdt { new_orset } + +// object 1 == object 2 +obj1 := 'dummy-object' +obj2 := 'dummy-object' + +mut orset := new_orset[string]() + +orset.add(obj1) +orset.add(obj2) + +// Removing any one of the above two objects should remove both +// because they contain the same value. +orset.remove(obj1) + +// Should return 'false'. +assert orset.lookup(obj2) == false +``` + +## ToDo + +- [x] Add `compare` and `merge` methods to G-Set (`gset.v`) +- [ ] Add `compare` and `merge` methods to 2P-Set (`two_phase_set.v`) diff --git a/vlib/datatypes/crdt/crdt.v b/vlib/datatypes/crdt/crdt.v new file mode 100644 index 00000000000000..ed076bfe56add8 --- /dev/null +++ b/vlib/datatypes/crdt/crdt.v @@ -0,0 +1 @@ +module crdt diff --git a/vlib/datatypes/crdt/gcounter.v b/vlib/datatypes/crdt/gcounter.v new file mode 100644 index 00000000000000..99d37ba64274e6 --- /dev/null +++ b/vlib/datatypes/crdt/gcounter.v @@ -0,0 +1,63 @@ +module crdt + +import rand + +// GCounter represent a G-counter in CRDT, which is +// a state-based grow-only counter that only supports +// increments. +pub struct GCounter { + // identity provides a unique identity to each replica. + identity string + // counter maps identity of each replica to their + // entry values i.e. the counter value they individually + // have. +mut: + counter map[string]int +} + +// new_gcounter returns a GCounter by pre-assigning a unique +// identity to it. +pub fn new_gcounter() GCounter { + return GCounter{ + identity: rand.ulid().str() + counter: map[string]int{} + } +} + +// increment increments the GCounter by the value of 1 everytime it +// is called. +pub fn (mut g GCounter) increment() { + g.increment_value(1) +} + +// inc_val allows passing in an arbitrary delta to increment the +// current value of counter by. Only positive values are accepted. +// If a negative value is provided the implementation will panic. +pub fn (mut g GCounter) increment_value(value int) { + if value < 0 { + panic('Cannot decrement a gcounter') + } + g.counter[g.identity] += value +} + +// count returns the total count of this counter across all the +// present replicas. +pub fn (mut g GCounter) value() int { + mut total := 0 + for key in g.counter.keys() { + total += g.counter[key] + } + return total +} + +// Merge combines the counter values across multiple replicas. +// The property of idempotency is preserved here across +// multiple merges as when no state is changed across any replicas, +// the result should be exactly the same everytime. +pub fn (mut g GCounter) merge(c &GCounter) { + for key, value in c.counter { + if key !in g.counter || g.counter[key] < value { + g.counter[key] = value + } + } +} diff --git a/vlib/datatypes/crdt/gcounter_test.v b/vlib/datatypes/crdt/gcounter_test.v new file mode 100644 index 00000000000000..1e92d0b4e114dc --- /dev/null +++ b/vlib/datatypes/crdt/gcounter_test.v @@ -0,0 +1,16 @@ +module crdt + +fn test_increment() { + mut counter := new_gcounter() + counter.increment() + assert counter.value() == 1 +} + +fn test_merge() { + mut first_counter := new_gcounter() + first_counter.increment() + assert first_counter.value() == 1 + mut second_counter := new_gcounter() + second_counter.merge(first_counter) + assert second_counter.value() == 1 +} diff --git a/vlib/datatypes/crdt/gset.v b/vlib/datatypes/crdt/gset.v new file mode 100644 index 00000000000000..77a01597082aea --- /dev/null +++ b/vlib/datatypes/crdt/gset.v @@ -0,0 +1,53 @@ +module crdt + +// Gset is a grow-only set. +struct GSet[T] { +mut: + main_set map[T]T +} + +// new_gset returns an instance of GSet. +pub fn new_gset[T]() GSet[T] { + return GSet[T]{ + main_set: map[T]T{} + } +} + +// add lets you add an element to grow-only set. +pub fn (mut g GSet[T]) add(elem T) { + g.main_set[elem] = T{} +} + +// lookup returns true if an element exists within the +// set or false otherwise. +pub fn (mut g GSet[T]) lookup(elem T) bool { + return elem in g.main_set +} + +// len returns the no. of elements present within GSet. +pub fn (mut g GSet[T]) len() int { + return g.main_set.len +} + +// elements returns all the elements present in the set. +pub fn (mut g GSet[T]) elements() []T { + mut elements := []T{} + for _, element in g.main_set { + elements << element + } + return elements +} + +// compare returns true if both of of sets are same, false otherwise. +pub fn (mut g GSet[T]) compare(c GSet[T]) bool { + return g == c +} + +// merge function to merge the GSet object's payload with the argument's payload. +pub fn (mut g GSet[T]) merge(c GSet[T]) { + for key, _ in c.main_set { + if key !in g.main_set { + g.main_set[key] = c.main_set[key] + } + } +} diff --git a/vlib/datatypes/crdt/gset_test.v b/vlib/datatypes/crdt/gset_test.v new file mode 100644 index 00000000000000..888282e3db867c --- /dev/null +++ b/vlib/datatypes/crdt/gset_test.v @@ -0,0 +1,41 @@ +module crdt + +fn test_lookup() { + mut gset := new_gset[string]() + element := 'some-test-element' + assert gset.lookup(element) == false +} + +fn test_add() { + mut gset := new_gset[string]() + element := 'some-test-element' + assert gset.lookup(element) == false + gset.add(element) + assert gset.lookup(element) +} + +fn test_compare() { + mut first_gset := new_gset[string]() + element := 'some-test-element' + assert first_gset.lookup(element) == false + first_gset.add(element) + assert first_gset.lookup(element) == true + mut second_gset := new_gset[string]() + assert first_gset.compare(second_gset) == false + second_gset.add(element) + assert second_gset.compare(first_gset) + assert first_gset.compare(second_gset) +} + +fn test_merge() { + mut first_gset := new_gset[string]() + element := 'some-test-element' + assert first_gset.lookup(element) == false + first_gset.add(element) + assert first_gset.lookup(element) == true + mut second_gset := new_gset[string]() + assert first_gset.compare(second_gset) == false + second_gset.merge(first_gset) + assert second_gset.compare(first_gset) + assert first_gset.compare(second_gset) +} diff --git a/vlib/datatypes/crdt/lww_element_set.v b/vlib/datatypes/crdt/lww_element_set.v new file mode 100644 index 00000000000000..de6ffcc4b4ee0e --- /dev/null +++ b/vlib/datatypes/crdt/lww_element_set.v @@ -0,0 +1,74 @@ +module crdt + +import time + +// LWW-Element-Set is similar to 2P-Set in that it consists of an "add set" and a "remove set", with a timestamp for each element. +struct LWWESet[T] { +mut: + add_map map[T]time.Time + rm_map map[T]time.Time + time time.Time +} + +// new_orset returns an instance of LWWESet. +pub fn new_lwweset[T]() LWWESet[T] { + return LWWESet[T]{ + add_map: map[T]time.Time{} + rm_map: map[T]time.Time{} + } +} + +// add lets you add an element to set. +pub fn (mut s LWWESet[T]) add(value T) { + s.add_map[value] = time.now() +} + +// remove deletes the element from the set. +pub fn (mut s LWWESet[T]) remove(value T) { + s.rm_map[value] = time.now() +} + +// lookup returns true if an element exists within the +// set or false otherwise. +pub fn (mut s LWWESet[T]) lookup(value T) bool { + if value in s.add_map.keys() { + if value in s.rm_map.keys() { + return s.rm_map[value].unix < s.add_map[value].unix + } + return true + } else { + return false + } + if value in s.add_map { + return true + } + if value in s.rm_map { + return true + } + return false +} + +// merge function to merge the LWWESet object's payload with the argument's payload. +pub fn (mut s LWWESet[T]) merge(r LWWESet[T]) { + for value, ts in r.add_map { + if value in s.add_map { + t := s.add_map[value] + if t < ts { + s.add_map[value] = ts + } + } else { + s.add_map[value] = ts + } + } + + for value, ts in r.rm_map { + if value in s.rm_map { + t := s.rm_map[value] + if t < ts { + s.rm_map[value] = ts + } + } else { + s.rm_map[value] = ts + } + } +} diff --git a/vlib/datatypes/crdt/lww_element_set_test.v b/vlib/datatypes/crdt/lww_element_set_test.v new file mode 100644 index 00000000000000..908b6a586c9dd4 --- /dev/null +++ b/vlib/datatypes/crdt/lww_element_set_test.v @@ -0,0 +1,39 @@ +module crdt + +fn test_lookup() { + mut lwweset := new_lwweset[string]() + assert lwweset.lookup('some-test-element') == false +} + +fn test_add() { + mut lwweset := new_lwweset[string]() + assert lwweset.lookup('some-test-element') == false + lwweset.add('some-test-element') + assert lwweset.lookup('some-test-element') +} + +fn test_remove() { + mut lwweset := new_lwweset[string]() + assert lwweset.lookup('some-test-element') == false + lwweset.add('some-test-element') + assert lwweset.lookup('some-test-element') + lwweset.remove('some-test-element') + assert lwweset.lookup('some-test-element') == false +} + +fn test_merge() { + mut lwweset := new_lwweset[string]() + assert lwweset.lookup('some-test-element') == false + lwweset.add('some-test-element') + assert lwweset.lookup('some-test-element') + lwweset.remove('some-test-element') + assert lwweset.lookup('some-test-element') == false + mut another_lwweset := new_lwweset[string]() + another_lwweset.add('another-test-element') + assert another_lwweset.lookup('another-test-element') + lwweset.merge(another_lwweset) + assert lwweset.lookup('another-test-element') + another_lwweset.merge(lwweset) + assert another_lwweset.add_map == lwweset.add_map + assert another_lwweset.rm_map == lwweset.rm_map +} diff --git a/vlib/datatypes/crdt/orset.v b/vlib/datatypes/crdt/orset.v new file mode 100644 index 00000000000000..5251ee109a0186 --- /dev/null +++ b/vlib/datatypes/crdt/orset.v @@ -0,0 +1,76 @@ +module crdt + +import rand + +// ORSet resembles LWWESet, but using unique tags instead of timestamps. +struct ORSet[T] { +mut: + add_map map[T]map[string]T + rm_map map[T]map[string]T +} + +// new_orset returns an instance of ORSet. +pub fn new_orset[T]() ORSet[T] { + return ORSet[T]{ + add_map: map[T]map[string]T{} + rm_map: map[T]map[string]T{} + } +} + +// add lets you add an element to set. +pub fn (mut o ORSet[T]) add(value T) { + if value in o.add_map { + o.add_map[value][rand.ulid().str()] = value + return + } + o.add_map[value][rand.ulid().str()] = value +} + +// remove deletes the element from the set. +pub fn (mut o ORSet[T]) remove(value T) { + if value in o.add_map { + for uid, _ in o.add_map[value] { + o.rm_map[value][uid] = value + } + } + o.rm_map[value][rand.ulid().str()] = value +} + +// lookup returns true if an element exists within the +// set or false otherwise. +pub fn (mut o ORSet[T]) lookup(value T) bool { + if value in o.add_map { + if value in o.rm_map { + for uid, _ in o.add_map { + if uid !in o.rm_map { + return true + } + } + } else { + return true + } + } else { + return false + } + return false +} + +// merge function to merge the ORSet object's payload with the argument's payload. +pub fn (mut o ORSet[T]) merge(r ORSet[T]) { + for value, m in r.add_map { + if add_map := o.add_map[value] { + for uid, _ in m { + add_map[uid] + } + } + o.add_map[value] = m.clone() + } + for value, m in r.rm_map { + if rm_map := o.rm_map[value] { + for uid, _ in m { + rm_map[uid] + } + } + o.rm_map[value] = m.clone() + } +} diff --git a/vlib/datatypes/crdt/orset_test.v b/vlib/datatypes/crdt/orset_test.v new file mode 100644 index 00000000000000..2df6c5514d115b --- /dev/null +++ b/vlib/datatypes/crdt/orset_test.v @@ -0,0 +1,33 @@ +module crdt + +fn test_add() { + mut orset := new_orset[string]() + element := 'some-test-element' + assert orset.lookup(element) == false + orset.add(element) + assert orset.lookup(element) +} + +fn test_remove() { + mut orset := new_orset[string]() + element := 'some-test-element' + assert orset.lookup(element) == false + orset.add(element) + assert orset.lookup(element) + orset.remove(element) + assert orset.lookup(element) == false +} + +fn test_merge() { + mut orset := new_orset[string]() + element := 'some-test-element' + assert orset.lookup(element) == false + orset.add(element) + assert orset.lookup(element) + mut other_orset := new_orset[string]() + other_orset.merge(orset) + assert other_orset.lookup(element) + other_orset.remove(element) + orset.merge(other_orset) + assert orset.lookup(element) == false +} diff --git a/vlib/datatypes/crdt/pncounter.v b/vlib/datatypes/crdt/pncounter.v new file mode 100644 index 00000000000000..cc6ee24fbb2372 --- /dev/null +++ b/vlib/datatypes/crdt/pncounter.v @@ -0,0 +1,64 @@ +module crdt + +// PNCounter represents a state-based PN-Counter. It is +// implemented as sets of two G-Counters, one that tracks +// increments while the other decrements. +struct PNCounter { +mut: + p_counter GCounter + n_counter GCounter +} + +// new_pncounter returns a new *PNCounter with both its +// G-Counters initialized. +pub fn new_pncounter() PNCounter { + return PNCounter{ + p_counter: new_gcounter() + n_counter: new_gcounter() + } +} + +// increment monotonically increments the current value of the +// PN-Counter by one. +pub fn (mut pn PNCounter) increment() { + pn.increment_value(1) +} + +// increment_value increments the current value of the PN-Counter +// by the delta incr that is provided. The value of delta +// has to be >= 0. If the value of delta is < 0, then this +// implementation panics. +pub fn (mut pn PNCounter) increment_value(incr int) { + pn.p_counter.increment_value(incr) +} + +// decrement monotonically decrements the current value of the +// PN-Counter by one. +pub fn (mut pn PNCounter) decrement() { + pn.decrement_value(1) +} + +// decrement_value adds a decrement to the current value of +// PN-Counter by the value of delta decr. Similar to +// increment_value, the value of decr cannot be less than 0. +pub fn (mut pn PNCounter) decrement_value(decr int) { + pn.n_counter.increment_value(decr) +} + +// value returns the current value of the counter. It +// subtracts the value of negative G-Counter from the +// positive grow-only counter and returns the result. +// Because this counter can grow in either direction, +// negative integers as results are possible. +pub fn (mut pn PNCounter) value() int { + return pn.p_counter.value() - pn.n_counter.value() +} + +// Merge combines both the PN-Counters and saves the result +// in the invoking counter. Respective G-Counters are merged +// i.e. +ve with +ve and -ve with -ve, but not computation +// is actually performed. +pub fn (mut pn PNCounter) merge(pnpn PNCounter) { + pn.p_counter.merge(pnpn.p_counter) + pn.n_counter.merge(pnpn.n_counter) +} diff --git a/vlib/datatypes/crdt/pncounter_test.v b/vlib/datatypes/crdt/pncounter_test.v new file mode 100644 index 00000000000000..8a78985f4add58 --- /dev/null +++ b/vlib/datatypes/crdt/pncounter_test.v @@ -0,0 +1,27 @@ +module crdt + +fn test_increment() { + mut pncounter := new_pncounter() + pncounter.increment() + assert pncounter.value() == 1 +} + +fn test_decrement() { + mut pncounter := new_pncounter() + pncounter.decrement() + assert pncounter.value() == -1 +} + +fn test_merge() { + mut first_pncounter := new_pncounter() + first_pncounter.increment() + assert first_pncounter.value() == 1 + mut second_pncounter := new_pncounter() + second_pncounter.decrement() + assert second_pncounter.value() == -1 + first_pncounter.merge(second_pncounter) + assert first_pncounter.value() == 0 + assert second_pncounter.value() == -1 + second_pncounter.merge(first_pncounter) + assert second_pncounter.value() == 0 +} diff --git a/vlib/datatypes/crdt/two_phase_set.v b/vlib/datatypes/crdt/two_phase_set.v new file mode 100644 index 00000000000000..11f9e51407755a --- /dev/null +++ b/vlib/datatypes/crdt/two_phase_set.v @@ -0,0 +1,34 @@ +module crdt + +// TwoPhaseSet supports both addition and removal of +// elements to set. +struct TwoPhaseSet[T] { +mut: + add_set GSet[T] + rm_set GSet[T] +} + +// new_two_phase_set returns a new instance of TwoPhaseSet. +pub fn new_two_phase_set[T]() TwoPhaseSet[T] { + return TwoPhaseSet[T]{ + add_set: new_gset[T]() + rm_set: new_gset[T]() + } +} + +// add inserts element into the TwoPhaseSet. +pub fn (mut t TwoPhaseSet[T]) add(elem T) { + t.add_set.add(elem) +} + +// remove deletes the element from the set. +pub fn (mut t TwoPhaseSet[T]) remove(elem T) { + t.rm_set.add(elem) +} + +// lookup returns true if the set contains the element. +// The set is said to contain the element if it is present +// in the add-set and not in the remove-set. +pub fn (mut t TwoPhaseSet[T]) lookup(elem T) bool { + return t.add_set.lookup(elem) && !t.rm_set.lookup(elem) +} diff --git a/vlib/datatypes/crdt/two_phase_set_test.v b/vlib/datatypes/crdt/two_phase_set_test.v new file mode 100644 index 00000000000000..26db5933eb6b3f --- /dev/null +++ b/vlib/datatypes/crdt/two_phase_set_test.v @@ -0,0 +1,19 @@ +module crdt + +fn test_add() { + mut twophaseset := new_two_phase_set[string]() + element := 'some-test-element' + assert twophaseset.lookup(element) == false + twophaseset.add(element) + assert twophaseset.lookup(element) +} + +fn test_remove() { + mut twophaseset := new_two_phase_set[string]() + element := 'some-test-element' + assert twophaseset.lookup(element) == false + twophaseset.add(element) + assert twophaseset.lookup(element) + twophaseset.remove(element) + assert twophaseset.lookup(element) == false +}