forked from strongback/strongback-java
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAsyncSwitchReactor.java
137 lines (117 loc) · 5.65 KB
/
AsyncSwitchReactor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
* Strongback
* Copyright 2015, Strongback and individual contributors by the @authors tag.
* See the COPYRIGHT.txt in the distribution for a full listing of individual
* contributors.
*
* Licensed under the MIT License; you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.strongback;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import org.strongback.annotation.Immutable;
import org.strongback.annotation.ThreadSafe;
import org.strongback.components.Switch;
/**
* A threadsafe {@link SwitchReactor} implementation that relies upon being periodically {@link Executable#execute(long)
* executed}. This class is carefully written to ensure that all functions are registered atomically even while
* {@link #execute(long)} is being called.
*
* @author Randall Hauch
*/
@ThreadSafe
final class AsyncSwitchReactor implements Executable, SwitchReactor {
private final ConcurrentMap<Switch, Container> listeners = new ConcurrentHashMap<>();
@Override
public void execute(long time) {
listeners.forEach((swtch, container) -> container.notifyListeners(swtch.isTriggered()));
}
@Override
public void onTriggered(Switch swtch, Runnable function) {
listeners.computeIfAbsent(swtch,(s)->new Container()).addWhenTriggered(function);
}
@Override
public void onUntriggered(Switch swtch, Runnable function) {
listeners.computeIfAbsent(swtch,(s)->new Container()).addWhenUntriggered(function);
}
@Override
public void whileTriggered(Switch swtch, Runnable function) {
listeners.computeIfAbsent(swtch,(s)->new Container()).addWhileTriggered(function);
}
@Override
public void whileUntriggered(Switch swtch, Runnable function) {
listeners.computeIfAbsent(swtch,(s)->new Container()).addWhileUntriggered(function);
}
/**
* A container class for all listener functions associated with a specific {@link Switch}. The class is threadsafe to allow
* for new listener functions to be added while the existing functions are called based upon the switch's current state.
* <p>
* To achieve efficient and lock-free concurrent operations, each of the functions for a specific Switch state or transition
* are maintained in a simple linked-list structure (see {@link Listener}). Each immutable Listener is created to hold one
* function and an optional "next" listener. To add a new function, a new Listener object is created with the function and
* the current Listener object for that state, and the Container's reference to that state's listeners is updated with the
* new Listener object. In essence, new functions are added to the front of the linked list without using any locking.
* <p>
* It is not currently possible to remove functions that have been registered.
*
* @author Randall Hauch
*/
@ThreadSafe
private static final class Container {
private boolean previouslyTriggered;
private final AtomicReference<Listener> whenTriggered = new AtomicReference<>();
private final AtomicReference<Listener> whenUntriggered = new AtomicReference<>();
private final AtomicReference<Listener> whileTriggered = new AtomicReference<>();
private final AtomicReference<Listener> whileUntriggered = new AtomicReference<>();
public void notifyListeners(boolean nowTriggered) {
notifyAtomicallyWhen(()->!previouslyTriggered && nowTriggered, whenTriggered);
notifyAtomicallyWhen(()->previouslyTriggered && !nowTriggered, whenUntriggered);
notifyAtomicallyWhen(()->previouslyTriggered && nowTriggered, whileTriggered);
notifyAtomicallyWhen(()->!previouslyTriggered && !nowTriggered, whileUntriggered);
previouslyTriggered = nowTriggered;
}
private void notifyAtomicallyWhen(BooleanSupplier criteria, AtomicReference<Listener> listenerRef ) {
Listener listener = listenerRef.get();
if ( listener != null && criteria.getAsBoolean() ) listener.fire();
}
public void addWhenTriggered(Runnable function) {
whenTriggered.updateAndGet((existing)->new Listener(function,existing));
}
public void addWhenUntriggered(Runnable function) {
whenUntriggered.updateAndGet((existing)->new Listener(function,existing));
}
public void addWhileTriggered(Runnable function) {
whileTriggered.updateAndGet((existing)->new Listener(function,existing));
}
public void addWhileUntriggered(Runnable function) {
whileUntriggered.updateAndGet((existing)->new Listener(function,existing));
}
}
/**
* One node in a linked list of listener functions.
*
* @author Randall Hauch
*/
@Immutable
private static final class Listener {
private final Runnable function;
private final Listener next;
public Listener(Runnable function, Listener next) {
this.function = function;
this.next = next;
}
public void fire() {
function.run();
if (next != null) next.fire();
}
}
}