Understanding Observables and Subjects in RxJS. Multicasting values

Hello guys!

When I just started working with Angular 2+ I faced quite tricky and kinda complicated issue to me. I’ve noticed many people got into the same situation quite often. So I will try to explain what Subject and Observable in RxJS mean and what is the main difference between these two terms.

Pattern Observer

One of the first thing that you will read about RxJS is Observer pattern. This is important and fundamental pattern used in RxJS. In a standard form of the pattern there are two basic terms - Subject & Observer. But RxJS treats this pattern a bit differently, sometimes confusing for you. So let’s not focus on it and take a look at the simplified version.

There are two central terms. The first one is called Observer and you might think of it as of someone who receives data. But before it starts to obtain data, it’s necessary to subscribe it to Observable. This is the second term from the global concept and you can think of it as of someone who sends data. I like the example with Excel cells - a cell notifies every dependent formula and asks to re-calculate. You can read more here

As you can see it’s quite a simple concept and that’s all we need to understand from the Observer pattern.

Observable

You may ask where is the Subject on the previous picture, but before I answer, it’s necessary to understand what Observable does under the hood. The main aspect you should understand is that Observable is just a function that relates Observer and Data Producer. Here is the code example for better understanding:

emitValue(observer) {  
    let i = 0;
       const id = setInterval(() => {
           observer.next(i++)
       }, 1000)
       return () => { clearInterval(id) }
   } 

   createObservable(emitValueFn) {
       return {
           subscribe: emitValueFn,
       }
   }

   const observer = { next: (v) => { console.log(v) }}
   const observable = createObservable(emitValue);
   const unsubscribe = observable.subscribe(observer)

   setTimeout(() => {
     unsubscribe()
   }, 4500)

In this example you can see emitValue function puts Observer inside and every second emits increased value by calling next method from Observer object. Pay attention that emitValue returns function that will clear interval and stop emitting process.
The next function is createObservable. It returns an object with subscribe property, referring to which emitValue function will be invoked.

There is simple Observer, which will output the value in a console. The most interesting part follows. We’ve created new Observable by calling createObservable and passing function. Now it’s just an object with subscribe property and emitValue function inside. Then we call subscribe and it gives us unsubscribe (clearInterval) method.
So after running this example, you will see 0…1…2…3 in your console. I’ve prepared a working example. It’s available here

To sum up, what you really need to understand:

  1. Under the hood Observable is just a function

  2. To start receiving data, you need to call subscribe

  3. To stop receiving – call unsubscribe

There is an awesome article by Ben Lesh about Observables, where you can read more about Observable concept.

Problem with Observable

As you have already read from the previous chapter, when you call subscribe it invokes a function, which will produce data. It means that if you call subscribe, it calls Producer function once again. Speaking in terms of RxJS, if we subscribe new Observer to already existing Observable, it will receive the same data as the first Observer. Let's see the Marble diagram.

The answer to these questions is Hybrid. Let's see what it means.

Hybrid

Hybrid is an object that can be both Observable and Observer at the same time. We can subscribe it to Observable as well as subscribe Observer to this Hybrid.

What benefit we can get from it? While using Hybrid we can solve the problem of sharing the same data with many Observers. In that way marble diagramm would be as follows:

Scroll down to see a simple example of how this Hybrid can work under the hood.

const observable = Rx.Observable.interval(1000).take(5);  
    const observerA = {...};
    const observerB = {...};

    const hybrid = {
      observers: [],
      subscribe: (obs) => {
        this.observers.push(obs);
      }
      next: (v) => {
        this.observers.forEach((o) => {
          o.next(v);
        })
      }
    }

    observable.subscribe(hybrid);
    hybrid.subscribe(observerA); // first execution
    setTimeout(() => {
      hybrid.subscribe(observerB); // the same execution
    }, 2000);

Here is Hybrid object with internal list of Observers, subscribe and next methods. When we call subscribe method it will push new Observers to the list. And when data producer emits new value, it calls next method in Hybrid, that will iterate over all Observers and notify each of them. So in this way we can be sure that each Observer has been notified.
Here we are, really close to understanding such a term as Subject.

Subject and its types

So what is Subject? The answer is obvious. Subject is Hybrid between Observable and Observer, it is really similar to the one we have discussed in the previous chapter.
Now as we already know what Subject is and how it works, let's see other types of Subject available in RxJS.

The difference from Subject is that it keeps the last received data and can give it to us by request. As it stores value, it’s necessary to put the default data during the init process.

Replay Subject is pretty similar to the previous one. It also stores value, but the difference is that it can store not only the latest value, it stores as many values as we set via bufferSize or windowTime parameter.

This type of Subject is used for sending only the last value to the Observers, before the onCompleted notification or onError execution.

What you really should know is that Subject has both features of Observer and Observable; using it gives us a possibility to share one execution between many Observers.

Multicast

Returning back to the problem of sharing one data between many Observers, let me introduce you multicast.
For sharing data with many Observers, first we should create Subject, then subscribe Observers to it and then subscribe Subject to Observable, as in the picture below.

