Skip to content

Commit b6ef86e

Browse files
committed
Add filterMap Sequence operator
1 parent b3a2ca3 commit b6ef86e

File tree

2 files changed

+121
-0
lines changed

2 files changed

+121
-0
lines changed

source/concurrency/sequence.d

+110
Original file line numberDiff line numberDiff line change
@@ -941,6 +941,116 @@ auto iotaSequence(T)(T start, T end) {
941941
return iota(start, end).sequence();
942942
}
943943

944+
auto filterMap(Sequence, Fun)(Sequence s, Fun f) {
945+
return FilterMapSequence!(Sequence, Fun)(s, f);
946+
}
947+
948+
struct FilterMapSequence(Sequence, Fun) {
949+
import std.traits : ReturnType;
950+
alias Value = void;
951+
alias Element = Sequence.Element;
952+
Sequence s;
953+
Fun f;
954+
auto connect(Receiver)(return Receiver receiver) @safe return scope {
955+
auto op = s.connect(FilterMapSequenceReceiver!(Fun, Receiver)(f, receiver));
956+
return op;
957+
}
958+
}
959+
960+
struct FilterMapSequenceReceiver(Fun, Receiver) {
961+
Fun fun;
962+
Receiver receiver;
963+
auto setNext(Sender)(Sender sender) {
964+
return FilterMapSequenceNextSender!(Sender, Fun, Receiver)(sender, fun, receiver);
965+
}
966+
auto setValue() {
967+
receiver.setValue();
968+
}
969+
auto setDone() nothrow @safe {
970+
receiver.setDone();
971+
}
972+
auto setError(Throwable t) nothrow @safe {
973+
receiver.setError(t);
974+
}
975+
import concurrency.receiver : ForwardExtensionPoints;
976+
mixin ForwardExtensionPoints!receiver;
977+
}
978+
979+
struct FilterMapSequenceNextSender(Sender, Fun, NextReceiver) {
980+
alias Value = Sender.Value;
981+
Sender sender;
982+
Fun fun;
983+
NextReceiver nextReceiver;
984+
auto connect(Receiver)(return Receiver receiver) @safe return scope {
985+
auto op = FilterMapSequenceNextOp!(Sender, Fun, NextReceiver, Receiver)(sender, fun, nextReceiver, receiver);
986+
return op;
987+
}
988+
}
989+
990+
struct FilterMapSequenceNextOp(Sender, Fun, NextReceiver, Receiver) {
991+
import concurrency.sender : OpType;
992+
993+
alias Op = OpType!(Sender, FilterMapSequenceNextReceiver!(Sender.Value, Fun, NextReceiver, Receiver));
994+
Op op;
995+
FilterMapSequenceNextState!(Fun, NextReceiver, Receiver) state;
996+
this(Sender sender, Fun fun, NextReceiver nextReceiver, Receiver receiver) @trusted {
997+
state = FilterMapSequenceNextState!(Fun, NextReceiver, Receiver)(fun, nextReceiver, receiver);
998+
op = sender.connect(FilterMapSequenceNextReceiver!(Sender.Value, Fun, NextReceiver, Receiver)(&state));
999+
}
1000+
@disable this(ref return scope typeof(this) rhs);
1001+
@disable this(this);
1002+
1003+
@disable void opAssign(typeof(this) rhs) nothrow @safe @nogc;
1004+
@disable void opAssign(ref typeof(this) rhs) nothrow @safe @nogc;
1005+
1006+
void start() @trusted scope nothrow {
1007+
op.start();
1008+
}
1009+
}
1010+
1011+
struct FilterMapSequenceNextState(Fun, NextReceiver, Receiver) {
1012+
Fun fun;
1013+
NextReceiver nextReceiver;
1014+
Receiver receiver;
1015+
}
1016+
1017+
struct FilterMapSequenceNextReceiver(Value, Fun, NextReceiver, Receiver) {
1018+
FilterMapSequenceNextState!(Fun, NextReceiver, Receiver)* state;
1019+
1020+
auto setValue(Value value) {
1021+
import concurrency : just;
1022+
import concurrency : connectHeap;
1023+
auto result = state.fun(value);
1024+
if (result.isNone) {
1025+
state.receiver.setValue();
1026+
} else {
1027+
auto sender = state.nextReceiver.setNext(just(result.getSome));
1028+
// TODO: put state in FilterMapSequenceNextOp
1029+
sender.connectHeap(state.receiver).start();
1030+
}
1031+
}
1032+
auto setError(Throwable t) nothrow @safe {
1033+
state.nextReceiver.setError(t);
1034+
}
1035+
auto setDone() nothrow @safe {
1036+
state.nextReceiver.setDone();
1037+
}
1038+
auto receiver() nothrow @safe {
1039+
return &state.nextReceiver;
1040+
}
1041+
import concurrency.receiver : ForwardExtensionPoints;
1042+
mixin ForwardExtensionPoints!receiver;
1043+
}
1044+
1045+
private bool isNone(T)(ref const T t) {
1046+
return t.isNull();
1047+
}
1048+
1049+
private auto getSome(T)(ref T t) {
1050+
return t.get();
1051+
}
1052+
1053+
9441054
// cron - create a sequence like interval but using cron spec
9451055

9461056
// flatmap{latest,concat} - create a sequence that flattens

tests/ut/concurrency/sequence.d

+11
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,14 @@ import unit_threaded;
144144
@safe unittest {
145145
iotaSequence(5, 10).toList().syncWait.value.should == [5,6,7,8,9];
146146
}
147+
148+
@("filterMap")
149+
@safe unittest {
150+
import std.typecons : Nullable;
151+
[1,2,3,4].sequence.filterMap((int i) {
152+
if (i > 2)
153+
return Nullable!(int)(i*3);
154+
else
155+
return Nullable!(int).init;
156+
}).toList().syncWait.value.should == [9,12];
157+
}

0 commit comments

Comments
 (0)