Using RxJS Subjects

S.AsyncSubject

RxJS Subjects are one of the favorite types of Observables used by expert developers when dealing with a multi-subscriber or multi-observer implementation.

Subjects in simple terms, function both as an Observable and as an Observer. This gives them the ability to proxy values received from other Observables to its own list of Observers, which it maintains in an array called observers.

Subjects fall under the category of Observables commonly called Multi-casted Observables and to better understand them, we need to be aware that there are of two types of Observables, namely:

  1. Cold Observables:
    • The value producer is created inside the observable.
    • There can only be one observer per execution, making them unicast.
    • Examples of such include interval(), ajax().
  2. Hot Observables:
    • The value producer exists outside the observable. It produces values whether there any observers or not.
    • They can have a shared producer that allows for multiple observers that all receive the same values at the same time, since calling subscribe() is no longer what triggers the observable to start sending values.
    • They are multicast.
    • Examples of such include Observables that wrap DOM events (like fromEvent)  and WebSockets.

To multicast a cold observable to a behave like a hot observable, the RxJS team has provided a series of multicasting operators, that manage the entire process, such as:

  • multicast():
    • It takes a subject as a parameter and uses it behind-the-scenes to manage the multicasting.
    • It returns a special type of observable called Connectable Observable, whose execution can be triggered by calling the connect method.
  • refCount():
    • It automatically triggers the execution of the source observable when the number of observers is greater than 0.
  • publish():
    • Its a thin wrapper about multicast() and it does not require you to pass a subject as a parameter. It will create a subject for you internally.
  • share():
    • It is similar to using publish() and refCount() together because it executes when the number of observers is greater than 0.
    • However, unlike the publish operator, it will resubscribe to the source as necessary if the observer count goes down to 0 and then back above 0 as new observers are added.

Here’s a sample code that demonstrates the multicast operators in action using publish() and refCount():

let source$ = interval(1000).pipe(
 take(4),
 publish(),
 refCount()
);

source$.subscribe(value => console.log(`Observer 1: ${value}`));
setTimeout(() => {
 source$.subscribe(value=>console.log(`Observer 2: ${value}`));
}, 1000);

setTimeout(() => {
 source$.subscribe(value=>console.log(`Observer 3: ${value}`));
}, 2000);

Types of Subjects:

AsyncSubject:

The diagram displayed at the start of this article is a marble diagram that describes a basic sub-type of Subject called AsyncSubject, which is a special type of Subject that emits only the last value it received. This type of Subject is used internally by the publishLast() operator when it is applied to an observable.

BehaviourSubject: 

Another special type of Subject is the BehaviourSubject, which are configured with an initial seed value and will immediately emit that value to observers if the source has not yet produced that value. If an observer is added after the source has begun producing values, then it will emit the most recent value to the new observer instead of the seed value. The operator that uses the type of Subject internally is called publishBehaviour(). Below is a marble diagram describing the BehaviourSubject:

S.BehaviorSubject.e

 

ReplaySubject:

Another special type of Subject is the ReplaySubject, which stores and emits a configurable number of values to all observers. Those stored values are essentially replayed and emitted to all new observers. The publishReplay() operator uses this type of subject internally. Below is a diagram describing the ReplaySubject:

S.ReplaySubject

 

PublishSubject:

The PublishSubject emits to the observers only those items that are emitted by the source observable subsequent to the time of the subscription. Note that a PublishSubject may begin emitting values immediately it is created and so there is a risk that one or more items may be lost between the time the Subject is created and the observer subscribes to it. If you need to guarantee delivery of all items from the source Observable, you’ll need either to form that Observable with Create() so that you can manually reintroduce “cold” Observable behavior, or switch to using a ReplaySubject instead. Below is a marble diagram of the PublishSubject:

S.PublishSubject.e

 

Author: daltonwhyte

A technocrat who believes in a smart future, that will be proliferated with systems that allow us to focus on the bigger picture.

Leave a Reply

Your email address will not be published. Required fields are marked *