const observable = Rx.Observable.interval(1000).take(5);  
    const observerA = {...};
    const observerB = {...};

    const subject = new Rx.Subject();

    observable.subscribe(subject);

    subject.subscribe(observerA); // first execution
    setTimeout(() => {
      subject.subscribe(observerB); // the same execution
    }, 2000);

multicast is an operator and it makes sharing process easier for us. Let's see an example of using it:

const connectableObservable = Rx.Observable  
      .interval(1000)
      .take(5)
      .multicast(new Rx.Subject());
    const observerA = {...};
    const observerB = {...};

    connectableObservable.connect();

    connectableObservable.subscribe(observerA); // first execution
    setTimeout(() => {
      connectableObservable.subscribe(observerB); // the same execution
    }, 2000);

It’s pretty similar with the previous example, but instead of creating Subjects every single time, it’s possible to pass it inside multicast operator. After multicast invoked, it returns ConnectableObservable. It’s pretty similar to the ordinary Observable, except one thing - operator connect. This operator will subscribe inner Subject to the data source by demand. It means that we can control time of emitting data.
Using multicast give us an opportunity to write code in chainable style and prevents us from boilerplate code.

Publish

Another useful operator that makes our lives easier is publish.
https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/publish.ts
As you can see from the source code it’s just a wrapper around multicast. It creates Subject and passes it to the multicast operator. There is one interesting thing called Selector function used as a sandbox. It’s quite a tricky thing and deserves its own blog post to be written. So let's skip it for now.
As you have seen from the source code, publish operator creates Subject inside.
What if we need different types of Subject? For this purpose there are three more types of publish operator:

In addition publish operator returns ConnectableObservable. Moreover we need to remember: it’s necessary to call unsubscribe for all listeners to avoid memory leaks. We unsubscribe not only observers but also ConnectableObserver, which we unsubscribe from the source, too.

const sub = observable.connect();  
    const subA = observable.subscribe(observerA);
    const subB = observable.subscribe(observerB);

    subA.unsubscribe();
    subB.unsubscribe();
    sub.unsubscribe();

Creators of RxJS have taken care of us and implemented refCount operator.

RefCount

As you might have guessed refCount helps us to manage subscription. It does subscribe and unsubscribe Observers automatically.

const observable = Rx.Observable  
      .interval(1000)
      .take(5)
      .multicast(new Rx.Subject())
      .refCount();

When new Observer is subscribed, links of active Observers will be increased and refCount will automatically make a subscription. From the other hand when Observer is unsubscribed, links of active Observers will be decreased. When the number reaches 0, RefCount triggers unsubscription from the source.
So with using this operator we don’t need to remember about subscription and unsubscription, it will be called automatically, what in turn will save us from the memory leak.

Share

The share operator is a wrapper for the refCount, but having some difference. If we use refCount after onCompleted notification, then all Observers will be unsubscribed, number of active links will be decreased up to null and it will automatically unsubscribe ConnectableObservable from the source. After that, if we decide to subscribe new Observer, it will receive only complete notification.

const observable = Rx.Observable  
      .interval(1000)
      .take(3)
      .publish()
      .refCount()

    const observerA = {
      next: (x) =>  console.log('A next ' + x) ,
      complete: () => console.log('A done')
    };

    const observerB = {
      next: (x) =>  console.log('B next ' + x) ,
      complete: () => console.log('B done')
    };

    observable.subscribe(observerA);

    setTimeout(() => {
      observable.subscribe(observerB);
    }, 4000)

    // Result will be 
    // A next 0
    // A next 1
    // A next 2
    // A done
    // B done

Another behavior with using share operator. In comparison to RefCount, share uses subject factory inside. Subject factory is a simple function that creates and returns new Subject every time you call it. That means that if we decide to subscribe to new Observable after the onComplete has happened, it will create new Subject and subscribe it to the source. So we will be able to receive data once again and not seeing Complete event every time.

const observable = Rx.Observable  
.interval (1000)
.take (3)
.share ()

const observable = Rx.Observable  
      .interval(1000)
      .take(3)
      .share()

    //  Uses factory method inside, sonething like
    //  factory() {
    //       return new Rx.Subject(); 
    //    }

    const observerA = {
      next: (x) =>  console.log('A next ' + x) ,
      complete: () => console.log('A done')
    };

    const observerB = {
      next: (x) =>  console.log('B next ' + x) ,
      complete: () => console.log('B done')
    };

    observable.subscribe(observerA);

    setTimeout(() => {
      observable.subscribe(observerB);
    }, 4000)

    // Result will be 
    // A next 0
    // A next 1
    // A next 2
    // A done
    // B next 0
    // B next 1
    // B next 2
    // B done

That’s it. I’ve tried to explain the most important and confusing things in RxJS, from my point of view. However there are still some tricky things left. So leave comments and questions and we will try to figure it out next time for you.

Thank you for reading!