Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ReactiveX/RxJava
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 1.x
Choose a base ref
...
head repository: dmgd/RxJava
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 1.x
Choose a head ref
Able to merge. These branches can be automatically merged.
  • 1 commit
  • 1 file changed
  • 1 contributor

Commits on Oct 18, 2014

  1. make AbstractSchedulerConcurrencyTests.testUnSubscribeForScheduler() …

    …deterministic by running interval on a test scheduler rather than the computation scheduler
    dmgd committed Oct 18, 2014
    Copy the full SHA
    e95d4a4 View commit details
Showing with 16 additions and 9 deletions.
  1. +16 −9 src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java
25 changes: 16 additions & 9 deletions src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java
Original file line number Diff line number Diff line change
@@ -54,16 +54,24 @@ public abstract class AbstractSchedulerConcurrencyTests extends AbstractSchedule
public final void testUnSubscribeForScheduler() throws InterruptedException {
final AtomicInteger countReceived = new AtomicInteger();
final AtomicInteger countGenerated = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch subscribed = new CountDownLatch(1);
final CountDownLatch unsubscribed = new CountDownLatch(1);

Observable.interval(50, TimeUnit.MILLISECONDS)
TestScheduler testScheduler = Schedulers.test();
Observable.interval(50, TimeUnit.MILLISECONDS, testScheduler)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
countGenerated.incrementAndGet();
return aLong;
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
subscribed.countDown();
}
})
.subscribeOn(getScheduler())
.observeOn(getScheduler())
.subscribe(new Subscriber<Long>() {
@@ -79,19 +87,18 @@ public void onError(Throwable e) {

@Override
public void onNext(Long args) {
System.out.println("==> Received " + args);
if (countReceived.incrementAndGet() == 2) {
unsubscribe();
latch.countDown();
unsubscribed.countDown();
}
System.out.println("==> Received " + args);
}
});

latch.await(1000, TimeUnit.MILLISECONDS);

System.out.println("----------- it thinks it is finished ------------------ ");
Thread.sleep(100);

subscribed.await(1000, TimeUnit.MILLISECONDS);
testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
unsubscribed.await(1000, TimeUnit.MILLISECONDS);
testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
assertEquals(2, countGenerated.get());
}