Operator Repeat and other operator fixes#807
Operator Repeat and other operator fixes#807akarnokd wants to merge 1 commit intoReactiveX:masterfrom
Conversation
|
RxJava-pull-requests #733 FAILURE |
What is the unit test for this scenario? |
There was a problem hiding this comment.
What do you think about this accepting Notification instead of Action0? The reason is that we now have 2 wrappers ... the Notification and Action0 around a type T and 2 object allocations for each onNext.
The use of Action0 is definitely more generic, but as we've seen by your CompositeSubscription changes, we're at the point where we're moving away from generic to achieve performance and memory gains, and this class will be involved in very high throughput scenarios.
There was a problem hiding this comment.
Possible. I can't think of any scenario right now where Action0 is necessary. I'll update the code.
It breaks because The fix I pasted above resolves the |
There was a problem hiding this comment.
I like how the QueueDrain class abstracts this away. The concern as stated in another comment is the double wrapping of Action0 and Notification we now have.
There was a problem hiding this comment.
Will lock into the specialization.
|
The test case with Observable.range(1, 20000000).observeOn(Schedulers.newThread()).take(10).subscribe(System.out::println);It prints 1..10, but then it doesn't stop and spins up a lot of threads. Basically, the |
- merging changes from ReactiveX#807
|
I merged |
Isn't that because we haven't re-implemented This works: Observable.range(1, 20000000).take(10).toBlockingObservable().forEach(new Action1<Integer>() {
@Override
public void call(Integer t1) {
System.out.println(t1);
}
}); |
|
Do you intend on submitting updated code soon, or should I merge the observeOn changes as discussed above? |
|
I skip on this one. |
|
Okay, thanks @akarnokd |
Is the |
|
The issue was with observeOn+take together. This fixes take, but observeOn in its current form doesn't work. This spins: Observable.range(1, 20000000)
.observeOn(Schedulers.newThread())
.take(10).toBlockingObservable().forEach(System.out::println);I guess I need to revisit the issue once ObserveOn has been updated. |
|
That code works on the new |
|
Excellent. I'm looking at the new |
|
Thanks @akarnokd I would appreciate that. My brain is done for the night! |
Reimplemented repeat() which exposed some other problems.
QueueDrainbased on this.takewas unable to unsubscribe the upstream as there was nothing in itscs.OperationParallelMergeTestis broken with this PR. I don't quite understand why it doesn't work nor have a clue how to fix it or the newobserveOn. Maybe the parallel tests relied on thread timing and not expiring too fast so they could continue one them.Performance
from+repeat: 3.8MOps/s
from+repeat+observeOn: 1.5MOps/s
range: 31.8MOps/s
from+observeOn+repeat: 15 kOps/s