I struggled for a while figuring out how to do some work in the middle of a chain of Observables.
I needed to make four HTTP requests. The second and third can be done at the same time and I need the combined results for the fourth call, but the application needed the results of the third call.
Observable<User> source = sessionService // First HTTP request .createSessionRx(username, password) // Second and third HTTP request // When they both are successful emit one observable .flatMap(new Func1<AuthorizationToken, Observable<User>>() { @Override public Observable<User> call(final AuthorizationToken token) { // Result from first request String authorization = token.AuthorizationToken; // Second and third requests final Observable<Session> session = sessionService.getSessionRx(authorization); final Observable<SessionBindings> bindings = sessionService.getSessionBindingsRx(authorization); // Combine those together like the teeth of a zipper return Observable.zip( // second result session, // third result bindings, // combining funciton new Func2<Session, SessionBindings, User>() { @Override public User call(Session t1, SessionBindings t2) { return new User(username, session, bindings); } }); } }) // Now we need to do a little work and perform a fourth HTTP request // but the subscription wants the zipped result. .flatMap(new Func1<User, Observable<User>>() { @Override public Observable<User> call(final User user) { doTheWorkRequiredBeforeFourthCall(user); // This was the confusing part - I need to return the "prior" // result after another successful HTTP request and finally figured // out you can map the fourth resonse to the zipped response. return sessionService .fourthRequestRx(user.EmailAddress) .map(new Func1<ResponseBody, User>() { @Override public User call(final ResponseBody responseBody) { // We didn't care what the response from the // fourth request was, as long as it succeeded. // Everything else wants the user emitted earlier. return user; } }); } }); Subscription subscription = source .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<User>() { @Override public void onCompleted() { Log.e("RX", "Completed"); } @Override public void onError(final Throwable e) { Log.e("RX", "Failed"); } @Override public void onNext(final User user) { Log.e("RX", "Success"); } });