From 0f19eb2225e9bb26abde3fb6d2378b69bb3e792c Mon Sep 17 00:00:00 2001 From: MarkVanDerVoort Date: Sat, 8 Mar 2014 22:17:59 +0100 Subject: [PATCH 1/3] OperationAll to OperatorAll --- rxjava-core/src/main/java/rx/Observable.java | 85 +---------------- .../main/java/rx/operators/OperationAll.java | 91 ------------------- .../main/java/rx/operators/OperatorAll.java | 58 ++++++++++++ ...ationAllTest.java => OperatorAllTest.java} | 50 ++++------ 4 files changed, 79 insertions(+), 205 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationAll.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorAll.java rename rxjava-core/src/test/java/rx/operators/{OperationAllTest.java => OperatorAllTest.java} (60%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 42da02f62b..06cf28cc9b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -49,88 +49,7 @@ import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; import rx.observers.SafeSubscriber; -import rx.operators.OnSubscribeFromIterable; -import rx.operators.OnSubscribeRange; -import rx.operators.OperationAll; -import rx.operators.OperationAmb; -import rx.operators.OperationAny; -import rx.operators.OperationAsObservable; -import rx.operators.OperationAverage; -import rx.operators.OperationBuffer; -import rx.operators.OperationCache; -import rx.operators.OperationCombineLatest; -import rx.operators.OperationConcat; -import rx.operators.OperationDebounce; -import rx.operators.OperationDefaultIfEmpty; -import rx.operators.OperationDefer; -import rx.operators.OperationDelay; -import rx.operators.OperationDematerialize; -import rx.operators.OperationDistinct; -import rx.operators.OperationDistinctUntilChanged; -import rx.operators.OperationElementAt; -import rx.operators.OperationFinally; -import rx.operators.OperationFlatMap; -import rx.operators.OperationGroupByUntil; -import rx.operators.OperationGroupJoin; -import rx.operators.OperationInterval; -import rx.operators.OperationJoin; -import rx.operators.OperationJoinPatterns; -import rx.operators.OperationMaterialize; -import rx.operators.OperationMergeDelayError; -import rx.operators.OperationMergeMaxConcurrent; -import rx.operators.OperationMinMax; -import rx.operators.OperationMulticast; -import rx.operators.OperationOnErrorResumeNextViaObservable; -import rx.operators.OperationOnErrorReturn; -import rx.operators.OperationOnExceptionResumeNextViaObservable; -import rx.operators.OperationParallelMerge; -import rx.operators.OperationReplay; -import rx.operators.OperationSample; -import rx.operators.OperationSequenceEqual; -import rx.operators.OperationSingle; -import rx.operators.OperationSkip; -import rx.operators.OperationSkipLast; -import rx.operators.OperationSkipUntil; -import rx.operators.OperationSkipWhile; -import rx.operators.OperationSum; -import rx.operators.OperationSwitch; -import rx.operators.OperationSynchronize; -import rx.operators.OperationTakeLast; -import rx.operators.OperationTakeTimed; -import rx.operators.OperationTakeUntil; -import rx.operators.OperationTakeWhile; -import rx.operators.OperationThrottleFirst; -import rx.operators.OperationTimeInterval; -import rx.operators.OperationTimer; -import rx.operators.OperationToMap; -import rx.operators.OperationToMultimap; -import rx.operators.OperationToObservableFuture; -import rx.operators.OperationUsing; -import rx.operators.OperationWindow; -import rx.operators.OperatorCast; -import rx.operators.OperatorDoOnEach; -import rx.operators.OperatorFilter; -import rx.operators.OperatorGroupBy; -import rx.operators.OperatorMap; -import rx.operators.OperatorMerge; -import rx.operators.OperatorObserveOn; -import rx.operators.OperatorOnErrorFlatMap; -import rx.operators.OperatorOnErrorResumeNextViaFunction; -import rx.operators.OperatorParallel; -import rx.operators.OperatorRepeat; -import rx.operators.OperatorRetry; -import rx.operators.OperatorScan; -import rx.operators.OperatorSkip; -import rx.operators.OperatorSubscribeOn; -import rx.operators.OperatorTake; -import rx.operators.OperatorTimeout; -import rx.operators.OperatorTimeoutWithSelector; -import rx.operators.OperatorTimestamp; -import rx.operators.OperatorToObservableList; -import rx.operators.OperatorToObservableSortedList; -import rx.operators.OperatorUnsubscribeOn; -import rx.operators.OperatorZip; -import rx.operators.OperatorZipIterable; +import rx.operators.*; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; @@ -3461,7 +3380,7 @@ public final Observable aggregate(R initialValue, Func2 * @see RxJava Wiki: all() */ public final Observable all(Func1 predicate) { - return create(OperationAll.all(this, predicate)); + return lift(new OperatorAll(predicate)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationAll.java b/rxjava-core/src/main/java/rx/operators/OperationAll.java deleted file mode 100644 index 681486f13c..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationAll.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; -import rx.functions.Func1; - -/** - * Returns an Observable that emits a Boolean that indicates whether all items emitted by an - * Observable satisfy a condition. - *

- * - */ -public class OperationAll { - - public static OnSubscribeFunc all(Observable sequence, Func1 predicate) { - return new AllObservable(sequence, predicate); - } - - private static class AllObservable implements OnSubscribeFunc { - private final Observable sequence; - private final Func1 predicate; - - private final SafeObservableSubscription subscription = new SafeObservableSubscription(); - - private AllObservable(Observable sequence, Func1 predicate) { - this.sequence = sequence; - this.predicate = predicate; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - return subscription.wrap(sequence.subscribe(new AllObserver(observer))); - - } - - private class AllObserver implements Observer { - private final Observer underlying; - - private final AtomicBoolean status = new AtomicBoolean(true); - - public AllObserver(Observer underlying) { - this.underlying = underlying; - } - - @Override - public void onCompleted() { - if (status.get()) { - underlying.onNext(true); - underlying.onCompleted(); - } - } - - @Override - public void onError(Throwable e) { - underlying.onError(e); - } - - @Override - public void onNext(T args) { - boolean result = predicate.call(args); - boolean changed = status.compareAndSet(true, result); - - if (changed && !result) { - underlying.onNext(false); - underlying.onCompleted(); - subscription.unsubscribe(); - } - } - } - - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorAll.java b/rxjava-core/src/main/java/rx/operators/OperatorAll.java new file mode 100644 index 0000000000..b1b0535127 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorAll.java @@ -0,0 +1,58 @@ +package rx.operators; + +import rx.Observable; +import rx.Subscriber; +import rx.exceptions.OnErrorThrowable; +import rx.functions.Func1; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static rx.Observable.Operator; + +/** + * Returns an Observable that emits a Boolean that indicates whether all items emitted by an + * Observable satisfy a condition. + *

+ * + */ +public class OperatorAll implements Operator{ + + private final Func1 predicate; + + public OperatorAll(Func1 predicate) { + this.predicate = predicate; + } + + @Override + public Subscriber call(final Subscriber child) { + return new Subscriber() { + private AtomicBoolean status = new AtomicBoolean(true); + + @Override + public void onCompleted() { + child.onNext(status.get()); + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(T t) { + try { + final Boolean result = predicate.call(t); + boolean changed = status.compareAndSet(true, result); + + if (changed && !result) { + child.onNext(false); + child.onCompleted(); + } + } catch (Throwable e) { + child.onError(OnErrorThrowable.addValueAsLastCause(e,t)); + } + } + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationAllTest.java b/rxjava-core/src/test/java/rx/operators/OperatorAllTest.java similarity index 60% rename from rxjava-core/src/test/java/rx/operators/OperationAllTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorAllTest.java index e7940a0583..f97504f884 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationAllTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorAllTest.java @@ -16,28 +16,31 @@ package rx.operators; import static org.mockito.Mockito.*; -import static rx.operators.OperationAll.*; - import org.junit.Test; import rx.Observable; import rx.Observer; import rx.functions.Func1; +import rx.observers.TestSubscriber; + +import java.util.Arrays; + +public class OperatorAllTest { -public class OperationAllTest { + final Func1 hasLength3 = new Func1() { + @Override + public Boolean call(String s) { + return s.length() == 3; + } + }; @Test @SuppressWarnings("unchecked") public void testAll() { - Observable obs = Observable.from("one", "two", "six"); + Observable obs = Observable.from(Arrays.asList("one", "two", "six")).all(hasLength3); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { - @Override - public Boolean call(String s) { - return s.length() == 3; - } - })).subscribe(observer); + obs.subscribe(new TestSubscriber(observer)); verify(observer).onNext(true); verify(observer).onCompleted(); @@ -47,15 +50,10 @@ public Boolean call(String s) { @Test @SuppressWarnings("unchecked") public void testNotAll() { - Observable obs = Observable.from("one", "two", "three", "six"); + Observable obs = Observable.from(Arrays.asList("one", "two", "three", "six")).all(hasLength3); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { - @Override - public Boolean call(String s) { - return s.length() == 3; - } - })).subscribe(observer); + obs.subscribe(new TestSubscriber(observer)); verify(observer).onNext(false); verify(observer).onCompleted(); @@ -65,15 +63,10 @@ public Boolean call(String s) { @Test @SuppressWarnings("unchecked") public void testEmpty() { - Observable obs = Observable.empty(); + Observable obs = Observable.empty().all(hasLength3); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { - @Override - public Boolean call(String s) { - return s.length() == 3; - } - })).subscribe(observer); + obs.subscribe(new TestSubscriber(observer)); verify(observer).onNext(true); verify(observer).onCompleted(); @@ -84,15 +77,10 @@ public Boolean call(String s) { @SuppressWarnings("unchecked") public void testError() { Throwable error = new Throwable(); - Observable obs = Observable.error(error); + Observable obs = Observable.error(error).all(hasLength3); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { - @Override - public Boolean call(String s) { - return s.length() == 3; - } - })).subscribe(observer); + obs.subscribe(new TestSubscriber(observer)); verify(observer).onError(error); verifyNoMoreInteractions(observer); From f4d468fc1dac20244e5ec6631cfcc1274ae43e24 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 22 Apr 2014 10:56:11 +0200 Subject: [PATCH 2/3] OperatorAll --- rxjava-core/src/main/java/rx/Observable.java | 4 +- .../main/java/rx/operators/OperationAll.java | 90 ------------------- .../main/java/rx/operators/OperatorAll.java | 73 +++++++++++++++ ...ationAllTest.java => OperatorAllTest.java} | 19 ++-- 4 files changed, 84 insertions(+), 102 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationAll.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorAll.java rename rxjava-core/src/test/java/rx/operators/{OperationAllTest.java => OperatorAllTest.java} (84%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 0c7e64802e..e5eccfe598 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -48,7 +48,7 @@ import rx.observers.SafeSubscriber; import rx.operators.OnSubscribeFromIterable; import rx.operators.OnSubscribeRange; -import rx.operators.OperationAll; +import rx.operators.OperatorAll; import rx.operators.OperationAny; import rx.operators.OperationAsObservable; import rx.operators.OperationBuffer; @@ -2941,7 +2941,7 @@ public final static Observable zip(Ob * @see RxJava Wiki: all() */ public final Observable all(Func1 predicate) { - return create(OperationAll.all(this, predicate)); + return lift(new OperatorAll(predicate)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationAll.java b/rxjava-core/src/main/java/rx/operators/OperationAll.java deleted file mode 100644 index 9ad2d942ad..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationAll.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Func1; - -/** - * Returns an Observable that emits a Boolean that indicates whether all items emitted by an - * Observable satisfy a condition. - *

- * - */ -public class OperationAll { - - public static OnSubscribeFunc all(Observable sequence, Func1 predicate) { - return new AllObservable(sequence, predicate); - } - - private static class AllObservable implements OnSubscribeFunc { - private final Observable sequence; - private final Func1 predicate; - - private AllObservable(Observable sequence, Func1 predicate) { - this.sequence = sequence; - this.predicate = predicate; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - return sequence.unsafeSubscribe(new AllObserver(observer)); - - } - - private class AllObserver extends Subscriber { - private final Observer underlying; - - private final AtomicBoolean status = new AtomicBoolean(true); - - public AllObserver(Observer underlying) { - this.underlying = underlying; - } - - @Override - public void onCompleted() { - if (status.get()) { - underlying.onNext(true); - underlying.onCompleted(); - } - } - - @Override - public void onError(Throwable e) { - underlying.onError(e); - } - - @Override - public void onNext(T args) { - boolean result = predicate.call(args); - boolean changed = status.compareAndSet(true, result); - - if (changed && !result) { - underlying.onNext(false); - underlying.onCompleted(); - unsubscribe(); - } - } - } - - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorAll.java b/rxjava-core/src/main/java/rx/operators/OperatorAll.java new file mode 100644 index 0000000000..7b1124836f --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorAll.java @@ -0,0 +1,73 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.atomic.AtomicBoolean; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observable.Operator; +import rx.Observer; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Func1; + +/** + * Returns an Observable that emits a Boolean that indicates whether all items emitted by an + * Observable satisfy a condition. + *

+ * + */ +public final class OperatorAll implements Operator { + private final Func1 predicate; + + public OperatorAll(Func1 predicate) { + this.predicate = predicate; + } + + @Override + public Subscriber call(final Subscriber child) { + Subscriber s = new Subscriber() { + boolean done; + @Override + public void onNext(T t) { + boolean result = predicate.call(t); + if (!result && !done) { + done = true; + child.onNext(false); + child.onCompleted(); + unsubscribe(); + } + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onCompleted() { + if (!done) { + done = true; + child.onNext(true); + child.onCompleted(); + } + } + }; + child.add(s); + return s; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationAllTest.java b/rxjava-core/src/test/java/rx/operators/OperatorAllTest.java similarity index 84% rename from rxjava-core/src/test/java/rx/operators/OperationAllTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorAllTest.java index c76fad7a13..a36933ce27 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationAllTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorAllTest.java @@ -18,7 +18,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static rx.operators.OperationAll.all; import org.junit.Test; @@ -26,7 +25,7 @@ import rx.Observer; import rx.functions.Func1; -public class OperationAllTest { +public class OperatorAllTest { @Test @SuppressWarnings("unchecked") @@ -34,12 +33,12 @@ public void testAll() { Observable obs = Observable.from("one", "two", "six"); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { + obs.all(new Func1() { @Override public Boolean call(String s) { return s.length() == 3; } - })).subscribe(observer); + }).subscribe(observer); verify(observer).onNext(true); verify(observer).onCompleted(); @@ -52,12 +51,12 @@ public void testNotAll() { Observable obs = Observable.from("one", "two", "three", "six"); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { + obs.all(new Func1() { @Override public Boolean call(String s) { return s.length() == 3; } - })).subscribe(observer); + }).subscribe(observer); verify(observer).onNext(false); verify(observer).onCompleted(); @@ -70,12 +69,12 @@ public void testEmpty() { Observable obs = Observable.empty(); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { + obs.all(new Func1() { @Override public Boolean call(String s) { return s.length() == 3; } - })).subscribe(observer); + }).subscribe(observer); verify(observer).onNext(true); verify(observer).onCompleted(); @@ -89,12 +88,12 @@ public void testError() { Observable obs = Observable.error(error); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { + obs.all(new Func1() { @Override public Boolean call(String s) { return s.length() == 3; } - })).subscribe(observer); + }).subscribe(observer); verify(observer).onError(error); verifyNoMoreInteractions(observer); From dea5af5c50ddbeb16adc0c8d7bee7efde9946a67 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 22 Apr 2014 20:57:02 -0700 Subject: [PATCH 3/3] Simplify Subscription Chain --- .../src/main/java/rx/operators/OperatorAll.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperatorAll.java b/rxjava-core/src/main/java/rx/operators/OperatorAll.java index 7b1124836f..6539296fe7 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorAll.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorAll.java @@ -15,14 +15,8 @@ */ package rx.operators; -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; import rx.Observable.Operator; -import rx.Observer; import rx.Subscriber; -import rx.Subscription; import rx.functions.Func1; /** @@ -40,8 +34,9 @@ public OperatorAll(Func1 predicate) { @Override public Subscriber call(final Subscriber child) { - Subscriber s = new Subscriber() { + return new Subscriber(child) { boolean done; + @Override public void onNext(T t) { boolean result = predicate.call(t); @@ -67,7 +62,5 @@ public void onCompleted() { } } }; - child.add(s); - return s; } }