Reimplemented the 'reduce' operator#436
Conversation
|
RxJava-pull-requests #348 SUCCESS |
There was a problem hiding this comment.
Why does this need to be synchronized? We have a single Observable sequence that will not interleave onNext calls (by contract) so I don't think we need this here.
What is the overhead of
Can we not achieve this check without re-implementing the entire operator? This duplicates logic between The guiding principle for this decision is: |
|
Hi, @benjchristensen I removed the synchronized. However, I'm still confused about concurrency in RxJava. Could you review my discussion in #417 and help me solve the problems? I also updated the unit tests. Finally, I reviewed the current operators and could not find a way to throw an exception when the observable is empty. Could you provide some suggestion? |
|
RxJava-pull-requests #360 FAILURE |
|
RxJava-pull-requests #361 FAILURE |
|
RxJava-pull-requests #362 FAILURE |
Does this do what you need? .flatMap( o -> {
if(good) {
return Observable.just(goodValue);
} else {
// if bad ... return an error
return Observable.error(new RuntimeException());
}
}) |
|
Sorry that I still have no idea about how to use |
|
Maybe it would be nice to have an |
|
Materialize and dematerialize are good tool for implementing operators that have behavior based on onError and onCompleted. x = from([])
hadValue = false;
x.materialize().map({ n ->
if (n.kind == Notification.Kind.OnNext) {
hadValue = true
}
else if (n.kind == Notification.Kind.OnCompleted && !hadValue) {
return new Notification(new Exception())
}
return n
}).dematerialize() |
|
Thanks, @abersnaze . It works. @samuelgruetter , I added an private |
|
RxJava-pull-requests #375 FAILURE |
|
I just had this idea. replace the takeLast(1) with takeLast(2) and use a second scan to figure out what to do at the end of the sequence. public Observable<T> reduce(Func2<T, T, T> accumulator) {
Func2<Notification<T>, Notification<T>, Notification<T>> func = new Func2<Notification<T>, Notification<T>, Notification<T>>() {
@Override
public Notification<T> call(Notification<T> value, Notification<T> end) {
if (end.isOnError())
return end;
if (value == null)
return new Notification<T>(new UnsupportedOperationException("Can not apply on an empty sequence"));
return value;
}
};
return create(OperationScan.scan(this, accumulator)).materialize().takeLast(2).scan(null, func).dematerialize();
} |
|
Sounds good. But I'm not sure if it's better that using one more |
|
I updated this PR to use the new |
|
RxJava-pull-requests #397 FAILURE |
|
It has been fixed in #474 |

Hi,
I reimplemented the 'reduce' operator. The improvements are as follow: