I struggled for a while figuring out how to do some work in the middle of a chain of Observables.
I needed to make four HTTP requests. The second and third can be done at the same time and I need the combined results for the fourth call, but the application needed the results of the third call.
Observable<User> source = sessionService
// First HTTP request
.createSessionRx(username, password)
// Second and third HTTP request
// When they both are successful emit one observable
.flatMap(new Func1<AuthorizationToken, Observable<User>>()
{
@Override
public Observable<User> call(final AuthorizationToken token)
{
// Result from first request
String authorization = token.AuthorizationToken;
// Second and third requests
final Observable<Session> session =
sessionService.getSessionRx(authorization);
final Observable<SessionBindings> bindings =
sessionService.getSessionBindingsRx(authorization);
// Combine those together like the teeth of a zipper
return Observable.zip(
// second result
session,
// third result
bindings,
// combining funciton
new Func2<Session, SessionBindings, User>()
{
@Override
public User call(Session t1, SessionBindings t2)
{
return new User(username, session, bindings);
}
});
}
})
// Now we need to do a little work and perform a fourth HTTP request
// but the subscription wants the zipped result.
.flatMap(new Func1<User, Observable<User>>()
{
@Override
public Observable<User> call(final User user)
{
doTheWorkRequiredBeforeFourthCall(user);
// This was the confusing part - I need to return the "prior"
// result after another successful HTTP request and finally figured
// out you can map the fourth resonse to the zipped response.
return sessionService
.fourthRequestRx(user.EmailAddress)
.map(new Func1<ResponseBody, User>()
{
@Override
public User call(final ResponseBody responseBody)
{
// We didn't care what the response from the
// fourth request was, as long as it succeeded.
// Everything else wants the user emitted earlier.
return user;
}
});
}
});
Subscription subscription = source
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>()
{
@Override
public void onCompleted()
{
Log.e("RX", "Completed");
}
@Override
public void onError(final Throwable e)
{
Log.e("RX", "Failed");
}
@Override
public void onNext(final User user)
{
Log.e("RX", "Success");
}
});