From 89f3218d84ef679c661b01822d110477f2b06ab1 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 10 Sep 2013 21:36:15 -0700 Subject: [PATCH 1/2] Remove covariance of scan/reduce since consumer/producer are the same See https://github.com/Netflix/RxJava/issues/360#issuecomment-24203016 --- rxjava-core/src/main/java/rx/Observable.java | 6 +-- .../main/java/rx/operators/OperationScan.java | 2 +- .../src/test/java/rx/CovarianceTest.java | 30 +++++++++++++++ rxjava-core/src/test/java/rx/ReduceTests.java | 38 ++++++++++--------- 4 files changed, 55 insertions(+), 21 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 2ac0f42754..41d40e6217 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -3021,7 +3021,7 @@ public Observable onErrorReturn(Func1 resumeFunction) * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ - public Observable reduce(Func2 accumulator) { + public Observable reduce(Func2 accumulator) { return create(OperationScan.scan(this, accumulator)).takeLast(1); } @@ -3166,7 +3166,7 @@ public ConnectableObservable publish() { * * @see #reduce(Func2) */ - public Observable aggregate(Func2 accumulator) { + public Observable aggregate(Func2 accumulator) { return reduce(accumulator); } @@ -3227,7 +3227,7 @@ public Observable aggregate(R initialValue, Func2MSDN: Observable.Scan */ - public Observable scan(Func2 accumulator) { + public Observable scan(Func2 accumulator) { return create(OperationScan.scan(this, accumulator)); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java index fae739dfd8..b71c804729 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperationScan.java @@ -70,7 +70,7 @@ public static OnSubscribeFunc scan(Observable sequence, R * @return An observable sequence whose elements are the result of accumulating the output from the list of Observables. * @see Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource)) */ - public static OnSubscribeFunc scan(Observable sequence, Func2 accumulator) { + public static OnSubscribeFunc scan(Observable sequence, Func2 accumulator) { return new AccuWithoutInitialValue(sequence, accumulator); } diff --git a/rxjava-core/src/test/java/rx/CovarianceTest.java b/rxjava-core/src/test/java/rx/CovarianceTest.java index 69110b6c6a..01f2030c52 100644 --- a/rxjava-core/src/test/java/rx/CovarianceTest.java +++ b/rxjava-core/src/test/java/rx/CovarianceTest.java @@ -4,6 +4,8 @@ import org.junit.Test; +import rx.util.functions.Func2; + /** * Test super/extends of generics. * @@ -21,6 +23,25 @@ public void testCovarianceOfFrom() { // Observable.from(new Movie()); // may not compile } + @Test + public void testSortedList() { + Func2 SORT_FUNCTION = new Func2() { + + @Override + public Integer call(Media t1, Media t2) { + return 1; + } + }; + + // this one would work without the covariance generics + Observable o = Observable.from(new Movie(), new TVSeason(), new Album()); + o.toSortedList(SORT_FUNCTION); + + // this one would NOT work without the covariance generics + Observable o2 = Observable.from(new Movie(), new ActionMovie(), new HorrorMovie()); + o2.toSortedList(SORT_FUNCTION); + } + /* * Most tests are moved into their applicable classes such as [Operator]Tests.java */ @@ -34,6 +55,15 @@ static class Movie extends Media { static class HorrorMovie extends Movie { } + static class ActionMovie extends Movie { + } + + static class Album extends Media { + } + + static class TVSeason extends Media { + } + static class Rating { } diff --git a/rxjava-core/src/test/java/rx/ReduceTests.java b/rxjava-core/src/test/java/rx/ReduceTests.java index 1f7ea5550c..b812ba3638 100644 --- a/rxjava-core/src/test/java/rx/ReduceTests.java +++ b/rxjava-core/src/test/java/rx/ReduceTests.java @@ -25,6 +25,7 @@ public Integer call(Integer t1, Integer t2) { assertEquals(6, value); } + @SuppressWarnings("unused") @Test public void reduceWithObjects() { Observable horrorMovies = Observable. from(new HorrorMovie()); @@ -41,9 +42,15 @@ public Movie call(Movie t1, Movie t2) { Observable reduceResult2 = horrorMovies.reduce(chooseSecondMovie); } + /** + * Reduce consumes and produces T so can't do covariance. + * + * https://github.com/Netflix/RxJava/issues/360#issuecomment-24203016 + */ + @SuppressWarnings("unused") @Test public void reduceWithCovariantObjects() { - Observable horrorMovies = Observable.from(new HorrorMovie()); + Observable horrorMovies = Observable. from(new HorrorMovie()); Func2 chooseSecondMovie = new Func2() { @@ -52,24 +59,25 @@ public Movie call(Movie t1, Movie t2) { } }; - Observable reduceResult = Observable.create(OperationScan.scan(horrorMovies, chooseSecondMovie)).takeLast(1); - - //TODO this isn't compiling - // Observable reduceResult2 = horrorMovies.reduce(chooseSecondMovie); + Observable reduceResult2 = horrorMovies.reduce(chooseSecondMovie); } + /** + * Reduce consumes and produces T so can't do covariance. + * + * https://github.com/Netflix/RxJava/issues/360#issuecomment-24203016 + */ @Test public void reduceCovariance() { - Observable horrorMovies = Observable.from(new HorrorMovie()); - - // do something with horrorMovies, relying on the fact that all are HorrorMovies - // and not just any Movies... - - // pass it to library (works because it takes Observable) + // must type it to + Observable horrorMovies = Observable. from(new HorrorMovie()); libraryFunctionActingOnMovieObservables(horrorMovies); } - public void libraryFunctionActingOnMovieObservables(Observable obs) { + /* + * This accepts instead of since `reduce` can't handle covariants + */ + public void libraryFunctionActingOnMovieObservables(Observable obs) { Func2 chooseSecondMovie = new Func2() { public Movie call(Movie t1, Movie t2) { @@ -77,11 +85,7 @@ public Movie call(Movie t1, Movie t2) { } }; - Observable reduceResult = Observable.create(OperationScan.scan(obs, chooseSecondMovie)).takeLast(1); - - //TODO this isn't compiling - // Observable reduceResult2 = obs.reduce(chooseSecondMovie); - // do something with reduceResult... + obs.reduce(chooseSecondMovie); } } From 3501fe74ad2588757e1b4d72ca321990a70610dd Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 11 Sep 2013 09:52:16 -0700 Subject: [PATCH 2/2] Simplify reduce/scan generics Updates based on discussion at https://github.com/Netflix/RxJava/pull/369#issuecomment-24255958 --- rxjava-core/src/main/java/rx/Observable.java | 6 +++--- .../src/main/java/rx/operators/OperationScan.java | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 41d40e6217..e1e1e59eea 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -3193,7 +3193,7 @@ public Observable aggregate(Func2 accumulator) { * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ - public Observable reduce(R initialValue, Func2 accumulator) { + public Observable reduce(R initialValue, Func2 accumulator) { return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1); } @@ -3204,7 +3204,7 @@ public Observable reduce(R initialValue, Func2 Observable aggregate(R initialValue, Func2 accumulator) { + public Observable aggregate(R initialValue, Func2 accumulator) { return reduce(initialValue, accumulator); } @@ -3288,7 +3288,7 @@ public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { * @return an Observable that emits the results of each call to the accumulator function * @see MSDN: Observable.Scan */ - public Observable scan(R initialValue, Func2 accumulator) { + public Observable scan(R initialValue, Func2 accumulator) { return create(OperationScan.scan(this, initialValue, accumulator)); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java index b71c804729..513646b8ef 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperationScan.java @@ -55,7 +55,7 @@ public final class OperationScan { * @return An observable sequence whose elements are the result of accumulating the output from the list of Observables. * @see Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, TAccumulate)) */ - public static OnSubscribeFunc scan(Observable sequence, R initialValue, Func2 accumulator) { + public static OnSubscribeFunc scan(Observable sequence, R initialValue, Func2 accumulator) { return new Accumulator(sequence, initialValue, accumulator); } @@ -76,11 +76,11 @@ public static OnSubscribeFunc scan(Observable sequence, Func private static class AccuWithoutInitialValue implements OnSubscribeFunc { private final Observable sequence; - private final Func2 accumulatorFunction; + private final Func2 accumulatorFunction; private AccumulatingObserver accumulatingObserver; - private AccuWithoutInitialValue(Observable sequence, Func2 accumulator) { + private AccuWithoutInitialValue(Observable sequence, Func2 accumulator) { this.sequence = sequence; this.accumulatorFunction = accumulator; } @@ -116,9 +116,9 @@ public void onCompleted() { private static class Accumulator implements OnSubscribeFunc { private final Observable sequence; private final R initialValue; - private final Func2 accumulatorFunction; + private final Func2 accumulatorFunction; - private Accumulator(Observable sequence, R initialValue, Func2 accumulator) { + private Accumulator(Observable sequence, R initialValue, Func2 accumulator) { this.sequence = sequence; this.initialValue = initialValue; this.accumulatorFunction = accumulator; @@ -133,11 +133,11 @@ public Subscription onSubscribe(final Observer observer) { private static class AccumulatingObserver implements Observer { private final Observer observer; - private final Func2 accumulatorFunction; + private final Func2 accumulatorFunction; private R acc; - private AccumulatingObserver(Observer observer, R initialValue, Func2 accumulator) { + private AccumulatingObserver(Observer observer, R initialValue, Func2 accumulator) { this.observer = observer; this.accumulatorFunction = accumulator;