Skip to content

Commit aaecfa8

Browse files
authored
Merge pull request #37549: Move CausedByDrain to top level, move types
2 parents 7df489c + 4a2afe8 commit aaecfa8

File tree

37 files changed

+218
-237
lines changed

37 files changed

+218
-237
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.beam.sdk.state.Timers;
3434
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
3535
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
36+
import org.apache.beam.sdk.values.CausedByDrain;
3637
import org.apache.beam.sdk.values.PCollectionView;
3738
import org.apache.beam.sdk.values.WindowingStrategy;
3839
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -136,22 +137,19 @@ public TimersImpl(StateNamespace namespace) {
136137
@Override
137138
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
138139
timerInternals.setTimer(
139-
TimerData.of(
140-
namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
140+
TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL));
141141
}
142142

143143
@Override
144144
public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
145145
timerInternals.setTimer(
146-
TimerData.of(
147-
namespace, timestamp, outputTimestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
146+
TimerData.of(namespace, timestamp, outputTimestamp, timeDomain, CausedByDrain.NORMAL));
148147
}
149148

150149
@Override
151150
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
152151
timerInternals.deleteTimer(
153-
TimerData.of(
154-
namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
152+
TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL));
155153
}
156154

157155
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.beam.sdk.util.construction.SplittableParDo;
5252
import org.apache.beam.sdk.util.construction.SplittableParDo.ProcessKeyedElements;
5353
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
54+
import org.apache.beam.sdk.values.CausedByDrain;
5455
import org.apache.beam.sdk.values.KV;
5556
import org.apache.beam.sdk.values.PCollection;
5657
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -604,7 +605,7 @@ public String getErrorContext() {
604605
wakeupTime,
605606
wakeupTime,
606607
TimeDomain.PROCESSING_TIME,
607-
TimerInternals.TimerData.CausedByDrain.NORMAL));
608+
CausedByDrain.NORMAL));
608609
}
609610

