diff --git a/rxjava-core/src/test/java/rx/internal/util/RxRingBufferWithoutUnsafeTest.java b/rxjava-core/src/test/java/rx/internal/util/RxRingBufferWithoutUnsafeTest.java index 02ee73969b..b146460a10 100644 --- a/rxjava-core/src/test/java/rx/internal/util/RxRingBufferWithoutUnsafeTest.java +++ b/rxjava-core/src/test/java/rx/internal/util/RxRingBufferWithoutUnsafeTest.java @@ -39,10 +39,11 @@ protected RxRingBuffer createRingBuffer() { /** * Single producer, 2 consumers. The request() ensures it gets scheduled back on the same Producer thread. */ - @Test(timeout = 2000) + @Test public void testConcurrency() throws InterruptedException { final RxRingBuffer b = createRingBuffer(); - final CountDownLatch latch = new CountDownLatch(255); + final CountDownLatch emitLatch = new CountDownLatch(255); + final CountDownLatch drainLatch = new CountDownLatch(2); final Scheduler.Worker w1 = Schedulers.newThread().createWorker(); Scheduler.Worker w2 = Schedulers.newThread().createWorker(); @@ -58,25 +59,25 @@ public void testConcurrency() throws InterruptedException { @Override public void request(final long n) { - System.out.println("request[" + c.incrementAndGet() + "]: " + n + " Thread: " + Thread.currentThread()); + // System.out.println("request[" + c.incrementAndGet() + "]: " + n + " Thread: " + Thread.currentThread()); w1.schedule(new Action0() { @Override public void call() { - if (latch.getCount() == 0) { + if (emitLatch.getCount() == 0) { return; } for (int i = 0; i < n; i++) { try { - emit.incrementAndGet(); b.onNext("one"); + emit.incrementAndGet(); } catch (MissingBackpressureException e) { System.out.println("BackpressureException => item: " + i + " requested: " + n + " emit: " + emit.get() + " poll: " + poll.get()); backpressureExceptions.incrementAndGet(); } } // we'll release after n batches - latch.countDown(); + emitLatch.countDown(); } }); @@ -84,7 +85,6 @@ public void call() { }; final TestSubscriber ts = new TestSubscriber(); - w1.schedule(new Action0() { @Override @@ -95,7 +95,7 @@ public void call() { }); - w2.schedule(new Action0() { + Action0 drainer = new Action0() { @Override public void call() { @@ -109,39 +109,30 @@ public void call() { if (emitted > 0) { ts.requestMore(emitted); emitted = 0; + } else { + if (emitLatch.getCount() == 0) { + // this works with SynchronizedQueue, if changing to a non-blocking Queue + // then this will likely need to change like the SpmcTest version + drainLatch.countDown(); + return; + } } } } } - }); - - w3.schedule(new Action0() { + }; - @Override - public void call() { - int emitted = 0; - while (true) { - Object o = b.poll(); - if (o != null) { - emitted++; - poll.incrementAndGet(); - } else { - if (emitted > 0) { - ts.requestMore(emitted); - emitted = 0; - } - } - } - } + w2.schedule(drainer); + w3.schedule(drainer); - }); + emitLatch.await(); + drainLatch.await(); - latch.await(); - w1.unsubscribe(); w2.unsubscribe(); w3.unsubscribe(); + w1.unsubscribe(); // put this one last as unsubscribing from it can cause Exceptions to be throw in w2/w3 System.out.println("emit: " + emit.get() + " poll: " + poll.get()); assertEquals(0, backpressureExceptions.get());