diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 0c7e64802e..e5eccfe598 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -48,7 +48,7 @@ import rx.observers.SafeSubscriber; import rx.operators.OnSubscribeFromIterable; import rx.operators.OnSubscribeRange; -import rx.operators.OperationAll; +import rx.operators.OperatorAll; import rx.operators.OperationAny; import rx.operators.OperationAsObservable; import rx.operators.OperationBuffer; @@ -2941,7 +2941,7 @@ public final static Observable zip(Ob * @see RxJava Wiki: all() */ public final Observable all(Func1 predicate) { - return create(OperationAll.all(this, predicate)); + return lift(new OperatorAll(predicate)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationAll.java b/rxjava-core/src/main/java/rx/operators/OperationAll.java deleted file mode 100644 index 9ad2d942ad..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationAll.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Func1; - -/** - * Returns an Observable that emits a Boolean that indicates whether all items emitted by an - * Observable satisfy a condition. - *

- * - */ -public class OperationAll { - - public static OnSubscribeFunc all(Observable sequence, Func1 predicate) { - return new AllObservable(sequence, predicate); - } - - private static class AllObservable implements OnSubscribeFunc { - private final Observable sequence; - private final Func1 predicate; - - private AllObservable(Observable sequence, Func1 predicate) { - this.sequence = sequence; - this.predicate = predicate; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - return sequence.unsafeSubscribe(new AllObserver(observer)); - - } - - private class AllObserver extends Subscriber { - private final Observer underlying; - - private final AtomicBoolean status = new AtomicBoolean(true); - - public AllObserver(Observer underlying) { - this.underlying = underlying; - } - - @Override - public void onCompleted() { - if (status.get()) { - underlying.onNext(true); - underlying.onCompleted(); - } - } - - @Override - public void onError(Throwable e) { - underlying.onError(e); - } - - @Override - public void onNext(T args) { - boolean result = predicate.call(args); - boolean changed = status.compareAndSet(true, result); - - if (changed && !result) { - underlying.onNext(false); - underlying.onCompleted(); - unsubscribe(); - } - } - } - - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorAll.java b/rxjava-core/src/main/java/rx/operators/OperatorAll.java new file mode 100644 index 0000000000..6539296fe7 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorAll.java @@ -0,0 +1,66 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable.Operator; +import rx.Subscriber; +import rx.functions.Func1; + +/** + * Returns an Observable that emits a Boolean that indicates whether all items emitted by an + * Observable satisfy a condition. + *

+ * + */ +public final class OperatorAll implements Operator { + private final Func1 predicate; + + public OperatorAll(Func1 predicate) { + this.predicate = predicate; + } + + @Override + public Subscriber call(final Subscriber child) { + return new Subscriber(child) { + boolean done; + + @Override + public void onNext(T t) { + boolean result = predicate.call(t); + if (!result && !done) { + done = true; + child.onNext(false); + child.onCompleted(); + unsubscribe(); + } + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onCompleted() { + if (!done) { + done = true; + child.onNext(true); + child.onCompleted(); + } + } + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationAllTest.java b/rxjava-core/src/test/java/rx/operators/OperatorAllTest.java similarity index 84% rename from rxjava-core/src/test/java/rx/operators/OperationAllTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorAllTest.java index c76fad7a13..a36933ce27 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationAllTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorAllTest.java @@ -18,7 +18,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static rx.operators.OperationAll.all; import org.junit.Test; @@ -26,7 +25,7 @@ import rx.Observer; import rx.functions.Func1; -public class OperationAllTest { +public class OperatorAllTest { @Test @SuppressWarnings("unchecked") @@ -34,12 +33,12 @@ public void testAll() { Observable obs = Observable.from("one", "two", "six"); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { + obs.all(new Func1() { @Override public Boolean call(String s) { return s.length() == 3; } - })).subscribe(observer); + }).subscribe(observer); verify(observer).onNext(true); verify(observer).onCompleted(); @@ -52,12 +51,12 @@ public void testNotAll() { Observable obs = Observable.from("one", "two", "three", "six"); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { + obs.all(new Func1() { @Override public Boolean call(String s) { return s.length() == 3; } - })).subscribe(observer); + }).subscribe(observer); verify(observer).onNext(false); verify(observer).onCompleted(); @@ -70,12 +69,12 @@ public void testEmpty() { Observable obs = Observable.empty(); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { + obs.all(new Func1() { @Override public Boolean call(String s) { return s.length() == 3; } - })).subscribe(observer); + }).subscribe(observer); verify(observer).onNext(true); verify(observer).onCompleted(); @@ -89,12 +88,12 @@ public void testError() { Observable obs = Observable.error(error); Observer observer = mock(Observer.class); - Observable.create(all(obs, new Func1() { + obs.all(new Func1() { @Override public Boolean call(String s) { return s.length() == 3; } - })).subscribe(observer); + }).subscribe(observer); verify(observer).onError(error); verifyNoMoreInteractions(observer);