Ross Coded Classes

Programming tips with no rose colored glasses.

Doing work in the middle of a ReactiveX chain

I struggled for a while figuring out how to do some work in the middle of a chain of Observables.

I needed to make four HTTP requests. The second and third can be done at the same time and I need the combined results for the fourth call, but the application needed the results of the third call.

Observable<User> source = sessionService
        // First HTTP request
        .createSessionRx(username, password)
        
        // Second and third HTTP request
        // When they both are successful emit one observable
        .flatMap(new Func1<AuthorizationToken, Observable<User>>()
        {
            @Override
            public Observable<User> call(final AuthorizationToken token)
            {
                // Result from first request
                String authorization = token.AuthorizationToken;

                // Second and third requests
                final Observable<Session> session = 
                    sessionService.getSessionRx(authorization);
                final Observable<SessionBindings> bindings = 
                    sessionService.getSessionBindingsRx(authorization);
                
                // Combine those together like the teeth of a zipper
                return Observable.zip(
                        // second result
                        session, 
                        // third result
                        bindings,
                        // combining funciton
                        new Func2<Session, SessionBindings, User>()
                        {
                            @Override
                            public User call(Session t1, SessionBindings t2)
                            {
                                return new User(username, session, bindings);
                            }
                        });
            }
        })

        // Now we need to do a little work and perform a fourth HTTP request
        // but the subscription wants the zipped result.
        .flatMap(new Func1<User, Observable<User>>()
        {
            @Override
            public Observable<User> call(final User user)
            {
                doTheWorkRequiredBeforeFourthCall(user);

                // This was the confusing part - I need to return the "prior" 
                // result after another successful HTTP request and finally figured 
                // out you can map the fourth resonse to the zipped response.
                return sessionService
                        .fourthRequestRx(user.EmailAddress)
                        .map(new Func1<ResponseBody, User>()
                        {
                            @Override
                            public User call(final ResponseBody responseBody)
                            {
                                // We didn't care what the response from the 
                                // fourth request was, as long as it succeeded.

                                // Everything else wants the user emitted earlier.
                                return user;
                            }
                        });
            }
        });

Subscription subscription = source
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<User>()
        {
            @Override
            public void onCompleted()
            {
                Log.e("RX", "Completed");
            }

            @Override
            public void onError(final Throwable e)
            {
                Log.e("RX", "Failed");
            }

            @Override
            public void onNext(final User user)
            {
                Log.e("RX", "Success");
            }
        });

Leave a Reply

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.