2 minutes reading time (392 words)

On the subject of RxJS: multiple subscribers to a single Observable

A while ago, I answered this question on StackOverflow regarding multiple subscriptions to an RxJS Observable.

As with everything else of RxJS, the answer is simple and elegant.

But first, let's start with the actual problem. Here's the author's question:

let observer = null;

const notificationArrayStream = Rx.Observable.create(function (obs) {
  observer = obs;
  return () = {};
});

function trigger(something) {
  observer.next(something);
}

notificationArrayStream.subscribe((x) => console.log('a: ' + x));
notificationArrayStream.subscribe((x) => console.log('b: ' + x));

trigger('TEST');

 

The author's expected output:

a: TEST
b: TEST

The actual output:

b: TEST

 

Why ist that? And how can multiple functions subscribe to a single Observable?


Problem:

Observables are not multicast. Sometimes people like to think of an Observable as a Promise, which can have multiple thenables. In fact, that's not the case with Observables.

Solution:

Subject

A simple solution for this problem is to use a Subject. A subject allows you to share a single execution with multiple observers when using it as a proxy for a group of subscribers and a source. In other words: A subject represents an Observer and an Observable at the same time, allowing multicasting events from a single source to multiple child subscribers. You can also think of it as an event bus.

In essence, here's the example code utilizing a subject:

const subject = new Subject();

function trigger(something) {
    subject.next(something);
}

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

trigger('TEST');

Result:

a: TEST
b: TEST

 

Pitfall: Observers arriving too late

Note that the timing of when you subscribe and when you broadcast the data is relevant. If you send a broadcast before subscribing, you're not getting notified by this broadcast:

function trigger(something) {
    subject.next(something);
}

trigger('TEST');

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

Result:

(empty)

 

ReplaySubject & BehaviorSubject

If you want to ensure that even future subscribers get notified, you can use a ReplaySubject or a BehaviorSubject instead.

Here's an example using a ReplaySubject (with a cache-size of 5, meaning up to 5 values from the past will be remembered, as opposed to a BehaviorSubject which can remember only the last value):

const subject = new ReplaySubject(5); // buffer size is 5

function trigger(something) {
    subject.next(something);
}

trigger('TEST');

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

Result:

a: TEST
b: TEST

Bitcoin Monitor with Real-Time Prices
Browser Market Share in Austria
 

Comments

No comments made yet. Be the first to submit a comment

София Дървени материали

София Дървени дъски