In some cases, the RxJs combineLatest pipe needs to be used in tandem with a dynamically changing list of input observables. In this article, we'll outline how to implement this with code implementations. If you are not familiar with RxJs or would like a quick refresher, please visit our introduction to observables with RxJs.
Dynamically Changing CombineLatest Observables
The combineLatest pipe accepts a static array of observables as input, and when any of the input observables emits a value, the subscription to combineLatest will be invoked with the most recent values of all input observables. This is a useful and powerful functionality, but it has the limitation that the array of input observables cannot change once combineLatest is called. In other words: combineLatest will not react to changes in the array; thus, we cannot dynamically add or remove observables from the combineLatest input/subscription once invoked. Therefore, in cases where we need the list of observables to change dynamically, we will need to find a way of either using the combineLatest function differently, or use a different implementation altogether. A few example use cases for this type of implementation include:
- Users are added to a session one by one, and a subscription needs to be invoked when any user performs some action.
- The UI should display a stream of events only for events which are marked as "checked" on the UI.
- Sources of information (observables) need to be dropped based on some condition, such as a server outage.
If we held a reference to the list of observables we want combineLatest to combine, we could pass that reference in, then makes changes to the array afterwords, but combineLatest will not react to this change. Therefore, we will need to put the reaction before calling combineLatest. We'll discuss two methods of doing this in the sections below.
Implementation Using SwitchMap
A simple way to provide this dynamic functionality is to:
- Store the array of observables inside of a BehaviorSubject.
- Pipe that BehaviorSubject with a switchMap.
- Within the switchMap, call combineLatest with the parameter passed into the switchMap and return the observable returned from combinedLatest.
- When it's time to change the array, pass the entire new array into BehaviorSubject.next(newList) and the pipes will respond and re-create the combineLatest observable.
This approach will provide the reaction to changes to the list of input observables, but since it recreates the combineLatest observable, it may cause issues relating to when combineLatest emits its first value. CombineLatest waits for all input observables to emit at least once before emitting itself, so even if all input observables emitted before the combineLatest was created, if the dynamic list changes afterwords, it may cause scenarios where the output observable was expected to emit a value but did not. To solve this, we can take one of these approaches given below:
- Ensure all input observables are behavior subjects: behavior subjects always emit values when they are subscribed to; thus, all observables will be guaranteed to emit a value as soon as the combineLatest is subscribed to.
- Ensure all input observables are piped with startWith or defaultIfEmpty pipes: this will give a similar result to using behavior subjects: all pipes will output as soon as the subscription is created. If the original observable has not emitted anything, one of these pipes will emit a default value, guaranteeing an emission in any case.
- Ensure all input observables are piped with shareReplay or publishReplay: this will give a similar result to using behavior subjects: all pipes will output as soon as the subscription is created. In this case, the last output will be automatically emitted. However, these pipes will only kick in after the original observable has emitted at least one value, unlike the other approaches.
- Combination of the above: a combination of the approaches above will work so long as every observable is covered.
The code below showcases a simple example of this implementation strategy:
Implementation Using a Custom Function
To make our code more declarative and easier to read, we can write a function which encapsulates the switchMap and other concerns discussed above. The TypeScript version below preserves the data types. This also allows us to put any other logic within the function itself, such as adding different pipes to our observables: