Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Reimplement 'subscribeOn' using 'lift'#822

Merged
benjchristensen merged 3 commits intoReactiveX:masterfrom
zsxwing:subscribeOn
Feb 7, 2014
Merged

Reimplement 'subscribeOn' using 'lift'#822
benjchristensen merged 3 commits intoReactiveX:masterfrom
zsxwing:subscribeOn

Conversation

@zsxwing
Copy link
Member

@zsxwing zsxwing commented Feb 6, 2014

hi, this PR reimplemented the subscribeOn using lift. However, both the original and current implementation can not guarantee that unsubscribe is always called in the scheduler. An extreme example is:

    public static void main(String[] args) throws InterruptedException {
        Observable.create(new OnSubscribe<Integer>() {

            @Override
            public void call(final Subscriber<? super Integer> t1) {
                final Subscription s = Subscriptions.create(new Action0() {

                    @Override
                    public void call() {
                        System.out.println(Thread.currentThread().getName());
                    }
                });
                t1.add(s);
                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        t1.unsubscribe();
                    }

                }, "test").start();
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                t1.onCompleted();
            }
        }).subscribeOn(Schedulers.newThread()).subscribe();
        Thread.sleep(10000);
    }

will output "test".

@cloudbees-pull-request-builder

RxJava-pull-requests #742 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #743 SUCCESS
This pull request looks good

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use inner here I guess.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a recursive schedule. As you said, if using inner, it may prevents further scheduled actions.

@akarnokd
Copy link
Member

akarnokd commented Feb 6, 2014

I've played around with this and couldn't get the scheduled unsubscription to run because when the subscription is scheduled, it exposes the Inner.MAS, which then gets unsubscribed and prevents further scheduling on Inner completely.

@zsxwing
Copy link
Member Author

zsxwing commented Feb 6, 2014

Could you provide an example? The return value of scheduler.schedule isn't used.

@akarnokd
Copy link
Member

akarnokd commented Feb 6, 2014

This example doesn't print unsub and doesn't terminate after 100ms as it should (I know it isn't your subscribeOn, but this would ensure the unsubscription happens on the same newThread() thread):

public class SubscribeOnTest {
    static <T> Observable<T> subscribeOn1(Observable<T> source, Scheduler scheduler) {
        return Observable.create((Subscriber<? super T> s) -> {
            Subscriber<T> s0 = new Subscriber<T>() {
                @Override
                public void onCompleted() {
                    s.onCompleted();
                }
                @Override
                public void onError(Throwable e) {
                    s.onError(e);
                }
                @Override
                public void onNext(T t) {
                    s.onNext(t);
                }
            };
            MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
            s.add(mas);
            mas.set(scheduler.schedule(i -> {
                s.add(Subscriptions.create(() -> {
                    i.schedule(j -> {
                        System.out.println("Unsub: " + Thread.currentThread().getName());
                        s0.unsubscribe();
                    });
                }));
                System.out.println("Sub  : " + Thread.currentThread().getName());
                source.subscribe(s0);
            }));
        });
    }
    public static void main(String[] args) throws Exception {
        Subscription s = subscribeOn1(Observable.interval(10, TimeUnit.MILLISECONDS)
                , Schedulers.computation()).subscribe(v -> {
                    System.out.printf("%s: %s%n", Thread.currentThread().getName(), v);
                });

        Thread.sleep(105);
        s.unsubscribe();
        Thread.sleep(100);
    }
}

The problem is that the first schedule call once run should be gently removed to preserve the Inner's isUnsubscribed==false status. I had to modify MultipleAssignmentSubscription to allow setting and unsubscribing without calling unsubscribe in the inner subscription:

https://gist.github.com/akarnokd/8843694

https://gist.github.com/akarnokd/8843709

With the latter, it correctly subscribes and unsubscribes.

@zsxwing
Copy link
Member Author

zsxwing commented Feb 6, 2014

If the Subscription of scheduler.schedule is not added to the subscriber or the mas, so the unsubscribe can be always called, although there is a little performance cost.

@cloudbees-pull-request-builder

RxJava-pull-requests #745 SUCCESS
This pull request looks good

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line necessary? As I understand subscriber chains, this is what should happen:

subscriber.unsubscribe -> cs.unsubscribe -> o.unsubscribe

benjchristensen added a commit that referenced this pull request Feb 7, 2014
Reimplement 'subscribeOn' using 'lift'
@benjchristensen benjchristensen merged commit d40b684 into ReactiveX:master Feb 7, 2014
@benjchristensen
Copy link
Member

Thanks @zsxwing for this and @akarnokd for the review.

@zsxwing zsxwing deleted the subscribeOn branch February 7, 2014 04:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants