Skip to content

Commit 7055561

Browse files
committed
Add sample operator
1 parent f315462 commit 7055561

File tree

2 files changed

+136
-0
lines changed

2 files changed

+136
-0
lines changed

source/concurrency/sequence.d

+112
Original file line numberDiff line numberDiff line change
@@ -1085,6 +1085,118 @@ private auto getSome(T)(ref T t) {
10851085
return t.get();
10861086
}
10871087

1088+
auto proxyNext(Sequence, Receiver)(Sequence sequence, Receiver receiver) {
1089+
return ProxyNextSequence!(Sequence, Receiver)(sequence, receiver);
1090+
}
1091+
1092+
struct ProxyNextSequence(Sequence, NextReceiver) {
1093+
alias Value = Sequence.Value;
1094+
1095+
Sequence sequence;
1096+
NextReceiver nextReceiver;
1097+
1098+
auto connect(Receiver)(return Receiver receiver) @safe return scope {
1099+
// ensure NRVO
1100+
auto op = sequence.connect(ProxyNextReceiver!(NextReceiver, Receiver)(nextReceiver, receiver));
1101+
return op;
1102+
}
1103+
}
1104+
1105+
struct ProxyNextReceiver(NextReceiver, Receiver) {
1106+
NextReceiver nextReceiver;
1107+
Receiver receiver;
1108+
1109+
auto setNext(Sender)(Sender sender) {
1110+
return nextReceiver.setNext(sender);
1111+
}
1112+
auto setValue() {
1113+
receiver.setValue();
1114+
}
1115+
auto setDone() nothrow @safe {
1116+
receiver.setDone();
1117+
}
1118+
auto setError(Throwable t) nothrow @safe {
1119+
receiver.setError(t);
1120+
}
1121+
import concurrency.receiver : ForwardExtensionPoints;
1122+
mixin ForwardExtensionPoints!receiver;
1123+
}
1124+
1125+
auto sample(BaseSequence, TriggerSequence)(BaseSequence base, TriggerSequence trigger) {
1126+
return SampleSequence!(BaseSequence, TriggerSequence)(base, trigger);
1127+
}
1128+
1129+
struct SampleSequence(BaseSequence, TriggerSequence) {
1130+
alias Value = void;
1131+
alias Element = BaseSequence.Element;
1132+
1133+
BaseSequence base;
1134+
TriggerSequence trigger;
1135+
1136+
auto connect(Receiver)(return Receiver receiver) @safe return scope {
1137+
// ensure NRVO
1138+
auto op = SampleSequenceOp!(BaseSequence, TriggerSequence, Receiver)(base, trigger, receiver);
1139+
return op;
1140+
}
1141+
}
1142+
1143+
struct SampleSequenceOp(BaseSequence, TriggerSequence, Receiver) {
1144+
import concurrency.bitfield : SharedBitField;
1145+
import concurrency.sender : OpType;
1146+
import concurrency.operations : RaceSender;
1147+
1148+
import std.typecons : Nullable;
1149+
enum Flags : size_t {
1150+
locked = 0x1,
1151+
valid = 0x2
1152+
}
1153+
shared SharedBitField!Flags state;
1154+
alias Element = BaseSequence.Element;
1155+
Element item;
1156+
alias RaceAllSender = RaceSender!(
1157+
SequenceCollect!(BaseSequence, void delegate(Element) shared @safe nothrow @nogc),
1158+
ProxyNextSequence!(FilterMapSequence!(TriggerSequence, Nullable!Element delegate() shared @safe nothrow @nogc), Receiver)
1159+
);
1160+
alias Op = OpType!(RaceAllSender, Receiver);
1161+
1162+
Op op;
1163+
1164+
@disable this(ref return scope typeof(this) rhs);
1165+
@disable this(this);
1166+
1167+
@disable void opAssign(typeof(this) rhs) nothrow @safe @nogc;
1168+
@disable void opAssign(ref typeof(this) rhs) nothrow @safe @nogc;
1169+
1170+
this(BaseSequence base, TriggerSequence trigger, return Receiver receiver) @trusted return scope {
1171+
import concurrency.operations : raceAll;
1172+
op = raceAll(
1173+
base.collect(&(cast(shared)this).produced),
1174+
trigger.filterMap(&(cast(shared)this).triggered).proxyNext(receiver)
1175+
).connect(receiver);
1176+
}
1177+
1178+
void start() {
1179+
op.start();
1180+
}
1181+
1182+
private void produced(Element item) shared @safe nothrow @nogc {
1183+
with (state.lock(Flags.valid)) {
1184+
this.item = item;
1185+
}
1186+
}
1187+
1188+
private Nullable!Element triggered() shared @safe nothrow @nogc{
1189+
with (state.lock()) {
1190+
if (was(Flags.valid)) {
1191+
auto localElement = item;
1192+
release(Flags.valid);
1193+
return Nullable!Element(localElement);
1194+
}
1195+
return Nullable!Element.init;
1196+
}
1197+
}
1198+
}
1199+
10881200

10891201
// cron - create a sequence like interval but using cron spec
10901202

tests/ut/concurrency/sequence.d

+24
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,27 @@ import unit_threaded;
173173
return Nullable!int(1);
174174
}).take(4).toList().syncWait.value.should == [1,1,1,1];
175175
}
176+
177+
@("sample")
178+
@safe unittest {
179+
import core.time : msecs;
180+
import concurrency.scheduler : ManualTimeWorker;
181+
import concurrency.operations : then, whenAll;
182+
183+
auto worker = new shared ManualTimeWorker();
184+
auto driver = just(worker).then((shared ManualTimeWorker worker) shared {
185+
for(;;) {
186+
auto span = worker.timeUntilNextEvent();
187+
if (span.isNull)
188+
break;
189+
worker.advance(span.get());
190+
}
191+
});
192+
whenAll(
193+
driver,
194+
sample(
195+
interval(1.msecs, false).scan((int acc) => acc + 1, 0),
196+
interval(2.msecs, false)
197+
).take(4).toList()
198+
).syncWait.value.should == [1,3,5,7];
199+
}

0 commit comments

Comments
 (0)