RxJava
RxJava
The Helix programming model for Cortex uses a small subset of RxJava Observable types as expected return types for all prototype operations. This document covers the specific parts of RxJava used by Cortex, but more in-depth documentation can be found at reactivex.io.
Prototype Operations and RxJava
The expected return types for all prototype operation interfaces are Observable types: rx.Observable<T>, rx.Single<T> or rx.Completable.
rx.Observable
Where a prototype expects zero or more values to be returned, the generated operation interfaces return rx.Observable.
To return an Observable from a collection:
Observable.just(someList)
In the event of zero values being returned, an empty Observable can be returned:
Observable.empty();
Observable can also return an error:
Observable.error(new MyException())
rx.Single
In cases where a prototype expects a single return value, the generated operation interfaces return rx.Single. Single can be used to express that either a value has been emitted or that the operation has failed.
To express a value:
Single.just("foo")
To express a failure:
Single.error(new MyException())
Note that in contrast to rx.Observable<T>, there is no concept of an empty Single. rx.Single must either return a value or an error.
rx.Completable
In cases where a prototype is not interested in a return value of generated operation interfaces, rx.Completable is returned. rx.Completable indicates whether an operation was successful or it failed.
To express a successful computation:
Completable.complete()
To express a failure:
Completable.error(new MyException())
Operation Failures
The only runtime exceptions that prototypes understand are ResourceOperationFailure exceptions. These should be used.
For example, to return a NOT_FOUND failure:
return Observable.error(ResourceOperationFailure.notFound());
Rx Repositories
In some of Helix examples, you'll see repositories that also return RxJava Observable, Single or Completable. While this is not necessary, returning these allows prototype implementations to make use of the various Rx combinators.
For example:
@Override public Single<ThingEntity> onRead() { return repository .retrieveThing(thingId.getValue()) .map(thing -> ThingEntity.builder() .withName(thing.getName()) .withDescription(thing.getDescription()) .build()); }
In the preceding example, the repository returns a Single<Thing> which allows us to use the map operator to convert the returned value into a ThingEntity.
The same example, but with a "regular" return type:
@Override public Single<ThingEntity> onRead() { Thing thing = repository.retrieveThing(thingId.getValue()); ThingEntity thingEntity = ThingEntity.builder() .withName(thing.getName()) .withDescription(thing.getDescription()) .build(); return Single.just(thingEntity); }
In addition to being able to use Rx combinators, maintaining the "Observable chain" makes handling and propagating errors fairly simple, without need for try/catch blocks.
Rx Unit Testing
Unit testing of RxJava Observable types is fairly simple. RxJava contains a standard TestSubscriber, which allows execution and assertion of observable types.
The typical pattern for using the TestSubscriber is to create an instance and subscribe with this instance to an Observable. After the subscription, various xunit style assertions can be performed on the TestSubscriber instance:
TestSubscriber<OperationResult> testSubscriber = new TestSubscriber<>(); //TestSubscriber subscribes to observable observable.subscribe(testSubscriber); //assert if observable completed testSubscriber.assertCompleted(); //count number of values emitted by observable testSubscriber.assertValueCount(2);
The observable values can also be transformed by the TestSubscriber to a java.util.List:
//emit all values and collect values in a List List<MyResourceIdentier> resourceIdentifiers = testSubscriber.getOnNextEvents();
This enables convenient list style assertions with advanced assertion frameworks like AsssertJ or Hamcrest.
Note: The TestSubscriber can be used with all observable types: rx.Observable, rx.Single and rx.Completable.
Debugging via Stack Traces
To debug RxJava code via stack traces, use the RxJavaHooks plugin, available from RxJava's github.
RxJava code is executed upon subscription to an observable type. Because of this, when a failure occurs, an exception stack trace may not tell which Rx operation failed. RxJavaHooks captures the stack trace when Observable, Single and Completable operators are instantiated at assembly-time. Whenever an error is signaled via onError(), this assembly-time stack trace is attached as the last cause of that exception.
Example: Enabling Assembly Tracking for the Entire Application
Enabling assembly tracking has severe performance impact, and it should never be enabled in production. It should only be used for debugging purposes in development.
@Component public class RxAssemplyHookInitializer { @Activate public void initialize() { RxJavaHooks.enableAssemblyTracking(); } }
Once enabled, the following snippet:
Observable.empty().single() .subscribe(System.out::println, Throwable::printStackTrace);
Will result in a stacktrace similar to shown below:
java.lang.NoSuchElementException at rx.internal.operators.OnSubscribeSingle(OnSubscribeSingle.java:57) ... Assembly trace: at com.example.TrackingExample(TrackingExample:10)