redux-observable is a great way for modeling asynchronous effects in Redux. Based on RxJS, it allows you to represent your actions and their effects as streams of events over time. It can be a bit tricky to test, however, especially if you're using time-based operations. In this post I'll explain some of the tools you'll need to test RxJS, specifically within the context of redux-observable.
I'll assume you're already familiar with Redux, RxJS and redux-observable. I'll begin with a brief introduction to marble testing, then build up an epic of increasing complexity over several examples, along with tests. Finally I'll give some less detailed examples of further improvements.
All examples were built using RxJS 5.5. This shouldn't make a difference to most of the code, but some of the import locations might be different if you're using a different version. For the test framework I'll be using Jest, but it should be easy to adapt the examples for any framework.
Marble testing
Introduced in RxJS 5, marble tests allow you to represent elements emitted by an Observable
over time using a diagram. For example:
--a--b---a-|
Time flows from left to right in the diagram, with each -
symbol representing 10 "frames" of time. The symbols a
and b
are the elements emitted by the observable and |
indicates the observable completing successfully. For a complete description of marble syntax, see the RxJS documentation.
The idea is to create marble diagrams to describe the inputs and the expected outputs of our epics, then use our chosen test framework to assert the result. With this in mind, we can start building our first epic.
Hello, marbles
Let's begin by creating a very simple epic and marble test to confirm our setup is working.
Our first epic will simply take an incoming request action and map it to a success action with no payload:
import 'rxjs';
export const FETCH_REQUEST = 'FETCH_REQUEST';
export const FETCH_SUCCESS = 'FETCH_SUCCESS';
export const fetchRequest = url => ({
type: FETCH_REQUEST,
payload: { url }
});
export const fetchSuccess = data => ({
type: FETCH_SUCCESS,
payload: data
});
const epic = action$ =>
action$.ofType(FETCH_REQUEST).mapTo(fetchSuccess());
export default epic;
Create a test file with the following test setup:
import 'rxjs';
import { Observable, TestScheduler } from 'rxjs';
import { ActionsObservable } from 'redux-observable';
import epic, * as actions from './epics';
const deepEquals = (actual, expected) =>
expect(actual).toEqual(expected);
const createTestScheduler = () =>
new TestScheduler(deepEquals);
This sets up a factory for RxJS's TestScheduler
, along with the function that will be used to test expectations. The scheduler is what controls time in observable operations.
Now for the actual test where we define our first marble diagrams:
test('it should return success', () => {
const marbles1 = '-a--a--';
const marbles2 = '-b--b--';
const values = {
a: actions.fetchRequest(),
b: actions.fetchSuccess()
};
const ts = createTestScheduler();
const source = ActionsObservable.from(
ts.createColdObservable(marbles1, values)
);
const actual = epic(source);
ts.expectObservable(actual).toBe(marbles2, values);
ts.flush();
});
- The marble diagrams should already be familiar, but here we add
values
, which assigns specific values to thea
andb
symbols in the diagram. createColdObservable
converts a marble diagram to anObservable
instance. We'll be using cold observables in all the examples.ActionsObservable.from(...)
converts theObservable
to anActionsObservable
, which is required for the epic input.const actual = epic(source)
sets up the epic under test. Note that nothing will execute at this point. The epic will only start doing work oncesource
starts emitting elements, which will only happen when the scheduler is flushed.expectObservable(actual).toBe(marbles2, values)
schedules a comparison between the actual epic output and the second (expected) marble diagram. Again, the comparison will only be executed when the scheduler is flushed.ts.flush()
executes the test; test observables will start emitting and expectations will be asserted. Without this line, nothing will be tested.
You should now have a passing test. If you play around with the marble diagrams you'll see how it fails when you move actions forward and backward on the timeline.
Dependencies
Now that we've established a working test setup, let's extend the functionality of our epic. Let's say we have a library called httpClient
with a get
method that takes a URL and returns a response as an observable.
Since we don't actually want to make any API calls when testing this epic, we need to be able to replace httpClient
with a test implementation during tests. To accommodate this, we use redux-observable's dependency injection.
The updated epic looks like this:
const epic = (action$, store, { httpClient }) =>
action$
.ofType(FETCH_REQUEST)
.switchMap(action =>
httpClient
.get(action.payload.url)
.map(response => fetchSuccess(response))
);
This is a fairly common sight in redux-observable: an incoming action triggers an API call with the result mapped to a new action.
Let's update the test accordingly:
test('it should return success', () => {
const marbles1 = '-a--a--';
const marbles2 = '-b--b--';
const values = {
a: actions.fetchRequest('some-url'),
b: actions.fetchSuccess({ message: 'hello, marbles' })
};
const ts = createTestScheduler();
const httpClient = {
get: () => Observable.of({ message: 'hello, marbles' })
};
const source = ActionsObservable.from(
ts.createColdObservable(marbles1, values)
);
const actual = epic(source, null, { httpClient });
ts.expectObservable(actual).toBe(marbles2, values);
ts.flush();
});
- The fake
httpClient
always returns a response immediately. fetchRequest
andfetchSuccess
now have payloads.- The fake
httpClient
is injected into the epic. - Marble diagrams have stayed exactly the same.
Introducing time
To make things more interesting (and realistic), let's slow down the response from the fake HTTP client.
Normally we would just add a delay
operator to the observable, but to make it testable we must ensure it uses the right scheduler:
const ts = createTestScheduler();
const httpClient = {
get: () => {
const duration = ts.createTime('-|');
return Observable.of({
message: 'hello, marbles'
}).delay(duration, ts);
}
};
ts.createTime
allows us to specify a delay duration in frames, rather than milliseconds.- As expected,
.delay()
is used to slow down the response, but we pass ints
to override the default scheduler.
The test now fails. To fix it, we need to update our marble diagrams:
const marbles1 = '-a--a--';
const marbles2 = '--b--b-';
Looking at these diagrams it's easy to see the expected delay.
If we increase the delay to ---|
we can even test our choice of switchMap
over mergeMap
in the epic; these diagrams will make that test pass:
const marbles1 = '-a--a---';
const marbles2 = '-------b';
The diagram shows that only one success action was emitted. The first request wasn't resolved by the time the second request arrived, so the first request was canceled. To emit both success results, use mergeMap
instead.
Failures
So far the epic handles successful API calls, but ignores failures. We'll extend the epic to handle them.
Create the new failure-handling action:
export const FETCH_FAILURE = 'FETCH_FAILURE';
export const fetchFailure = error => ({
type: FETCH_SUCCESS,
error
});
Update the epic to catch failures:
const epic = (action$, store, { httpClient }) =>
action$.ofType(FETCH_REQUEST).switchMap(action =>
httpClient
.get(action.payload.url)
.map(response => fetchSuccess(response))
.catch(error => Observable.of(fetchFailure(error)))
);
We catch the error inside the switchMap
to prevent it from reaching action$
; if it does, action$
will terminate and won't handle any more actions.
In the test, update diagrams and values:
const marbles1 = '-a--e---';
const marbles2 = '--b-x---';
const values = {
a: actions.fetchRequest('some-url'),
b: actions.fetchSuccess({ message: 'hello, marbles' }),
e: actions.fetchRequest('error-url'),
x: actions.fetchFailure('Fetch error')
};
Finally, update the fake HTTP client to immediately return an error if the URL is error-url
:
const httpClient = {
get: url => {
const duration = ts.createTime('-|');
if (url === 'error-url') {
return Observable.throw('Fetch error');
}
return Observable.of({
message: 'hello, marbles'
}).delay(duration, ts);
}
};
The test should now be passing again.
Note that although we're testing failures, we don't use the #
symbol in this case. The failures are represented purely by actions, not by actual errors thrown in the observables.
Failures with retry
As a final example we'll create an epic that retries a few times on any HTTP error, with a delay between each retry, before finally emitting a failure action.
We'll use the retryWhen
operator to retry with a delay. First attempt:
const epic = (action$, store, { httpClient }) =>
action$.ofType(FETCH_REQUEST).switchMap(action =>
httpClient
.get(action.payload.url)
.retryWhen(errors =>
errors
.delay(1000)
.take(5)
.concat(Observable.throw('Retry error'))
)
.map(response => fetchSuccess(response))
.catch(error => Observable.of(fetchFailure(error)))
);
Retry attempts are triggered when the observable inside retryWhen
(known as the "notifier") emits. We introduce a delay and only take 5 emitted values. If no retry has succeeded by then, we terminate the observable with an error.
This is a good approach, but is not very testable: delay
should be configurable with a test scheduler and duration. We'll use injection again:
const epic = (
action$,
store,
{ httpClient, retryConfig, scheduler }
) =>
action$.ofType(FETCH_REQUEST).switchMap(action =>
httpClient
.get(action.payload.url)
.retryWhen(errors =>
errors
.delay(retryConfig.delay, scheduler)
.take(retryConfig.count)
.concat(Observable.throw('Retry error'))
)
.map(response => fetchSuccess(response))
.catch(error => Observable.of(fetchFailure(error)))
);
We'll inject the new dependencies in the test and update the marble diagrams:
test('it should handle failures', () => {
const marbles1 = '-a--e-----';
const marbles2 = '--b------x';
const values = {
a: actions.fetchRequest('some-url'),
b: actions.fetchSuccess({ message: 'hello, marbles' }),
e: actions.fetchRequest('error-url'),
x: actions.fetchFailure('Fetch error')
};
const ts = createTestScheduler();
const httpClient = {
get: url => {
const duration = ts.createTime('-|');
if (url === 'error-url') {
return Observable.throw('Fetch error');
}
return Observable.of({
message: 'hello, marbles'
}).delay(duration, ts);
}
};
const source = ActionsObservable.from(
ts.createColdObservable(marbles1, values)
);
const retryConfig = {
delay: ts.createTime('-|'),
count: 5
};
const dependencies = {
httpClient,
retryConfig,
scheduler: ts
};
const actual = epic(source, null, dependencies);
ts.expectObservable(actual).toBe(marbles2, values);
ts.flush();
});
Note the marble diagrams:
const marbles1 = '-a--e-----';
const marbles2 = '--b------x';
There's a long delay between the failing request and the final failure action. This shows the retries and delay are working as expected.
Further improvements
The test could be made more sophisticated by introducing the following refinements:
- Extract retry functionality into a helper function.
- Test using an HTTP client that fails at first, but eventually succeeds.
- Use subscription testing to verify the retry behaviour.
I'll give a brief discussion and some code snippets to explain how this might be done.
Retry helper function
Creating a helper function that just handles the GET request and retry logic will make testing the retries easier. Something like:
const retryGET = (httpClient, retryConfig, scheduler) => url =>
httpClient.get(url).retryWhen(errors =>
errors
.delay(retryConfig.delay, scheduler)
.take(retryConfig.count)
.concat(Observable.throw('Retry error'))
);
Here httpClient.get
is the input observable. Use a marble diagram to define its behaviour in tests.
HTTP client fails at first, then succeeds
Observable.defer
is key to creating this sort of behaviour. A concise way of creating such an observable is:
const marbles = ['--#', '-#', '--a|'];
const marbles$ = marbles.map(m => ts.createColdObservable(m, values));
const source = Observable.defer(() => marbles$.shift());
The marble diagram snippets are mapped to an array of observables and wrapped into a single observable using Observable.defer
. Every time source
is subscribed to, it will return the next observable in the array.
Subscription tests
Since retryWhen
works by resubscribing to the source observable, it would be good to test when these subscriptions occur. Subscription tests let you define expectations for this. Example:
const subs = [
'^ !',
' ^ !',
' ^ !'
];
These diagrams show when we expect subscriptions to start (^
) and end (!
). Comparing them to side-by-side, we can see the retry delay between the end of one subscription and the start of the next. To test:
Object.keys(subs).forEach(i => {
ts.expectSubscriptions(marble$[i].subscriptions).toBe(subs[i]);
});
We're testing each observable fragment against its corresponding subscription diagram. We can't test against the combined observable, since that was created using defer
and won't have the subscriptions
property we need.
Wrap up
That should cover most of the tools and techniques you'll need to test your epics. It may seem a bit complicated at first, but if you start small with some simple tests and add to it bit by bit, you'll quickly grow your understanding of marble testing and of observables in general.