Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ protected RxRingBuffer createRingBuffer() {
/**
* Single producer, 2 consumers. The request() ensures it gets scheduled back on the same Producer thread.
*/
@Test(timeout = 2000)
@Test
public void testConcurrency() throws InterruptedException {
final RxRingBuffer b = createRingBuffer();
final CountDownLatch latch = new CountDownLatch(255);
final CountDownLatch emitLatch = new CountDownLatch(255);
final CountDownLatch drainLatch = new CountDownLatch(2);

final Scheduler.Worker w1 = Schedulers.newThread().createWorker();
Scheduler.Worker w2 = Schedulers.newThread().createWorker();
Expand All @@ -58,33 +59,32 @@ public void testConcurrency() throws InterruptedException {

@Override
public void request(final long n) {
System.out.println("request[" + c.incrementAndGet() + "]: " + n + " Thread: " + Thread.currentThread());
// System.out.println("request[" + c.incrementAndGet() + "]: " + n + " Thread: " + Thread.currentThread());
w1.schedule(new Action0() {

@Override
public void call() {
if (latch.getCount() == 0) {
if (emitLatch.getCount() == 0) {
return;
}
for (int i = 0; i < n; i++) {
try {
emit.incrementAndGet();
b.onNext("one");
emit.incrementAndGet();
} catch (MissingBackpressureException e) {
System.out.println("BackpressureException => item: " + i + " requested: " + n + " emit: " + emit.get() + " poll: " + poll.get());
backpressureExceptions.incrementAndGet();
}
}
// we'll release after n batches
latch.countDown();
emitLatch.countDown();
}

});
}

};
final TestSubscriber<String> ts = new TestSubscriber<String>();

w1.schedule(new Action0() {

@Override
Expand All @@ -95,7 +95,7 @@ public void call() {

});

w2.schedule(new Action0() {
Action0 drainer = new Action0() {

@Override
public void call() {
Expand All @@ -109,39 +109,30 @@ public void call() {
if (emitted > 0) {
ts.requestMore(emitted);
emitted = 0;
} else {
if (emitLatch.getCount() == 0) {
// this works with SynchronizedQueue, if changing to a non-blocking Queue
// then this will likely need to change like the SpmcTest version
drainLatch.countDown();
return;
}
}
}
}

}

});

w3.schedule(new Action0() {
};

@Override
public void call() {
int emitted = 0;
while (true) {
Object o = b.poll();
if (o != null) {
emitted++;
poll.incrementAndGet();
} else {
if (emitted > 0) {
ts.requestMore(emitted);
emitted = 0;
}
}
}
}
w2.schedule(drainer);
w3.schedule(drainer);

});
emitLatch.await();
drainLatch.await();

latch.await();
w1.unsubscribe();
w2.unsubscribe();
w3.unsubscribe();
w1.unsubscribe(); // put this one last as unsubscribing from it can cause Exceptions to be throw in w2/w3

System.out.println("emit: " + emit.get() + " poll: " + poll.get());
assertEquals(0, backpressureExceptions.get());
Expand Down