Ross Coded Classes

Programming tips with no rose colored glasses.

Doing work in the middle of a ReactiveX chain

I struggled for a while figuring out how to do some work in the middle of a chain of Observables.

I needed to make four HTTP requests. The second and third can be done at the same time and I need the combined results for the fourth call, but the application needed the results of the third call.

Observable<User> source = sessionService
        // First HTTP request
        .createSessionRx(username, password)
        // Second and third HTTP request
        // When they both are successful emit one observable
        .flatMap(new Func1<AuthorizationToken, Observable<User>>()
            public Observable<User> call(final AuthorizationToken token)
                // Result from first request
                String authorization = token.AuthorizationToken;

                // Second and third requests
                final Observable<Session> session = 
                final Observable<SessionBindings> bindings = 
                // Combine those together like the teeth of a zipper
                        // second result
                        // third result
                        // combining funciton
                        new Func2<Session, SessionBindings, User>()
                            public User call(Session t1, SessionBindings t2)
                                return new User(username, session, bindings);

        // Now we need to do a little work and perform a fourth HTTP request
        // but the subscription wants the zipped result.
        .flatMap(new Func1<User, Observable<User>>()
            public Observable<User> call(final User user)

                // This was the confusing part - I need to return the "prior" 
                // result after another successful HTTP request and finally figured 
                // out you can map the fourth resonse to the zipped response.
                return sessionService
                        .map(new Func1<ResponseBody, User>()
                            public User call(final ResponseBody responseBody)
                                // We didn't care what the response from the 
                                // fourth request was, as long as it succeeded.

                                // Everything else wants the user emitted earlier.
                                return user;

Subscription subscription = source
        .subscribe(new Observer<User>()
            public void onCompleted()
                Log.e("RX", "Completed");

            public void onError(final Throwable e)
                Log.e("RX", "Failed");

            public void onNext(final User user)
                Log.e("RX", "Success");