610611
private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(

runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.beam.sdk.state.TimeDomain;
3232
import org.apache.beam.sdk.transforms.DoFn;
3333
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
34+
import org.apache.beam.sdk.values.CausedByDrain;
3435
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ComparisonChain;
3536
import org.checkerframework.checker.nullness.qual.Nullable;
3637
import org.joda.time.Instant;
@@ -168,10 +169,6 @@ void deleteTimer(
168169
/** Data about a timer as represented within {@link TimerInternals}. */
169170
@AutoValue
170171
abstract class TimerData implements Comparable<TimerData> {
171-
public enum CausedByDrain {
172-
CAUSED_BY_DRAIN,
173-
NORMAL
174-
}
175172

176173
public abstract String getTimerId();
177174

@@ -245,7 +242,7 @@ public static TimerData of(
245242
timestamp,
246243
outputTimestamp,
247244
domain,
248-
TimerData.CausedByDrain.NORMAL);
245+
CausedByDrain.NORMAL);
249246
}
250247

251248
/**
@@ -355,7 +352,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
355352
timestamp,
356353
outputTimestamp,
357354
domain,
358-
TimerData.CausedByDrain.NORMAL);
355+
CausedByDrain.NORMAL);
359356
}
360357

361358
@Override
@@ -401,8 +398,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
401398
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
402399
Instant timestamp = INSTANT_CODER.decode(inStream);
403400
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
404-
return TimerData.of(
405-
timerId, namespace, timestamp, timestamp, domain, TimerData.CausedByDrain.NORMAL);
401+
return TimerData.of(timerId, namespace, timestamp, timestamp, domain, CausedByDrain.NORMAL);
406402
}
407403

408404
@Override

runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.beam.runners.core.TimerInternals.TimerData;
2525
import org.apache.beam.sdk.state.TimeDomain;
26+
import org.apache.beam.sdk.values.CausedByDrain;
2627
import org.joda.time.Duration;
2728
import org.joda.time.Instant;
2829
import org.junit.Test;
@@ -47,15 +48,15 @@ public void testFiringEventTimers() throws Exception {
4748
new Instant(19),
4849
new Instant(19),
4950
TimeDomain.EVENT_TIME,
50-
TimerData.CausedByDrain.NORMAL);
51+
CausedByDrain.NORMAL);
5152
TimerData eventTimer2 =
5253
TimerData.of(
5354
ID2,
5455
NS1,
5556
new Instant(29),
5657
new Instant(29),
5758
TimeDomain.EVENT_TIME,
58-
TimerData.CausedByDrain.NORMAL);
59+
CausedByDrain.NORMAL);
5960

6061
underTest.setTimer(eventTimer1);
6162
underTest.setTimer(eventTimer2);
@@ -128,14 +129,14 @@ public void testFiringProcessingTimeTimers() throws Exception {
128129
new Instant(19),
129130
new Instant(19),
130131
TimeDomain.PROCESSING_TIME,
131-
TimerData.CausedByDrain.NORMAL);
132+
CausedByDrain.NORMAL);
132133
TimerData processingTime2 =
133134
TimerData.of(
134135
NS1,
135136
new Instant(29),
136137
new Instant(29),
137138
TimeDomain.PROCESSING_TIME,
138-
TimerData.CausedByDrain.NORMAL);
139+
CausedByDrain.NORMAL);
139140

140141
underTest.setTimer(processingTime1);
141142
underTest.setTimer(processingTime2);
@@ -165,46 +166,38 @@ public void testTimerOrdering() throws Exception {
165166
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
166167
TimerData eventTime1 =
167168
TimerData.of(
168-
NS1,
169-
new Instant(19),
170-
new Instant(19),
171-
TimeDomain.EVENT_TIME,
172-
TimerData.CausedByDrain.NORMAL);
169+
NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL);
173170
TimerData processingTime1 =
174171
TimerData.of(
175172
NS1,
176173
new Instant(19),
177174
new Instant(19),
178175
TimeDomain.PROCESSING_TIME,
179-
TimerData.CausedByDrain.NORMAL);
176+
CausedByDrain.NORMAL);
180177
TimerData synchronizedProcessingTime1 =
181178
TimerData.of(
182179
NS1,
183180
new Instant(19),
184181
new Instant(19),
185182
TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
186-
TimerData.CausedByDrain.NORMAL);
183+
CausedByDrain.NORMAL);
187184
TimerData eventTime2 =
188185
TimerData.of(
189-
NS1,
190-
new Instant(29),
191-
new Instant(29),
192-
TimeDomain.EVENT_TIME,
193-
TimerData.CausedByDrain.NORMAL);
186+
NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL);
194187
TimerData processingTime2 =
195188
TimerData.of(
196189
NS1,
197190
new Instant(29),
198191
new Instant(29),
199192
TimeDomain.PROCESSING_TIME,
200-
TimerData.CausedByDrain.NORMAL);
193+
CausedByDrain.NORMAL);
201194
TimerData synchronizedProcessingTime2 =
202195
TimerData.of(
203196
NS1,
204197
new Instant(29),
205198
new Instant(29),
206199
TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
207-
TimerData.CausedByDrain.NORMAL);
200+
CausedByDrain.NORMAL);
208201

209202
underTest.setTimer(processingTime1);
210203
underTest.setTimer(eventTime1);
@@ -239,18 +232,14 @@ public void testDeduplicate() throws Exception {
239232
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
240233
TimerData eventTime =
241234
TimerData.of(
242-
NS1,
243-
new Instant(19),
244-
new Instant(19),
245-
TimeDomain.EVENT_TIME,
246-
TimerData.CausedByDrain.NORMAL);
235+
NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL);
247236
TimerData processingTime =
248237
TimerData.of(
249238
NS1,
250239
new Instant(19),
251240
new Instant(19),
252241
TimeDomain.PROCESSING_TIME,
253-
TimerData.CausedByDrain.NORMAL);
242+
CausedByDrain.NORMAL);
254243
underTest.setTimer(eventTime);
255244
underTest.setTimer(eventTime);
256245
underTest.setTimer(processingTime);

runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.beam.sdk.state.TimeDomain;
2424
import org.apache.beam.sdk.testing.CoderProperties;
2525
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
26+
import org.apache.beam.sdk.values.CausedByDrain;
2627
import org.apache.beam.sdk.values.WindowedValue;
2728
import org.apache.beam.sdk.values.WindowedValues;
2829
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -49,7 +50,7 @@ public void testEncodeDecodeEqual() throws Exception {
4950
new Instant(500L),
5051
new Instant(500L),
5152
TimeDomain.EVENT_TIME,
52-
TimerData.CausedByDrain.NORMAL));
53+
CausedByDrain.NORMAL));
5354
Iterable<WindowedValue<Integer>> elements =
5455
ImmutableList.of(
5556
WindowedValues.valueInGlobalWindow(1),

runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.beam.sdk.util.WindowTracing;
6060
import org.apache.beam.sdk.util.WindowedValueReceiver;
6161
import org.apache.beam.sdk.util.construction.TriggerTranslation;
62+
import org.apache.beam.sdk.values.CausedByDrain;
6263
import org.apache.beam.sdk.values.KV;
6364
import org.apache.beam.sdk.values.TimestampedValue;
6465
import org.apache.beam.sdk.values.WindowedValue;
@@ -578,7 +579,7 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc
578579
timestamp,
579580
timestamp,
580581
domain,
581-
TimerData.CausedByDrain.NORMAL));
582+
CausedByDrain.NORMAL));
582583
runner.onTimers(timers);
583584
runner.persist();
584585
}
@@ -593,7 +594,7 @@ public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws
593594
timer.getTimestamp(),
594595
timer.getTimestamp(),
595596
timer.getValue(),
596-
TimerData.CausedByDrain.NORMAL));
597+
CausedByDrain.NORMAL));
597598
}
598599
runner.onTimers(timerData);
599600
runner.persist();

runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.beam.sdk.transforms.windowing.WindowFn;
4747
import org.apache.beam.sdk.util.UserCodeException;
4848
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
49+
import org.apache.beam.sdk.values.CausedByDrain;
4950
import org.apache.beam.sdk.values.KV;
5051
import org.apache.beam.sdk.values.TupleTag;
5152
import org.apache.beam.sdk.values.WindowedValue;
@@ -702,7 +703,7 @@ public void onTimer(OnTimerContext context) {
702703
context.fireTimestamp(),
703704
context.timestamp(),
704705
context.timeDomain(),
705-
TimerData.CausedByDrain.NORMAL));
706+
CausedByDrain.NORMAL));
706707
}
707708
}
708709

runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
5757
import org.apache.beam.sdk.transforms.windowing.Window;
5858
import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
59+
import org.apache.beam.sdk.values.CausedByDrain;
5960
import org.apache.beam.sdk.values.KV;
6061
import org.apache.beam.sdk.values.PCollection;
6162
import org.apache.beam.sdk.values.PCollectionView;
@@ -318,7 +319,7 @@ public void testOnTimerCalled() {
318319
timestamp,
319320
timestamp,
320321
TimeDomain.EVENT_TIME,
321-
TimerData.CausedByDrain.NORMAL)));
322+
CausedByDrain.NORMAL)));
322323
}
323324

324325
private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
@@ -361,7 +362,7 @@ public <KeyT> void onTimer(
361362
timestamp,
362363
outputTimestamp,
363364
timeDomain,
364-
TimerData.CausedByDrain.NORMAL));
365+
CausedByDrain.NORMAL));
365366
}
366367

367368
@Override

0 commit comments

Comments
 (0)