Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3021,7 +3021,7 @@ public Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunction)
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public Observable<T> reduce(Func2<? super T, ? super T, ? extends T> accumulator) {
public Observable<T> reduce(Func2<T, T, T> accumulator) {
return create(OperationScan.scan(this, accumulator)).takeLast(1);
}

Expand Down Expand Up @@ -3166,7 +3166,7 @@ public ConnectableObservable<T> publish() {
*
* @see #reduce(Func2)
*/
public Observable<T> aggregate(Func2<? super T, ? super T, ? extends T> accumulator) {
public Observable<T> aggregate(Func2<T, T, T> accumulator) {
return reduce(accumulator);
}

Expand All @@ -3193,7 +3193,7 @@ public Observable<T> aggregate(Func2<? super T, ? super T, ? extends T> accumula
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public <R> Observable<R> reduce(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
}

Expand All @@ -3204,7 +3204,7 @@ public <R> Observable<R> reduce(R initialValue, Func2<? super R, ? super T, ? ex
*
* @see #reduce(Object, Func2)
*/
public <R> Observable<R> aggregate(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
public <R> Observable<R> aggregate(R initialValue, Func2<R, ? super T, R> accumulator) {
return reduce(initialValue, accumulator);
}

Expand All @@ -3227,7 +3227,7 @@ public <R> Observable<R> aggregate(R initialValue, Func2<? super R, ? super T, ?
* @return an Observable that emits the results of each call to the accumulator function
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
*/
public Observable<T> scan(Func2<? super T, ? super T, ? extends T> accumulator) {
public Observable<T> scan(Func2<T, T, T> accumulator) {
return create(OperationScan.scan(this, accumulator));
}

Expand Down Expand Up @@ -3288,7 +3288,7 @@ public Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
* @return an Observable that emits the results of each call to the accumulator function
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
*/
public <R> Observable<R> scan(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
public <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return create(OperationScan.scan(this, initialValue, accumulator));
}

Expand Down
16 changes: 8 additions & 8 deletions rxjava-core/src/main/java/rx/operators/OperationScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public final class OperationScan {
* @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212007%28v=vs.103%29.aspx">Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, TAccumulate))</a>
*/
public static <T, R> OnSubscribeFunc<R> scan(Observable<? extends T> sequence, R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
public static <T, R> OnSubscribeFunc<R> scan(Observable<? extends T> sequence, R initialValue, Func2<R, ? super T, R> accumulator) {
return new Accumulator<T, R>(sequence, initialValue, accumulator);
}

Expand All @@ -70,17 +70,17 @@ public static <T, R> OnSubscribeFunc<R> scan(Observable<? extends T> sequence, R
* @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx">Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource))</a>
*/
public static <T> OnSubscribeFunc<T> scan(Observable<? extends T> sequence, Func2<? super T, ? super T, ? extends T> accumulator) {
public static <T> OnSubscribeFunc<T> scan(Observable<? extends T> sequence, Func2<T, T, T> accumulator) {
return new AccuWithoutInitialValue<T>(sequence, accumulator);
}

private static class AccuWithoutInitialValue<T> implements OnSubscribeFunc<T> {
private final Observable<? extends T> sequence;
private final Func2<? super T, ? super T, ? extends T> accumulatorFunction;
private final Func2<T, T, T> accumulatorFunction;

private AccumulatingObserver<T, T> accumulatingObserver;

private AccuWithoutInitialValue(Observable<? extends T> sequence, Func2<? super T, ? super T, ? extends T> accumulator) {
private AccuWithoutInitialValue(Observable<? extends T> sequence, Func2<T, T, T> accumulator) {
this.sequence = sequence;
this.accumulatorFunction = accumulator;
}
Expand Down Expand Up @@ -116,9 +116,9 @@ public void onCompleted() {
private static class Accumulator<T, R> implements OnSubscribeFunc<R> {
private final Observable<? extends T> sequence;
private final R initialValue;
private final Func2<? super R, ? super T, ? extends R> accumulatorFunction;
private final Func2<R, ? super T, R> accumulatorFunction;

private Accumulator(Observable<? extends T> sequence, R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
private Accumulator(Observable<? extends T> sequence, R initialValue, Func2<R, ? super T, R> accumulator) {
this.sequence = sequence;
this.initialValue = initialValue;
this.accumulatorFunction = accumulator;
Expand All @@ -133,11 +133,11 @@ public Subscription onSubscribe(final Observer<? super R> observer) {

private static class AccumulatingObserver<T, R> implements Observer<T> {
private final Observer<? super R> observer;
private final Func2<? super R, ? super T, ? extends R> accumulatorFunction;
private final Func2<R, ? super T, R> accumulatorFunction;

private R acc;

private AccumulatingObserver(Observer<? super R> observer, R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
private AccumulatingObserver(Observer<? super R> observer, R initialValue, Func2<R, ? super T, R> accumulator) {
this.observer = observer;
this.accumulatorFunction = accumulator;

Expand Down
30 changes: 30 additions & 0 deletions rxjava-core/src/test/java/rx/CovarianceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import org.junit.Test;

import rx.util.functions.Func2;

/**
* Test super/extends of generics.
*
Expand All @@ -21,6 +23,25 @@ public void testCovarianceOfFrom() {
// Observable.<HorrorMovie>from(new Movie()); // may not compile
}

@Test
public void testSortedList() {
Func2<Media, Media, Integer> SORT_FUNCTION = new Func2<Media, Media, Integer>() {

@Override
public Integer call(Media t1, Media t2) {
return 1;
}
};

// this one would work without the covariance generics
Observable<Media> o = Observable.from(new Movie(), new TVSeason(), new Album());
o.toSortedList(SORT_FUNCTION);

// this one would NOT work without the covariance generics
Observable<Movie> o2 = Observable.from(new Movie(), new ActionMovie(), new HorrorMovie());
o2.toSortedList(SORT_FUNCTION);
}

/*
* Most tests are moved into their applicable classes such as [Operator]Tests.java
*/
Expand All @@ -34,6 +55,15 @@ static class Movie extends Media {
static class HorrorMovie extends Movie {
}

static class ActionMovie extends Movie {
}

static class Album extends Media {
}

static class TVSeason extends Media {
}

static class Rating {
}

Expand Down
38 changes: 21 additions & 17 deletions rxjava-core/src/test/java/rx/ReduceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public Integer call(Integer t1, Integer t2) {
assertEquals(6, value);
}

@SuppressWarnings("unused")
@Test
public void reduceWithObjects() {
Observable<Movie> horrorMovies = Observable.<Movie> from(new HorrorMovie());
Expand All @@ -41,9 +42,15 @@ public Movie call(Movie t1, Movie t2) {
Observable<Movie> reduceResult2 = horrorMovies.reduce(chooseSecondMovie);
}

/**
* Reduce consumes and produces T so can't do covariance.
*
* https://github.com/Netflix/RxJava/issues/360#issuecomment-24203016
*/
@SuppressWarnings("unused")
@Test
public void reduceWithCovariantObjects() {
Observable<HorrorMovie> horrorMovies = Observable.from(new HorrorMovie());
Observable<Movie> horrorMovies = Observable.<Movie> from(new HorrorMovie());

Func2<Movie, Movie, Movie> chooseSecondMovie =
new Func2<Movie, Movie, Movie>() {
Expand All @@ -52,36 +59,33 @@ public Movie call(Movie t1, Movie t2) {
}
};

Observable<Movie> reduceResult = Observable.create(OperationScan.scan(horrorMovies, chooseSecondMovie)).takeLast(1);

//TODO this isn't compiling
// Observable<Movie> reduceResult2 = horrorMovies.reduce(chooseSecondMovie);
Observable<Movie> reduceResult2 = horrorMovies.reduce(chooseSecondMovie);
}

/**
* Reduce consumes and produces T so can't do covariance.
*
* https://github.com/Netflix/RxJava/issues/360#issuecomment-24203016
*/
@Test
public void reduceCovariance() {
Observable<HorrorMovie> horrorMovies = Observable.from(new HorrorMovie());

// do something with horrorMovies, relying on the fact that all are HorrorMovies
// and not just any Movies...

// pass it to library (works because it takes Observable<? extends Movie>)
// must type it to <Movie>
Observable<Movie> horrorMovies = Observable.<Movie> from(new HorrorMovie());
libraryFunctionActingOnMovieObservables(horrorMovies);
}

public void libraryFunctionActingOnMovieObservables(Observable<? extends Movie> obs) {
/*
* This accepts <Movie> instead of <? super Movie> since `reduce` can't handle covariants
*/
public void libraryFunctionActingOnMovieObservables(Observable<Movie> obs) {
Func2<Movie, Movie, Movie> chooseSecondMovie =
new Func2<Movie, Movie, Movie>() {
public Movie call(Movie t1, Movie t2) {
return t2;
}
};

Observable<Movie> reduceResult = Observable.create(OperationScan.scan(obs, chooseSecondMovie)).takeLast(1);

//TODO this isn't compiling
// Observable<Movie> reduceResult2 = obs.reduce(chooseSecondMovie);
// do something with reduceResult...
obs.reduce(chooseSecondMovie);
}

}